C++ SDK
Build TubeMQ C++ SDK
C++ SDK is based on the non-boost asio, and the CMake is used for building, the commands are:
# enter the root directory of c++ sdk source
git clone https://github.com/apache/inlong.git
cd inlong/inlong-tubemq/tubemq-client-twins/tubemq-client-cpp
mkdir build && cd build
cmake ..
make -j8 # the thread num is based on the cpu cores
The building can also be completed in docker environment provided by InLong.
# pull image
docker pull inlong/tubemq-cpp
# start container and mount the source code
docker run -it --net=host -v ${REPLACE_BY_CPP_SOURCE_DIR_PATH}:/tubemq-cpp/ inlong/tubemq-cpp /bin/bash
# same as the build process in physical machines
mkdir build && cd build
cmake ..
make -j8
C++ SDK API
Similar with other MQ systems,C++ SDK is diveded into Producer and Consumer. The API of Producer and Consumer are introduced respectively below.
First of all, regardless of role, start the background global TubeMQ SDK service (support the default C++ namespace
is tubemq
)。
bool StartTubeMQService(string& err_info, const TubeMQServiceConfig& serviceConfig);
Look up the return boolean value and the err_info
to check the start process is successful。
Producer
The user-level api class is TubeMQProducer
,the first thing is initializing the class.
#include "tubemq/tubemq_client.h" // header file of TubeMQProducer
TubeMQProducer producer;
Set the config of producer, then start the producer
ProducerConfig producer_config;
producer_config.SetRpcReadTimeoutMs(20000);
producer_config.SetMasterAddrInfo(err_info, master_addr);
bool result = producer.start(); // start producer
When register2Master
is successed,producer will send heartbeat request to master periodically to update the meta info of topics. Then users can publish topics, multiple topics can be published at once, and the param type is std::set
.
std::set topics{"topic_0", "topic_1"};
bool result = producer.Publish(err_info, topics);
After publishing, construct the message
#include "tubemq/tubemq_message.h" // header file of tubemq::Message
std::string topic_name = "demo";
std::string send_data = "hello_world";
Message message(topic_name, send_data.c_str(), send_data.size()); // type is const char*
There are two SendMessage
API: synchronous sending and asynchronous sending.
// sync send
bool TubeMQProducer::SendMessage(string& err_info, const Message& message);
// async send
void TubeMQProducer::SendMessage(const Message& message, const std::function<void(const ErrorCode&)>& callback);
How to use these two SendMessage
bool result = producer.SendMessage(err_info, message);
void callback(const ErrorCode&);
producer.SendMessage(message, callback); // callback can be other invoked objects
Synchronous sending will block until the result is returned, asynchronous sending will not, and the returnted result is received through the user-defined callback function.
Finally, close the producer.
producer.ShutDown();
Consumer
Similar with producer,initialize the consumer object and set the config.
#include "tubemq/tubemq_client.h" // header file of TubeMQConsumer
TubeMQConsumer consumer;
// config of consumption
ConsumerConfig consumer_config;
consumer_config.SetRpcReadTimeoutMs(20000);
consumer_config.SetMasterAddrInfo(err_info, master_addr);
// set the consume group and the topic
consumer_config.SetGroupConsumeTarget(err_info, group_name, topic_list);
// start the consumer
consumer.start();
After starting the consumer, invoke the GetMessage
to get messages.
ConsumerResult get_result;
ConsumerResult confirm_result;
bool result = consumer.GetMessage(get_result);
if (result) {
list<Message> messages = get_result.GetMessageList();
consumer.Confirm(get_result.GetConfirmContext(), true, confirm_result);
}
// stop the consumer
consumer.ShutDown();
Example
There are more detailed examples about C++ SDK in C++ SDK Example,the compiled executable is located in build/example/producer
and build/example/consumer