Skip to main content
版本:Next

Sort 扩展 Connector

总览

InLong Sort 是一个基于 Apache Flink SQL 的 ETL 服务。Flink SQL 强大的表达能力带来的高可扩展性、灵活性,基本上 Flink SQL 支持的语意,InLong Sort 都支持。 当 Flink SQL 内置的函数不满足需求时,还可通过 UDF 来扩展。这对于曾经使用过 SQL 尤其是 Flink SQL 的开发者非常友好。

本文介绍如何在 InLong Sort 中扩展一个新的 source(在 InLong 中抽象为 Extract Node)或一个新的 sink(在InLong中抽象为 Load Node )。 InLong Sort 架构的 UML 对象关系图如下:

sort_uml

其中各个组件的概念为:

名称描述
Group数据流组,包含多个数据流,一个 Group 代表一个数据接入
Stream数据流,一个数据流有具体的流向
GroupInfoSort 中对数据流向的封装,一个 GroupInfo 可包含多个 DataFlowInfo
StreamInfoSort 中数据流向的抽象,包含该数据流的各种来源、转换、去向等
Node数据同步中数据源、数据转换、数据去向的抽象
ExtractNode数据同步的来源端抽象
TransformNode数据同步的转换过程抽象
LoadNode数据同步的去向端抽象
NodeRelationShip数据同步中各个节点关系抽象
FieldRelationShip数据同步中上下游节点字段间关系的抽象
FieldInfo节点字段
MetaFieldInfo节点 Meta 字段
Function转换函数的抽象
FunctionParam函数的入参抽象
ConstantParam常量参数

扩展 Extract & Load Node

Extract 节点是基于 Apache Flink® 的 Source Connectors 用于从不同的源系统抽取数据。 Load 节点是基于 Apache Flink® 的 Sink Connectors 用于将数据加载到不同的存储系统。

Apache InLong Sort 启动时通过将一组 Extract 和 Load Node 配置翻译成对应的 Flink SQL 并提交到 Flink 集群,拉起用户指定的数据抽取和入库任务。

增加 Extract & Load Node 定义

自定义 Extract Node 需要继承 org.apache.inlong.sort.protocol.node.ExtractNode 类,自定义 Load Node 需要继承 org.apache.inlong.sort.protocol.node.LoadNode 类, 两者都至少需要选择性实现 org.apache.inlong.sort.protocol.node.Node 接口中的方法

方法名含义默认值
getId获取节点IDInlong StreamSource Id
getName获取节点名Inlong StreamSource Name
getFields获取字段信息Inlong Stream 定义的字段
getProperties获取节点额外属性空 Map
tableOptions获取 Flink SQL 表属性节点额外属性
genTableName生成 Flink SQL 表名无默认值
getPrimaryKey获取主键null
getPartitionFields获取分区字段null

扩展 ExtractNode

扩展一个 ExtractNode 分为三个步骤:

第一步:继承 ExtractNode 类,类的位置在:

inlong-sort/sort-common/src/main/java/org/apache/inlong/sort/protocol/node/ExtractNode.java

在实现的 ExtractNode 中指定 connector;

// 继承 ExtractNode 类,实现具体的类,例如 MongoExtractNode
@EqualsAndHashCode(callSuper = true)
@JsonTypeName("MongoExtract")
@Data
public class MongoExtractNode extends ExtractNode implements Serializable {
@JsonInclude(Include.NON_NULL)
@JsonProperty("primaryKey")
private String primaryKey;
...

@JsonCreator
public MongoExtractNode(@JsonProperty("id") String id, ...) { ... }

@Override
public Map<String, String> tableOptions() {
Map<String, String> options = super.tableOptions();
// 配置指定的 connector,这里指定的是 mongodb-cdc
options.put("connector", "mongodb-cdc");
...
return options;
}
}

第二步:在 ExtractNode 和 Node 中的 JsonSubTypes 添加该 Extract

// 在 ExtractNode 和 Node 的 JsonSubTypes 中添加字段
...
@JsonSubTypes({
@JsonSubTypes.Type(value = MongoExtractNode.class, name = "mongoExtract")
})
...
public abstract class ExtractNode implements Node{...}

...
@JsonSubTypes({
@JsonSubTypes.Type(value = MongoExtractNode.class, name = "mongoExtract")
})
public interface Node {...}

