消费者示例
1 Consumer 示例
TubeMQ 提供了两种方式来消费消息: PullConsumer and PushConsumer。
1.1 PullConsumer
public class PullConsumerExample {
    public static void main(String[] args) throws Throwable {
        final String masterHostAndPort = "localhost:8000";
        final String topic = "test";
        final String group = "test-group";
        final ConsumerConfig consumerConfig = new ConsumerConfig(masterHostAndPort, group);
        consumerConfig.setConsumePosition(ConsumePosition.CONSUMER_FROM_LATEST_OFFSET);
        final MessageSessionFactory messageSessionFactory = new TubeSingleSessionFactory(consumerConfig);
        final PullMessageConsumer messagePullConsumer = messageSessionFactory.createPullConsumer(consumerConfig);
        messagePullConsumer.subscribe(topic, null);
        messagePullConsumer.completeSubscribe();
        // wait for client to join the exact consumer queue that consumer group allocated
        while (!messagePullConsumer.isPartitionsReady(1000)) {
            ThreadUtils.sleep(1000);
        }
        while (true) {
            ConsumerResult result = messagePullConsumer.getMessage();
            if (result.isSuccess()) {
                List<Message> messageList = result.getMessageList();
                for (Message message : messageList) {
                    System.out.println("received message : " + message);
                }
                messagePullConsumer.confirmConsume(result.getConfirmContext(), true);
            }
        }
    }
}
1.2 PushConsumer
  public class PushConsumerExample {
  
       public static void test(String[] args) throws Throwable {
           final String masterHostAndPort = "localhost:8000";
           final String topic = "test";
           final String group = "test-group";
           final ConsumerConfig consumerConfig = new ConsumerConfig(masterHostAndPort, group);
           consumerConfig.setConsumePosition(ConsumePosition.CONSUMER_FROM_LATEST_OFFSET);
           final MessageSessionFactory messageSessionFactory = new TubeSingleSessionFactory(consumerConfig);
           final PushMessageConsumer pushConsumer = messageSessionFactory.createPushConsumer(consumerConfig);
           pushConsumer.subscribe(topic, null, new MessageListener() {
               @Override
               public void receiveMessages(PeerInfo peerInfo, List<Message> messages) throws InterruptedException {
                   for (Message message : messages) {
                       System.out.println("received message : " + new String(message.getData()));
                   }
               }
               @Override
               public Executor getExecutor() {
                   return null;
               }
               @Override
               public void stop() {
                   //
               }
           });
           pushConsumer.completeSubscribe();
           CountDownLatch latch = new CountDownLatch(1);
           latch.await(10, TimeUnit.MINUTES);
       }
   }
Back to top