Skip to main content
Version: 1.5.0

Iceberg

Overview

Apache Iceberg is a high-performance format for huge analytic tables.

Version

Extract NodeVersion
IcebergIceberg: 0.12.x, 0.13.x

Dependencies

<dependency>
<groupId>org.apache.inlong</groupId>
<artifactId>sort-connector-iceberg</artifactId>
<version>1.5.0</version>
</dependency>

Usage

Usage for SQL API

To create iceberg table in flink, we recommend to use Flink SQL Client because it’s easier for users to understand the concepts.

Step.1 Start a standalone flink cluster within hadoop environment.

# HADOOP_HOME is your hadoop root directory after unpack the binary package.
export HADOOP_CLASSPATH=`$HADOOP_HOME/bin/hadoop classpath`

# Start the flink standalone cluster
./bin/start-cluster.sh

Step.2 Start the Flink SQL client.

We’ve created a separate flink-runtime module in iceberg project to generate a bundled jar, which could be loaded by flink SQL client directly.

If we want to build the flink-runtime bundled jar manually, please just build the inlong project and it will generate the jar under <inlong-root-dir>/inlong-sort/sort-connectors/iceberg/target.

By default, iceberg has included hadoop jars for hadoop catalog. If we want to use hive catalog, we will need to load the hive jars when opening the flink sql client. Fortunately, inlong auto package a bundled hive jar into iceberg. So we could open the sql client as the following:

# HADOOP_HOME is your hadoop root directory after unpack the binary package.
export HADOOP_CLASSPATH=`$HADOOP_HOME/bin/hadoop classpath`

./bin/sql-client.sh embedded -j <flink-runtime-directory>/sort-connector-iceberg-{inlong-version}.jar

Step.3 create a table in current Flink catalog

By default,we do not need to create a catalog ,just use memory catalog. In catalog if catalog-database.catalog-table doesn't exist, it will be created automatic.Here we just load data into it.

Table managed in Hive catalog

The following SQL will create a Flink table in the current Flink catalog, which maps to the iceberg table default_database.iceberg_table managed in iceberg catalog.Because catalog type default is hive,so here do not need to put catalog-type.

CREATE TABLE flink_table (
id BIGINT,
data STRING
) WITH (
'connector'='iceberg',
'catalog-name'='hive_prod',
'uri'='thrift://localhost:9083',
'warehouse'='hdfs://nn:8020/path/to/warehouse'
);

If you want to create a Flink table mapping to a different iceberg table managed in Hive catalog (such as hive_db.hive_iceberg_table in Hive), then you can create Flink table as following:

CREATE TABLE flink_table (
id BIGINT,
data STRING
) WITH (
'connector'='iceberg',
'catalog-name'='hive_prod',
'catalog-database'='hive_db',
'catalog-table'='hive_iceberg_table',
'uri'='thrift://localhost:9083',
'warehouse'='hdfs://nn:8020/path/to/warehouse'
);

The underlying catalog database (hive_db in the above example) will be created automatically if it does not exist when writing records into the Flink table.

Table managed in hadoop catalog

The following SQL will create a Flink table in current Flink catalog, which maps to the iceberg table default_database.flink_table managed in hadoop catalog.

CREATE TABLE flink_table (
id BIGINT,
data STRING
) WITH (
'connector'='iceberg',
'catalog-name'='hadoop_prod',
'catalog-type'='hadoop',
'warehouse'='hdfs://nn:8020/path/to/warehouse'
);

Table managed in custom catalog

The following SQL will create a Flink table in current Flink catalog, which maps to the iceberg table default_database.flink_table managed in custom catalog.

CREATE TABLE flink_table (
id BIGINT,
data STRING
) WITH (
'connector'='iceberg',
'catalog-name'='custom_prod',
'catalog-type'='custom',
'catalog-impl'='com.my.custom.CatalogImpl',
-- More table properties for the customized catalog
'my-additional-catalog-config'='my-value',
...
);

Please check sections under the Integrations tab for all custom catalogs.

Step.4 insert data into iceberg table

INSERT INTO `flink_table` 
SELECT
`id` AS `id`,
`d` AS `name`
FROM `source_table`

Usage for InLong Dashboard

TODO

Usage for InLong Manager Client

TODO

Feature

Multiple table sink

Currently Iceberg support multiple table sinking, it require FLINK SQL create table parameters add
'sink.multiple.enable' = 'true' and target table schema can only be defined as BYTES or STRING Examples as follows:

CREATE TABLE `table_2`(
`data` STRING)
WITH (
'connector'='iceberg-inlong',
'catalog-name'='hive_prod',
'uri'='thrift://localhost:9083',
'warehouse'='hdfs://localhost:8020/hive/warehouse',
'sink.multiple.enable' = 'true',
'sink.multiple.format' = 'canal-json',
'sink.multiple.add-column.policy' = 'TRY_IT_BEST',
'sink.multiple.database-pattern' = '${database}',
'sink.multiple.table-pattern' = 'test_${table}'
);

To support multiple sink, it is necessary to set the serialization format of upstream data (Via option 'sink.multiple.format' to set, currently only supports [canal-json|debezium-json]).

dynamic dababase/table Extraction

