Skip to main content
Version: 1.9.0



Apache Pulsar is a distributed, open source pub-sub messaging and steaming platform for real-time workloads, managing hundreds of billions of events per day.


Extract NodeVersion
PulsarPulsar: >= 2.8.x




Usage for SQL API

Step.1 Ready for sql client

The SQL Client is used to write SQL queries for manipulating data in Pulsar, you can use the -addclasspath option to add sort-connector-pulsar-{{INLONG_VERSION}}.jar package.


./bin/ embedded --jar sort-connector-pulsar_{{INLONG_VERSION}}.jar

Note If you put the JAR package of our connector under $FLINK_HOME/lib, do not use --jar again to specify the package of the connector.

Step.2 Read data from pulsar

`physical_1` STRING,
`physical_2` INT,
`key` STRING ,
`physical_3` BOOLEAN
) WITH (
'connector' = 'pulsar-inlong',
'topic' = 'persistent://public/default/topic82547611',
'key.format' = 'raw',
'key.fields' = 'key',
'value.format' = 'avro',
'service-url' = 'pulsar://localhost:6650',
'admin-url' = 'http://localhost:8080',
'scan.startup.mode' = 'earliest'

INSERT INTO `sink_table`
`physical_1` AS `physical_1`,
`physical_2` AS `physical_2`
FROM `pulsar`
('data 1', 1, TIMESTAMP '2020-03-08 13:12:11.123', MAP['k11', 'v11', 'k12', 'v12'], 'key1', TRUE),
('data 2', 2, TIMESTAMP '2020-03-09 13:12:11.123', MAP['k21', 'v21', 'k22', 'v22'], 'key2', FALSE),
('data 3', 3, TIMESTAMP '2020-03-10 13:12:11.123', MAP['k31', 'v31', 'k32', 'v32'], 'key3', TRUE)

Usage for InLong Dashboard


Usage for InLong Manager Client


Pulsar Extract Node Options

ParameterRequiredDefault valueTypeDescription
connectorrequired(none)StringSet the connector type. Available options are pulsar-inlong.
topicoptional(none)StringSet the input or output topic, use half comma for multiple and concatenate topics. Choose one with the topic-pattern.
topic-patternoptional(none)StringUse regular to get the matching topic.
service-urlrequired(none)StringSet the Pulsar broker service address.
admin-urloptional(none)StringSet the Pulsar administration service address.When this parameter is not passed in, the startup mode only supports earliest and latest, and the offset in the pulsar cluster cannot be updated.
scan.startup.modeoptionallatestStringConfigure the Source's startup mode. Available options are earliest, latest, external-subscription, and specific-offsets.
scan.startup.specific-offsetsoptional(none)StringThis parameter is required when the specific-offsets parameter is specified.
scan.startup.sub-nameoptional(none)StringThis parameter is required when the external-subscription parameter is specified.
discovery topic intervaloptional(none)LongSet the time interval for partition discovery, in unit of milliseconds.
sink.message-routeroptionalkey-hashStringSet the routing method for writing messages to the Pulsar partition. Available options are key-hash, round-robin, and custom MessageRouter.
sink.semanticoptionalat-least-onceStringThe Sink writes the assurance level of the message. Available options are at-least-once, exactly-once, and none.
propertiesoptionalemptyMapSet Pulsar's optional configurations, in a format of properties.key='value'. For details, see Configuration parameters.
key.formatoptional(none)StringSet the key-based serialization format for Pulsar messages. Available options are No format, optional raw, Avro, JSON, etc.
key.fieldsoptional(none)StringThe SQL definition field to be used when serializing Key, multiple by half comma , concatenated.
key.fields-prefixoptional(none)StringDefine a custom prefix for all fields in the key format to avoid name conflicts with fields in the value format. By default, the prefix is empty. If a custom prefix is defined, the Table schema and key.fields are used.
format or value.formatrequired(none)StringSet the name with a prefix. When constructing data types in the key format, the prefix is removed and non-prefixed names are used within the key format. Pulsar message value serialization format, support JSON, Avro, etc. For more information, see the Flink format.
value.fields-includeoptionalALLEnumThe Pulsar message value contains the field policy, optionally ALL, and EXCEPT_KEY.
inlong.metric.labelsoptional(none)StringInlong metric label, format of value is groupId={groupId}&streamId={streamId}&nodeId={nodeId}.

Available Metadata

The METADATA flag is used to read and write metadata in Pulsar messages. The support list is as follows.

Note The R/W column defines whether a metadata field is readable (R) and/or writable (W). Read-only columns must be declared VIRTUAL to exclude them during an INSERT INTO operation.

KeyData TypeDescriptionR/W
topicSTRING NOT NULLTopic name of the Pulsar message.R
messageIdBYTES NOT NULLMessage ID of the Pulsar message.R
sequenceIdBIGINT NOT NULLsequence ID of the Pulsar message.R
publishTimeTIMESTAMP(3) WITH LOCAL TIME ZONE NOT NULLPublishing time of the Pulsar message.R
eventTimeTIMESTAMP(3) WITH LOCAL TIME ZONE NOT NULLGeneration time of the Pulsar message.R/W
propertiesMAP<STRING, STRING> NOT NULLExtensions information of the Pulsar message.R/W

Data Type Mapping

Pulsar stores message keys and values as bytes, so Pulsar doesn’t have schema or data types. The Pulsar messages are deserialized and serialized by formats, e.g. csv, json, avro. Thus, the data type mapping is determined by specific formats. Please refer to Formats pages for more details.