Skip to main content
版本:1.5.0

Oracle-CDC

概述

Oracle Extract 节点允许从 Oracle 数据库中读取快照数据和增量数据。本文档介绍如何设置 Oracle Extract 节点以对 Oracle 数据库运行 SQL 查询。

支持的版本

Extract 节点版本Driver
Oracle-CDCOracle: 11, 12, 19Oracle Driver: 19.3.0.0

依赖

为了设置 Oracle Extract 节点,下表提供了使用构建自动化工具(例如 Maven 或 SBT)和带有 Sort Connectors JAR 包的 SQL 客户端的两个项目的依赖关系信息。

Maven 依赖

<dependency>
<groupId>org.apache.inlong</groupId>
<artifactId>sort-connector-oracle-cdc</artifactId>
<version>1.5.0</version>
</dependency>

连接 Oracle 数据库还需要 Oracle 驱动程序依赖项。请下载ojdbc8-19.3.0.0.jar 并将其放入 FLINK_HOME/lib/

设置 Oracle

你必须为 Oracle 数据库启用日志归档,并定义一个对 Debezium Oracle 连接器监控的所有数据库具有适当权限的 Oracle 用户。

对于非 CDB 数据库

  • 启用日志归档

    (1.1). 以 DBA 身份连接到数据库

    ORACLE_SID=SID
    export ORACLE_SID
    sqlplus /nolog
    CONNECT sys/password AS SYSDBA

    (1.2). 启用日志归档

    alter system set db_recovery_file_dest_size = 10G;
    alter system set db_recovery_file_dest = '/opt/oracle/oradata/recovery_area' scope=spfile;
    shutdown immediate;
    startup mount;
    alter database archivelog;
    alter database open;

    Note:

    • Enable log archiving requires database restart, pay attention when try to do it
    • The archived logs will occupy a large amount of disk space, so consider clean the expired logs the periodically

    (1.3). 检查是否启用了日志归档

    -- Should now "Database log mode: Archive Mode"
    archive log list;

    注意:

    必须为捕获的表或数据库启用补充日志记录,以便数据更改能够捕获已更改数据库行的之前状态。 下面说明了如何在表/数据库级别进行配置。

    -- 为特定表启用补充日志记录:
    ALTER TABLE inventory.customers ADD SUPPLEMENTAL LOG DATA (ALL) COLUMNS;
    -- 为数据库启用补充日志记录:
    ALTER DATABASE ADD SUPPLEMENTAL LOG DATA;
  • 创建具有权限的 Oracle 用户

    (2.1). 创建表空间

    sqlplus sys/password@host:port/SID AS SYSDBA;
    CREATE TABLESPACE logminer_tbs DATAFILE '/opt/oracle/oradata/SID/logminer_tbs.dbf' SIZE 25M REUSE AUTOEXTEND ON MAXSIZE UNLIMITED;
    exit;

    (2.2). 创建用户并授予权限

    sqlplus sys/password@host:port/SID AS SYSDBA;
    CREATE USER flinkuser IDENTIFIED BY flinkpw DEFAULT TABLESPACE LOGMINER_TBS QUOTA UNLIMITED ON LOGMINER_TBS;
    GRANT CREATE SESSION TO flinkuser;
    GRANT SET CONTAINER TO flinkuser;
    GRANT SELECT ON V_$DATABASE to flinkuser;
    GRANT FLASHBACK ANY TABLE TO flinkuser;
    GRANT SELECT ANY TABLE TO flinkuser;
    GRANT SELECT_CATALOG_ROLE TO flinkuser;
    GRANT EXECUTE_CATALOG_ROLE TO flinkuser;
    GRANT SELECT ANY TRANSACTION TO flinkuser;
    GRANT LOGMINING TO flinkuser;

    GRANT CREATE TABLE TO flinkuser;
    GRANT LOCK ANY TABLE TO flinkuser;
    GRANT ALTER ANY TABLE TO flinkuser;
    GRANT CREATE SEQUENCE TO flinkuser;

    GRANT EXECUTE ON DBMS_LOGMNR TO flinkuser;
    GRANT EXECUTE ON DBMS_LOGMNR_D TO flinkuser;

    GRANT SELECT ON V_$LOG TO flinkuser;
    GRANT SELECT ON V_$LOG_HISTORY TO flinkuser;
    GRANT SELECT ON V_$LOGMNR_LOGS TO flinkuser;
    GRANT SELECT ON V_$LOGMNR_CONTENTS TO flinkuser;
    GRANT SELECT ON V_$LOGMNR_PARAMETERS TO flinkuser;
    GRANT SELECT ON V_$LOGFILE TO flinkuser;
    GRANT SELECT ON V_$ARCHIVED_LOG TO flinkuser;
    GRANT SELECT ON V_$ARCHIVE_DEST_STATUS TO flinkuser;
    exit;

