Skip to main content
版本:Next

Pulsar

概述

Apache Pulsar是一个分布式、开源的 pub-sub 消息传递和流平台,用于实时工作负载,每天管理数千亿个事件。

版本

抽取节点版本
PulsarPulsar:> = 2.8.x

依赖项

<dependency>
<groupId>org.apache.inlong</groupId>
<artifactId>sort-connector-pulsar</artifactId>
<version>2.1.0-SNAPSHOT</version>
</dependency>

用法

SQL API 用法

Step.1 准备好 sql 客户端

SQL Client用于编写用于在 Pulsar 中操作数据的 SQL 查询,您可以使用添加-addclasspath包的选项sort-connector-pulsar-{{INLONG_VERSION}}.jar

例子

./bin/sql-client.sh embedded --jar sort-connector-pulsar_{{INLONG_VERSION}}.jar

注意如果你把我们连接器的JAR包放在下面$FLINK_HOME/lib,不用--jar再用指定连接器的包了。

Step.2 从Pulsar读取数据

CREATE TABLE pulsar (
`physical_1` STRING,
`physical_2` INT,
`eventTime` TIMESTAMP(3) METADATA,
`properties` MAP<STRING, STRING> METADATA ,
`topic` STRING METADATA VIRTUAL,
`sequenceId` BIGINT METADATA VIRTUAL,
`key` STRING ,
`physical_3` BOOLEAN
) WITH (
'connector' = 'pulsar-inlong',
'topic' = 'persistent://public/default/topic82547611',
'key.format' = 'raw',
'key.fields' = 'key',
'value.format' = 'avro',
'service-url' = 'pulsar://localhost:6650',
'admin-url' = 'http://localhost:8080',
'scan.startup.mode' = 'earliest'
)

INSERT INTO `sink_table`
SELECT
`physical_1` AS `physical_1`,
`physical_2` AS `physical_2`
FROM `pulsar`
INSERT INTO pulsar
VALUES
('data 1', 1, TIMESTAMP '2020-03-08 13:12:11.123', MAP['k11', 'v11', 'k12', 'v12'], 'key1', TRUE),
('data 2', 2, TIMESTAMP '2020-03-09 13:12:11.123', MAP['k21', 'v21', 'k22', 'v22'], 'key2', FALSE),
('data 3', 3, TIMESTAMP '2020-03-10 13:12:11.123', MAP['k31', 'v31', 'k32', 'v32'], 'key3', TRUE)

Inlong Dashboard 用法

TODO

InLong Manager Client 方式

TODO

Pulsar Extract 节点参数

ParameterRequiredDefault valueTypeDescription
connector必需(none)String设置连接器类型。可用的选项是pulsar-inlong
topic可选(none)String设置输入或输出主题,多个和连接主题使用半逗号。选择一个主题模式。Set the input or output topic, use half comma for multiple and concatenate topics. Choose one with the topic-pattern.
topic-pattern可选(none)String使用正则获取匹配的主题。
service-url必需(none)String设置 Pulsar 代理服务地址。
admin-url可选(none)String设置 Pulsar 管理服务地址。不传入该参数时,启动模式只支持earliestlatest,并且无法更新 Pulsar Topic 的 offset。
scan.startup.mode可选latestString配置 Source 的启动模式。可用选项为earliestlatestexternal-subscriptionspecific-offsets
scan.startup.specific-offsets可选(none)String指定参数时需要该specific-offsets参数。
scan.startup.sub-name可选(none)String指定参数时需要该external-subscription参数。
discovery topic interval可选(none)Long设置分区发现的时间间隔,单位为毫秒。
sink.message-router可选key-hashString设置将消息写入 Pulsar 分区的路由方式。可用选项为key-hashround-robincustom MessageRouter
sink.semantic可选at-least-onceStringSink 写入消息的保证级别。可用选项为at-least-onceexactly-oncenone
properties可选emptyMap设置 Pulsar 的可选配置,格式为properties.key='value'. 有关详细信息,请参阅配置参数
key.format可选(none)String为 Pulsar 消息设置基于键的序列化格式。可用选项有No formatoptional rawAvroJSON等。
key.fields可选(none)String序列化Key时要使用的SQL定义字段,多个半逗号,连接。
key.fields-prefix可选(none)String为 key 格式的所有字段定义自定义前缀,以避免与 value 格式的字段名称冲突。默认情况下,前缀为空。如果定义了自定义前缀,key.fields则使用表架构和。
format or value.format必需(none)String使用前缀设置名称。当以键格式构造数据类型时,前缀被移除,并且在键格式中使用非前缀名称。Pulsar 消息值序列化格式,支持 JSON、Avro 等。更多信息请参见 Flink 格式。
value.fields-include可选ALLEnumPulsar 消息值包含字段策略、可选的 ALL 和 EXCEPT_KEY。
inlong.metric.labels可选(none)Stringinlong metric 的标签值,该值的构成为groupId={groupId}&streamId={streamId}&nodeId={nodeId}

可用元数据

METADATA 标志用于读取和写入 Pulsar 消息中的元数据。支持列表如下。

注意 R/W 列定义元数据字段是否可读 (R) 和/或可写 (W)。只读列必须声明为 VIRTUAL 以在 INSERT INTO 操作期间排除它们。

关键字数据类型描述读/写
topicSTRING NOT NULLPulsar 消息的主题名称R
messageIdBYTES NOT NULLPulsar 消息的消息 IDR
sequenceIdBIGINT NOT NULLPulsar 消息的序列 IDR
publishTimeTIMESTAMP(3) WITH LOCAL TIME ZONE NOT NULLPulsar 消息的发布时间R
eventTimeTIMESTAMP(3) WITH LOCAL TIME ZONE NOT NULLPulsar 消息的生成时间R/W
propertiesMAP<STRING, STRING> NOT NULLPulsar 消息的扩展信息R/W

数据类型映射

Pulsar 将消息键和值存储为字节,因此 Pulsar 没有 schema 或数据类型。Pulsar 消息按格式进行反序列化和序列化,例如 csv、json、avro。因此,数据类型映射由特定格式确定。有关格式详细信息,请参阅格式页面。