Skip to main content
Version: Next

Monitor Metrics

Overview

We add metric computing for node. Sort will compute metric when user just need add with option inlong.metric.labels that includes groupId={groupId}&streamId={streamId}&nodeId={nodeId}. Sort will export metric by flink metric group, So user can use metric reporter to get metric data.

Metric

Supporting extract node

Node level metric

metric nameextract nodedescription
groupId_streamId_nodeId_numRecordsInkafka,mongodb-cdc,mysql-cdc,oracle-cdc,postgresql-cdc,pulsar,sqlserver-cdcinput records number
groupId_streamId_nodeId_numBytesInkafka,mongodb-cdc,mysql-cdc,oracle-cdc,postgresql-cdc,pulsar,sqlserver-cdcinput bytes number
groupId_streamId_nodeId_numRecordsInPerSecondkafka,mongodb-cdc,mysql-cdc,oracle-cdc,postgresql-cdc,pulsar,sqlserver-cdcinput records per second
groupId_streamId_nodeId_numBytesInPerSecondkafka,mongodb-cdc,mysql-cdc,oracle-cdc,postgresql-cdc,pulsar,sqlserver-cdcinput bytes number per second

Table level metric

It is used for all database sync.

Metric nameExtract nodeDescription
groupId_streamId_nodeId_database_table_numRecordsInmysql-cdcinput records number
groupId_streamId_nodeId_database_schema_table_numRecordsInoracle-cdc,postgresql-cdcinput records number
groupId_streamId_nodeId_database_collection_numRecordsInmongodb-cdcinput records number
groupId_streamId_nodeId_database_table_numBytesInmysql-cdcinput records number
groupId_streamId_nodeId_database_schema_table_numBytesInoracle-cdc,postgresql-cdcinput records number
groupId_streamId_nodeId_database_collection_numBytesInmongodb-cdcinput bytes number
groupId_streamId_nodeId_database_table_numRecordsInPerSecondmysql-cdcinput records number per second
groupId_streamId_nodeId_database_schema_table_numRecordsInPerSecondoracle-cdc,postgresql-cdcinput records number per second
groupId_streamId_nodeId_database_collection_numRecordsInPerSecondmongodb-cdcinput records number per second
groupId_streamId_nodeId_database_table_numBytesInPerSecondmysql-cdcinput bytes number per second
groupId_streamId_nodeId_database_schema_table_numBytesInPerSecondoracle-cdc,postgresql-cdcinput bytes number per second
groupId_streamId_nodeId_database_collection_numBytesInPerSecondmongodb-cdcinput bytes number per second
groupId_streamId_nodeId_database_collection_numSnapshotCreatepostgresql-cdc,pulsar,mongodb-cdc,sqlserver-cdccheckpoint creation attempt number
groupId_streamId_nodeId_database_collection_numSnapshotErrorpostgresql-cdc,pulsar,mongodb-cdc,sqlserver-cdccheckpoint creation exception number
groupId_streamId_nodeId_database_collection_numSnapshotCompletepostgresql-cdc,pulsar,mongodb-cdc,sqlserver-cdcsuccessful checkpoint creation number
groupId_streamId_nodeId_database_collection_snapshotToCheckpointTimeLagpostgresql-cdc,pulsar,mongodb-cdc,sqlserver-cdctime lag from start to completion of checkpoint creation (ms)
groupId_streamId_nodeId_database_collection_numDeserializeSuccesspostgresql-cdc,pulsar,mongodb-cdc,sqlserver-cdcsuccessful deserialization number
groupId_streamId_nodeId_database_collection_numDeserializeErrorpostgresql-cdc,pulsar,mongodb-cdc,sqlserver-cdcdeserialization error number
groupId_streamId_nodeId_database_collection_deserializeTimeLagpostgresql-cdc,pulsar,mongodb-cdc,sqlserver-cdcdeserialization time lag (ms)

Supporting load node

Node level metric

Metric nameLoad nodeDescription
groupId_streamId_nodeId_numRecordsOutclickhouse,elasticsearch,greenplum,hbase,
hdfs,hive,iceberg,kafka,mysql,
oracle,postgresql,sqlserver,tdsql-postgresql
out records number
groupId_streamId_nodeId_numBytesOutclickhouse,elasticsearch,greenplum,hbase,
hdfs,hive,iceberg,kafka,mysql,
oracle,postgresql,sqlserver,tdsql-postgresql
output byte number
groupId_streamId_nodeId_numRecordsOutPerSecondclickhouse,elasticsearch,greenplum,hbase,
hdfs,hive,iceberg,kafka,mysql,
oracle,postgresql,sqlserver,tdsql-postgresql
output records per second
groupId_streamId_nodeId_numBytesOutPerSecondclickhouse,elasticsearch,greenplum,hbase,
hdfs,hive,iceberg,kafka,mysql,
oracle,postgresql,sqlserver,tdsql-postgresql
output bytes per second
groupId_streamId_nodeId_dirtyRecordsOutclickhouse,elasticsearch,greenplum,hbase,
hdfs,hive,iceberg,kafka,mysql,
oracle,postgresql,sqlserver,tdsql-postgresql
output records
groupId_streamId_nodeId_dirtyBytesOutclickhouse,elasticsearch,greenplum,hbase,
hdfs,hive,iceberg,kafka,mysql,
oracle,postgresql,sqlserver,tdsql-postgresql
output bytes

Table level metric

