Skip to main content
版本:1.5.0

脏数据归档

概览

脏数据为在数据抽取、转换、加载过程中,因数据本身的质量问题导致数据无法正确抽取、转换、加载的数据, 常见的脏数据可能有字段类型、长度、个数不匹配,数据序列化、反序列化异常,目标端库、表不存在等。 脏数据归档可以将 InLong 数据流运行中的脏数据转存到第三方存储,便于业务查找问题。 目前 Sort 支持了 S3、Log 两种目标端的脏数据归档,同时支持 Kafka、Doris、Iceberg、Hbase、Hive、Elasticsearch、JDBC 等常见数据源的脏数据归档。

支持的节点

类型名称归档目标端
Extract 节点KafkaLog, S3
Load 节点HiveLog, S3
KafkaLog, S3
HBaseLog, S3
ClickHouseLog, S3
IcebergLog, S3
ElasticsearchLog, S3
PostgreSQLLog, S3
HDFSLog, S3
TDSQL PostgresLog, S3
DorisLog, S3
SQL ServerLog, S3
GreenplumLog, S3

脏数据格式化

在脏数据归档处理过程中,我们定义了下面的系统变量,用作脏数据的格式化:

  • SYSTEM_TIME: 系统时间,格式为 'yyyy-MM-dd HH:mm:ss'
  • DIRTY_TYPE:脏数据类型,常见的有 SerializeError、DeserializeError、DataTypeMappingError 等
  • DIRTY_MESSAGE: 脏数据信息,即脏数据产生的原因、异常信息等

归档到 Log 的格式化为:

  • [${dirty.side-output.log-tag}] ${value},其中 ${value} 为 ${dirty.side-output.labels} 和 ${脏数据} 合并,并按照 'csv' 或者 'json' 进行格式化后的值

归档到 S3 的格式化为:

  • S3 文件名生成格式为: ${dirty.side-output.s3.key}/${dirty.identifier}-${随机序列}.${文件后缀名}
  • S3 文件内数据格式为:会将 ${dirty.side-output.labels} 和 ${脏数据} 合并,并按照 'csv' 或者 'json' 进行格式化

:${dirty.side-output.log-tag}、${dirty.side-output.labels}、${dirty.identifier}、${dirty.side-output.s3.key} 等见下面配置详解。

配置

公共配置

参数是否必选默认值数据类型描述
dirty.ignore可选falseBoolean是否忽略脏数据,只有为 'true' 才能进行脏数据归档,默认为 'false'
dirty.side-output.enable可选falseBoolean是否支持脏数据归档,默认为 'false'
dirty.side-output.connector必选(none)String归档目标端名称,当 'dirty.side-output.enable' 为 'true' 时必须设置该值,目前仅支持 'log' 和 's3'。
dirty.side-output.format可选csvString脏数据格式化,目前仅支持 'csv' 和 'json',默认为 'csv'。
dirty.side-output.log.enable可选trueBoolean脏数据归档时是否支持日志打印, 默认为 'true'。
dirty.side-output.ignore-errors可选trueBoolean是否忽略脏数据归档中的错误,默认为 'true'。
dirty.identifier必选(none)String脏数据标识,用作 File 类型脏数据归档的文件名称生成或者 MQ 类型脏数据归档的 Topic 生成或者 数据库类型脏数据归档的 Tablename生成等。 它支持形如 ${variable} 变量替换,这里支持 SYSTEM_TIME、DIRTY_TYPE、DIRTY_MESSAGE 几种系统变量,其他的变量支持取决于具体的节点实现。
dirty.side-output.log-tag可选(none)String脏数据 Tag,用作日志打印时标识等,比如 [${logTag}] ${logLabels},${dirtydata}。 它支持形如 ${variable} 变量替换,这里支持 SYSTEM_TIME、DIRTY_TYPE、DIRTY_MESSAGE 几种系统变量,其他的变量支持取决于具体的节点实现。
dirty.side-output.labels可选(none)String脏数据 labels,用作日志打印标识,并将和脏数据一起归档等,比如 [${logTag}] ${logLabels},${dirtydata}。 它支持形如 ${variable} 变量替换,这里支持 SYSTEM_TIME、DIRTY_TYPE、DIRTY_MESSAGE 几种系统变量,其他的变量支持取决于具体的节点实现。
dirty.side-output.field-delimiter可选,String脏数据归档列分割符,用作 'csv' 等格式化场景,默认为 ','。
dirty.side-output.line-delimiter可选\nString脏数据归档行分割符,用作 'csv'、'json' 等格式化场景,默认为 '\n'。
dirty.side-output.batch.size可选100Integer脏数据归档缓存 Batch 条数,默认为 '100'。
dirty.side-output.batch.bytes可选10240Integer脏数据归档缓存 Batch 大小,单位为 Byte, 默认为 '10240' 即 10KB。
dirty.side-output.retries可选3Integer脏数据归档发生错误时重试次数,默认为 '3'。
dirty.side-output.batch.interval可选60000Integer脏数据归档间隔时间,单位为毫秒,默认为 '60000' 即 60s。

