生产者示例
1 Producer 示例
TubeMQ提供了两种方式来初始化 session factory: TubeSingleSessionFactory 和 TubeMultiSessionFactory。
- TubeSingleSessionFactory 在整个生命周期只会创建一个 session
- TubeMultiSessionFactory 每次调用都会创建一个session
1.1 TubeSingleSessionFactory
1.1.1 Send Message Synchronously
   
   public final class SyncProducerExample {
   
       public static void main(String[] args) throws Throwable {
           final String masterHostAndPort = "localhost:8000";
           final TubeClientConfig clientConfig = new TubeClientConfig(masterHostAndPort);
           final MessageSessionFactory messageSessionFactory = new TubeSingleSessionFactory(clientConfig);
           final MessageProducer messageProducer = messageSessionFactory.createProducer();
           final String topic = "test";
           final String body = "This is a test message from single-session-factory!";
           byte[] bodyData = StringUtils.getBytesUtf8(body);
           messageProducer.publish(topic);
           Message message = new Message(topic, bodyData);
           MessageSentResult result = messageProducer.sendMessage(message);
           if (result.isSuccess()) {
               System.out.println("sync send message : " + message);
           }
           messageProducer.shutdown();
       }
   }
1.1.2 Send Message Asynchronously
   public final class AsyncProducerExample {
   
      public static void main(String[] args) throws Throwable {
          final String masterHostAndPort = "localhost:8000";
          final TubeClientConfig clientConfig = new TubeClientConfig(masterHostAndPort);
          final MessageSessionFactory messageSessionFactory = new TubeSingleSessionFactory(clientConfig);
          final MessageProducer messageProducer = messageSessionFactory.createProducer();
          final String topic = "test";
          final String body = "async send message from single-session-factory!";
          byte[] bodyData = StringUtils.getBytesUtf8(body);
          messageProducer.publish(topic);
          final Message message = new Message(topic, bodyData);
          messageProducer.sendMessage(message, new MessageSentCallback(){
              @Override
              public void onMessageSent(MessageSentResult result) {
                  if (result.isSuccess()) {
                      System.out.println("async send message : " + message);
                  } else {
                      System.out.println("async send message failed : " + result.getErrMsg());
                  }
              }
              @Override
              public void onException(Throwable e) {
                  System.out.println("async send message error : " + e);
              }
          });
          messageProducer.shutdown();
      }
1.1.3 Send Message With Attributes
   public final class ProducerWithAttributeExample {
   
      public static void main(String[] args) throws Throwable {
          final String masterHostAndPort = "localhost:8000";
          final TubeClientConfig clientConfig = new TubeClientConfig(masterHostAndPort);
          final MessageSessionFactory messageSessionFactory = new TubeSingleSessionFactory(clientConfig);
          final MessageProducer messageProducer = messageSessionFactory.createProducer();
          final String topic = "test";
          final String body = "send message with attribute from single-session-factory!";
          byte[] bodyData = StringUtils.getBytesUtf8(body);
          messageProducer.publish(topic);
          Message message = new Message(topic, bodyData);
          //set attribute
          message.setAttrKeyVal("test_key", "test value");
          //msgType is used for consumer filtering, and msgTime(accurate to minute) is used as the pipe to send and receive statistics
          SimpleDateFormat sdf = new SimpleDateFormat("yyyyMMddHHmm");
          message.putSystemHeader("test", sdf.format(new Date()));
          messageProducer.sendMessage(message);
          messageProducer.shutdown();
      }
1.2 TubeMultiSessionFactory
   public class MultiSessionProducerExample {
       
       public static void main(String[] args) throws Throwable {
           final int SESSION_FACTORY_NUM = 10;
           final String masterHostAndPort = "localhost:8000";
           final TubeClientConfig clientConfig = new TubeClientConfig(masterHostAndPort);
           final List<MessageSessionFactory> sessionFactoryList = new ArrayList<>(SESSION_FACTORY_NUM);
           final ExecutorService sendExecutorService = Executors.newFixedThreadPool(SESSION_FACTORY_NUM);
           final CountDownLatch latch = new CountDownLatch(SESSION_FACTORY_NUM);
           for (int i = 0; i < SESSION_FACTORY_NUM; i++) {
               TubeMultiSessionFactory tubeMultiSessionFactory = new TubeMultiSessionFactory(clientConfig);
               sessionFactoryList.add(tubeMultiSessionFactory);
               MessageProducer producer = tubeMultiSessionFactory.createProducer();
               Sender sender = new Sender(producer, latch);
               sendExecutorService.submit(sender);
           }
           latch.await();
           sendExecutorService.shutdownNow();
           for (MessageSessionFactory sessionFactory : sessionFactoryList) {
               sessionFactory.shutdown();
           }
       }
   
       private static class Sender implements Runnable {
           
           private MessageProducer producer;
           
           private CountDownLatch latch;
   
           public Sender(MessageProducer producer, CountDownLatch latch) {
               this.producer = producer;
               this.latch = latch;
           }
   
           @Override
           public void run() {
               final String topic = "test";
               try {
                   producer.publish(topic);
                   final byte[] bodyData = StringUtils.getBytesUtf8("This is a test message from multi-session factory");
                   Message message = new Message(topic, bodyData);
                   producer.sendMessage(message);
                   producer.shutdown();
               } catch (Throwable ex) {
                   System.out.println("send message error : " + ex);
               } finally {
                   latch.countDown();
               }
           }
       }
   }
Back to top