对于 CDB 数据库

总的来说,配置 CDB 数据库的步骤与非 CDB 数据库非常相似,但命令可能会有所不同。

  • 启用日志归档

    ORACLE_SID=ORCLCDB
    export ORACLE_SID
    sqlplus /nolog
    CONNECT sys/password AS SYSDBA
    alter system set db_recovery_file_dest_size = 10G;
    -- should exist
    alter system set db_recovery_file_dest = '/opt/oracle/oradata/recovery_area' scope=spfile;
    shutdown immediate
    startup mount
    alter database archivelog;
    alter database open;
    -- Should show "Database log mode: Archive Mode"
    archive log list
    exit;

    注意: 您还可以使用以下命令启用补充日志记录:

    -- Enable supplemental logging for a specific table:
    ALTER TABLE inventory.customers ADD SUPPLEMENTAL LOG DATA (ALL) COLUMNS;
    -- Enable supplemental logging for database
    ALTER DATABASE ADD SUPPLEMENTAL LOG DATA;
  • 创建具有权限的 Oracle 用户

    sqlplus sys/password@//localhost:1521/ORCLCDB as sysdba
    CREATE TABLESPACE logminer_tbs DATAFILE '/opt/oracle/oradata/ORCLCDB/logminer_tbs.dbf' SIZE 25M REUSE AUTOEXTEND ON MAXSIZE UNLIMITED;
    exit
    sqlplus sys/password@//localhost:1521/ORCLPDB1 as sysdba
    CREATE TABLESPACE logminer_tbs DATAFILE '/opt/oracle/oradata/ORCLCDB/ORCLPDB1/logminer_tbs.dbf' SIZE 25M REUSE AUTOEXTEND ON MAXSIZE UNLIMITED;
    exit
    sqlplus sys/password@//localhost:1521/ORCLCDB as sysdba
    CREATE USER flinkuser IDENTIFIED BY flinkpw DEFAULT TABLESPACE logminer_tbs QUOTA UNLIMITED ON logminer_tbs CONTAINER=ALL;
    GRANT CREATE SESSION TO flinkuser CONTAINER=ALL;
    GRANT SET CONTAINER TO flinkuser CONTAINER=ALL;
    GRANT SELECT ON V_$DATABASE to flinkuser CONTAINER=ALL;
    GRANT FLASHBACK ANY TABLE TO flinkuser CONTAINER=ALL;
    GRANT SELECT ANY TABLE TO flinkuser CONTAINER=ALL;
    GRANT SELECT_CATALOG_ROLE TO flinkuser CONTAINER=ALL;
    GRANT EXECUTE_CATALOG_ROLE TO flinkuser CONTAINER=ALL;
    GRANT SELECT ANY TRANSACTION TO flinkuser CONTAINER=ALL;
    GRANT LOGMINING TO flinkuser CONTAINER=ALL;
    GRANT CREATE TABLE TO flinkuser CONTAINER=ALL;
    GRANT LOCK ANY TABLE TO flinkuser CONTAINER=ALL;
    GRANT CREATE SEQUENCE TO flinkuser CONTAINER=ALL;

    GRANT EXECUTE ON DBMS_LOGMNR TO flinkuser CONTAINER=ALL;
    GRANT EXECUTE ON DBMS_LOGMNR_D TO flinkuser CONTAINER=ALL;

    GRANT SELECT ON V_$LOG TO flinkuser CONTAINER=ALL;
    GRANT SELECT ON V_$LOG_HISTORY TO flinkuser CONTAINER=ALL;
    GRANT SELECT ON V_$LOGMNR_LOGS TO flinkuser CONTAINER=ALL;
    GRANT SELECT ON V_$LOGMNR_CONTENTS TO flinkuser CONTAINER=ALL;
    GRANT SELECT ON V_$LOGMNR_PARAMETERS TO flinkuser CONTAINER=ALL;
    GRANT SELECT ON V_$LOGFILE TO flinkuser CONTAINER=ALL;
    GRANT SELECT ON V_$ARCHIVED_LOG TO flinkuser CONTAINER=ALL;
    GRANT SELECT ON V_$ARCHIVE_DEST_STATUS TO flinkuser CONTAINER=ALL;
    exit

