Skip to main content
Version: Next

Pulsar To Elasticsearch Example

Prepare To Get Module Archive

Module archive is in the directory:inlong-sort-standalone/sort-standalone-dist/target/, the archive file is apache-inlong-sort-standalone-${project.version}-bin.tar.gz.

Prepare To Modify Configuration File

At first, decompress the archive file, copy three files in the directory conf/es/ to the directory conf/.

  • conf/common.properties, common configuration of all components.
  • conf/SortClusterConfig.conf, sink configuration of all sort tasks.
  • conf/sid_es_v3.conf, data source configuration example of a sort task, the file name is same with sort task name in SortClusterConfig.conf.

Example: conf/common.properties

# inlong-sort-standalone cluster id
clusterId=esv3-sz-sz1
# Current node ID
nodeId=nodeId
# Domain name of metric
metricDomains=Sort
# Class name list of metric listener, separated by space
metricDomains.Sort.domainListeners=org.apache.inlong.sort.standalone.metrics.prometheus.PrometheusMetricListener
# Interval snapshoting metric data(millisecond)
metricDomains.Sort.snapshotInterval=60000
# Channel class name
sortChannel.type=org.apache.inlong.sort.standalone.channel.BufferQueueChannel
# Sink class name. Different distribution types use different Sink classes
sortSink.type=org.apache.inlong.sort.standalone.sink.elasticsearch.EsSink
# Source class name
sortSource.type=org.apache.inlong.sort.standalone.source.sortsdk.SortSdkSource
# There are three ways to load cluster configuration data: [file, Manager, custom class].
sortClusterConfig.type=file
# When the cluster configuration data is loaded from a file, the name of the configuration file in the classpath
sortClusterConfig.file=SortClusterConfig.conf
# There are three ways to load the Sort task configuration data: [file, Manager, custom class]
sortSourceConfig.QueryConsumeConfigType=file

Example: conf/SortClusterConfig.conf

{
"clusterName": "esv3-gz-gz1",
"sortTasks": [
{
"name": "sid_es_v3",
"type": "ES",
"idParams": [
{
"indexNamePattern": "inlong0fc00000046_{yyyyMMdd}",
"contentOffset": "0",
"inlongGroupId": "testgroup",
"fieldOffset": "0",
"fieldNames": "ftime extinfo t1 t2 t3 t4",
"inlongStreamId": "0fc00000046",
"separator": "|"
}
],
"sinkParams": {
"httpHosts": "ip:port",
"password": "password",
"bulkSizeMb": "10",
"flushInterval": "60",
"keywordMaxLength": "32767",
"bulkAction": "4000",
"concurrentRequests": "5",
"maxConnect": "10",
"isUseIndexId": "false",
"username": "elastic"
}
}
]
}

Example: conf/sid_es_v3.conf

{
"sortClusterName": "esv3-gz-gz1",
"sortTaskId": "sid_es_v3",
"cacheZones": {
"pc_inlong6th_sz1": {
"zoneName": "${PULSAR_CLUSTER_NAME}",
"serviceUrl": "http://${PULSAR_IP}:${PULSAR_PORT}",
"authentication": "${PULSAR_AUTH}",
"topics": [
{
"topic": "${TENANT/NAMESPACE/TOPIC}",
"partitionCnt": 10,
"topicProperties": {}
}
],
"cacheZoneProperties": {},
"zoneType": "pulsar"
}
}
}

Configuration for conf/common.properties

ParameterRequiredDefaultValueRemark
clusterIdYNAinlong-sort-standalone collection unique identification
nodeIdNLocalhost IPCurrent node ID
metricDomainsNSortIndex summary name
metricDomains.Sort.domainListenersNorg.apache.inlong.sort.standalone.metrics.prometheus.PrometheusMetricListenerList of indexes and list of equipment categories, empty case interval
metricDomains.Sort.snapshotIntervalN60000The retry timeout for subscribing to a tube, in ms
prometheusHttpPortN8080Parameters of org.apache.inlong.sort.standalone.metrics.prometheus.PrometheusMetricListener, HttpServer port of Prometheus
sortChannel.typeNorg.apache.inlong.sort.standalone.channel.BufferQueueChannelChannel type
sortSink.typeYNASink class name. Different distribution types use different Sink classes.
sortSource.typeNorg.apache.inlong.sort.standalone.source.sortsdk.SortSdkSourceSource class name
sortClusterConfig.typeNmanagerThere are three ways to load cluster configuration data: [file, Manager, custom class].
sortClusterConfig.fileNSortClusterConfig.confWhen the cluster configuration data is loaded from a file, the name of the configuration file in the classpath
sortClusterConfig.managerUrlNNAWhen the cluster configuration data is loaded from the manager, define the URL of InlongManager here
For example: http://${manager ip:port}/api/inlong/manager/openapi/sort/standalone/getClusterConfig
sortSourceConfig.QueryConsumeConfigTypeNmanagerThere are three ways to load the Sort task configuration data: [file, Manager, custom class].
If the loading path is file, the Sort task configuration file is in the class path, and the file name format is: ${sortTaskId}.conf.
sortSourceConfig.managerUrlNNAWhen the Sort task configuration data loading source is manager, define the URL of InlongManager here
For example::http://${manager ip:port}/api/inlong/manager/openapi/sort/standalone/getSortSource

