Skip to main content
版本:Next

ClickHouse

概览

ClickHouse Load 节点支持将数据写入 ClickHouse 数据库。 本文档介绍如何设置 ClickHouse Load 节点以对 ClickHouse 数据库运行 SQL 查询。

支持的版本

Load 节点DriverGroup IdArtifact IdJAR
ClickHouseClickHouseru.yandex.clickhouseclickhouse-jdbc下载

依赖

为了设置 ClickHouse Load 节点, 下面提供了使用构建自动化工具(例如 Maven 或 SBT)和带有 Sort Connector JAR 包的 SQL 客户端的两个项目的依赖关系信息。

Maven 依赖

<dependency>
<groupId>org.apache.inlong</groupId>
<artifactId>sort-connector-jdbc</artifactId>
<version>2.1.0-SNAPSHOT</version>
</dependency>

如何创建 ClickHouse Load 节点

SQL API 用法


-- MySQL Extract 节点
CREATE TABLE `mysql_extract_table`(
PRIMARY KEY (`id`) NOT ENFORCED,
`id` BIGINT,
`name` STRING,
`age` INT
) WITH (
'connector' = 'mysql-cdc-inlong',
'url' = 'jdbc:mysql://localhost:3306/read',
'username' = 'inlong',
'password' = 'inlong',
'table-name' = 'user'
)

-- ClickHouse Load 节点
CREATE TABLE `clickhouse_load_table`(
PRIMARY KEY (`id`) NOT ENFORCED,
`id` BIGINT,
`name` STRING,
`age` INT
) WITH (
'connector' = 'jdbc-inlong',
'dialect-impl' = 'org.apache.inlong.sort.jdbc.dialect.ClickHouseDialect',
'url' = 'jdbc:clickhouse://localhost:8123/demo',
'username' = 'inlong',
'password' = 'inlong',
'table-name' = 'demo.user'
)

-- 写数据到 ClickHouse
INSERT INTO clickhouse_load_table
SELECT id, name , age FROM mysql_extract_table;

InLong Dashboard 用法

创建数据流时,数据流方向选择ClickHouse,点击“添加”进行配置。

ClickHouse Configuration

InLong Manager Client 用法

TODO: 将在未来支持此功能。

ClickHouse Load 节点参数

参数是否必选默认值数据类型描述
connector必选(none)String指定使用什么类型的连接器,这里应该是 'jdbc-inlong'。
url必选(none)StringJDBC 数据库 url。
dialect-impl必选(none)Stringorg.apache.inlong.sort.jdbc.dialect.ClickHouseDialect
table-name必选(none)String连接到 JDBC 表的名称。例子:database.tableName
driver可选(none)String用于连接到此 URL 的 JDBC 驱动类名,如果不设置,将自动从 URL 中推导。
username可选(none)StringJDBC 用户名。如果指定了 'username' 和 'password' 中的任一参数,则两者必须都被指定。
password可选(none)StringJDBC 密码。
connection.max-retry-timeout可选60sDuration最大重试超时时间,以秒为单位且不应该小于 1 秒。
sink.buffer-flush.max-rows可选100Integerflush 前缓存记录的最大值,可以设置为 '0' 来禁用它。
sink.buffer-flush.interval可选1sDurationflush 间隔时间,超过该时间后异步线程将 flush 数据。可以设置为 '0' 来禁用它。注意, 为了完全异步地处理缓存的 flush 事件,可以将 'sink.buffer-flush.max-rows' 设置为 '0' 并配置适当的 flush 时间间隔。
sink.max-retries可选3Integer写入记录到数据库失败后的最大重试次数。
sink.parallelism可选(none)Integer用于定义 JDBC sink 算子的并行度。默认情况下,并行度是由框架决定:使用与上游链式算子相同的并行度。
sink.ignore.changelog可选falseBoolean忽略所有 RowKind 类型的变更日志,将它们当作 INSERT 的数据来采集。
inlong.metric.labels可选(none)Stringinlong metric 的标签值,该值的构成为groupId={groupId}&streamId={streamId}&nodeId={nodeId}

数据类型映射

ClickHouse typeFlink SQL type
StringCHAR
String
IP
UUID
VARCHAR
String
EnumL
STRING
UInt8BOOLEAN
FixedStringBYTES
Decimal
Int128
Int256
UInt64
UInt128
UInt256
DECIMAL
Int8TINYINT
Int16
UInt8
SMALLINT
Int32
UInt16
Interval
INTEGER
Int64
UInt32
BIGINT
Float32FLOAT
DateDATE
DateTimeTIME
DateTimeTIMESTAMP
DateTimeTIMESTAMP_LTZ
Int32INTERVAL_YEAR_MONTH
Int64INTERVAL_DAY_TIME
ArrayARRAY
MapMAP
Not supportedROW
Not supportedMULTISET
Not supportedRAW