Iceberg can customize mapping rules for database names and table names, it can fill in placeholders and add prefixes and suffixes to modify the mapped target table name. Iceberg Load Node will extract 'sink.multiple.database-pattern' as target database name, extract 'sink.multiple.table-pattern' as target table name, The placeholder is parsed from the data, the variable is strictly represented by '${VARIABLE_NAME}', the value of the variable comes from the data itself, it can be a metadata field of a Format specified by 'sink.multiple.format', or it can be a physical field in the data. Examples of 'topic-parttern' are as follows:

  • 'sink.multiple.format' is 'canal-json':

The upstream data is:

{
"data": [
{
"id": "111",
"name": "scooter",
"description": "Big 2-wheel scooter",
"weight": "5.18"
}
],
"database": "inventory",
"es": 1589373560000,
"id": 9,
"isDdl": false,
"mysqlType": {
"id": "INTEGER",
"name": "VARCHAR(255)",
"description": "VARCHAR(512)",
"weight": "FLOAT"
},
"old": [
{
"weight": "5.15"
}
],
"pkNames": [
"id"
],
"sql": "",
"sqlType": {
"id": 4,
"name": 12,
"description": 12,
"weight": 7
},
"table": "products",
"ts": 1589373560798,
"type": "UPDATE"
}

'topic-pattern' is '{database}_${table}', and the extracted topic is 'inventory_products' ('source.db', 'source.table' are metadata fields, and 'id' are physical fields)

'topic-pattern' is '{database}${table}${id}', and the extracted topic is 'inventory_products_111' ('source.db', 'source.table' are metadata fields, and 'id' are physical fields)

Auto create database/table

Iceberg can auto create database and auto create table in multiple sink scenes if database and table not exists, and it supports capture new table at runtime。 default Iceberg table parameters: 'format-version' = '2''write.upsert.enabled' = 'true'''engine.hive.enabled' = 'true'

Dynamic schema evolution

Iceberg support schema evolution from source table to target table in multiple sink scenes(DDL synchronize), supported schema evolution:

schema evolution typesupported
Column addtrue
Column deletefalse
Column reorderfalse
Column renamefalse
Column type updatefalse

Iceberg Load Node Options

OptionRequiredDefaultTypeDescription
connectorrequired(none)StringSpecify what connector to use, here should be 'iceberg'.
catalog-typerequiredhiveStringhive or hadoop for built-in catalogs, or left unset for custom catalog implementations using catalog-impl.
catalog-namerequired(none)StringCatalog name.
catalog-databaserequired(none)StringDatabase name managed in the iceberg catalog.
catalog-tablerequired(none)StringTable name managed in the underlying iceberg catalog and database.
catalog-imploptional for custom catalog(none)StringThe fully-qualified class name custom catalog implementation, must be set if catalog-type is unset.
cache-enabledoptionaltrueBooleanWhether to enable catalog cache, default value is true
urirequired for hive catalog(none)StringThe Hive metastore’s thrift URI.
clientsoptional for hive catalog2IntegerThe Hive metastore client pool size, default value is 2.
warehouseoptional for hadoop catalog or hive catalog(none)StringFor Hive catalog,is the Hive warehouse location, users should specify this path if neither set the hive-conf-dir to specify a location containing a hive-site.xml configuration file nor add a correct hive-site.xml to classpath. For hadoop catalog,The HDFS directory to store metadata files and data files.
hive-conf-diroptional for hive catalog(none)StringPath to a directory containing a hive-site.xml configuration file which will be used to provide custom Hive configuration values. The value of hive.metastore.warehouse.dir from <hive-conf-dir>/hive-site.xml (or hive configure file from classpath) will be overwrote with the warehouse value if setting both hive-conf-dir and warehouse when creating iceberg catalog.
inlong.metric.labelsoptional(none)StringInlong metric label, format of value is groupId={groupId}&streamId={streamId}&nodeId={nodeId}.
sink.multiple.enableoptionalfalseBooleanWhether to enable multiple sink
sink.multiple.schema-update.policyoptionalTRY_IT_BESTEnumThe policy to handle the inconsistency between the schema in the data and the schema of the target table
TRY_IT_BEST: try best, deal with as much as possible, ignore it if can't handled.
IGNORE_WITH_LOG:ignore it and log it,ignore this table later.
THROW_WITH_STOP:throw exception and stop the job, until user deal with schema conflict and job restore.
sink.multiple.pk-auto-generatedoptionalfalseBooleanWhether auto generate primary key, regard all field combined as primary key in multiple sink scenes.
sink.multiple.typemap-compatible-with-sparkoptionalfalseBooleanWhether to adapt spark type system in auto generate table.

Data Type Mapping

Iceberg data type detail. Here is iceberg type convert to flink type when load data.

Flink SQL TypeIceberg Type
CHARSTRING
VARCHARSTRING
STRINGSTRING
BOOLEANBOOLEAN
BINARYFIXED(L)
VARBINARYBINARY
DECIMALDECIMAL(P,S)
TINYINTINT
SMALLINTINT
INTEGERINT
BIGINTLONG
FLOATFLOAT
DOUBLEDOUBLE
DATEDATE
TIMETIME
TIMESTAMPTIMESTAMP
TIMESTAMP_LTZTIMESTAMPTZ
INTERVAL-
ARRAYLIST
MULTISETMAP
MAPMAP
ROWSTRUCT
RAW-