Configuration For SortClusterConfig.conf

  • SortClusterConfig.conf source file in ClassPath, but does not support real-time updates
  • Can obtain configuration from the HTTP interface of Inlong Manager, supporting real-time updates
ParameterRequiredTypesDefaultValueRemark
clusterNameYStringNAinlong-sort-standalone cluster unique identifier
sortTasksYJsonArray<SortTaskConfig>NASort task list

Configuration For SortTaskConfig

ParameterRequiredDefaultValueRemark
nameYNASort task name
typeYNASort assignment type, like: HIVE("hive"), TUBE("tube"), KAFKA("kafka"), PULSAR("pulsar"), ElasticSearch("ElasticSearch"), UNKNOWN("n")
idParamsYNAInlong data stream parameter list
sinkParamsYNASort task parameters

IdParams Configuration For Sort-Elasticsearch

ParameterRequiredDefaultValueRemark
inlongGroupIdYNAinlongGroupId
inlongStreamIdYNAinlongStreamId
separatorYNADelimiter
fieldNamesYNAElasticsearch Index field list, separated by spaces
indexNamePatternYNAIndex name template supports three date and time format variables: {yyyyMMdd}, {yyyyMMddHH}, {yyyyMMddHHmm}
contentOffsetYNAThe valid field start offset of the source data, starting from 0
fieldOffsetYNAThe starting offset of the Elasticsearch Index field list

SinkParams Configuration For Sort-Elasticsearch

ParameterRequiredDefaultValueRemark
httpHostsYNAElasticsearch host IP port
usernameYNAElasticsearch Username
passwordYNAElasticsearch Password
isUseIndexIdNfalseWhether to create IndexId affects the distribution of Index fragments
bulkSizeMbN10The maximum size of a single bulk, in MB
flushIntervalN60The interval between disk flushing, in seconds
keywordMaxLengthN32767The maximum length of a single keyword, in bytes
bulkActionN4000Maximum number of IndexRequests for a single Bulk
maxConnectN10Maximum number of HTTP connections
concurrentRequestsN5The maximum number of pending requests for a single HTTP connection

sid_es_v3.conf Configuration For Sort-Elasticsearch Task

  • File name format: Sort task name + .conf.
  • Can read from the SortClusterConfig.conf source file in the ClassPath, but does not support live updates.
  • Can be obtained from the HTTP interface of Inlong Manager, which supports real-time updates.

Configuration For sid_es_v3.conf

ParameterRequiredTypeDefaultValueRemark
sortClusterNameYStringNAinlong-sort-standalone cluster unique identifier
sortTaskIdYStringNASort task name
cacheZonesYJsonObject<String, JsonObject>NA缓存层集群列表,格式:Map<cacheClusterName, CacheCluster>

Configuration For CacheCluster

ParameterRequiredTypeDefaultValueRemark
zoneNameYStringNACache layer cluster name
zoneTypeYStringNACache type: [pulsar,tube,kafka]
serviceUrlYStringNAPulsar's serviceUrl parameter, or Kafka's Broker list
authenticationYStringNAPulsar Authentication
cacheZonePropertiesNMap<String,String>NACache layer Consumer parameters
topicsNList<Topic>NAList of topics consumed by the cache layer

Configuration For Topic

ParameterRequiredTypeDefaultValueRemark
topicYStringNATopic full name, Pulsar: tenant/namespace/topic
partitionCntYIntegerNANumber of Topic Partitions
topicPropertiesNMap<String,String>NAConsumer parameters of cache layer topics

Start inlong-sort-standalone Application

Finally, execute the script ./bin/sort-start.sh to start the sort-standalone application. You can then check the log file sort.log to confirm the startup status.