Skip to main content
Version: Next

Kafka

Overview

The Kafka Extract Node supports to read data from Kafka topics. It can support read data in the normal fashion and read data in the upsert fashion. The upsert-kafka connector produces a changelog stream, where each data record represents an update or delete event. The kafka-inlong connector can read data and metadata.

Supported Version

Extract NodeKafka version
Kafka0.10+

Dependencies

In order to set up the Kafka Extract Node, the following provides dependency information for both projects using a build automation tool (such as Maven or SBT) and SQL Client with Sort Connectors JAR bundles.

Maven dependency

<dependency>
<groupId>org.apache.inlong</groupId>
<artifactId>sort-connector-kafka</artifactId>
<version>2.1.0-SNAPSHOT</version>
</dependency>

How to create a Kafka Extract Node

Usage for SQL API

The example below shows how to create a Kafka Extract Node with Flink SQL :

  • connector is kafka-inlong
-- Set checkpoint every 3000 milliseconds                       
Flink SQL> SET 'execution.checkpointing.interval' = '3s';

-- Create a Kafka table 'kafka_extract_node' in Flink SQL
Flink SQL> CREATE TABLE kafka_extract_node (
`id` INT,
`name` STRINTG
) WITH (
'connector' = 'kafka-inlong',
'topic' = 'user',
'properties.bootstrap.servers' = 'localhost:9092',
'properties.group.id' = 'testGroup',
'scan.startup.mode' = 'earliest-offset',
'format' = 'csv'
)

-- Read data
Flink SQL> SELECT * FROM kafka_extract_node;
  • connector is upsert-kafka-inlong
-- Set checkpoint every 3000 milliseconds                       
Flink SQL> SET 'execution.checkpointing.interval' = '3s';

-- Create a Kafka table 'kafka_extract_node' in Flink SQL
Flink SQL> CREATE TABLE kafka_extract_node (
`id` INT,
`name` STRINTG,
PRIMARY KEY (`id`) NOT ENFORCED
) WITH (
'connector' = 'upsert-kafka-inlong',
'topic' = 'user',
'properties.bootstrap.servers' = 'localhost:9092',
'properties.group.id' = 'testGroup',
'scan.startup.mode' = 'earliest-offset',
'key.format' = 'csv',
'value.format' = 'csv'
)

-- Read data
Flink SQL> SELECT * FROM kafka_extract_node;

Usage for InLong Dashboard

TODO: It will be supported in the future.

Usage for InLong Manager Client

TODO: It will be supported in the future.

Kafka Extract Node Options

OptionRequiredDefaultTypeDescription
connectorrequired(none)StringSpecify which connector to use, valid values are: 1. for the Upsert Kafka use: upsert-kafka-inlong 2. for normal Kafka use: kafka-inlong
topicoptional(none)StringTopic name(s) to read data from when the table is used as source. It also supports topic list for source by separating topic by semicolon like topic-1;topic-2. Note, only one of topic-pattern and topic can be specified for sources.
topic-patternoptional(none)StringThe regular expression for a pattern of topic names to read from. All topics with names that match the specified regular expression will be subscribed by the consumer when the job starts running. Note, only one of topic-pattern and topic can be specified for sources.
properties.bootstrap.serversrequired(none)StringComma separated list of Kafka brokers.
properties.group.idrequired(none)StringThe id of the consumer group for Kafka source.
properties.*optional(none)StringThis can set and pass arbitrary Kafka configurations. Suffix names must match the configuration key defined in Kafka Configuration documentation. Flink will remove the properties. key prefix and pass the transformed key and values to the underlying KafkaClient. For example, you can disable automatic topic creation via properties.allow.auto.create.topics = false. But there are some configurations that do not support to set, because Flink will override them, e.g. key.deserializer and value.deserializer.
formatrequired for normal kafka(none)StringThe format used to deserialize and serialize the value part of Kafka messages. Please refer to the formats page for more details and more format options. Note: Either this option or the value.format option are required.
key.formatoptional(none)StringThe format used to deserialize and serialize the key part of Kafka messages. Please refer to the formats page for more details and more format options. Note: If a key format is defined, the 'key.fields' option is required as well. Otherwise the Kafka records will have an empty key.
key.fieldsoptional[]List<String>Defines an explicit list of physical columns from the table schema that configure the data type for the key format. By default, this list is empty and thus a key is undefined. The list should look like 'field1;field2'.
key.fields-prefixoptional(none)StringDefines a custom prefix for all fields of the key format to avoid name clashes with fields of the value format. By default, the prefix is empty. If a custom prefix is defined, both the table schema and 'key.fields' will work with prefixed names. When constructing the data type of the key format, the prefix will be removed and the non-prefixed names will be used within the key format. Please note that this option requires that 'value.fields-include' must be set to 'EXCEPT_KEY'.
value.formatrequired for upsert Kafka(none)StringThe format used to deserialize and serialize the value part of Kafka messages. Please refer to the formats page for more details and more format options.
value.fields-includeoptionalALLEnum Possible values: [ALL, EXCEPT_KEY]Defines a strategy how to deal with key columns in the data type of the value format. By default, 'ALL' physical columns of the table schema will be included in the value format which means that key columns appear in the data type for both the key and value format
scan.startup.modeoptionalgroup-offsetsStringStartup mode for Kafka consumer, valid values are 'earliest-offset', 'latest-offset', 'group-offsets', 'timestamp' and 'specific-offsets'. See the following Start Reading Position for more details.
scan.startup.specific-offsetsoptional(none)StringSpecify offsets for each partition in case of 'specific-offsets' startup mode, e.g. 'partition:0,offset:42;partition:1,offset:300'.
scan.startup.timestamp-millisoptional(none)LongStart from the specified epoch timestamp (milliseconds) used in case of 'timestamp' startup mode.
scan.topic-partition-discovery.intervaloptional(none)DurationInterval for consumer to discover dynamically created Kafka topics and partitions periodically.
inlong.metric.labelsoptional(none)StringInlong metric label, format of value is groupId={groupId}&streamId={streamId}&nodeId={nodeId}.
sink.ignore.changelogoptionalfalseBooleanImporting all changelog mode data ingest into Kafka .

