Skip to main content
版本:1.12.0

MongoDB-CDC

概述

MongoDB CDC 连接器允许从 MongoDB 读取快照数据和增量数据。本文档介绍如何设置 MongoDB CDC 连接器以对 MongoDB 运行 SQL 查询。

支持的版本

Extract 节点版本
MongoDB-CDCMongoDB: >= 3.6

依赖项

I.为了设置 MongoDB CDC 连接器,下表提供了使用构建自动化工具(例如 Maven 或 SBT)的依赖关系信息

Maven依赖

<dependency>
<groupId>org.apache.inlong</groupId>
<artifactId>sort-connector-mongodb-cdc</artifactId>
<version>1.11.0-SNAPSHOT</version>
</dependency>

设置 MongoDB

可用性

  • MongoDB 版本

    MongoDB 版本 >= 3.6 我们使用 更改流功能(3.6 版中的新功能)来捕获更改数据。

  • 集群部署

    需要 副本集分片集群

  • 存储引擎

    需要 WiredTiger存储引擎。

  • 副本集协议版本

    需要副本集协议版本 1 (pv1)。 从版本 4.0 开始,MongoDB 仅支持 pv1。pv1 是使用 MongoDB 3.2 或更高版本创建的所有新副本集的默认值。

  • 特权

    changeStream MongoDB Kafka 连接器 read 需要权限。

    您可以使用以下示例进行简单授权。 更详细的授权请参考 MongoDB 数据库用户角色

    use admin;
    db.createUser({
    user: "flinkuser",
    pwd: "flinkpw",
    roles: [
    { role: "read", db: "admin" }, // read role includes changeStream privilege
    { role: "readAnyDatabase", db: "admin" } // for snapshot reading
    ]
    });

如何创建 MongoDB Extract 节点

SQL API 用法

这个例子展示了如何使用 Flink SQL 创建一个 MongoDB Extract 节点:

-- Set checkpoint every 3000 milliseconds                       
Flink SQL> SET 'execution.checkpointing.interval' = '3s';

-- Create a MySQL table 'mongodb_extract_node' in Flink SQL
Flink SQL> CREATE TABLE mongodb_extract_node (
_id STRING, // must be declared
name STRING,
weight DECIMAL(10,3),
tags ARRAY<STRING>, -- array
price ROW<amount DECIMAL(10,2), currency STRING>, -- embedded document
suppliers ARRAY<ROW<name STRING, address STRING>>, -- embedded documents
PRIMARY KEY(_id) NOT ENFORCED
) WITH (
'connector' = 'mongodb-cdc-inlong',
'hosts' = 'localhost:27017,localhost:27018,localhost:27019',
'username' = 'flinkuser',
'password' = 'flinkpw',
'database' = 'inventory',
'collection' = 'mongodb_extract_node'
);

-- Read snapshot and binlogs from mongodb_extract_node
Flink SQL> SELECT * FROM mongodb_extract_node;

注意

MongoDB 的更改事件记录在消息之前没有更新。所以,我们只能将其转换为 Flink 的 UPSERT 变更日志流。UPSERT 流需要唯一键,因此我们必须声明 _id 为主键。我们不能将其他列声明为主键,因为删除操作不包含除 _idsharding key 之外的键和值。

InLong Dashboard 用法

TODO: 未来会支持

InLong Manager 用法

TODO: 未来会支持

MongoDB Extract 节点参数

选项是否必须默认类型描述
connector必须(none)String指定要使用的连接器,这里应该是mongodb-cdc-inlong.
hosts必须(none)StringMongoDB 服务器的主机名和端口对的逗号分隔列表。例如。localhost:27017,localhost:27018
username可选(none)String连接到 MongoDB 时要使用的数据库用户的名称。仅当 MongoDB 配置为使用身份验证时才需要这样做。
password可选(none)String连接 MongoDB 时使用的密码。仅当 MongoDB 配置为使用身份验证时才需要这样做。
database必须(none)String要监视更改的数据库的名称。
collection必须(none)String数据库中要监视更改的集合的名称。
connection.options可选(none)StringMongoDB的 & 分隔连接选项。例如。replicaSet=test&connectTimeoutMS=300000
copy.existing可选trueBoolean是否从源集合中复制现有数据。
copy.existing.queue.size可选10240Integer执行数据复制时使用的线程数。
batch.size可选1024IntegerCursor 批次大小。
poll.max.batch.size可选1024Integer轮询新数据时,单个批次中包含的最大更改流文档数。
poll.await.time.ms可选1000Integer在更改流上检查新结果之前等待的时间量。
heartbeat.interval.ms可选0Integer发送心跳消息之间的时间长度(以毫秒为单位)。使用 0 禁用。
inlong.metric.labels可选(none)Stringinlong metric 的标签值,该值的构成为groupId={groupId}&streamId={streamId}&nodeId={nodeId}
scan.incremental.snapshot.enabled可选falseBoolean是否启用增量快照。增量快照功能仅支持 MongoDB 4.0之后的版本。
scan.incremental.snapshot.chunk.size.mb可选64Integer增量快照的块大小,单位: mb。
chunk-meta.group.size可选1000Integerchunk meta 的组大小,如果 meta 大小超过组大小,则 meta 将被分成多个组。

可用元数据

以下格式元数据可以作为表定义中的只读 (VIRTUAL) 列公开。

Key数据类型描述
database_nameSTRING NOT NULL包含该行的数据库的名称。
collection_nameSTRING NOT NULL包含该行的集合的名称。
op_tsTIMESTAMP_LTZ(3) NOT NULL它指示在数据库中进行更改的时间。如果记录是从表的快照而不是更改流中读取的,则该值始终为 0。

扩展的 CREATE TABLE 示例演示了公开这些元数据字段的语法:

CREATE TABLE `mysql_extract_node` (
db_name STRING METADATA FROM 'database_name' VIRTUAL,
collection_name STRING METADATA FROM 'collection_name' VIRTUAL,
operation_ts TIMESTAMP_LTZ(3) METADATA FROM 'op_ts' VIRTUAL,
_id STRING, // must be declared
name STRING,
weight DECIMAL(10,3),
tags ARRAY<STRING>, -- array
price ROW<amount DECIMAL(10,2), currency STRING>, -- embedded document
suppliers ARRAY<ROW<name STRING, address STRING>>, -- embedded documents
PRIMARY KEY(_id) NOT ENFORCED
) WITH (
'connector' = 'mongodb-cdc-inlong',
'hostname' = 'YourHostname',
'username' = 'YourUsername',
'password' = 'YourPassword',
'database' = 'YourDatabase',
'collection' = 'YourTable'
);

数据类型映射

BSON 类型Flink SQL 类型
TINYINT
SMALLINT
IntINT
LongBIGINT
FLOAT
DoubleDOUBLE
Decimal128DECIMAL(p, s)
BooleanBOOLEAN
Date TimestampDATE
Date TimestampTIME
DateTIMESTAMP(3) TIMESTAMP_LTZ(3)
TimestampTIMESTAMP(0) TIMESTAMP_LTZ(0)
String ObjectId UUID Symbol MD5 JavaScript RegexSTRING
BinDataBYTES
ObjectROW
ArrayARRAY
DBPointerROW\<\$ref STRING, \$id STRING>
GeoJSONPoint : ROW\<type STRING, coordinates ARRAY\<DOUBLE>> Line : ROW\<type STRING, coordinates ARRAY\<ARRAY\< DOUBLE>>> ...