Skip to main content
版本:1.10.0

Doris

概述

Doris Extract 节点 支持从 Doris 中读取数据。本章节介绍如何设置 Doris Extract 节点以对 Doris 数据库运行 SQL 查询。

支持的版本

Extract 节点Doris 版本
Doris0.13+

依赖

为了设置 Doris Extract 节点, 下面提供了使用构建自动化工具(例如 Maven 或 SBT)所需要的依赖信息。

Maven 依赖

<dependency>
<groupId>org.apache.inlong</groupId>
<artifactId>sort-connector-doris</artifactId>
<version>1.10.0-SNAPSHOT</version>
</dependency>

准备

创建 Doris Extract 表

先在 Doris 数据库中创建表 doris_extract_node, 命令如下:

[root@fe001 ~]# mysql -u root -h localhost -P 9030 -p000000
mysql> use test;
Reading table information for completion of table and column names
You can turn off this feature to get a quicker startup with -A

Database changed
mysql> CREATE TABLE `doris_extract_node` (
`id` int(11) NOT NULL COMMENT "用户id",
`name` varchar(50) NOT NULL COMMENT "昵称",
`dr` tinyint(4) NULL COMMENT "逻辑删除"
) ENGINE=OLAP
UNIQUE KEY(`id`)
COMMENT "OLAP"
DISTRIBUTED BY HASH(`id`) BUCKETS 1
PROPERTIES (
"replication_allocation" = "tag.location.default: 1",
"in_memory" = "false",
"storage_format" = "V2"
);
Query OK, 0 rows affected (0.03 sec)

mysql> insert into doris_extract_node values(1, 'zhangsan', 0),(2, 'lisi', 0),(3, 'wangwu', 0);
Query OK, 3 rows affected (0.07 sec)
{'label':'insert_29d973e9509a48d4-a20e9f0e2d510605', 'status':'VISIBLE', 'txnId':'1032'}

mysql> select * from doris_extract_node;
+------+---------+------+
| id | name | dr |
+------+---------+------+
| 1 | zhansan | 0 |
| 2 | lisi | 0 |
| 3 | wangwu | 0 |
+------+---------+------+
3 rows in set (0.02 sec)

如何创建 Doris Extract 节点

SQL API 用法

下面这个例子展示了如何用 Flink SQL 创建一个 Doris Extract 节点:

  • 连接器是 doris
# 启动flink sql-client, 加载 doris connector jar包
[root@tasknode001 flink-1.13.5]# ./bin/sql-client.sh -l ./opt/connectors/doris/

-- 使用 Flink SQL 创建 Doris 表 'doris_extract_node'
Flink SQL> CREATE TABLE doris_extract_node (
`id` INT,
`name` STRINTG,
`dr` TINYINT
) WITH (
'connector' = 'doris',
'fenodes' = 'localhost:8030',
'table.identifier' = 'test.doris_extract_node',
'username' = 'root',
'password' = '000000'
);

-- 读取数据
Flink SQL> SELECT * FROM doris_extract_node;

InLong Dashboard 用法

TODO: 将在未来支持此功能。

InLong Manager Client 用法

TODO: 将在未来支持此功能。

Doris Extract 节点参数

参数是否必选默认值数据类型描述
connector必选(none)string指定要使用的连接器 doris
fenodes必选(none)stringDoris FE http 地址
table.identifier必选(none)stringDoris 表名,如:db1.tbl1
username必选(none)string访问 Doris 的用户名
password必选(none)string访问 Doris 的密码
doris.request.retries可选3int向 Doris 发送请求的重试次数
doris.request.connect.timeout.ms可选30000int向 Doris 发送请求的连接超时时间
doris.request.read.timeout.ms可选30000int向 Doris 发送请求的读取超时时间
doris.request.query.timeout.s可选3600int查询 Doris 的超时时间,默认值为1小时,-1表示无超时限制
doris.request.tablet.size可选Integer.MAX_VALUEint一个 Partition 对应的 Doris Tablet 个数。
此数值设置越小,则会生成越多的 Partition。从而提升 Flink 侧的并行度,但同时会对 Doris 造成更大的压力。
doris.batch.size可选1024int一次从 BE 读取数据的最大行数。增大此数值可减少 Flink 与 Doris 之间建立连接的次数。
从而减轻网络延迟所带来的的额外时间开销。
doris.exec.mem.limit可选2147483648long单个查询的内存限制。默认为 2GB,单位为字节
doris.deserialize.arrow.async可选falseboolean是否支持异步转换 Arrow 格式到 flink-doris-connector 迭代所需的 RowBatch
doris.deserialize.queue.size可选64int异步转换 Arrow 格式的内部处理队列,当 doris.deserialize.arrow.async 为 true 时生效
doris.read.field可选(none)string读取 Doris 表的列名列表,多列之间使用逗号分隔
doris.filter.query可选(none)string过滤读取数据的表达式,此表达式透传给 Doris。Doris 使用此表达式完成源端数据过滤。

数据类型映射

Doris TypeFlink Type
NULL_TYPENULL
BOOLEANBOOLEAN
TINYINTTINYINT
SMALLINTSMALLINT
INTINT
BIGINTBIGINT
FLOATFLOAT
DOUBLEDOUBLE
DATESTRING
DATETIMESTRING
DECIMALDECIMAL
CHARSTRING
LARGEINTSTRING
VARCHARSTRING
DECIMALV2DECIMAL
TIMEDOUBLE
HLLUnsupported datatype

请参阅 flink-doris-connector 页面以获取更多细节。