第三步:扩展 Sort Connector,查看此(inlong-sort/sort-connectors/mongodb-cdc)目录下是否已经存在对应的 connector。如果没有,则需要参考 Flink 官方文档 DataStream Connectors 来扩展, 调用已有的 Flink-connector(例如inlong-sort/sort-connectors/mongodb-cdc)或自行实现相关的 connector 均可。

扩展 Load Node

扩展一个 LoadNode 分为三个步骤:

第一步:继承 LoadNode 类,类的位置在:

inlong-sort/sort-common/src/main/java/org/apache/inlong/sort/protocol/node/LoadNode.java

在实现的LoadNode 中指定 connector;

// 继承 LoadNode 类,实现具体的类,例如 KafkaLoadNode
@EqualsAndHashCode(callSuper = true)
@JsonTypeName("kafkaLoad")
@Data
@NoArgsConstructor
public class KafkaLoadNode extends LoadNode implements Serializable {
@Nonnull
@JsonProperty("topic")
private String topic;
...

@JsonCreator
public KafkaLoadNode(@Nonnull @JsonProperty("topic") String topic, ...) {...}

// 根据不同的条件配置使用不同的 connector
@Override
public Map<String, String> tableOptions() {
...
if (format instanceof JsonFormat || format instanceof AvroFormat || format instanceof CsvFormat) {
if (StringUtils.isEmpty(this.primaryKey)) {
// kafka connector
options.put("connector", "kafka");
options.putAll(format.generateOptions(false));
} else {
// upsert-kafka connector
options.put("connector", "upsert-kafka");
options.putAll(format.generateOptions(true));
}
} else if (format instanceof CanalJsonFormat || format instanceof DebeziumJsonFormat) {
// kafka-inlong connector
options.put("connector", "kafka-inlong");
options.putAll(format.generateOptions(false));
} else {
throw new IllegalArgumentException("kafka load Node format is IllegalArgument");
}
return options;
}
}

第二步:在 LoadNode 和 Node 中的 JsonSubTypes 添加该 Load

// 在 LoadNode 和 Node 的 JsonSubTypes 中添加字段
...
@JsonSubTypes({
@JsonSubTypes.Type(value = KafkaLoadNode.class, name = "kafkaLoad")
})
...
public abstract class LoadNode implements Node{...}

...
@JsonSubTypes({
@JsonSubTypes.Type(value = KafkaLoadNode.class, name = "kafkaLoad")
})
public interface Node {...}

第三步:扩展 Sort Connector,Kafka 的 sort connector 在 inlong-sort/sort-connectors/kafka 目录下。

集成到 Entrance

将 Extract 和 Load 集成到 InLong Sort 主流程中。 InLong Sort 的入口类在:

inlong-sort/sort-core/src/main/java/org/apache/inlong/sort/Entrance.java

Extract 和 Load 如何集成至 InLong Sort,可参考下面的 UT,首先构建对应的 ExtractNode、LoadNode,再构建 NodeRelation、StreamInfo、GroupInfo,最后通过 FlinkSqlParser 执行。

public class MongoExtractToKafkaLoad extends AbstractTestBase {

// 构建 MongoExtractNode
private MongoExtractNode buildMongoNode() {
List<FieldInfo> fields = Arrays.asList(new FieldInfo("name", new StringFormatInfo()), ...);
return new MongoExtractNode(..., fields, ...);
}

// 构建 KafkaLoadNode
private KafkaLoadNode buildAllMigrateKafkaNode() {
List<FieldInfo> fields = Arrays.asList(new FieldInfo("name", new StringFormatInfo()), ...);
List<FieldRelation> relations = Arrays.asList(new FieldRelation(new FieldInfo("name", new StringFormatInfo()), ...), ...);
CsvFormat csvFormat = new CsvFormat();
return new KafkaLoadNode(..., fields, relations, csvFormat, ...);
}

// 构建 NodeRelation
private NodeRelation buildNodeRelation(List<Node> inputs, List<Node> outputs) {
List<String> inputIds = inputs.stream().map(Node::getId).collect(Collectors.toList());
List<String> outputIds = outputs.stream().map(Node::getId).collect(Collectors.toList());
return new NodeRelation(inputIds, outputIds);
}

// 测试主流程 MongoDB to Kafka
@Test
public void testMongoDbToKafka() throws Exception {
EnvironmentSettings settings = EnvironmentSettings. ... .build();
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
...
StreamTableEnvironment tableEnv = StreamTableEnvironment.create(env, settings);
Node inputNode = buildMongoNode();
Node outputNode = buildAllMigrateKafkaNode();
StreamInfo streamInfo = new StreamInfo("1", Arrays.asList(inputNode, outputNode), ...);
GroupInfo groupInfo = new GroupInfo("1", Collections.singletonList(streamInfo));
FlinkSqlParser parser = FlinkSqlParser.getInstance(tableEnv, groupInfo);
ParseResult result = parser.parse();
Assert.assertTrue(result.tryExecute());
}
}