S3 归档配置

参数是否必选默认值数据类型描述
dirty.side-output.s3.endpoint必选(none)StringS3 归档的 Endpoint。
dirty.side-output.s3.region必选(none)StringS3 归档的 Region。
dirty.side-output.s3.bucket必选(none)StringS3 归档的 Bucket。
dirty.side-output.s3.key必选(none)StringS3 归档的 Key。
dirty.side-output.s3.access-key-id可选(none)StringS3 归档的 Access Key Id,若不配置该项,则需要在环境中配置好。
dirty.side-output.s3.secret-key-id可选(none)StringS3 归档的 Secret Key Id,若不配置该项,则需要在环境中配置好。

用法

这里将介绍一个同步 Kafka 数据到 Kafka 的例子,同时介绍脏数据归档的使用,其他节点类似。

  • 归档到 Log 的使用
 create table `table_user_input`(
`id` INT,
`name` INT,
`age` STRING)
WITH (
'dirty.side-output.connector' = 'log',
'dirty.ignore' = 'true',
'dirty.side-output.enable' = 'true',
'dirty.side-output.format' = 'csv',
'dirty.side-output.labels' = 'SYSTEM_TIME=${SYSTEM_TIME}&DIRTY_TYPE=${DIRTY_TYPE}&database=inlong&table=user',
'inlong.metric.labels' = 'groupId=1&streamId=1&nodeId=1',
'topic' = 'user_input',
'properties.bootstrap.servers' = 'localhost:9092',
'connector' = 'kafka-inlong',
'scan.startup.mode' = 'earliest-offset',
'json.timestamp-format.standard' = 'SQL',
'json.encode.decimal-as-plain-number' = 'true',
'json.map-null-key.literal' = 'null',
'json.ignore-parse-errors' = 'false',
'json.map-null-key.mode' = 'DROP',
'format' = 'json',
'json.fail-on-missing-field' = 'false',
'properties.group.id' = 'test_group');

CREATE TABLE `table_user_output`(
`id` INT,
`name` STRING,
`age` INT)
WITH (
'topic' = 'user_output',
'properties.bootstrap.servers' = 'localhost:9092',
'connector' = 'kafka-inlong',
'sink.ignore.changelog' = 'true',
'json.timestamp-format.standard' = 'SQL',
'json.encode.decimal-as-plain-number' = 'true',
'json.map-null-key.literal' = 'null',
'json.ignore-parse-errors' = 'true',
'json.map-null-key.mode' = 'DROP',
'format' = 'json',
'json.fail-on-missing-field' = 'true',
'dirty.ignore' = 'true',
'dirty.side-output.connector' = 'log',
'dirty.side-output.enable' = 'true',
'dirty.side-output.format' = 'csv',
'dirty.side-output.log.enable' = 'true',
'dirty.side-output.log-tag' = 'DirtyData',
'dirty.side-output.labels' = 'SYSTEM_TIME=${SYSTEM_TIME}&DIRTY_TYPE=${DIRTY_TYPE}&database=inlong&table=user');

