Skip to main content
Version: 1.7.0

Java SDK

Create real-time synchronization task

Create a task on the Dashboard or through the command line, and use Auto Push (autonomous push) as the data source type.

Import Java SDK

The library of the SDK need to be imported into the project before using the SDK. The library can be obtained in the following two ways:

  • Get the source code and compile it yourself and deploy the SDK package to the local warehouse, see How to Build.
  • Imported through maven dependency like this:
    <dependency>
    <groupId>org.apache.inlong</groupId>
    <artifactId>dataproxy-sdk</artifactId>
    <version>1.7.0</version>
    </dependency>

Data report process

After import the SDK, you can instantiate a MessageSender object, call sync(sendMessage()) or async(asyncSendMessage()) interface to report single or multiple(batch) data. see Send Demo. The overall process includes the following three steps:

Initialize SDK

From the demo code, we can see that the client initialization is mainly done in the getMessageSender() function:

public DefaultMessageSender getMessageSender(String localIP, String inLongManagerAddr, String inLongManagerPort, String netTag, String dataProxyGroup, boolean isLocalVisit, boolean isReadProxyIPFromLocal, String configBasePath, int msgType) {
ProxyClientConfig dataProxyConfig = null;
DefaultMessageSender messageSender = null;
try {
// Initialize client configuration. 'test', '123456' is username and password, which need to be replaced according to the environment configuration in actual use.
dataProxyConfig = new ProxyClientConfig(localIP, isLocalVisit, inLongManagerAddr, Integer.valueOf(inLongManagerPort), dataProxyGroup, netTag, "test", "123456");
// Set the local save path of the configuration. This setting is optional. By default, the SDK will create a "/.inlong/" directory under the current user's working directory to store the configuration.
if (StringUtils.isNotEmpty(configBasePath)) {
dataProxyConfig.setConfStoreBasePath(configBasePath);
}
// Set whether to use the local saved configuration or not. This setting is optional. By default, do not use.
dataProxyConfig.setReadProxyIPFromLocal(isReadProxyIPFromLocal);
// Initialize MessageSender object, if there is an exception, an exception will be thrown.
messageSender = DefaultMessageSender.generateSenderByClusterId(dataProxyConfig);
// Set message type to send. This setting is optional. By default, send data in binary format.
messageSender.setMsgtype(msgType);
} catch (Exception e) {
logger.error("getMessageSender has exception e = {}", e);
}
// Return initialization result.
return messageSender;
}

Call the send interface to report data

The SDK data send interface is thread safe, support send single or multiple messages by sync and async two ways. The following demo uses a single sync way to send, and the message does not contain property information:

public void sendTcpMessage(DefaultMessageSender sender, String inlongGroupId, String inlongStreamId, String messageBody, long dt) {
SendResult result = null;
try {
// Sends a single message in sync mode, and does not contain property information
result = sender.sendMessage(messageBody.getBytes("utf8"), inlongGroupId, inlongStreamId,
0, String.valueOf(dt), 20, TimeUnit.SECONDS);
} catch (UnsupportedEncodingException e) {
e.printStackTrace();
}
logger.info("messageSender {}", result);
}

You can also choose different send interfaces to report data according to your business needs. For the details of the interface, please refer to the definition in the MessageSender interface file, which has a detailed introduction, no additional explanation here.

Close SDK

In Demo, there is no close operation. When in use, we can call the close() function of the MessageSender interface object to stop data reporting.

Warning

  • The MessageSender interface object is initialized based on the GroupID, so each MessageSender object can be used differently based on the GroupID, and multiple MessageSender objects can be created in the same process.
  • The SDK provides three different network interaction ways: TCP, HTTP, and UDP. Examples of these three ways are given in the example (refer to TcpClientExample.java, HttpClientExample.java, UdpClientExample.java), and the business can be customized according to its own needs to initialize different MessageSender object.
  • The SDK contains complex network interactions, MessageSender should be used as a resident object. Avoid frequent initialization and shutdown of MessageSender (frequent initialization and shutdown will have a large resource overhead and will affect the timeliness of data reporting).
  • The SDK does not resend the failed message. When using the SDK to report data, if send fails, you need to decide whether to resend according to your own needs.

Error Code Introduction

Common error codes are as follows.

CodeExplainRemarks
SendResult.OKSuccessfully sent
SendResult.TIMEOUTRequest response timeout
SendResult.CONNECTION_BREAKConnection is breaked
SendResult.THREAD_INTERRUPTInterrupt
SendResult.ASYNC_CALLBACK_BUFFER_FULLAsync callback buffer fullIn this case, generally, it is caused by the speed of production data exceeding the response speed of the server. It is recommended to properly sleep when send to avoid blocking.
SendResult.NO_CONNECTIONNo available connectionIn this case, it is recommended to increase the number of available connections.
SendResult.INVALID_DATAInvalid data, failed to report data to DataProxy via HTTP
SendResult.INVALID_ATTRIBUTESThe packets sent are incorrect, such as empty packets or packets containing predefined attributes of the system
SendResult.UNKOWN_ERRORUnknown error

ProxyClientConfig Configuration Introduction

ParameterExplainAdjustment Suggestion
setAliveConnections(int aliveConnections)Set the number of DataProxy connections. Default: 3.1) If the amount of data is large or sensitive to delay, increase this parameter appropriately; 2) According to the size of the DataProxy cluster, adjust this parameter appropriately. For example, if the cluster size is 30, this value can be set to 5 ~ 10; 3) Experience value 15 ~ 20.
setTotalAsyncCallbackSize(int asyncCallbackSize)Set the size of SDK internal buffer queue during async send. The buffer is used to store packets that have been sent but have not received an Ack from the dataProxy. When the buffer reaches this threshold, continue to send data, and will receive an ASYNC_CALLBACK_BUFFER_FULL exception. Default: 50000.1) Normally, there is no need to adjust this parameter; 2) When the amount of data is very large or the load of DataProxy is high, it can be increased appropriately. Be careful not to be too large, which may cause OOM.
setConnectTimeoutMillis(long connectTimeoutMillis)Set the connection timeout interval. Unit: ms, Default: 40000.Set according to the actual environment.
setRequestTimeoutMillis(long requestTimeoutMillis)Set request timeout interval. Unit: ms, Default: 40000.Adjust settings as needed.
setMaxTimeoutCnt(int maxTimeoutCnt)Set the number of timeout disconnections of a single DataProxy connection. The SDK will internally count the DataProxy connections that have timed out and have not received an Ack. If the timeout times of a connection reach the value within a short period of time, the SDK automatically disconnects the connection and selects another DataProxy to create a new connection for data reporting. Default value: 3.If the size of the DataProxy cluster is small, you can appropriately increase this parameter to avoid frequent disconnection in a short time.
setManagerConnectionTimeout(int managerConnectionTimeout)Set the timeout interval for SDK connection to InLong Manager. Unit: ms, Default: 10000.1) When the network environment is not good, the value can be increased appropriately; 2) When the client takes a long time to resolve the domain name, the value can be increased appropriately.
setManagerSocketTimeout(int managerSocketTimeout)Sets the timeout for the SDK to get the DataProxy list from the InLong Manager connection, Unit: ms, Default: 30000.When the network environment is not good, the value can be increased appropriately.