Skip to main content
Version: Next

Pulsar To Hive Example

Prepare To Get Module Archive

Module archive is in the directory:inlong-sort-standalone/sort-standalone-dist/target/, the archive file is apache-inlong-sort-standalone-${project.version}-bin.tar.gz.

Prepare To Modify Configuration File

At first, decompress the archive file, copy three files in the directory conf/hive/ to the directory conf/.

  • conf/common.properties, common configuration of all components.
  • conf/SortClusterConfig.conf, sink configuration of all sort tasks.
  • conf/sid_hive_inlong6th_v3.conf, data source configuration example of a sort task, the file name is same with sort task name in SortClusterConfig.conf.

Example: conf/common.properties

# inlong-sort-standalone cluster id
clusterId=hivev3-sz-sz1
# Current node ID
nodeId=nodeId
# Domain name of metric
metricDomains=Sort
# Class name list of metric listener, separated by space
metricDomains.Sort.domainListeners=org.apache.inlong.sort.standalone.metrics.prometheus.PrometheusMetricListener
# Interval snapshoting metric data(millisecond)
metricDomains.Sort.snapshotInterval=60000
# Channel class name
sortChannel.type=org.apache.inlong.sort.standalone.channel.BufferQueueChannel
# Sink class name. Different distribution types use different Sink classes
sortSink.type=org.apache.inlong.sort.standalone.sink.hive.HiveSink
# Source class name
sortSource.type=org.apache.inlong.sort.standalone.source.sortsdk.SortSdkSource
# There are three ways to load cluster configuration data: [file, Manager, custom class].
sortClusterConfig.type=file
# When the cluster configuration data is loaded from a file, the name of the configuration file in the classpath
sortClusterConfig.file=SortClusterConfig.conf
# There are three ways to load the Sort task configuration data: [file, Manager, custom class]
sortSourceConfig.QueryConsumeConfigType=file

Example: conf/SortClusterConfig.conf

{
"data": {
"clusterName": "hivev3-sz-sz1",
"sortTasks": [
{
"idParams": [
{
"inlongGroupId": "0fc00000046",
"inlongStreamId": "",
"separator": "|",
"partitionIntervalMs": 3600000,
"idRootPath": "/user/hive/warehouse/t_inlong_v1_0fc00000046",
"partitionSubPath": "/{yyyyMMdd}/{yyyyMMddHH}",
"hiveTableName": "t_inlong_v1_0fc00000046",
"partitionFieldName": "dt",
"partitionFieldPattern": "yyyyMMddHH",
"msgTimeFieldPattern": "yyyy-MM-dd HH:mm:ss",
"maxPartitionOpenDelayHour": 8
},
{
"inlongGroupId": "03600000045",
"inlongStreamId": "",
"separator": "|",
"partitionIntervalMs": 3600000,
"idRootPath": "/user/hive/warehouse/t_inlong_v1_03600000045",
"partitionSubPath": "/{yyyyMMdd}/{yyyyMMddHH}",
"hiveTableName": "t_inlong_v1_03600000045",
"partitionFieldName": "dt",
"partitionFieldPattern": "yyyyMMddHH",
"msgTimeFieldPattern": "yyyy-MM-dd HH:mm:ss",
"maxPartitionOpenDelayHour": 8
},
{
"inlongGroupId": "05100054990",
"inlongStreamId": "",
"separator": "|",
"partitionIntervalMs": 3600000,
"idRootPath": "/user/hive/warehouse/t_inlong_v1_05100054990",
"partitionSubPath": "/{yyyyMMdd}/{yyyyMMddHH}",
"hiveTableName": "t_inlong_v1_05100054990",
"partitionFieldName": "dt",
"partitionFieldPattern": "yyyyMMddHH",
"msgTimeFieldPattern": "yyyy-MM-dd HH:mm:ss",
"maxPartitionOpenDelayHour": 8
},
{
"inlongGroupId": "09c00014434",
"inlongStreamId": "",
"separator": "|",
"partitionIntervalMs": 3600000,
"idRootPath": "/user/hive/warehouse/t_inlong_v1_09c00014434",
"partitionSubPath": "/{yyyyMMdd}/{yyyyMMddHH}",
"hiveTableName": "t_inlong_v1_09c00014434",
"partitionFieldName": "dt",
"partitionFieldPattern": "yyyyMMddHH",
"msgTimeFieldPattern": "yyyy-MM-dd HH:mm:ss",
"maxPartitionOpenDelayHour": 8
},
{
"inlongGroupId": "0c900035509",
"inlongStreamId": "",
"separator": "|",
"partitionIntervalMs": 3600000,
"idRootPath": "/user/hive/warehouse/t_inlong_v1_0c900035509",
"partitionSubPath": "/{yyyyMMdd}/{yyyyMMddHH}",
"hiveTableName": "t_inlong_v1_0c900035509",
"partitionFieldName": "dt",
"partitionFieldPattern": "yyyyMMddHH",
"msgTimeFieldPattern": "yyyy-MM-dd HH:mm:ss",
"maxPartitionOpenDelayHour": 8
}
],
"name": "sid_hive_inlong6th_v3",
"sinkParams": {
"hdfsPath": "hdfs://127.0.0.1:9000",
"maxFileOpenDelayMinute": "5",
"tokenOvertimeMinute": "60",
"maxOutputFileSizeGb": "2",
"hiveJdbcUrl": "jdbc:hive2://127.0.0.2:10000",
"hiveDatabase": "default",
"hiveUsername": "hive",
"hivePassword": "hive"
},
"type": "HIVE"
}
]
},
"errCode": 0,
"md5": "md5",
"result": true
}

