Skip to main content
版本:Next

监控指标

概览

我们为节点增加了指标计算。 用户添加 with 选项 inlong.metric.labels 后 Sort 会计算指标,inlong.metric.labels 选项的值由三部分构成:groupId={groupId}&streamId={streamId}&nodeId={nodeId}。 用户可以使用 metric reporter 去上报数据。

指标

支持的 extract 节点

指标名Extract 节点描述
groupId_streamId_nodeId_numRecordsInkafka,mongodb-cdc,mysql-cdc,oracle-cdc,postgresql-cdc,pulsar,sqlserver-cdc输入记录数
groupId_streamId_nodeId_numBytesInkafka,mongodb-cdc,mysql-cdc,oracle-cdc,postgresql-cdc,pulsar,sqlserver-cdc输入字节数
groupId_streamId_nodeId_numRecordsInPerSecondkafka,mongodb-cdc,mysql-cdc,oracle-cdc,postgresql-cdc,pulsar,sqlserver-cdc每秒输入记录数
groupId_streamId_nodeId_numBytesInPerSecondkafka,mongodb-cdc,mysql-cdc,oracle-cdc,postgresql-cdc,pulsar,sqlserver-cdc每秒输入字节数

支持表级别指标

它是用于整库同步场景

指标名Extract 节点描述
groupId_streamId_nodeId_database_table_numRecordsInmysql-cdc输入记录数
groupId_streamId_nodeId_database_schema_table_numRecordsInoracle-cdc,postgresql-cdc输入记录数
groupId_streamId_nodeId_database_collection_numRecordsInmongodb-cdc输入记录数
groupId_streamId_nodeId_database_table_numBytesInmysql-cdc输入字节数
groupId_streamId_nodeId_database_schema_table_numBytesInoracle-cdc,postgresql-cdc输入字节数
groupId_streamId_nodeId_database_collection_numBytesInmongodb-cdc输入字节数
groupId_streamId_nodeId_database_table_numRecordsInPerSecondmysql-cdc每秒输入记录数
groupId_streamId_nodeId_database_schema_table_numRecordsInPerSecondoracle-cdc,postgresql-cdc每秒输入记录数
groupId_streamId_nodeId_database_collection_numRecordsInPerSecondmongodb-cdc每秒输入记录数
groupId_streamId_nodeId_database_table_numBytesInPerSecondmysql-cdc每秒输入字节数
groupId_streamId_nodeId_database_schema_table_numBytesInPerSecondoracle-cdc,postgresql-cdc每秒输入字节数
groupId_streamId_nodeId_database_collection_numBytesInPerSecondmongodb-cdc每秒输入字节数
groupId_streamId_nodeId_database_collection_numSnapshotCreatepostgresql-cdc,pulsar,mongodb-cdc,sqlserver-cdc尝试创建Checkpoint数
groupId_streamId_nodeId_database_collection_numSnapshotErrorpostgresql-cdc,pulsar,mongodb-cdc,sqlserver-cdc创建Checkpoint异常数
groupId_streamId_nodeId_database_collection_numSnapshotCompletepostgresql-cdc,pulsar,mongodb-cdc,sqlserver-cdc创建Checkpoint成功数
groupId_streamId_nodeId_database_collection_snapshotToCheckpointTimeLagpostgresql-cdc,pulsar,mongodb-cdc,sqlserver-cdc从开始创建Checkpoint到完成创建延迟(毫秒)
groupId_streamId_nodeId_database_collection_numDeserializeSuccesspostgresql-cdc,pulsar,mongodb-cdc,sqlserver-cdc反序列化成功数
groupId_streamId_nodeId_database_collection_numDeserializeSuccesspostgresql-cdc,pulsar,mongodb-cdc,sqlserver-cdc反序列化异常数
groupId_streamId_nodeId_database_collection_deserializeTimeLagpostgresql-cdc,pulsar,mongodb-cdc,sqlserver-cdc反序列化延迟(毫秒)

支持的 load 节点

指标名Load 节点描述
groupId_streamId_nodeId_numRecordsOutclickhouse,elasticsearch,greenplum,hbase,
hdfs,hive,iceberg,kafka,
mysql,oracle,postgresql,sqlserver,tdsql-postgresql
输出记录数
groupId_streamId_nodeId_numBytesOutclickhouse,elasticsearch,greenplum,hbase,
hdfs,hive,iceberg,kafka,
mysql,oracle,postgresql,sqlserver,tdsql-postgresql
输出字节数
groupId_streamId_nodeId_numRecordsOutPerSecondclickhouse,elasticsearch,greenplum,
hbase,hdfs,hive,iceberg,
kafka,mysql,oracle,postgresql,sqlserver,tdsql-postgresql
每秒输出记录数
groupId_streamId_nodeId_numBytesOutPerSecondclickhouse,elasticsearch,greenplum,
hbase,hdfs,hive,iceberg,kafka,
mysql,oracle,postgresql,sqlserver,tdsql-postgresql
每秒输出字节数
groupId_streamId_nodeId_dirtyRecordsOutclickhouse,elasticsearch,greenplum,hbase,
hdfs,hive,iceberg,kafka,mysql,
oracle,postgresql,sqlserver,tdsql-postgresql
输出脏数据记录数
groupId_streamId_nodeId_dirtyBytesOutclickhouse,elasticsearch,greenplum,hbase,
hdfs,hive,iceberg,kafka,mysql,
oracle,postgresql,sqlserver,tdsql-postgresql
输出脏数据字节数

支持表级别指标

它是用于整库同步场景

