脏数据归档
概览
脏数据为在数据抽取、转换、加载过程中,因数据本身的质量问题导致数据无法正确抽取、转换、加载的数据, 常见的脏数据可能有字段类型、长度、个数不匹配,数据序列化、反序列化异常,目标端库、表不存在等。 脏数据归档可以将 InLong 数据流运行中的脏数据转存到第三方存储,便于业务查找问题。 目前 Sort 支持了 S3、Log 两种目标端的脏数据归档,同时支持 Kafka、Doris、Iceberg、Hbase、Hive、Elasticsearch、JDBC 等常见数据源的脏数据归档。
支持的节点
类型 | 名称 | 归档目标端 |
---|---|---|
Extract 节点 | Kafka | Log, S3 |
Load 节点 | Hive | Log, S3 |
Kafka | Log, S3 | |
HBase | Log, S3 | |
ClickHouse | Log, S3 | |
Iceberg | Log, S3 | |
Elasticsearch | Log, S3 | |
PostgreSQL | Log, S3 | |
HDFS | Log, S3 | |
TDSQL Postgres | Log, S3 | |
Doris | Log, S3 | |
SQL Server | Log, S3 | |
Greenplum | Log, S3 |
脏数据格式化
在脏数据归档处理过程中,我们定义了下面的系统变量,用作脏数据的格式化:
- SYSTEM_TIME: 系统时间,格式为 'yyyy-MM-dd HH:mm:ss'
- DIRTY_TYPE:脏数据类型,常见的有 SerializeError、DeserializeError、DataTypeMappingError 等
- DIRTY_MESSAGE: 脏数据信息,即脏数据产生的原因、异常信息等
归档到 Log 的格式化为:
- [${dirty.side-output.log-tag}] ${value},其中 ${value} 为 ${dirty.side-output.labels} 和 ${脏数据} 合并,并按照 'csv' 或者 'json' 进行格式化后的值
归档到 S3 的格式化为:
- S3 文件名生成格式为: ${dirty.side-output.s3.key}/${dirty.identifier}-${随机序列}.${文件后缀名}
- S3 文件内数据格式为:会将 ${dirty.side-output.labels} 和 ${脏数据} 合并,并按照 'csv' 或者 'json' 进行格式化
注:${dirty.side-output.log-tag}、${dirty.side-output.labels}、${dirty.identifier}、${dirty.side-output.s3.key} 等见下面配置详解。
配置
公共配置
参数 | 是否必选 | 默认值 | 数据类型 | 描述 |
---|---|---|---|---|
dirty.ignore | 可选 | false | Boolean | 是否忽略脏数据,只有为 'true' 才能进行脏数据归档,默认为 'false' |
dirty.side-output.enable | 可选 | false | Boolean | 是否支持脏数据归档,默认为 'false' |
dirty.side-output.connector | 必选 | (none) | String | 归档目标端名称,当 'dirty.side-output.enable' 为 'true' 时必须设置该值,目前仅支持 'log' 和 's3'。 |
dirty.side-output.format | 可选 | csv | String | 脏数据格式化,目前仅支持 'csv' 和 'json',默认为 'csv'。 |
dirty.side-output.log.enable | 可选 | true | Boolean | 脏数据归档时是否支持日志打印, 默认为 'true'。 |
dirty.side-output.ignore-errors | 可选 | true | Boolean | 是否忽略脏数据归档中的错误,默认为 'true'。 |
dirty.identifier | 必选 | (none) | String | 脏数据标识,用作 File 类型脏数据归档的文件名称生成或者 MQ 类型脏数据归档的 Topic 生成或者 数据库类型脏数据归档的 Tablename生成等。 它支持形如 ${variable} 变量替换,这里支持 SYSTEM_TIME、DIRTY_TYPE、DIRTY_MESSAGE 几种系统变量,其他的变量支持取决于具体的节点实现。 |
dirty.side-output.log-tag | 可选 | (none) | String | 脏数据 Tag,用作日志打印时标识等,比如 [${logTag}] ${logLabels},${dirtydata}。 它支持形如 ${variable} 变量替换,这里支持 SYSTEM_TIME、DIRTY_TYPE、DIRTY_MESSAGE 几种系统变量,其他的变量支持取决于具体的节点实现。 |
dirty.side-output.labels | 可选 | (none) | String | 脏数据 labels,用作日志打印标识,并将和脏数据一起归档等,比如 [${logTag}] ${logLabels},${dirtydata}。 它支持形如 ${variable} 变量替换,这里支持 SYSTEM_TIME、DIRTY_TYPE、DIRTY_MESSAGE 几种系统变量,其他的变量支持取决于具体的节点实现。 |
dirty.side-output.field-delimiter | 可选 | , | String | 脏数据归档列分割符,用作 'csv' 等格式化场景,默认为 ','。 |
dirty.side-output.line-delimiter | 可选 | \n | String | 脏数据归档行分割符,用作 'csv'、'json' 等格式化场景,默认为 '\n'。 |
dirty.side-output.batch.size | 可选 | 100 | Integer | 脏数据归档缓存 Batch 条数,默认为 '100'。 |
dirty.side-output.batch.bytes | 可选 | 10240 | Integer | 脏数据归档缓存 Batch 大小,单位为 Byte, 默认为 '10240' 即 10KB。 |
dirty.side-output.retries | 可选 | 3 | Integer | 脏数据归档发生错误时重试次数,默认为 '3'。 |
dirty.side-output.batch.interval | 可选 | 60000 | Integer | 脏数据归档间隔时间,单位为毫秒,默认为 '60000' 即 60s。 |
S3 归档配置
参数 | 是否必选 | 默认值 | 数据类型 | 描述 |
---|---|---|---|---|
dirty.side-output.s3.endpoint | 必选 | (none) | String | S3 归档的 Endpoint。 |
dirty.side-output.s3.region | 必选 | (none) | String | S3 归档的 Region。 |
dirty.side-output.s3.bucket | 必选 | (none) | String | S3 归档的 Bucket。 |
dirty.side-output.s3.key | 必选 | (none) | String | S3 归档的 Key。 |
dirty.side-output.s3.access-key-id | 可选 | (none) | String | S3 归档的 Access Key Id,若不配置该项,则需要在环境中配置好。 |
dirty.side-output.s3.secret-key-id | 可选 | (none) | String | S3 归档的 Secret Key Id,若不配置该项,则需要在环境中配置好。 |
用法
这里将介绍一个同步 Kafka 数据到 Kafka 的例子,同时介绍脏数据归档的使用,其他节点类似。
- 归档到 Log 的使用
create table `table_user_input`(
`id` INT,
`name` INT,
`age` STRING)
WITH (
'dirty.side-output.connector' = 'log',
'dirty.ignore' = 'true',
'dirty.side-output.enable' = 'true',
'dirty.side-output.format' = 'csv',
'dirty.side-output.labels' = 'SYSTEM_TIME=${SYSTEM_TIME}&DIRTY_TYPE=${DIRTY_TYPE}&database=inlong&table=user',
'inlong.metric.labels' = 'groupId=1&streamId=1&nodeId=1',
'topic' = 'user_input',
'properties.bootstrap.servers' = 'localhost:9092',
'connector' = 'kafka-inlong',
'scan.startup.mode' = 'earliest-offset',
'json.timestamp-format.standard' = 'SQL',
'json.encode.decimal-as-plain-number' = 'true',
'json.map-null-key.literal' = 'null',
'json.ignore-parse-errors' = 'false',
'json.map-null-key.mode' = 'DROP',
'format' = 'json',
'json.fail-on-missing-field' = 'false',
'properties.group.id' = 'test_group');
CREATE TABLE `table_user_output`(
`id` INT,
`name` STRING,
`age` INT)
WITH (
'topic' = 'user_output',
'properties.bootstrap.servers' = 'localhost:9092',
'connector' = 'kafka-inlong',
'sink.ignore.changelog' = 'true',
'json.timestamp-format.standard' = 'SQL',
'json.encode.decimal-as-plain-number' = 'true',
'json.map-null-key.literal' = 'null',
'json.ignore-parse-errors' = 'true',
'json.map-null-key.mode' = 'DROP',
'format' = 'json',
'json.fail-on-missing-field' = 'true',
'dirty.ignore' = 'true',
'dirty.side-output.connector' = 'log',
'dirty.side-output.enable' = 'true',
'dirty.side-output.format' = 'csv',
'dirty.side-output.log.enable' = 'true',
'dirty.side-output.log-tag' = 'DirtyData',
'dirty.side-output.labels' = 'SYSTEM_TIME=${SYSTEM_TIME}&DIRTY_TYPE=${DIRTY_TYPE}&database=inlong&table=user');
INSERT INTO `table_user_output`
SELECT
`id`,
`name`,
`age`
FROM `table_user_input`;
-- 在这个例子中, 我们故意输入一条非json格式的数据,比如: 1,zhangsan,18,那么依据配置将在日志中打印如下脏数据:
[DirtyData] 2023-01-30 13:01:01 ValueDeserializeError,inlong,user,1,zhangsan,18
- 归档到 S3 的使用
create table `table_user_input`(
`id` INT,
`name` INT,
`age` STRING)
WITH (
'dirty.side-output.connector' = 's3',
'dirty.ignore' = 'true',
'dirty.side-output.enable' = 'true',
'dirty.side-output.format' = 'csv',
'dirty.side-output.labels' = 'SYSTEM_TIME=${SYSTEM_TIME}&DIRTY_TYPE=${DIRTY_TYPE}&database=inlong&table=user',
'dirty.side-output.s3.bucket' = 's3-test-bucket',
'dirty.side-output.s3.endpoint' = 's3.test.endpoint',
'dirty.side-output.s3.key' = 'dirty/test',
'dirty.side-output.s3.region' = 'region',
'dirty.side-output.s3.access-key-id' = 'access_key_id',
'dirty.side-output.s3.secret-key-id' = 'secret_key_id',
'dirty.identifier' = 'inlong-user-${SYSTEM_TIME}',
'inlong.metric.labels' = 'groupId=1&streamId=1&nodeId=1',
'topic' = 'user_input',
'properties.bootstrap.servers' = 'localhost:9092',
'connector' = 'kafka-inlong',
'scan.startup.mode' = 'earliest-offset',
'json.timestamp-format.standard' = 'SQL',
'json.encode.decimal-as-plain-number' = 'true',
'json.map-null-key.literal' = 'null',
'json.ignore-parse-errors' = 'false',
'json.map-null-key.mode' = 'DROP',
'format' = 'json',
'json.fail-on-missing-field' = 'false',
'properties.group.id' = 'test_group');
CREATE TABLE `table_user_output`(
`id` INT,
`name` STRING,
`age` INT)
WITH (
'topic' = 'user_output',
'properties.bootstrap.servers' = 'localhost:9092',
'connector' = 'kafka-inlong',
'sink.ignore.changelog' = 'true',
'json.timestamp-format.standard' = 'SQL',
'json.encode.decimal-as-plain-number' = 'true',
'json.map-null-key.literal' = 'null',
'json.ignore-parse-errors' = 'true',
'json.map-null-key.mode' = 'DROP',
'format' = 'json',
'json.fail-on-missing-field' = 'true',
'dirty.side-output.connector' = 's3',
'dirty.ignore' = 'true',
'dirty.side-output.enable' = 'true',
'dirty.side-output.format' = 'csv',
'dirty.side-output.labels' = 'SYSTEM_TIME=${SYSTEM_TIME}&DIRTY_TYPE=${DIRTY_TYPE}&database=inlong&table=user',
'dirty.side-output.s3.bucket' = 's3-test-bucket',
'dirty.side-output.s3.endpoint' = 's3.test.endpoint',
'dirty.side-output.s3.key' = 'dirty/test',
'dirty.side-output.s3.region' = 'region',
'dirty.side-output.s3.access-key-id' = 'access_key_id',
'dirty.side-output.s3.secret-key-id' = 'secret_key_id',
'dirty.identifier' = 'inlong-user-${SYSTEM_TIME}');
INSERT INTO `table_user_output`
SELECT
`id`,
`name`,
`age`
FROM `table_user_input`;
-- 在这个例子中, 我们故意输入一条非json格式的数据,比如: 1,zhangsan,18,那么依据配置将向s3中写入如下脏数据(文件路径为: dirty/test/inlong-user-2023-01-01130101xxxx.txt, xxxx为4位随机序列):
[DirtyData] 2023-01-30 13:01:01 ValueDeserializeError,inlong,user,1,zhangsan,18