INSERT INTO `table_user_output`
SELECT
`id`,
`name`,
`age`
FROM `table_user_input`;
-- 在这个例子中, 我们故意输入一条非json格式的数据,比如: 1,zhangsan,18,那么依据配置将在日志中打印如下脏数据:
[DirtyData] 2023-01-30 13:01:01 ValueDeserializeError,inlong,user,1,zhangsan,18
  • 归档到 S3 的使用
 create table `table_user_input`(
`id` INT,
`name` INT,
`age` STRING)
WITH (
'dirty.side-output.connector' = 's3',
'dirty.ignore' = 'true',
'dirty.side-output.enable' = 'true',
'dirty.side-output.format' = 'csv',
'dirty.side-output.labels' = 'SYSTEM_TIME=${SYSTEM_TIME}&DIRTY_TYPE=${DIRTY_TYPE}&database=inlong&table=user',
'dirty.side-output.s3.bucket' = 's3-test-bucket',
'dirty.side-output.s3.endpoint' = 's3.test.endpoint',
'dirty.side-output.s3.key' = 'dirty/test',
'dirty.side-output.s3.region' = 'region',
'dirty.side-output.s3.access-key-id' = 'access_key_id',
'dirty.side-output.s3.secret-key-id' = 'secret_key_id',
'dirty.identifier' = 'inlong-user-${SYSTEM_TIME}',
'inlong.metric.labels' = 'groupId=1&streamId=1&nodeId=1',
'topic' = 'user_input',
'properties.bootstrap.servers' = 'localhost:9092',
'connector' = 'kafka-inlong',
'scan.startup.mode' = 'earliest-offset',
'json.timestamp-format.standard' = 'SQL',
'json.encode.decimal-as-plain-number' = 'true',
'json.map-null-key.literal' = 'null',
'json.ignore-parse-errors' = 'false',
'json.map-null-key.mode' = 'DROP',
'format' = 'json',
'json.fail-on-missing-field' = 'false',
'properties.group.id' = 'test_group');

CREATE TABLE `table_user_output`(
`id` INT,
`name` STRING,
`age` INT)
WITH (
'topic' = 'user_output',
'properties.bootstrap.servers' = 'localhost:9092',
'connector' = 'kafka-inlong',
'sink.ignore.changelog' = 'true',
'json.timestamp-format.standard' = 'SQL',
'json.encode.decimal-as-plain-number' = 'true',
'json.map-null-key.literal' = 'null',
'json.ignore-parse-errors' = 'true',
'json.map-null-key.mode' = 'DROP',
'format' = 'json',
'json.fail-on-missing-field' = 'true',
'dirty.side-output.connector' = 's3',
'dirty.ignore' = 'true',
'dirty.side-output.enable' = 'true',
'dirty.side-output.format' = 'csv',
'dirty.side-output.labels' = 'SYSTEM_TIME=${SYSTEM_TIME}&DIRTY_TYPE=${DIRTY_TYPE}&database=inlong&table=user',
'dirty.side-output.s3.bucket' = 's3-test-bucket',
'dirty.side-output.s3.endpoint' = 's3.test.endpoint',
'dirty.side-output.s3.key' = 'dirty/test',
'dirty.side-output.s3.region' = 'region',
'dirty.side-output.s3.access-key-id' = 'access_key_id',
'dirty.side-output.s3.secret-key-id' = 'secret_key_id',
'dirty.identifier' = 'inlong-user-${SYSTEM_TIME}');

INSERT INTO `table_user_output`
SELECT
`id`,
`name`,
`age`
FROM `table_user_input`;
-- 在这个例子中, 我们故意输入一条非json格式的数据,比如: 1,zhangsan,18,那么依据配置将向s3中写入如下脏数据(文件路径为: dirty/test/inlong-user-2023-01-01130101xxxx.txt, xxxx为4位随机序列):
[DirtyData] 2023-01-30 13:01:01 ValueDeserializeError,inlong,user,1,zhangsan,18