指标名Load node描述
groupId_streamId_nodeId_database_table_numRecordsOutdoris,iceberg,starRocks输出记录数据
groupId_streamId_nodeId_database_schema_table_numRecordsOutpostgresql输出记录数据
groupId_streamId_nodeId_topic_numRecordsOutkafka输出记录数据
groupId_streamId_nodeId_database_table_numBytesOutdoris,iceberg,starRocks输出字节数据
groupId_streamId_nodeId_database_schema_table_numBytesOutpostgresql输出字节数据
groupId_streamId_nodeId_topic_numBytesOutkafka输出字节数据
groupId_streamId_nodeId_database_table_numRecordsOutPerSeconddoris,iceberg,starRocks每秒记录数据
groupId_streamId_nodeId_database_schema_table_numRecordsOutPerSecondpostgresql每秒记录数据
groupId_streamId_nodeId_topic_numRecordsOutPerSecondkafka每秒记录数据
groupId_streamId_nodeId_database_table_numBytesOutPerSeconddoris,iceberg,starRocks每秒输出字节数量
groupId_streamId_nodeId_database_schema_table_numBytesOutPerSecondpostgresql每秒输出字节数量
groupId_streamId_nodeId_topic_numBytesOutPerSecondkafka每秒输出字节数量
groupId_streamId_nodeId_database_table_dirtyRecordsOutdoris,iceberg,starRocks输出脏数据记录数
groupId_streamId_nodeId_database_schema_table_dirtyRecordsOutpostgresql输出脏数据记录数
groupId_streamId_nodeId_topic_dirtyRecordsOutkafka输出脏数据记录数
groupId_streamId_nodeId_database_table_dirtyBytesOutdoris,iceberg,starRocks输出脏数据字节数据
groupId_streamId_nodeId_database_schema_table_dirtyBytesOutpostgresql输出脏数据字节数据
groupId_streamId_nodeId_topic_dirtyBytesOutkafka输出脏数据字节数据
groupId_streamId_nodeId_numSerializeSuccessstarRocks序列化成功数
groupId_streamId_nodeId_numSerializeErrorstarRocks序列化异常数
groupId_streamId_nodeId_serializeTimeLagstarRocks序列化延迟(毫秒)
groupId_streamId_nodeId_numSnapshotCreatestarRocks尝试创建Checkpoint数
groupId_streamId_nodeId_numSnapshotErrorstarRocks创建Checkpoint异常数
groupId_streamId_nodeId_numSnapshotCompletestarRocks创建Checkpoint成功数
groupId_streamId_nodeId_snapshotToCheckpointTimeLagstarRocks从开始创建Checkpoint到完成创建延迟(毫秒)

用法

这里将介绍一个同步 MYSQL 数据到 PostgreSQL 的例子,同时介绍指标的使用。

  • flink sql 的使用

create table `table_groupId_streamId_nodeId1`(
`id` INT,
`name` INT,
`age` STRING,
PRIMARY KEY(`id`) NOT ENFORCED)
WITH (
'connector' = 'mysql-cdc-inlong',
'hostname' = 'xxxx',
'username' = 'xxx',
'password' = 'xxx',
'database-name' = 'test',
'scan.incremental.snapshot.enabled' = 'true',
'server-time-zone' = 'GMT+8',
'table-name' = 'user',
'inlong.metric' = 'mysqlGroup&mysqlStream&mysqlNode1'
);

CREATE TABLE `table_groupId_streamId_nodeId2`(
PRIMARY KEY (`id`) NOT ENFORCED,
`id` INT,
`name` STRING,
`age` INT)
WITH (
'connector' = 'jdbc-inlong',
'url' = 'jdbc:postgresql://ip:5432/postgres',
'username' = 'postgres',
'password' = 'inlong',
'table-name' = 'public.user',
'inlong.metric.labels' = 'groupId=xxgroup&streamId=xxstream&nodeId=xxnode'
);

INSERT INTO `table_groupId_streamId_nodeId2`
SELECT
`id`,
`name`,
`age`
FROM `table_groupId_streamId_nodeId1`;
  • 要将指标上报到外部系统,我们可以在 flink-conf.yaml 中添加 metric report 配置(以Prometheus为例)
metric.reporters: promgateway
metrics.reporter.promgateway.class: org.apache.flink.metrics.prometheus.PrometheusPushGatewayReporter
metrics.reporter.promgateway.host: ip
metrics.reporter.promgateway.port: 9091
metrics.reporter.promgateway.interval: 60 SECONDS

ipport 是你的 pushgateway 的配置。

  • 执行上面的sql后,我们可以访问 pushgateway 的 url: http://ip:port

当我们使用的 metric report 是 org.apache.flink.metrics.prometheus.PrometheusPushGatewayReporter 指标名将添加前缀 flink_taskmanager_job_task_operator
我们可以看到完整的指标名如下:
flink_taskmanager_job_task_operator_groupId_streamId_nodeId_numRecordsIn,
flink_taskmanager_job_task_operator_groupId_streamId_nodeId_numBytesIn,
flink_taskmanager_job_task_operator_groupId_streamId_nodeId_numRecordsInPerSecond,
flink_taskmanager_job_task_operator_groupId_streamId_nodeId_numBytesInPerSecond,
flink_taskmanager_job_task_operator_groupId_streamId_nodeId_numRecordsOut,
flink_taskmanager_job_task_operator_groupId_streamId_nodeId_numBytesOut,
flink_taskmanager_job_task_operator_groupId_streamId_nodeId_numRecordsOutPerSecond,
flink_taskmanager_job_task_operator_groupId_streamId_nodeId_numBytesOutPerSecond.