Skip to main content
版本:Next

C++ SDK

构建 TubeMQ C++ SDK

C++ SDK 主要基于非 Boost 版本的 Asio 进行通信,构建时使用 CMake,具体操作步骤如下:

# 进入到 cpp sdk 的根路径
git clone https://github.com/apache/inlong.git
cd inlong/inlong-tubemq/tubemq-client-twins/tubemq-client-cpp

mkdir build && cd build

cmake ..

make -j8 # 可根据自己机器的核数决定编译时使用的线程数量

上述构建过程也可以在 InLong 提供的镜像中完成,具体操作如下

# 拉取镜像
docker pull inlong/tubemq-cpp

# 启动容器,将cpp sdk的源码挂载到容器中
docker run -it --net=host -v ${REPLACE_BY_CPP_SOURCE_DIR_PATH}:/tubemq-cpp/ inlong/tubemq-cpp /bin/bash

# 之后与在物理机上相同
mkdir build && cd build
cmake ..
make -j8

C++ SDK API

与其它的消息队列系统类似,C++ SDK 分为生产者 (Producer) 以及消费者 (Consumer),下面分别介绍 Producer 和 Consumer 的API使用。

首先,无论是生产者还是消费者,先启动 TubeMQ SDK 的后台全局服务 (以下例子都假定使用的默认 C++ namespacetubemq)。

bool StartTubeMQService(string& err_info, const TubeMQServiceConfig& serviceConfig);

通过返回值以及 err_info 判断启动是否成功。

Producer

对于用户来说,直接接触的类为 TubeMQProducer,在使用时需要首先创建实例 。

#include "tubemq/tubemq_client.h" // TubeMQProducer定义头文件

TubeMQProducer producer;

之后设置 producer 的配置,并启动。

ProducerConfig producer_config;
producer_config.SetRpcReadTimeoutMs(20000);
producer_config.SetMasterAddrInfo(err_info, master_addr);
bool result = producer.start(); // 启动 producer

当日志显示注册 master 成功后,producer 会以一定频率向 master 发送心跳,以获得最新的关于 topic 的信息,之后用户可以开始订阅想发送消息的 topic,支持订阅多个 topic,传入的 topics 参数类型为 std::set

std::set topics{"topic_0", "topic_1"};
bool result = producer.Publish(err_info, topics);

订阅之后就可以发送消息,首先对消息进行构造

#include "tubemq/tubemq_message.h" // tubemq::Message 定义头文件

std::string topic_name = "demo";
std::string send_data = "hello_tubemq";
Message message(topic_name, send_data.c_str(), send_data.size()); // 接受数据类型为 const char*

构造好消息后,即可发送,SDK 提供了同步和异步两种发送方式

// 同步发送
bool TubeMQProducer::SendMessage(string& err_info, const Message& message);
// 异步发送
void TubeMQProducer::SendMessage(const Message& message, const std::function<void(const ErrorCode&)>& callback);

分别通过如下方式使用

bool result = producer.SendMessage(err_info, message);

void callback(const ErrorCode&);
producer.SendMessage(message, callback); // callback 也可以为其它可调用的对象,如 lambda函数

同步发送在得到结果前将会阻塞,异步发送则不会,调用后通过用户自定义的回调函数接收返回的结果。

最后,关闭 producer

producer.ShutDown();

Consumer

Consumer 同 Producer 类似,都要先初始化 Consumer 实例,并设置配置,订阅 topic 等操作。

#include "tubemq/tubemq_client.h" // TubeMQConsumer 定义头文件

TubeMQConsumer consumer;

// 消费相关的配置
ConsumerConfig consumer_config;
consumer_config.SetRpcReadTimeoutMs(20000);
consumer_config.SetMasterAddrInfo(err_info, master_addr);
// consumer中除了订阅 topic,还要设置消费组
consumer_config.SetGroupConsumeTarget(err_info, group_name, topic_list);

// 启动 consumer
consumer.start();

启动 consumer 后,调用 GetMessage 获取消息即可

ConsumerResult get_result;
ConsumerResult confirm_result;

bool result = consumer.GetMessage(get_result);
if (result) {
list<Message> messages = get_result.GetMessageList();
consumer.Confirm(get_result.GetConfirmContext(), true, confirm_result);
}

// stop the consumer
consumer.ShutDown();

Example

关于 C++ SDK 的更加详细的用例,可以参考 C++ SDK Example,编译好的可执行文件位置在 build/example/producerbuild/example/consumer