查看更多关于 设置 Oracle

如何创建 Oracle Extract 节点

SQL API 用法

Oracle Extract 节点可以定义如下:

-- 创建 an Oracle Extract 节点 'products' in Flink SQL
Flink SQL> CREATE TABLE products (
ID INT NOT NULL,
NAME STRING,
DESCRIPTION STRING,
WEIGHT DECIMAL(10, 3),
PRIMARY KEY(id) NOT ENFORCED
) WITH (
'connector' = 'oracle-cdc-inlong',
'hostname' = 'localhost',
'port' = '1521',
'username' = 'flinkuser',
'password' = 'flinkpw',
'database-name' = 'XE',
'schema-name' = 'inlong',
'table-name' = 'user');

Flink SQL> SELECT * FROM products;

注意:

当使用 CDB + PDB 模型时,您需要在 Flink DDL 中添加一个额外的选项 'debezium.database.pdb.name' = 'xxx' 来指定要连接的 PDB 的名称。

InLong Dashboard 用法

TODO: 将在未来支持此功能。

InLong Manager Client 用法

TODO: 将在未来支持此功能。

Oracle Extact 节点参数

参数是否必须默认值数据类型描述
connectorrequired(none)String指定要使用的连接器,这里应该是 'oracle-cdc-inlong'
hostnamerequired(none)StringOracle 数据库服务器的 IP 地址或主机名。
usernamerequired(none)String连接到 Oracle 数据库服务器时要使用的 Oracle 数据库的名称。
passwordrequired(none)String连接到 Oracle 数据库服务器时使用的密码。
database-namerequired(none)String要监视的 Oracle 服务器的数据库名称。
schema-namerequired(none)String要监视的 Oracle 数据库的 Schema 名称。
table-namerequired(none)String要监视的 Oracle 数据库的表名。格式为<schema_name>.<table_name>
portoptional1521IntegerOracle 数据库服务器的整数端口号。
scan.startup.modeoptionalinitialStringOracle CDC 消费者的可选启动模式,有效枚举为"initial" 和"latest-offset"。 请参阅启动阅读位置部分了解更多详细信息。
debezium.*optional(none)String将 Debezium 的属性整合到用于从 Oracle 服务器捕获数据更改的 Debezium Embedded Engine。 例如:'debezium.snapshot.mode' = 'never'。 详细了解 Debezium 的 Oracle 连接器属性
inlong.metric.labels可选(none)Stringinlong metric 的标签值,该值的构成为groupId=[groupId]&streamId=[streamId]&nodeId=[nodeId]。
source.multiple.enable可选falseBoolean是否开启多模式、表同步功能,如果为 'true',Oracle Extract Node 则将表的物理字段压缩成 'canal-json' 格式的特殊元字段 'data_canal'。

局限性

在扫描表的快照期间无法执行 checkpoint

