SQLServer-CDC
概述
SQLServer 提取节点从 SQLServer 数据库中读取数据和增量数据。下面将介绍如何配置 SQLServer 抽取节点。
支持的版本
Extract Node | Version |
---|---|
SQLServer-cdc | SQLServer: 2014、2016、2017、2019、2022 |
依赖配置
通过 Maven 引入 sort-connector-sqlserver-cdc 构建自己的项目。 当然,你也可以直接使用 INLONG 提供的 jar 包。(sort-connector-sqlserver-cdc)
Maven依赖配置
<dependency>
<groupId>org.apache.inlong</groupId>
<artifactId>sort-connector-sqlserver-cdc</artifactId>
<version>1.13.0-SNAPSHOT</version>
</dependency>
配置 SQLServer 加载节点
SQLServer 加载节点需要开启库和表的 CDC 功能,配置步骤如下:
- 开启数据库 CDC 能力。
if exists(select 1 from sys.databases where name='dbName' and is_cdc_enabled=0)
begin
exec sys.sp_cdc_enable_db
end
- 检查数据库 CDC 是否开启。
select is_cdc_enabled from sys.databases where name='dbName'
备注: "1"表示数据库 CDC 开启
- 开启表的 CDC 能力。
IF EXISTS(SELECT 1 FROM sys.tables WHERE name='tableName' AND is_tracked_by_cdc = 0)
BEGIN
EXEC sys.sp_cdc_enable_table
@source_schema = 'dbo', -- source_schema
@source_name = 'tableName', -- table_name
@capture_instance = NULL, -- capture_instance
@supports_net_changes = 1, -- supports_net_changes
@role_name = NULL, -- role_name
@index_name = NULL, -- index_name
@captured_column_list = NULL, -- captured_column_list
@filegroup_name = 'PRIMARY' -- filegroup_name
END
备注: 表必须有主键或者唯一索引。
- 检查表 CDC 是否开启。
SELECT is_tracked_by_cdc FROM sys.tables WHERE name='tableName'
备注: "1"表示表 CDC 开启
如何创建一个 SQLServer 抽取节点
SQL API 的使用
使用 Flink SQL Cli
:
-- Set checkpoint every 3000 milliseconds
Flink SQL> SET 'execution.checkpointing.interval' = '3s';
-- Create a SqlServer table 'sqlserver_extract_node' in Flink SQL Cli
Flink SQL> CREATE TABLE sqlserver_extract_node (
order_id INT,
order_date TIMESTAMP(0),
customer_name STRING,
price DECIMAL(10, 5),
product_id INT,
order_status BOOLEAN,
PRIMARY KEY(order_id) NOT ENFORCED
) WITH (
'connector' = 'sqlserver-cdc-inlong',
'hostname' = 'YourHostname',
'port' = 'port', --default:1433
'username' = 'YourUsername',
'password' = 'YourPassword',
'database-name' = 'YourDatabaseName',
'schema-name' = 'YourSchemaName' -- default:dbo
'table-name' = 'YourTableName');
-- Read snapshot and binlog from sqlserver_extract_node
Flink SQL> SELECT * FROM sqlserver_extract_node;
InLong Dashboard 方式
TODO
InLong Manager Client 方式
TODO
SQLServer 抽取节点参数信息
参数 | 是否必须 | 默认值 | 数据类型 | 描述 |
---|---|---|---|---|
connector | 必须 | (none) | String | 指定使用什么连接器,这里应该是 'sqlserver-cdc-inlong'。 |
hostname | 必须 | (none) | String | SQLServer 数据库 IP 地址或者 hostname。 |
username | 必须 | (none) | String | SQLServer 数据库用户名。 |
password | 必须 | (none) | String | SQLServer 数据库用户密码。 |
database-name | 必须 | (none) | String | SQLServer 数据库监控的数据库名称。 |
schema-name | 必须 | dbo | String | SQLServer 数据库监控的 schema 名称。 |
table-name | 必须 | (none) | String | SQLServer 数据库监控的表名称。 |
port | 可选 | 1433 | Integer | SQLServer 数据库端口。 |
server-time-zone | 可选 | UTC | String | SQLServer 数据库连接配置时区。 例如: "Asia/Shanghai"。 |
inlong.metric.labels | 可选 | (none) | String | inlong metric 的标签值,该值的构成为 groupId=[groupId]&streamId=[streamId]&nodeId=[nodeId]。 |
可用的元数据字段
以下格式元数据可以作为表定义中的只读 (VIRTUAL) 列公开。
字段名称 | 数据类型 | 描述 |
---|---|---|
table_name | STRING NOT NULL | 包含该行的表的名称。 |
schema_name | STRING NOT NULL | 包含该行 schema 的名称。 |
database_name | STRING NOT NULL | 包含该行数据库的名称。 |
op_ts | TIMESTAMP_LTZ(3) NOT NULL | 它表示在数据库中进行更改的时间。如果记录是从表的快照而不是 binlog 中读取的,则该值始终为 0。 |
使用元数据字段的例子:
CREATE TABLE sqlserver_extract_node (
table_name STRING METADATA FROM 'table_name' VIRTUAL,
schema_name STRING METADATA FROM 'schema_name' VIRTUAL,
db_name STRING METADATA FROM 'database_name' VIRTUAL,
operation_ts TIMESTAMP_LTZ(3) METADATA FROM 'op_ts' VIRTUAL,
id INT NOT NULL
) WITH (
'connector' = 'sqlserver-cdc',
'hostname' = 'localhost',
'port' = '1433',
'username' = 'sa',
'password' = 'password',
'database-name' = 'test',
'schema-name' = 'dbo',
'table-name' = 'worker'
);
数据类型映射
SQLServer type | Flink SQL type |
---|---|
char(n) | CHAR(n) |
varchar(n) nvarchar(n) nchar(n) | VARCHAR(n) |
text ntext xml | STRING |
decimal(p, s) money smallmoney | DECIMAL(p, s) |
numeric | NUMERIC |
REAL FLOAT | FLOAT |
bit | BOOLEAN |
int | INT |
tinyint | TINYINT |
smallint | SMALLINT |
time (n) | TIME (n) |
bigint | BIGINT |
date | DATE |
datetime2 datetime smalldatetime | TIMESTAMP(n) |
datetimeoffset | TIMESTAMP_LTZ(3) |