Version: 1.9.0

DataProxy Plugin


DataProxy implements an abstract unified MQ (Message Queue) sink model, so that developers and easily extend mq sink types under standard MessageQueueZoneSink. By default, Apache Pulsar, Apache Kafka and InLong TubeMQ are already integrated. This article guides developers to extend new MQ types accordingly.

Concepts and Models

DataProxy is a message flow architecture based on Apache Flume with its Source + Channel + Sink components. Here we focus on the sink layer alone.

  • MessageQueueZoneSink: The standard MQ sink provided by DataProxy, supposedly to support all kinds of MQ types.
  • MessageQueueHandler: The abstract MQ handler interface that deals with connecting, sending data to, and disconnecting the MQ cluster.
  • EventHandler: The interface to convert MQ message header and body when required. For example to convert the data protocol.

When a new MQ cluster type needs to be integrated, developers should at least implement the MessageQueueHandler interface as plugin. Optionally they can also implement the EventHandler interface to transform data as required. The plugin classes can be specified and pulled from manager as configuration information so that new MQ type can be easily extended on the fly.


The concepts introduced above can be represented by the following figure:

Development Process

In the rest of the article we use the Kafka MQ with ProtoBuffer message format as an example. Here's what to do:

  • Implement the subclass plugin of MessageQueueHandler, namely KafKaHandler and its init / start /stop / send methods.
  • Implenent the EventHandler interface as ProtoBufferEventHandler and its parseHeader / parseBody method

Plugin Implementation


private class KafkaHandler implements MessageQueueHandler {

private EventHandler handler;

public void init(CacheClusterConfig config, MessageQueueZoneSinkContext sinkContext) {
// initialize configuration and event handler

public void start() {
// create Kafka Producer

public void stop() {
// close Kafka Producer

public boolean send(BatchPackProfile event) {
// process and send data


public class ProtoBufferEventHandler implements EventHandler {

public Map<String, String> parseHeader(IdTopicConfig idConfig, BatchPackProfile profile, String nodeId,
// retrieve, process and convert event header

public byte[] parseBody(IdTopicConfig idConfig, BatchPackProfile profile, INLONG_COMPRESSED_TYPE compressType)
throws IOException {
// retrieve and repack event to ProtoBuffer message

(See the full implementation from inlong-dataproxy module)

Plugin Configuration


The sink configuration please refer to Sink Configuration Exampnle