在扫描数据库表的快照时,由于没有可恢复的位置,我们无法执行检查点。为了不执行检查点,Oracle CDC 源将保持检查点等待超时。超时检查点将被识别为失败的检查点,默认情况下,这将触发 Flink 作业的故障转移。所以如果数据库表很大,建议添加以下 Flink 配置,避免因为超时检查点而导致故障转移:

execution.checkpointing.interval: 10min
execution.checkpointing.tolerable-failed-checkpoints: 100
restart-strategy: fixed-delay
restart-strategy.fixed-delay.attempts: 2147483647

可用的元数据字段

以下格式元数据可以作为表定义中的只读 (VIRTUAL) 列公开。

字段名称数据类型描述
table_nameSTRING NOT NULL该行所属的表名。
schema_nameSTRING NOT NULL该行所属的模式名称。
database_nameSTRING NOT NULL该行所属的数据库名称。
op_tsTIMESTAMP_LTZ(3) NOT NULL它指示在数据库中进行更改的时间。
如果记录从表的快照而不是change流中读取,则该值始终为0。
meta.table_nameSTRING NOT NULL该行所属的表名。
meta.schema_nameSTRING NOT NULL该行所属的模式名称。
meta.database_nameSTRING NOT NULL该行所属的数据库名称。
meta.op_tsTIMESTAMP_LTZ(3) NOT NULL它指示在数据库中进行更改的时间。
如果记录从表的快照而不是change流中读取,则该值始终为0。
meta.op_typeSTRING数据库操作的类型,如 INSERT/DELETE 等。
meta.data_canalSTRING/BYTES`canal-json` 格式化的行的数据只有在 `source.multiple.enable` 选项为 'true' 时才存在。
meta.is_ddlBOOLEAN是否是 DDL 语句。
meta.tsTIMESTAMP_LTZ(3) NOT NULL接收和处理行的当前时间。
meta.sql_typeMAP将 Sql_type 表字段映射到 Java 数据类型 Id。
meta.oracle_typeMAP表的结构。
meta.pk_namesARRAY表的主键名称。

扩展的 CREATE TABLE 示例演示了公开这些元数据字段的语法:

CREATE TABLE products (
db_name STRING METADATA FROM 'database_name' VIRTUAL,
schema_name STRING METADATA FROM 'schema_name' VIRTUAL,
table_name STRING METADATA FROM 'table_name' VIRTUAL,
op_ts TIMESTAMP_LTZ(3) METADATA FROM 'op_ts' VIRTUAL,
meta_db_name STRING METADATA FROM 'meta.database_name' VIRTUAL,
meta_schema_name STRING METADATA FROM 'meta.schema_name' VIRTUAL,
meta_table_name STRING METADATA FROM 'meta.table_name' VIRTUAL,
meat_op_ts TIMESTAMP_LTZ(3) METADATA FROM 'meta.op_ts' VIRTUAL,
meta_op_type STRING METADATA FROM 'meta.op_type' VIRTUAL,
meta_data_canal STRING METADATA FROM 'meta.data_canal' VIRTUAL,
meta_is_ddl BOOLEAN METADATA FROM 'meta.is_ddl' VIRTUAL,
meta_ts TIMESTAMP_LTZ(3) METADATA FROM 'meta.ts' VIRTUAL,
meta_sql_type MAP<STRING, INT> METADATA FROM 'meta.sql_type' VIRTUAL,
meat_oracle_type MAP<STRING, STRING> METADATA FROM 'meta.oracle_type' VIRTUAL,
meta_pk_names ARRAY<STRING> METADATA FROM 'meta.pk_names' VIRTUAL
ID INT NOT NULL,
NAME STRING,
DESCRIPTION STRING,
WEIGHT DECIMAL(10, 3),
PRIMARY KEY(id) NOT ENFORCED
) WITH (
'connector' = 'oracle-cdc-inlong',
'hostname' = 'localhost',
'port' = '1521',
'username' = 'flinkuser',
'password' = 'flinkpw',
'database-name' = 'XE',
'schema-name' = 'inventory',
'table-name' = 'inventory.products'
);

