Consumer Example
1 Consumer Example
TubeMQ provides two ways to consumer message, PullConsumer and PushConsumer:
1.1 PullConsumer
public class PullConsumerExample {
    public static void main(String[] args) throws Throwable {
        final String masterHostAndPort = "localhost:8000";
        final String topic = "test";
        final String group = "test-group";
        final ConsumerConfig consumerConfig = new ConsumerConfig(masterHostAndPort, group);
        consumerConfig.setConsumePosition(ConsumePosition.CONSUMER_FROM_LATEST_OFFSET);
        final MessageSessionFactory messageSessionFactory = new TubeSingleSessionFactory(consumerConfig);
        final PullMessageConsumer messagePullConsumer = messageSessionFactory.createPullConsumer(consumerConfig);
        messagePullConsumer.subscribe(topic, null);
        messagePullConsumer.completeSubscribe();
        // wait for client to join the exact consumer queue that consumer group allocated
        while (!messagePullConsumer.isPartitionsReady(1000)) {
            ThreadUtils.sleep(1000);
        }
        while (true) {
            ConsumerResult result = messagePullConsumer.getMessage();
            if (result.isSuccess()) {
                List<Message> messageList = result.getMessageList();
                for (Message message : messageList) {
                    System.out.println("received message : " + message);
                }
                messagePullConsumer.confirmConsume(result.getConfirmContext(), true);
            }
        }
    }
}
1.2 PushConsumer
public class PushConsumerExample {
    public static void test(String[] args) throws Throwable {
        final String masterHostAndPort = "localhost:8000";
        final String topic = "test";
        final String group = "test-group";
        final ConsumerConfig consumerConfig = new ConsumerConfig(masterHostAndPort, group);
        consumerConfig.setConsumePosition(ConsumePosition.CONSUMER_FROM_LATEST_OFFSET);
        final MessageSessionFactory messageSessionFactory = new TubeSingleSessionFactory(consumerConfig);
        final PushMessageConsumer pushConsumer = messageSessionFactory.createPushConsumer(consumerConfig);
        pushConsumer.subscribe(topic, null, new MessageListener() {
            @Override
            public void receiveMessages(PeerInfo peerInfo, List<Message> messages) throws InterruptedException {
                for (Message message : messages) {
                    System.out.println("received message : " + new String(message.getData()));
                }
            }
            @Override
            public Executor getExecutor() {
                return null;
            }
            @Override
            public void stop() {
                //
            }
        });
        pushConsumer.completeSubscribe();
        CountDownLatch latch = new CountDownLatch(1);
        latch.await(10, TimeUnit.MINUTES);
    }
}