Available Metadata

The following format metadata can be exposed as read-only (VIRTUAL) columns in a table definition. It supports read metadata for format canal-json-inlong.

keyData TypeDescription
value.table_nameSTRINGName of the table that contain the row
value.database_nameSTRINGName of the database that contain the row
value.op_tsTIMESTAMP(3)It indicates the time that the change was made in the database. If the record is read from snapshot of the table instead of the binlog, the value is always 0
value.op_typeSTRINGOperation type, INSERT/UPDATE/DELETE
value.batch_idBIGINTNot important, a simple increment counter
value.is_ddlBOOLEANSource does not emit ddl data, value is false
value.update_beforeARRAY<MAP<STRING, STRING>>The update-before data for UPDATE record
value.mysql_typeMAP<STRING, STRING>MySQL field type
value.pk_namesARRAY<STRING>Primary key
value.sql_typeMAP<STRING, INT>SQL field type
value.tsTIMESTAMP_LTZ(3)The ts_ms field is used to store the information about the local time at which the connector processed/generated the event

The extended CREATE TABLE example demonstrates the syntax for exposing these metadata fields:

CREATE TABLE `kafka_extract_node` (
`id` INT,
`name` STRING,
`database_name` string METADATA FROM 'value.database_name',
`table_name` string METADATA FROM 'value.table_name',
`op_ts` timestamp(3) METADATA FROM 'value.op_ts',
`op_type` string METADATA FROM 'value.op_type',
`batch_id` bigint METADATA FROM 'value.batch_id',
`is_ddl` boolean METADATA FROM 'value.is_ddl',
`update_before` ARRAY<MAP<STRING, STRING>> METADATA FROM 'value.update_before',
`mysql_type` MAP<STRING, STRING> METADATA FROM 'value.mysql_type',
`pk_names` ARRAY<STRING> METADATA FROM 'value.pk_names',
`data` STRING METADATA FROM 'value.data',
`sql_type` MAP<STRING, INT> METADATA FROM 'value.sql_type',
`ingestion_ts` TIMESTAMP(3) METADATA FROM 'value.ts',
) WITH (
'connector' = 'kafka-inlong',
'topic' = 'user',
'properties.bootstrap.servers' = 'localhost:9092',
'properties.group.id' = 'testGroup',
'scan.startup.mode' = 'earliest-offset',
'format' = 'canal-json-inlong'
)

Data Type Mapping

Kafka stores message keys and values as bytes, so Kafka doesn’t have schema or data types. The Kafka messages are deserialized and serialized by formats, e.g. csv, json, avro. Thus, the data type mapping is determined by specific formats. Please refer to Formats pages for more details.