MongoDB-CDC
概述
MongoDB CDC 连接器允许从 MongoDB 读取快照数据和增量数据。本文档介绍如何设置 MongoDB CDC 连接器以对 MongoDB 运行 SQL 查询。
支持的版本
| Extract 节点 | 版本 | 
|---|---|
| MongoDB-CDC | MongoDB: >= 3.6 | 
依赖项
I.为了设置 MongoDB CDC 连接器,下表提供了使用构建自动化工具(例如 Maven 或 SBT)的依赖关系信息
Maven依赖
<dependency>
    <groupId>org.apache.inlong</groupId>
    <artifactId>sort-connector-mongodb-cdc</artifactId>
    <version>1.10.0-SNAPSHOT</version>
</dependency>
设置 MongoDB
可用性
MongoDB 版本
MongoDB 版本 >= 3.6 我们使用 更改流功能(3.6 版中的新功能)来捕获更改数据。
集群部署
存储引擎
需要 WiredTiger存储引擎。
需要副本集协议版本 1 (pv1)。 从版本 4.0 开始,MongoDB 仅支持 pv1。pv1 是使用 MongoDB 3.2 或更高版本创建的所有新副本集的默认值。
特权
changeStreamMongoDB Kafka 连接器read需要权限。您可以使用以下示例进行简单授权。 更详细的授权请参考 MongoDB 数据库用户角色。
use admin;
db.createUser({
user: "flinkuser",
pwd: "flinkpw",
roles: [
{ role: "read", db: "admin" }, // read role includes changeStream privilege
{ role: "readAnyDatabase", db: "admin" } // for snapshot reading
]
});
如何创建 MongoDB Extract 节点
SQL API 用法
这个例子展示了如何使用 Flink SQL 创建一个 MongoDB Extract 节点:
-- Set checkpoint every 3000 milliseconds                       
Flink SQL> SET 'execution.checkpointing.interval' = '3s';   
-- Create a MySQL table 'mongodb_extract_node' in Flink SQL
Flink SQL> CREATE TABLE mongodb_extract_node (
  _id STRING, // must be declared
  name STRING,
  weight DECIMAL(10,3),
  tags ARRAY<STRING>, -- array
  price ROW<amount DECIMAL(10,2), currency STRING>, -- embedded document
  suppliers ARRAY<ROW<name STRING, address STRING>>, -- embedded documents
  PRIMARY KEY(_id) NOT ENFORCED
) WITH (
  'connector' = 'mongodb-cdc-inlong',
  'hosts' = 'localhost:27017,localhost:27018,localhost:27019',
  'username' = 'flinkuser',
  'password' = 'flinkpw',
  'database' = 'inventory',
  'collection' = 'mongodb_extract_node'
);
-- Read snapshot and binlogs from mongodb_extract_node
Flink SQL> SELECT * FROM mongodb_extract_node;
注意
MongoDB 的更改事件记录在消息之前没有更新。所以,我们只能将其转换为 Flink 的 UPSERT 变更日志流。UPSERT 流需要唯一键,因此我们必须声明 _id 为主键。我们不能将其他列声明为主键,因为删除操作不包含除 _id 和 sharding key 之外的键和值。
InLong Dashboard 用法
TODO: 未来会支持
InLong Manager 用法
TODO: 未来会支持
MongoDB Extract 节点参数
| 选项 | 是否必须 | 默认 | 类型 | 描述 | 
|---|---|---|---|---|
| connector | 必须 | (none) | String | 指定要使用的连接器,这里应该是mongodb-cdc-inlong. | 
| hosts | 必须 | (none) | String | MongoDB 服务器的主机名和端口对的逗号分隔列表。例如。localhost:27017,localhost:27018 | 
| username | 可选 | (none) | String | 连接到 MongoDB 时要使用的数据库用户的名称。仅当 MongoDB 配置为使用身份验证时才需要这样做。 | 
| password | 可选 | (none) | String | 连接 MongoDB 时使用的密码。仅当 MongoDB 配置为使用身份验证时才需要这样做。 | 
| database | 必须 | (none) | String | 要监视更改的数据库的名称。 | 
| collection | 必须 | (none) | String | 数据库中要监视更改的集合的名称。 | 
| connection.options | 可选 | (none) | String | MongoDB的 & 分隔连接选项。例如。replicaSet=test&connectTimeoutMS=300000 | 
| copy.existing | 可选 | true | Boolean | 是否从源集合中复制现有数据。 | 
| copy.existing.queue.size | 可选 | 10240 | Integer | 执行数据复制时使用的线程数。 | 
| batch.size | 可选 | 1024 | Integer | Cursor 批次大小。 | 
| poll.max.batch.size | 可选 | 1024 | Integer | 轮询新数据时,单个批次中包含的最大更改流文档数。 | 
| poll.await.time.ms | 可选 | 1000 | Integer | 在更改流上检查新结果之前等待的时间量。 | 
| heartbeat.interval.ms | 可选 | 0 | Integer | 发送心跳消息之间的时间长度(以毫秒为单位)。使用 0 禁用。 | 
| inlong.metric.labels | 可选 | (none) | String | inlong metric 的标签值,该值的构成为groupId={groupId}&streamId={streamId}&nodeId={nodeId}。 | 
| scan.incremental.snapshot.enabled | 可选 | false | Boolean | 是否启用增量快照。增量快照功能仅支持 MongoDB 4.0之后的版本。 | 
| scan.incremental.snapshot.chunk.size.mb | 可选 | 64 | Integer | 增量快照的块大小,单位: mb。 | 
| chunk-meta.group.size | 可选 | 1000 | Integer | chunk meta 的组大小,如果 meta 大小超过组大小,则 meta 将被分成多个组。 | 
可用元数据
以下格式元数据可以作为表定义中的只读 (VIRTUAL) 列公开。
| Key | 数据类型 | 描述 | 
|---|---|---|
| database_name | STRING NOT NULL | 包含该行的数据库的名称。 | 
| collection_name | STRING NOT NULL | 包含该行的集合的名称。 | 
| op_ts | TIMESTAMP_LTZ(3) NOT NULL | 它指示在数据库中进行更改的时间。如果记录是从表的快照而不是更改流中读取的,则该值始终为 0。 | 
扩展的 CREATE TABLE 示例演示了公开这些元数据字段的语法:
CREATE TABLE `mysql_extract_node` (
    db_name STRING METADATA FROM 'database_name' VIRTUAL,
    collection_name STRING METADATA  FROM 'collection_name' VIRTUAL,
    operation_ts TIMESTAMP_LTZ(3) METADATA FROM 'op_ts' VIRTUAL,
    _id STRING, // must be declared
    name STRING,
    weight DECIMAL(10,3),
    tags ARRAY<STRING>, -- array
    price ROW<amount DECIMAL(10,2), currency STRING>, -- embedded document
    suppliers ARRAY<ROW<name STRING, address STRING>>, -- embedded documents
    PRIMARY KEY(_id) NOT ENFORCED
) WITH (
      'connector' = 'mongodb-cdc-inlong', 
      'hostname' = 'YourHostname',
      'username' = 'YourUsername',
      'password' = 'YourPassword',
      'database' = 'YourDatabase',
      'collection' = 'YourTable' 
);
数据类型映射
| BSON 类型 | Flink SQL 类型 | 
|---|---|
| TINYINT | |
| SMALLINT | |
| Int | INT | 
| Long | BIGINT | 
| FLOAT | |
| Double | DOUBLE | 
| Decimal128 | DECIMAL(p, s) | 
| Boolean | BOOLEAN | 
| Date Timestamp | DATE | 
| Date Timestamp | TIME | 
| Date | TIMESTAMP(3) TIMESTAMP_LTZ(3) | 
| Timestamp | TIMESTAMP(0) TIMESTAMP_LTZ(0) | 
| String ObjectId UUID Symbol MD5 JavaScript Regex | STRING | 
| BinData | BYTES | 
| Object | ROW | 
| Array | ARRAY | 
| DBPointer | ROW\<\$ref STRING, \$id STRING> | 
| GeoJSON | Point : ROW\<type STRING, coordinates ARRAY\<DOUBLE>> Line : ROW\<type STRING, coordinates ARRAY\<ARRAY\< DOUBLE>>> ... |