注意:Oracle 方言是区分大小写的,如果字段名没有被引用,它会将字段名转换为大写,Flink SQL 不会转换字段名。因此对于 oracle 数据库中的物理列,我们在 Flink SQL 中定义 oracle-cdc 表时应该使用其在 Oracle 中转换后的字段名称。

特征

Exactly-Once 处理

Oracle Extract 节点是一个 Flink Source 连接器,它将首先读取数据库快照,然后通过exactly-once 处理继续读取更改事件,即使发生故障。请阅读 连接器的工作原理

启动读取位置

配置选项 scan.startup.mode 指定 Oracle Extract 节点消费者的启动模式。有效的枚举是:

  • initial (默认): 首次启动时对被监控的数据库表进行初始快照,并继续读取最新的 Binlog。
  • latest-offset: 永远不要在第一次启动时对受监控的数据库表执行快照,只需从自连接器启动以来的更改。

注意: scan.startup.mode 选项的机制依赖于 Debezium 的snapshot.mode 配置。所以请不要一起使用它们。如果您在 DDL 表中同时指定了 scan.startup.modedebezium.snapshot.mode 选项,可能会导致 scan.startup.mode 不起作用。

单线程读取

Oracle Extract 节点不能并行读取,因为只有一个任务可以接收更改事件。

整库、多模式、表同步

Oracle Extract 节点支持整库、多模式、多表同步。开启该功能后,Oracel Extract 节点会将表的物理字段压缩成 'canal-json' 格式的特殊元字段 'data_canal'。

配置参数:

参数是否必须默认值数据类型描述
source.multiple.enableoptionalfalseString指定'source.multiple.enable' = 'true'参数开启整库、多模式、多表同步功能
schema-namerequired(none)String要监视的 Oracle 数据库的 Schema 名称。如果要捕获多个模式,可以使用逗号分割它们。例如:'schema-name' = 'SCHEMA1,SCHEMA2'
table-namerequired(none)String要监视的 Oracle 数据库的表名。如果要捕获多个表,可以使用逗号分割它们。例如:'table-name' = 'SCHEMA1.TB.*, SCHEMA2.TB1'

CREATE TABLE 示例演示该功能语法:

CREATE TABLE node(
data STRING METADATA FROM 'meta.data_canal' VIRTUAL)
WITH (
'connector' = 'oracle-cdc-inlong',
'hostname' = 'localhost',
'port' = '1521',
'username' = 'flinkuser',
'password' = 'flinkpw',
'database-name' = 'XE',
'schema-name' = 'inventory',
'table-name' = 'inventory..*',
'source.multiple.enable' = 'true'
)

数据类型映射

Oracle typeFlink SQL type
NUMBER(p, s <= 0), p - s < 3TINYINT
NUMBER(p, s <= 0), p - s < 5SMALLINT
NUMBER(p, s <= 0), p - s < 10INT
NUMBER(p, s <= 0), p - s < 19BIGINT
NUMBER(p, s <= 0), 19 <= p - s <= 38
DECIMAL(p - s, 0)
NUMBER(p, s > 0)DECIMAL(p, s)
NUMBER(p, s <= 0), p - s > 38STRING
FLOAT
BINARY_FLOAT
FLOAT
DOUBLE PRECISION
BINARY_DOUBLE
DOUBLE
NUMBER(1)BOOLEAN
DATE
TIMESTAMP [(p)]
TIMESTAMP [(p)] [WITHOUT TIMEZONE]
TIMESTAMP [(p)] WITH TIME ZONETIMESTAMP [(p)] WITH TIME ZONE
TIMESTAMP [(p)] WITH LOCAL TIME ZONETIMESTAMP_LTZ [(p)]
CHAR(n)
NCHAR(n)
NVARCHAR2(n)
VARCHAR(n)
VARCHAR2(n)
CLOB
NCLOB
XMLType
STRING
BLOB
ROWID
BYTES
INTERVAL DAY TO SECOND
INTERVAL YEAR TO MONTH
BIGINT