Metric nameLoad nodeDescription
groupId_streamId_nodeId_database_table_numRecordsOutdoris,iceberg,starRocksout records number
groupId_streamId_nodeId_database_schema_table_numRecordsOutpostgresqlout records number
groupId_streamId_nodeId_topic_numRecordsOutkafkaout records number
groupId_streamId_nodeId_database_table_numBytesOutdoris,iceberg,starRocksout byte number
groupId_streamId_nodeId_database_schema_table_numBytesOutpostgresqlout byte number
groupId_streamId_nodeId_topic_numBytesOutkafkaout byte number
groupId_streamId_nodeId_database_table_numRecordsOutPerSeconddoris,iceberg,starRocksout records number per second
groupId_streamId_nodeId_database_schema_table_numRecordsOutPerSecondpostgresqlout records number per second
groupId_streamId_nodeId_topic_numRecordsOutPerSecondkafkaout records number per second
groupId_streamId_nodeId_database_table_numBytesOutPerSeconddoris,iceberg,starRocksout bytes number per second
groupId_streamId_nodeId_database_schema_table_numBytesOutPerSecondpostgresqlout bytes number per second
groupId_streamId_nodeId_topic_numBytesOutPerSecondkafkaout bytes number per second
groupId_streamId_nodeId_database_table_dirtyRecordsOutdoris,iceberg,starRocksout records number
groupId_streamId_nodeId_database_schema_table_dirtyRecordsOutpostgresqlout records number
groupId_streamId_nodeId_topic_dirtyRecordsOutkafkaout records number
groupId_streamId_nodeId_database_table_dirtyBytesOutdoris,iceberg,starRocksout byte number
groupId_streamId_nodeId_database_schema_table_dirtyBytesOutpostgresqlout byte number
groupId_streamId_nodeId_topic_dirtyBytesOutkafkaout byte number
groupId_streamId_nodeId_numSerializeSuccessstarRockssuccessful deserialization number
groupId_streamId_nodeId_numSerializeErrorstarRocksdeserialization exception number
groupId_streamId_nodeId_serializeTimeLagstarRocksserialization time lag (ms)
groupId_streamId_nodeId_numSnapshotCreatestarRockscheckpoint creation attempt number
groupId_streamId_nodeId_numSnapshotErrorstarRockscheckpoint creation exception number
groupId_streamId_nodeId_numSnapshotCompletestarRockssuccessful checkpoint creation number
groupId_streamId_nodeId_snapshotToCheckpointTimeLagstarRockstime lag from start to completion of checkpoint creation (ms)

Usage

One example about sync mysql data to postgresql data. And We will introduce usage of metric.

  • use flink sql

create table `table_groupId_streamId_nodeId1`(
`id` INT,
`name` INT,
`age` STRING,
PRIMARY KEY(`id`) NOT ENFORCED)
WITH (
'connector' = 'mysql-cdc-inlong',
'hostname' = 'xxxx',
'username' = 'xxx',
'password' = 'xxx',
'database-name' = 'test',
'scan.incremental.snapshot.enabled' = 'true',
'server-time-zone' = 'GMT+8',
'table-name' = 'user',
'inlong.metric.labels' = 'groupId=xxgroup&streamId=xxstream&nodeId=xxnode'
);

CREATE TABLE `table_groupId_streamId_nodeId2`(
PRIMARY KEY (`id`) NOT ENFORCED,
`id` INT,
`name` STRING,
`age` INT)
WITH (
'connector' = 'jdbc-inlong',
'url' = 'jdbc:postgresql://ip:5432/postgres',
'username' = 'postgres',
'password' = 'inlong',
'table-name' = 'public.user',
'inlong.metric.labels' = 'groupId=pggroup&streamId=pgStream&nodeId=pgNode'
);

INSERT INTO `table_groupId_streamId_nodeId2`
SELECT
`id`,
`name`,
`age`
FROM `table_groupId_streamId_nodeId1`;
  • To report the metrics to an external system, we can add metric report in flink-conf.yaml. Take the Prometheus reporter as an example.
metric.reporters: promgateway
metrics.reporter.promgateway.class: org.apache.flink.metrics.prometheus.PrometheusPushGatewayReporter
metrics.reporter.promgateway.host: ip
metrics.reporter.promgateway.port: 9091
metrics.reporter.promgateway.interval: 60 SECONDS

ip and port is your pushgateway setting.

  • We can visit http://ip:port of pushgateway after execute flink sql. Metric name will add prefix flink_taskmanager_job_task_operator when metric report is org.apache.flink.metrics.prometheus.PrometheusPushGatewayReporter.
    We can see full metric name:
    flink_taskmanager_job_task_operator_groupId_streamId_nodeId_numRecordsIn,
    flink_taskmanager_job_task_operator_groupId_streamId_nodeId_numBytesIn,
    flink_taskmanager_job_task_operator_groupId_streamId_nodeId_numRecordsInPerSecond,
    flink_taskmanager_job_task_operator_groupId_streamId_nodeId_numBytesInPerSecond,
    flink_taskmanager_job_task_operator_groupId_streamId_nodeId_numRecordsOut,
    flink_taskmanager_job_task_operator_groupId_streamId_nodeId_numBytesOut,
    flink_taskmanager_job_task_operator_groupId_streamId_nodeId_numRecordsOutPerSecond,
    flink_taskmanager_job_task_operator_groupId_streamId_nodeId_numBytesOutPerSecond.