Skip to main content
版本:1.9.0

SQLServer

概览

SQLServer Load 节点支持将数据写入 SQLServer 数据库。 本文档介绍如何设置 SQLServer Load 节点以对 SQLServer 数据库运行 SQL 查询。

支持的版本

Load 节点DriverGroup IdArtifact IdJAR
SQLServerSQL Servercom.microsoft.sqlservermssql-jdbc下载

依赖

为了设置 SQLServer Load 节点, 下面提供了使用构建自动化工具(例如 Maven 或 SBT)和带有 Sort Connector JAR 包的 SQL 客户端的两个项目的依赖关系信息。

Maven 依赖

<dependency>
<groupId>org.apache.inlong</groupId>
<artifactId>sort-connector-jdbc</artifactId>
<version>1.9.0</version>
</dependency>

如何创建 SQLServer Load 节点

SQL API 用法


-- SQLServer extract node
CREATE TABLE `sqlserver_extract_table`(
PRIMARY KEY (`id`) NOT ENFORCED,
`id` BIGINT,
`name` STRING,
`age` INT
) WITH (
'connector' = 'sqlserver-cdc-inlong',
'url' = 'jdbc:sqlserver://localhost:1433/read',
'username' = 'inlong',
'password' = 'inlong',
'table-name' = 'dbo.user'
)

-- SQLServer Load 节点
CREATE TABLE `sqlserver_load_table`(
PRIMARY KEY (`id`) NOT ENFORCED,
`id` BIGINT,
`name` STRING,
`age` INT
) WITH (
'connector' = 'jdbc-inlong',
'url' = 'jdbc:sqlserver://localhost:1433;databaseName=column_type_test',
'username' = 'inlong',
'password' = 'inlong',
'table-name' = 'dbo.work1'
)

-- 写数据到 SQLServer
INSERT INTO sqlserver_load_table
SELECT id, name , age FROM mysql_extract_table;

InLong Dashboard 用法

TODO: 将在未来支持此功能。

InLong Manager Client 用法

TODO: 将在未来支持此功能。

SQLServer Load 节点参数

参数是否必选默认值数据类型描述
connector必选(none)String指定使用什么类型的连接器,这里应该是 'jdbc-inlong'。
url必选(none)StringJDBC 数据库 url。
table-name必选(none)String连接到 JDBC 表的名称。
driver可选(none)String用于连接到此 URL 的 JDBC 驱动类名,如果不设置,将自动从 URL 中推导。
username可选(none)StringJDBC 用户名。如果指定了 'username' 和 'password' 中的任一参数,则两者必须都被指定。
password可选(none)StringJDBC 密码。
connection.max-retry-timeout可选60sDuration最大重试超时时间,以秒为单位且不应该小于 1 秒。
sink.buffer-flush.max-rows可选100Integerflush 前缓存记录的最大值,可以设置为 '0' 来禁用它。
sink.buffer-flush.interval可选1sDurationflush 间隔时间,超过该时间后异步线程将 flush 数据。可以设置为 '0' 来禁用它。注意, 为了完全异步地处理缓存的 flush 事件,可以将 'sink.buffer-flush.max-rows' 设置为 '0' 并配置适当的 flush 时间间隔。
sink.max-retries可选3Integer写入记录到数据库失败后的最大重试次数。
sink.parallelism可选(none)Integer用于定义 JDBC sink 算子的并行度。默认情况下,并行度是由框架决定:使用与上游链式算子相同的并行度。
sink.ignore.changelog可选falseBoolean忽略所有 RowKind 类型的变更日志,将它们当作 INSERT 的数据来采集。
inlong.metric.labels可选(none)Stringinlong metric 的标签值,该值的构成为 groupId={groupId}&streamId={streamId}&nodeId={nodeId}

数据映射

SQLServer 类型Flink SQL 类型
char(n)CHAR(n)
varchar(n)
nvarchar(n)
nchar(n)
VARCHAR(n)
text
ntext
xml
STRING
BIGINT
BIGSERIAL
BIGINT
decimal(p, s)
money
smallmoney
DECIMAL(p, s)
numericNUMERIC
float
real
FLOAT
bitBOOLEAN
intINT
tinyintTINYINT
smallintSMALLINT
bigintBIGINT
time(n)TIME(n)
datetime2
datetime
smalldatetime
TIMESTAMP(n)
datetimeoffsetTIMESTAMP_LTZ(3)