Skip to main content
版本:Next

参数配置说明

TransformConfig 配置说明

public class TransformConfig {

@JsonProperty("sourceInfo")
private SourceInfo sourceInfo; //数据源的解码定义
@JsonProperty("sinkInfo")
private SinkInfo sinkInfo; //数据结果的编码定义
@JsonProperty("transformSql")
private String transformSql; //数据转换的 SQL
}

SourceInfo 配置说明

CSV

    public CsvSourceInfo(
@JsonProperty("charset") String charset, //字符集
@JsonProperty("delimiter") String delimiter, //分隔符
@JsonProperty("escapeChar") String escapeChar, //转义符,如果为空,则解码时不进行反转义操作
@JsonProperty("fields") List<FieldInfo> fields) //字段列表,如果为空,则解码时默认按分隔符解析,字段名按 $1、$2、$3...来分配,注意从1开始;
//如果字段定义数少于解码出来的字段数,则多出的字段数将会被丢弃
);

KV

    public KvSourceInfo(
@JsonProperty("charset") String charset, //字符集
@JsonProperty("fields") List<FieldInfo> fields) //字段列表,如果为空,则解码时默认按 KV 里的 Key 作为字段名
//如果字段名不匹配解码出来的字段名,则字段值为空,多出来的字段名会被丢弃
);

ProtoBuffer

    public PbSourceInfo(
@JsonProperty("charset") String charset, //字符集
@JsonProperty("protoDescription") String protoDescription, // ProtoBuf 协议描述的 Base64 编码
@JsonProperty("rootMessageType") String rootMessageType, //解码源数据的 MessageType ,MessageType 需要在 ProtoBuf 协议中已定义
@JsonProperty("rowsNodePath") String rowsNodePath) // ProtoBuf 协议包含多条待转换数据的数组节点路径
);

生成ProtoBuf 协议描述

  • 安装 Protocol Buffers compiler
PB_REL="https://github.com/protocolbuffers/protobuf/releases"
curl -LO $PB_REL/download/v3.15.8/protoc-3.15.8-linux-x86_64.zip
unzip protoc-3.15.8-linux-x86_64.zip -d $HOME/.local
export PATH="$HOME/.local/bin:$PATH"
protoc --version
#显示libprotoc 3.15.8
  • 解析协议生成描述的 Base64 编码
# transform.proto 是 proto 协议文件,transform.description 是协议解析后的二进制描述文件
protoc --descriptor_set_out=transform.description ./transform.proto --proto_path=.
# 将协议解析后的二进制描述文件 transform.description 进行 Base64 编码,写入文件 transform.base64,这个文件内容就是配置接口中的参数:protoDescription
base64 transform.description |tr -d '\n' > transform.base64
  • transform.proto 样例
syntax = "proto3";
package test;
message SdkMessage {
bytes msg = 1;
int64 msgTime = 2;
map<string, string> extinfo = 3;
}
message SdkDataRequest {
string sid = 1;
repeated SdkMessage msgs = 2;
uint64 packageID = 3;
}
  • transform.base64 样例
CrcCCg90cmFuc2Zvcm0ucHJvdG8SBHRlc3QirQEKClNka01lc3NhZ2USEAoDbXNnGAEgASgMUgNtc2cSGAoHbXNnVGltZRgCIAEoA1IHbXNnVGltZRI3CgdleHRpbmZvGAMgAygLMh0udGVzdC5TZGtNZXNzYWdlLkV4dGluZm9FbnRyeVIHZXh0aW5mbxo6CgxFeHRpbmZvRW50cnkSEAoDa2V5GAEgASgJUgNrZXkSFAoFdmFsdWUYAiABKAlSBXZhbHVlOgI4ASJmCg5TZGtEYXRhUmVxdWVzdBIQCgNzaWQYASABKAlSA3NpZBIkCgRtc2dzGAIgAygLMhAudGVzdC5TZGtNZXNzYWdlUgRtc2dzEhwKCXBhY2thZ2VJRBgDIAEoBFIJcGFja2FnZUlEYgZwcm90bzM=
  • transform.description 样例

