Iceberg
概览
Apache Iceberg 是一种用于大型分析表的高性能格式。
版本
提取节点 | 版本 |
---|---|
Iceberg | Iceberg:0.12.x,0.13.x |
依赖项
<dependency>
<groupId>org.apache.inlong</groupId>
<artifactId>sort-connector-iceberg</artifactId>
<version>1.11.0-SNAPSHOT</version>
</dependency>
配置 Iceberg 数据抽取节点
- 下载
Apache Hadoop
- 修改
jobmanager.sh
和taskmanager.sh
,加入Hadoop
环境变量。启动命令可以参考 Apache Flink
export HADOOP_CLASSPATH=`$HADOOP_HOME/bin/hadoop classpath`
- 修改
docker/docker-compose
目录下的docker-compose.yml
,将Hadoop
和Flink 启动命令
挂载至容器中:
jobmanager:
image: apache/flink:1.13-scala_2.11
container_name: jobmanager
user: root
environment:
- |
FLINK_PROPERTIES=
jobmanager.rpc.address: jobmanager
volumes:
# Mount Hadoop
- HADOOP_HOME:HADOOP_HOME
# Mount the modified jobmanager.sh which adds the HADOOP_HOME env correctly
- /jobmanager.sh:/opt/flink/bin/jobmanager.sh
ports:
- "8081:8081"
command: jobmanager
taskmanager:
image: apache/flink:1.13-scala_2.11
container_name: taskmanager
environment:
- |
FLINK_PROPERTIES=
jobmanager.rpc.address: jobmanager
taskmanager.numberOfTaskSlots: 2
volumes:
# Mount Hadoop
- HADOOP_HOME:HADOOP_HOME
# Mount the modified taskmanager.sh which adds the HADOOP_HOME env correctly
- /taskmanager.sh:/opt/flink/bin/taskmanager.sh
command: taskmanager
Flink SQL API
使用 Flink sql client 之前,sql-client.sh
启动命令也需要添加 Hadoop 环境变量,并挂载至容器。
启动命令可以参考 Apache Flink
export HADOOP_CLASSPATH=`$HADOOP_HOME/bin/hadoop classpath`
使用 Flink sql cli
:
CREATE TABLE `iceberg_table_source`(
PRIMARY KEY (`_id`) NOT ENFORCED,
`_id` STRING,
`id` INT,
`name` STRING,
`age` INT)
WITH (
'connector' = 'iceberg-inlong',
'catalog-database' = 'DATABASES',
'catalog-table' = 'TABLE',
'catalog-type' = 'HIVE',
'catalog-name' = 'HIVE',
'streaming' = 'true',
'uri' = 'thrift://127.0.0.1:9083'
);
Dashboard
页面点击 数据源 → 新建 → Iceberg
Manager Client
TODO
参数信息
选项 | 必填 | 类型 | 描述 |
---|---|---|---|
connector | 必填 | String | 指定要使用的 Connector,这里应该是 'iceberg-inlong' |
catalog-database | 必填 | String | 在 Iceberg 目录中管理的数据库名称 |
catalog-table | 必填 | String | 在 Iceberg 目录和数据库中管理的表名 |
catalog-type | 必填 | String | hive 或 hadoop 用于内置目录 |
catalog-name | 必填 | String | 目录名称 |
uri | 必填 | String | Hive 元存储的 thrift URI,如:thrift://127.0.0.1:9083 |
warehouse | 可选 | String | 对于 Hive 目录,是 Hive 仓库位置。对于 hadoop 目录,是 HDFS 目录存放元数据文件和数据文件 |
inlong.metric.labels | 可选 | String | 在 long metric label 中,value 的格式为 groupId=xxgroup&streamId=xxstream&nodeId=xxnode |
数据类型映射
Flink SQL Type | Iceberg Type |
---|---|
CHAR | STRING |
VARCHAR | STRING |
STRING | STRING |
BOOLEAN | BOOLEAN |
BINARY | FIXED(L) |
VARBINARY | BINARY |
DECIMAL | DECIMAL(P,S) |
TINYINT | INT |
SMALLINT | INT |
INTEGER | INT |
BIGINT | LONG |
FLOAT | FLOAT |
DOUBLE | DOUBLE |
DATE | DATE |
TIME | TIME |
TIMESTAMP | TIMESTAMP |
TIMESTAMP_LTZ | TIMESTAMPTZ |
INTERVAL | - |
ARRAY | LIST |
MULTISET | MAP |
MAP | MAP |
ROW | STRUCT |
RAW | - |