同时,Sort 还增加了 InlongMetricMetadata 两个额外的接口用来支持更丰富的语义。

InlongMetric

如果自定义节点需要上报 Inlong 指标,则需要实现 org.apache.inlong.sort.protocol.InlongMetric 接口。 Inlong Sort 解析配置时会向 table option 中增加 'inlong.metric.labels' = 'groupId={g}&streamId={s}&nodeId={n}' 启动参数,并以此来配置 Inlong Audit。 详情请查看 如何集成 Inlong Audit 到自定义 Connector

Metadata

如果自定义节点需要指定某个字段为 Flink SQL 的 Metadata 字段,则需要实现 org.apache.inlong.sort.protocol.Metadata 接口。 Inlong Sort 解析配置时会自动将对应的字段标记为 Metadata。

Sort 基于 Apache Flink 1.15 版本实现。如何扩展 Apache Flink Connector 可以参考 User-defined Sources & Sinks

如何集成 Inlong Audit 到自定义 Connector

Inlong Sort 将指标上报的流程封装在了 org.apache.inlong.sort.base.metric.SourceExactlyMetricorg.apache.inlong.sort.base.metric.SinkExactlyMetric 类中。开发者只需要根据 Source/Sink 类型初始化对应的 Metric 对象,则可以实现指标上报。

通常的做法是在构造 Source/Sink 时传递例如 InLong Audit 地址,在初始化 Source/Sink 算子调用 open() 方法时进行初始化 SourceExactlyMetric/SinkExactlyMetric 对象。 在处理实际数据后再调用对应的审计上报方法。

public class StarRocksDynamicSinkFunctionV2<T> extends StarRocksDynamicSinkFunctionBase<T> {

private static final long serialVersionUID = 1L;
private static final Logger log = LoggerFactory.getLogger(StarRocksDynamicSinkFunctionV2.class);

private transient SinkExactlyMetric sinkExactlyMetric;

private String inlongMetric;
private String auditHostAndPorts;
private String auditKeys;
private String stateKey;

public StarRocksDynamicSinkFunctionV2(StarRocksSinkOptions sinkOptions,
TableSchema schema,
StarRocksIRowTransformer<T> rowTransformer, String inlongMetric,
String auditHostAndPorts, String auditKeys) {
this.sinkOptions = sinkOptions;

// pass the params of inlong audit
this.auditHostAndPorts = auditHostAndPorts;
this.inlongMetric = inlongMetric;
this.auditKeys = auditKeys;
}

@Override
public void open(Configuration parameters) {

// init SinkExactlyMetric in open()
MetricOption metricOption = MetricOption.builder().withInlongLabels(inlongMetric)
.withAuditAddress(auditHostAndPorts)
.withAuditKeys(auditKeys)
.build();

if (metricOption != null) {
sinkExactlyMetric = new SinkExactlyMetric(metricOption, getRuntimeContext().getMetricGroup());
}
}

@Override
public void invoke(T value, Context context)
throws IOException, ClassNotFoundException, JSQLParserException {
Object[] data = rowTransformer.transform(value, sinkOptions.supportUpsertDelete());

sinkManager.write(
null,
sinkOptions.getDatabaseName(),
sinkOptions.getTableName(),
serializer.serialize(schemaUtils.filterOutTimeField(data)));

// output audit after write data to sink
if (sinkExactlyMetric != null) {
sinkExactlyMetric.invoke(1, getDataSize(value), schemaUtils.getDataTime(data));
}
}