Skip to main content
版本:Next

总览

  • InLong 审计是独立于 InLong 的一个子系统,对 InLong 系统的 Agent、DataProxy、Sort 模块的入流量、出流量进行实时审计对账。
  • 对账的粒度有分钟、10分钟、30分钟、小时、天等等。

审计对账以日志上报时间为统一的口径,参与审计的各个服务将按照相同的日志时间进行实时对账。通过审计对账,我们可以清晰的了解 InLong 各个模块的传输情况,以及数据流是否有丢失或者重复

架构

  • 审计SDK嵌套在需要审计的服务,对服务进行审计,将审计结果发送到审计接入层。
  • 审计接入层将审计数据写到 MQ (Pulsar、Kafka 或者 TubeMQ)。
  • 分发服务消费 MQ 的审计数据,将审计数据写到 MySQL、StarRocks。
  • 接口层将 MySQL、StarRocks 的数据进行实时聚合并且 cache,对外提供 OpenAPI。
  • 应用场景主要包括报表展示、审计对账等等。
  • 支持数据补录场景的审计对账。
  • 支持 Flink CheckPoint 场景的审计对账。

模块

模块描述
audit-sdk审计埋点上报,各个模块使用该 SDK 上报审计数据
audit-proxy审计代理层,接收 SDK 上报数据,转发到 MQ (pulsar / kafka / tubeMQ)
audit-store审计存储层,支持通用的 JDBC 协议
audit-service审计服务层,提供聚合、cache、OpenAPI 等能力

审计维度

机器 ip容器 ID线程 ID日志时间(分钟)审计 IDinlong_group_idinlong_stream_id条数大小传输时延(ms)

审计项 ID

每个模块的接收与发送分别为一个独立的审计项 ID

InLong 服务模块审计 ID
InLong API 接收成功1
InLong API 发送成功2
InLong Agent 接收成功3
InLong Agent 发送成功4
InLong DataProxy 接收成功5
InLong DataProxy 发送成功6

审计数据存储

Audit Store 能够支持所有兼容 JDBC 协议的存储组件的写入操作。因此,在选择兼容 JDBC 协议的存储组件时,只需确保其满足以下 Schema 即可:

CREATE TABLE IF NOT EXISTS `audit_data`
(
`id` int(32) NOT NULL PRIMARY KEY AUTO_INCREMENT COMMENT 'Incremental primary key',
`ip` varchar(32) NOT NULL DEFAULT '' COMMENT 'Client IP',
`docker_id` varchar(100) NOT NULL DEFAULT '' COMMENT 'Client docker id',
`thread_id` varchar(50) NOT NULL DEFAULT '' COMMENT 'Client thread id',
`sdk_ts` TIMESTAMP NOT NULL DEFAULT CURRENT_TIMESTAMP COMMENT 'SDK timestamp',
`packet_id` BIGINT NOT NULL DEFAULT '0' COMMENT 'Packet id',
`log_ts` TIMESTAMP NOT NULL DEFAULT CURRENT_TIMESTAMP COMMENT 'Log timestamp',
`inlong_group_id` varchar(100) NOT NULL DEFAULT '' COMMENT 'The target inlong group id',
`inlong_stream_id` varchar(100) NOT NULL DEFAULT '' COMMENT 'The target inlong stream id',
`audit_id` varchar(100) NOT NULL DEFAULT '' COMMENT 'Audit id',
`audit_tag` varchar(100) DEFAULT '' COMMENT 'Audit tag',
`audit_version` BIGINT DEFAULT -1 COMMENT 'Audit version',
`count` BIGINT NOT NULL DEFAULT '0' COMMENT 'Message count',
`size` BIGINT NOT NULL DEFAULT '0' COMMENT 'Message size',
`delay` BIGINT NOT NULL DEFAULT '0' COMMENT 'Message delay count',
`update_time` timestamp NOT NULL DEFAULT CURRENT_TIMESTAMP ON UPDATE CURRENT_TIMESTAMP COMMENT 'Update time',
INDEX group_stream_audit_id (`inlong_group_id`, `inlong_stream_id`, `audit_id`, `log_ts`)
) ENGINE = InnoDB
DEFAULT CHARSET = UTF8 COMMENT ='InLong audit data table';
  • ip:表示客户端的 IP 地址;
  • docker_id:长度为 100 的字符串,表示客户端的 Docker ID;
  • thread_id:长度为 50 的字符串,表示客户端的线程 ID;
  • sdk_ts:TIMESTAMP 类型,表示 SDK 的时间戳,默认值为当前时间戳;
  • packet_id:64 位整数,表示数据包的ID;
  • log_ts:TIMESTAMP 类型,表示日志的时间戳,默认值为当前时间戳;
  • inlong_group_id:长度为 100 的字符串,表示目标 Inlong 组的 ID;
  • inlong_stream_id:长度为 100 的字符串,表示目标 Inlong 流的 ID;
  • audit_id:长度为 100 的字符串,表示审计 ID;
  • audit_tag:长度为 100 的字符串,表示审计标签,默认为空字符串;
  • audit_version:64 位整数,表示审计版本,默认值为-1;
  • count:64 位整数,表示消息数量,默认值为 0;
  • size:64 位整数,表示消息大小,默认值为 0;
  • delay:64 位整数,表示消息延迟数量,默认值为 0;
  • update_time:TIMESTAMP 类型,表示更新时间,默认值为当前时间戳,当记录被更新时自动更新。