Elasticsearch Example
Prepare to get module archive
Module archive is in the directory:inlong-sort-standalone/sort-standalone-dist/target/, the archive file is apache-inlong-sort-standalone-${project.version}-bin.tar.gz.
Prepare to modify configuration file
At first, decompress the archive file, copy three files in the directory "conf/es/" to the directory "conf/".
- conf/common.properties, common configuration of all components.
- conf/SortClusterConfig.conf, sink configuration of all sort tasks.
- conf/sid_es_v3.conf, data source configuration example of a sort task, the file name is same with sort task name in SortClusterConfig.conf.
Example: conf/common.properties
clusterId=esv3-sz-sz1
nodeId=nodeId
metricDomains=Sort
metricDomains.Sort.domainListeners=org.apache.inlong.sort.standalone.metrics.prometheus.PrometheusMetricListener
metricDomains.Sort.snapshotInterval=60000
sortChannel.type=org.apache.inlong.sort.standalone.channel.BufferQueueChannel
sortSink.type=org.apache.inlong.sort.standalone.sink.elasticsearch.EsSink
sortSource.type=org.apache.inlong.sort.standalone.source.sortsdk.SortSdkSource
sortClusterConfig.type=file
sortClusterConfig.file=SortClusterConfig.conf
sortSourceConfig.QueryConsumeConfigType=file
# manager config example
#sortClusterConfig.type=manager
#sortSourceConfig.QueryConsumeConfigType=manager
#managerUrlLoaderType=org.apache.inlong.sort.standalone.config.loader.CommonPropertiesManagerUrlLoader
#sortClusterConfig.managerUrl=http://${manager_ip:port}/api/inlong/manager/openapi/sort/getClusterConfig
#sortSourceConfig.managerUrl=http://${manager_ip:port}/api/inlong/manager/openapi/sort/getSortSource
Example: conf/SortClusterConfig.conf
{
"clusterName": "esv3-gz-gz1",
"sortTasks": [{
"name": "sid_es_v3",
"type": "ES",
"idParams": [{
"indexNamePattern": "inlong0fc00000046_{yyyyMMdd}",
"contentOffset": "0",
"inlongGroupId": "testgroup",
"fieldOffset": "0",
"fieldNames": "ftime extinfo t1 t2 t3 t4",
"inlongStreamId": "0fc00000046",
"separator": "|"
}],
"sinkParams": {
"httpHosts": "ip:port",
"password": "password",
"bulkSizeMb": "10",
"flushInterval": "60",
"keywordMaxLength": "32767",
"bulkAction": "4000",
"concurrentRequests": "5",
"maxConnect": "10",
"isUseIndexId": "false",
"username": "elastic"
}
}]
}
Example: conf/sid_es_v3.conf
{
"sortClusterName": "esv3-gz-gz1",
"sortTaskId": "sid_es_v3",
"cacheZones": {
"pc_atta6th_sz1": {
"zoneName": "${PULSAR_CLUSTER_NAME}",
"serviceUrl": "http://${PULSAR_IP}:${PULSAR_PORT}",
"authentication": "${PULSAR_AUTH}",
"topics": [{
"topic": "${TENANT/NAMESPACE/TOPIC}",
"partitionCnt": 10,
"topicProperties": {}
}],
"cacheZoneProperties": {},
"zoneType": "pulsar"
}
}
}
Modify configuration: idParams of Elasticsearch sort task
Parameter | Required | DefaultValue | Remark |
---|---|---|---|
inlongGroupId | Y | NA | inlongGroupId |
inlongStreamId | Y | NA | inlongStreamId |
separator | Y | NA | separator of Inlong datastream in data source |
fieldNames | Y | NA | field name list of Elasticsearch index, separated by space. |
indexNamePattern | Y | NA | index name pattern of Elasticsearch,date time variable include {yyyyMMdd},{yyyyMMddHH},{yyyyMMddHHmm}. |
contentOffset | Y | NA | field index offset of source content |
fieldOffset | Y | NA | offset of Elasticsearch index field name list |
Modify configuration: sinkParams of Elasticsearch sort task
Parameter | Required | DefaultValue | Remark |
---|---|---|---|
httpHosts | Y | NA | Hosts of Elasticsearch |
username | Y | NA | Username of Elasticsearch |
password | Y | NA | Password of Elasticsearch |
isUseIndexId | N | false | Create index id or not |
bulkSizeMb | N | 10 | Max content size per bulk(MB) |
flushInterval | N | 60 | Max interval between flushing operation(Second) |
keywordMaxLength | N | 32767 | Max keyword length(Byte) |
bulkAction | N | 4000 | Max index request per bulk |
maxConnect | N | 10 | Max opening HTTP connect |
concurrentRequests | N | 5 | Max concurrent requests per HTTP connect |
Modify configuration: sid_es_v3.conf
- The file name include sort task name plus the postfix ".conf".
Parameter | Required | Type | DefaultValue | Remark |
---|---|---|---|---|
sortClusterName | Y | String | NA | inlong-sort-standalone cluster id |
sortTaskId | Y | String | NA | Sort task name |
cacheZones | Y | JsonObject<String, JsonObject> | NA | Cache cluster list, Map<cacheClusterName, CacheCluster> |
Modify configuration: CacheCluster
Parameter | Required | Type | DefaultValue | Remark |
---|---|---|---|---|
zoneName | Y | String | NA | cache cluster name |
zoneType | Y | String | NA | [pulsar,tube,kafka] |
serviceUrl | Y | String | NA | Pulsar serviceUrl or Kafka broker list |
authentication | Y | String | NA | Pulsar authentication |
cacheZoneProperties | N | Map<String,String> | NA | Cache consumer configuration |
topics | N | List<Topic> | NA | Topic list of Cache consumer |
Modify configuration: Topic
Parameter | Required | Type | DefaultValue | Remark |
---|---|---|---|---|
topic | Y | String | NA | cache topic name |
partitionCnt | Y | Integer | NA | cache topic partition count |
topicProperties | N | Map<String,String> | NA | Cache topic configuration |