Example: conf/sid_hive_inlong6th_v3.conf

{
"sortClusterName": "hivev3-sz-sz1",
"sortTaskId": "sid_hive_inlong6th_v3",
"cacheZones": {
"pc_inlong6th_sz1": {
"zoneName": "${PULSAR_CLUSTER_NAME}",
"serviceUrl": "http://${PULSAR_IP}:${PULSAR_PORT}",
"authentication": "${PULSAR_AUTH}",
"topics": [
{
"topic": "${TENANT/NAMESPACE/TOPIC}",
"partitionCnt": 10,
"topicProperties": {}
}
],
"cacheZoneProperties": {},
"zoneType": "pulsar"
}
}
}

Configuration For conf/common.properties

ParameterRequiredDefaultValueRemark
clusterIdYNAinlong-sort-standalone collection unique identification
nodeIdNLocalhost IPCurrent node ID
metricDomainsNSortIndex summary name
metricDomains.Sort.domainListenersNorg.apache.inlong.sort.standalone.metrics.prometheus.PrometheusMetricListenerList of indexes and list of equipment categories, empty case interval
metricDomains.Sort.snapshotIntervalN60000The retry timeout for subscribing to a tube, in ms
prometheusHttpPortN8080Parameters of org.apache.inlong.sort.standalone.metrics.prometheus.PrometheusMetricListener, HttpServer port of Prometheus
sortChannel.typeNorg.apache.inlong.sort.standalone.channel.BufferQueueChannelChannel type
sortSink.typeYNASink class name. Different distribution types use different Sink classes.
sortSource.typeNorg.apache.inlong.sort.standalone.source.sortsdk.SortSdkSourceSource class name
sortClusterConfig.typeNmanagerThere are three ways to load cluster configuration data: [file, Manager, custom class].
sortClusterConfig.fileNSortClusterConfig.confWhen the cluster configuration data is loaded from a file, the name of the configuration file in the classpath
sortClusterConfig.managerUrlNNAWhen the cluster configuration data is loaded from the manager, define the URL of InlongManager here
For example: http://${manager ip:port}/api/inlong/manager/openapi/sort/standalone/getClusterConfig
sortSourceConfig.QueryConsumeConfigTypeNmanagerThere are three ways to load the Sort task configuration data: [file, Manager, custom class].
If the loading path is file, the Sort task configuration file is in the class path, and the file name format is: ${sortTaskId}.conf.
sortSourceConfig.managerUrlNNAWhen the Sort task configuration data loading source is manager, define the URL of InlongManager here
For example::http://${manager ip:port}/api/inlong/manager/openapi/sort/standalone/getSortSource

