Skip to main content
版本:1.11.0

Iceberg

概览

Apache Iceberg 是一种用于大型分析表的高性能格式。

版本

提取节点版本
IcebergIceberg:0.12.x,0.13.x

依赖项

<dependency>
<groupId>org.apache.inlong</groupId>
<artifactId>sort-connector-iceberg</artifactId>
<version>1.11.0-SNAPSHOT</version>
</dependency>

配置 Iceberg 数据抽取节点

  • 下载 Apache Hadoop
  • 修改 jobmanager.shtaskmanager.sh,加入 Hadoop 环境变量。启动命令可以参考 Apache Flink
export HADOOP_CLASSPATH=`$HADOOP_HOME/bin/hadoop classpath`
  • 修改 docker/docker-compose 目录下的 docker-compose.yml,将 HadoopFlink 启动命令 挂载至容器中:
  jobmanager:
image: apache/flink:1.13-scala_2.11
container_name: jobmanager
user: root
environment:
- |
FLINK_PROPERTIES=
jobmanager.rpc.address: jobmanager
volumes:
# Mount Hadoop
- HADOOP_HOME:HADOOP_HOME
# Mount the modified jobmanager.sh which adds the HADOOP_HOME env correctly
- /jobmanager.sh:/opt/flink/bin/jobmanager.sh
ports:
- "8081:8081"
command: jobmanager

taskmanager:
image: apache/flink:1.13-scala_2.11
container_name: taskmanager
environment:
- |
FLINK_PROPERTIES=
jobmanager.rpc.address: jobmanager
taskmanager.numberOfTaskSlots: 2
volumes:
# Mount Hadoop
- HADOOP_HOME:HADOOP_HOME
# Mount the modified taskmanager.sh which adds the HADOOP_HOME env correctly
- /taskmanager.sh:/opt/flink/bin/taskmanager.sh
command: taskmanager

使用 Flink sql client 之前,sql-client.sh 启动命令也需要添加 Hadoop 环境变量,并挂载至容器。 启动命令可以参考 Apache Flink

export HADOOP_CLASSPATH=`$HADOOP_HOME/bin/hadoop classpath`

使用 Flink sql cli

CREATE TABLE `iceberg_table_source`(
PRIMARY KEY (`_id`) NOT ENFORCED,
`_id` STRING,
`id` INT,
`name` STRING,
`age` INT)
WITH (
'connector' = 'iceberg-inlong',
'catalog-database' = 'DATABASES',
'catalog-table' = 'TABLE',
'catalog-type' = 'HIVE',
'catalog-name' = 'HIVE',
'streaming' = 'true',
'uri' = 'thrift://127.0.0.1:9083'
);

Dashboard

页面点击 数据源 → 新建 → Iceberg

img.png

Manager Client

TODO

参数信息

选项必填类型描述
connector必填String指定要使用的 Connector,这里应该是 'iceberg-inlong'
catalog-database必填String在 Iceberg 目录中管理的数据库名称
catalog-table必填String在 Iceberg 目录和数据库中管理的表名
catalog-type必填Stringhivehadoop 用于内置目录
catalog-name必填String目录名称
uri必填StringHive 元存储的 thrift URI,如:thrift://127.0.0.1:9083
warehouse可选String对于 Hive 目录,是 Hive 仓库位置。对于 hadoop 目录,是 HDFS 目录存放元数据文件和数据文件
inlong.metric.labels可选String在 long metric label 中,value 的格式为 groupId=xxgroup&streamId=xxstream&nodeId=xxnode

数据类型映射

Flink SQL TypeIceberg Type
CHARSTRING
VARCHARSTRING
STRINGSTRING
BOOLEANBOOLEAN
BINARYFIXED(L)
VARBINARYBINARY
DECIMALDECIMAL(P,S)
TINYINTINT
SMALLINTINT
INTEGERINT
BIGINTLONG
FLOATFLOAT
DOUBLEDOUBLE
DATEDATE
TIMETIME
TIMESTAMPTIMESTAMP
TIMESTAMP_LTZTIMESTAMPTZ
INTERVAL-
ARRAYLIST
MULTISETMAP
MAPMAP
ROWSTRUCT
RAW-