Skip to main content
Version: 1.12.0



HDFS uses the general capabilities of flink's fileSystem to support single files and partitioned files. The file system connector itself is included in Flink and does not require an additional dependency. The corresponding jar can be found in the Flink distribution inside the /lib directory. A corresponding format needs to be specified for reading and writing rows from and to a file system.

How to create a HDFS Load Node

Usage for SQL API

The example below shows how to create a HDFS Load Node with Flink SQL Cli :

CREATE TABLE hdfs_load_node (
name STRING,
`hour` STRING
) PARTITIONED BY (dt, `hour`) WITH (
'sink.partition-commit.delay'='1 h',

File Formats

  • CSV(Uncompressed)
  • JSON(JSON format for file system connector is not a typical JSON file but uncompressed newline delimited JSON.)
  • Avro(Support compression by configuring avro.codec.)
  • Parquet(Compatible with Hive.)
  • Orc(Compatible with Hive.)
  • Debezium-JSON
  • Canal-JSON
  • Raw

Rolling Policy

Data within the partition directories are split into part files. Each partition will contain at least one part file for each subtask of the sink that has received data for that partition. The in-progress part file will be closed and additional part file will be created according to the configurable rolling policy. The policy rolls part files based on size, a timeout that specifies the maximum duration for which a file can be open.

sink.rolling-policy.file-size128MBMemorySizeThe maximum part file size before rolling.
sink.rolling-policy.rollover-interval30 minStringThe maximum time duration a part file can stay open before rolling (by default 30 min to avoid to many small files). The frequency at which this is checked is controlled by the 'sink.rolling-policy.check-interval' option.
sink.rolling-policy.check-interval1 minStringThe interval for checking time based rolling policies. This controls the frequency to check whether a part file should rollover based on 'sink.rolling-policy.rollover-interval'.

File Compaction

The file sink supports file compactions, which allows applications to have smaller checkpoint intervals without generating a large number of files.

auto-compactionfalseBooleanWhether to enable automatic compaction in streaming sink or not. The data will be written to temporary files. After the checkpoint is completed, the temporary files generated by a checkpoint will be compacted. The temporary files are invisible before compaction.
compaction.file-size(none)StringThe compaction target file size, the default value is the rolling file size.
inlong.metric.labels(none)StringInlong metric label, format of value is groupId=[groupId]&streamId=[streamId]&nodeId=[nodeId].

Partition Commit

After writing a partition, it is often necessary to notify downstream applications. For example, add the partition to a Hive metastore or writing a _SUCCESS file in the directory. The file system sink contains a partition commit feature that allows configuring custom policies. Commit actions are based on a combination of triggers and policies.

sink.partition-commit.triggerprocess-timeStringTrigger type for partition commit: 'process-time': based on the time of the machine, it neither requires partition time extraction nor watermark generation. Commit partition once the 'current system time' passes 'partition creation system time' plus 'delay'. 'partition-time': based on the time that extracted from partition values, it requires watermark generation. Commit partition once the 'watermark' passes 'time extracted from partition values' plus 'delay'.
sink.partition-commit.delay0 sDurationThe partition will not commit until the delay time. If it is a daily partition, should be '1 d', if it is a hourly partition, should be '1 h'.
sink.partition-commit.watermark-time-zoneUTCStringThe time zone to parse the long watermark value to TIMESTAMP value, the parsed watermark timestamp is used to compare with partition time to decide the partition should commit or not. This option is only take effect when `sink.partition-commit.trigger` is set to 'partition-time'. If this option is not configured correctly, e.g. source rowtime is defined on TIMESTAMP_LTZ column, but this config is not configured, then users may see the partition committed after a few hours. The default value is 'UTC', which means the watermark is defined on TIMESTAMP column or not defined. If the watermark is defined on TIMESTAMP_LTZ column, the time zone of watermark is the session time zone. The option value is either a full name such as 'America/Los_Angeles', or a custom timezone id such as 'GMT-08:00'.

Partition Commit Policy

The partition strategy defines the specific operation of partition submission.

  • metastore:This strategy is only supported when hive.
  • success: The '_SUCCESS' file will be generated after the part file is generated.
sink.partition-commit.policy.kindoptional(none)StringPolicy to commit a partition is to notify the downstream application that the partition has finished writing, the partition is ready to be read. metastore: add partition to metastore. Only hive table supports metastore policy, file system manages partitions through directory structure. success-file: add '_success' file to directory. Both can be configured at the same time: 'metastore,success-file'. custom: use policy class to create a commit policy. Support to configure multiple policies: 'metastore,success-file'.
sink.partition-commit.policy.classoptional(none)StringThe partition commit policy class for implement PartitionCommitPolicy interface. Only work in custom commit policy.
sink.partition-commit.success-file.nameoptional_SUCCESSStringThe file name for success-file partition commit policy, default is '_SUCCESS'.