Json

    public JsonSourceInfo(
@JsonProperty("charset") String charset, //字符集
@JsonProperty("rowsNodePath") String rowsNodePath) // Json 协议包含多条待转换数据的数组节点路径
);

SinkInfo 配置说明

CSV

    public CsvSinkInfo(
@JsonProperty("charset") String charset, //字符集
@JsonProperty("delimiter") String delimiter, //分隔符
@JsonProperty("escapeChar") String escapeChar, //转义符,如果为空,则编码时不进行转义操作
@JsonProperty("fields") List<FieldInfo> fields) //字段列表,如果为空,则编码时默认按 TransformSQL 的 Select 字段顺序拼接
);

KV

    public KvSinkInfo(
@JsonProperty("charset") String charset, //字符集
@JsonProperty("fields") List<FieldInfo> fields) //字段列表,如果为空,则编码时默认按 TransformSQL 的 Select 字段 Alias 作为 Key 拼接
);

TransformSQL 配置说明

CSV / KV 字段引用

  • SourceInfo 没有配置字段列表。
    • CSV 格式,字段名用 $1、$2、$3... 来引用。
    • KV 格式,字段名直接引用源数据中的 Key。
  • SourceInfo 的字段名和 SinkInfo 字段名不一致,可以通过 Select 字段的别名映射转换。

ProtoBuf / Json 树形字段引用

  • 所有字段只能以 $root.、$child. 作为前缀,$root 表示根节点,$child 表示多行的数组节点。
  • 多级节点,用小数点分隔,如 $root.extParams.name。
  • 对于数组节点,用小括号标识数组下标,如 $root.msgs(1).msgTime。

运算符支持

  • 目前已支持运算符
    • 算术运算符,+、-、*、/、(、)
    • 比较运算符,=、!=、>、>=、<、<=
    • 逻辑运算符,and、or、!、not、(、)

函数说明

  • CONCAT(string1, string2, ...),返回连接 string1,string2, … 的字符串。如果有任一参数为 NULL,则返回 NULL。 例如 CONCAT('AA', 'BB', 'CC') 返回 "AABBCC"。
  • NOW(),返回本地时区的当前 SQL 时间戳。
  • 详见函数说明章节。

SQL 样例

select ftime,extinfo from source where extinfo='ok'

select $1 ftime,$2 extinfo from source where $2!='ok'

select $root.sid,$root.packageID,$child.msgTime,$child.msg from source

select $root.sid,$root.packageID,$root.msgs(1).msgTime,$root.msgs(0).msg from source

select $root.sid,
($root.msgs(1).msgTime-$root.msgs(0).msgTime)/$root.packageID field2,
$root.packageID*($root.msgs(0).msgTime*$root.packageID+$root.msgs(1).msgTime/$root.packageID)*$root.packageID field3,
$root.msgs(0).msg field4
from source
where $root.packageID<($root.msgs(0).msgTime+$root.msgs(1).msgTime+$root.msgs(0).msgTime+$root.msgs(1).msgTime)

select $root.sid,
$root.packageID,
$child.msgTime,
concat($root.sid,$root.packageID,$child.msgTime,$child.msg) msg,$root.msgs.msgTime.msg
from source

select now() from source

常见问题

  • SDK 调用是线程安全的。
  • 配置变更,直接修改配置对象的参数是不起效果的,需要重建 SDK 对象。
  • 如果 CSV、KV 格式的转换源数据里包含换行符、分隔符(竖线、逗号等)、反斜杠(转义符),需要配置正确的转义符和行分隔符。
    • 如果不配置,那么转换后数据的字段顺序会发生错乱,换行符会导致一条数据变两条,竖线分隔符会导致字段错位。
  • 避免在处理每条数据时都创建一个 SDK 对象,SDK 对象初始化需要编译转换 SQL 并建立 AST 语义解析树,频繁调用会引发性能问题,推荐的使用方式为在程序中复用一个初始化好的 SDK 对象处理数据。