Configuration For SortClusterConfig.conf

  • SortClusterConfig.conf source file in ClassPath, but does not support real-time updates
  • Can obtain configuration from the HTTP interface of Inlong Manager, supporting real-time updates
ParameterRequiredTypesDefaultValueRemark
clusterNameYStringNAinlong-sort-standalone cluster unique identifier
sortTasksYJsonArray<SortTaskConfig>NASort task list

Configuration For SortTaskConfig

ParameterRequiredDefaultValueRemark
nameYNASort task name
typeYNASort assignment type, like: HIVE("hive"), TUBE("tube"), KAFKA("kafka"), PULSAR("pulsar"), ElasticSearch("ElasticSearch"), UNKNOWN("n")
idParamsYNAInlong data stream parameter list
sinkParamsYNASort task parameters

IdParams Configuration For Sort-Hive Task

ParameterRequiredDefaultValueRemark
inlongGroupIdYNAinlongGroupId
inlongStreamIdYNAinlongStreamId
separatorYNADelimiter
partitionIntervalMsN3600000Partition interval, in milliseconds
idRootPathYNAHDFS root directory of Inlong data stream
partitionSubPathYNAPartition subdirectories for inlong data streams
hiveTableNameYNAHive table name of the Inlong data stream
partitionFieldNameNdtPartition field name of the Inlong data stream
partitionFieldPatternYNAThe partition field value format of the Inlong data stream, such as {yyyyMMdd}, {yyyyMMddHH}, {yyyyMMddHHmm}
msgTimeFieldPatternYNAThe field value format of the message generation time, Java time format
maxPartitionOpenDelayHourN8Maximum opening delay time of the partition, in hours

SinkParams Configuration For Sort-Hive Task

ParameterRequiredDefaultValueRemark
hdfsPathYNAHDFS nameNode
maxFileOpenDelayMinuteN5Maximum write time of a single HDFS file, in minutes
tokenOvertimeMinuteN60The maximum time it takes to create a token for a partition of a single Inlong data stream, in minutes
maxOutputFileSizeGbN2Maximum size of a single HDFS file, in GB
hiveJdbcUrlYNAHive JDBC Path
hiveDatabaseYNAHive Database
hiveUsernameYNAHive Username
hivePasswordYNAHive Password

sid_hive_inlong6th_v3.conf Configuration For Sort-Hive Task

  • File name format: Sort task name + .conf.
  • Can read from the SortClusterConfig.conf source file in the ClassPath, but does not support live updates.
  • Can be obtained from the HTTP interface of Inlong Manager, which supports real-time updates.

Configuration For sid_hive_inlong6th_v3.conf

ParameterRequiredTypeDefaultValueRemark
sortClusterNameYStringNAinlong-sort-standalone Unique identifier of the cluster
sortTaskIdYStringNASort task name
cacheZonesYJsonObject<String, JsonObject>NACache layer cluster list, format: Map<cacheClusterName, CacheCluster>

Configuration For CacheCluster

ParameterRequiredTypeDefaultValueRemark
zoneNameYStringNACache layer cluster name
zoneTypeYStringNACache type: [pulsar,tube,kafka]
serviceUrlYStringNAPulsar's serviceUrl parameter, or Kafka's Broker list
authenticationYStringNAPulsar Authentication
cacheZonePropertiesNMap<String,String>NACache layer Consumer parameters
topicsNList<Topic>NAList of topics consumed by the cache layer

Configuration For Topic

ParameterRequiredTypeDefaultValueRemark
topicYStringNATopic full name, Pulsar: tenant/namespace/topic
partitionCntYIntegerNANumber of Topic Partitions
topicPropertiesNMap<String,String>NAConsumer parameters of cache layer topics

Start inlong-sort-standalone Application

Finally, execute the script ./bin/sort-start.sh to start the sort-standalone application. You can then check the log file sort.log to confirm the startup status.