Skip to main content
Version: 1.5.0

Elasticsearch Example

Prepare to get module archive

Module archive is in the directory:inlong-sort-standalone/sort-standalone-dist/target/, the archive file is apache-inlong-sort-standalone-${project.version}-bin.tar.gz.

Prepare to modify configuration file

At first, decompress the archive file, copy three files in the directory "conf/es/" to the directory "conf/".

  • conf/common.properties, common configuration of all components.
  • conf/SortClusterConfig.conf, sink configuration of all sort tasks.
  • conf/sid_es_v3.conf, data source configuration example of a sort task, the file name is same with sort task name in SortClusterConfig.conf.

Example: conf/common.properties

clusterId=esv3-sz-sz1
nodeId=nodeId
metricDomains=Sort
metricDomains.Sort.domainListeners=org.apache.inlong.sort.standalone.metrics.prometheus.PrometheusMetricListener
metricDomains.Sort.snapshotInterval=60000
sortChannel.type=org.apache.inlong.sort.standalone.channel.BufferQueueChannel
sortSink.type=org.apache.inlong.sort.standalone.sink.elasticsearch.EsSink
sortSource.type=org.apache.inlong.sort.standalone.source.sortsdk.SortSdkSource

sortClusterConfig.type=file
sortClusterConfig.file=SortClusterConfig.conf
sortSourceConfig.QueryConsumeConfigType=file

# manager config example
#sortClusterConfig.type=manager
#sortSourceConfig.QueryConsumeConfigType=manager
#managerUrlLoaderType=org.apache.inlong.sort.standalone.config.loader.CommonPropertiesManagerUrlLoader
#sortClusterConfig.managerUrl=http://${manager_ip:port}/api/inlong/manager/openapi/sort/getClusterConfig
#sortSourceConfig.managerUrl=http://${manager_ip:port}/api/inlong/manager/openapi/sort/getSortSource

Example: conf/SortClusterConfig.conf

{
"clusterName": "esv3-gz-gz1",
"sortTasks": [{
"name": "sid_es_v3",
"type": "ES",
"idParams": [{
"indexNamePattern": "inlong0fc00000046_{yyyyMMdd}",
"contentOffset": "0",
"inlongGroupId": "testgroup",
"fieldOffset": "0",
"fieldNames": "ftime extinfo t1 t2 t3 t4",
"inlongStreamId": "0fc00000046",
"separator": "|"
}],
"sinkParams": {
"httpHosts": "ip:port",
"password": "password",
"bulkSizeMb": "10",
"flushInterval": "60",
"keywordMaxLength": "32767",
"bulkAction": "4000",
"concurrentRequests": "5",
"maxConnect": "10",
"isUseIndexId": "false",
"username": "elastic"
}
}]
}

Example: conf/sid_es_v3.conf

{
"sortClusterName": "esv3-gz-gz1",
"sortTaskId": "sid_es_v3",
"cacheZones": {
"pc_atta6th_sz1": {
"zoneName": "${PULSAR_CLUSTER_NAME}",
"serviceUrl": "http://${PULSAR_IP}:${PULSAR_PORT}",
"authentication": "${PULSAR_AUTH}",
"topics": [{
"topic": "${TENANT/NAMESPACE/TOPIC}",
"partitionCnt": 10,
"topicProperties": {}
}],
"cacheZoneProperties": {},
"zoneType": "pulsar"
}
}
}

Modify configuration: idParams of Elasticsearch sort task

ParameterRequiredDefaultValueRemark
inlongGroupIdYNAinlongGroupId
inlongStreamIdYNAinlongStreamId
separatorYNAseparator of Inlong datastream in data source
fieldNamesYNAfield name list of Elasticsearch index, separated by space.
indexNamePatternYNAindex name pattern of Elasticsearch,date time variable include {yyyyMMdd},{yyyyMMddHH},{yyyyMMddHHmm}.
contentOffsetYNAfield index offset of source content
fieldOffsetYNAoffset of Elasticsearch index field name list

Modify configuration: sinkParams of Elasticsearch sort task

ParameterRequiredDefaultValueRemark
httpHostsYNAHosts of Elasticsearch
usernameYNAUsername of Elasticsearch
passwordYNAPassword of Elasticsearch
isUseIndexIdNfalseCreate index id or not
bulkSizeMbN10Max content size per bulk(MB)
flushIntervalN60Max interval between flushing operation(Second)
keywordMaxLengthN32767Max keyword length(Byte)
bulkActionN4000Max index request per bulk
maxConnectN10Max opening HTTP connect
concurrentRequestsN5Max concurrent requests per HTTP connect

Modify configuration: sid_es_v3.conf

  • The file name include sort task name plus the postfix ".conf".
ParameterRequiredTypeDefaultValueRemark
sortClusterNameYStringNAinlong-sort-standalone cluster id
sortTaskIdYStringNASort task name
cacheZonesYJsonObject<String, JsonObject>NACache cluster list, Map<cacheClusterName, CacheCluster>

Modify configuration: CacheCluster

ParameterRequiredTypeDefaultValueRemark
zoneNameYStringNAcache cluster name
zoneTypeYStringNA[pulsar,tube,kafka]
serviceUrlYStringNAPulsar serviceUrl or Kafka broker list
authenticationYStringNAPulsar authentication
cacheZonePropertiesNMap<String,String>NACache consumer configuration
topicsNList<Topic>NATopic list of Cache consumer

Modify configuration: Topic

ParameterRequiredTypeDefaultValueRemark
topicYStringNAcache topic name
partitionCntYIntegerNAcache topic partition count
topicPropertiesNMap<String,String>NACache topic configuration