Skip to main content
Version: Next

StarRocks

Overview

  • StarRocks Load node supports writing data to the StarRocks database.
  • Two modes are supported for sink to StarRocks: Single-sink for specify fixed database name and table name to sink. Multi-sink for custom database name and table name according to src format, which suitable for scenarios such as multi-table writing or whole database synchronization.
  • This document describes how to set up a StarRocks Load node to sink to StarRocks.

Supported Version

Load NodeStarRocks version
StarRocks2.0+

Dependencies

In order to set up the StarRocks Load node, the dependency information needed to use a build automation tool such as Maven or SBT is provided below.

Maven dependency

<dependency>
<groupId>org.apache.inlong</groupId>
<artifactId>sort-connector-starrocks</artifactId>
<version>1.12.0-SNAPSHOT</version>
</dependency>

## Prepare
### Create MySql Extract table
- For Single-sink: Create a table `cdc.cdc_mysql_source` in the MySQL database. The command is as follows:
```sql
[root@fe001 ~]# mysql -u root -h localhost -P 3306 -p123456
mysql> use cdc;
Database changed
mysql> CREATE TABLE `cdc_mysql_source` (
`id` int(11) NOT NULL AUTO_INCREMENT,
`name` varchar(64) DEFAULT NULL,
`dr` tinyint(3) DEFAULT 0,
PRIMARY KEY (`id`)
);
Query OK, 0 rows affected (0.02 sec)

mysql> insert into cdc_mysql_source values(1, 'zhangsan', 0),(2, 'lisi', 0),(3, 'wangwu', 0);
Query OK, 3 rows affected (0.01 sec)
Records: 3 Duplicates: 0 Warnings: 0

mysql> select * from cdc_mysql_source;
+----+----------+----+
| id | name | dr |
+----+----------+----+
| 1 | zhangsan | 0 |
| 2 | lisi | 0 |
| 3 | wangwu | 0 |
+----+----------+----+
3 rows in set (0.07 sec)
  • For Multi-sink: Create tables user_db.user_id_nameuser_db.user_id_score in the MySQL database. The command is as follows:
[root@fe001 ~]# mysql -u root -h localhost -P 3306 -p123456
mysql> use user_db;
Database changed
mysql> CREATE TABLE `user_id_name` (
`id` int(11) NOT NULL AUTO_INCREMENT,
`name` varchar(64) DEFAULT NULL
PRIMARY KEY (`id`)
);
Query OK, 0 rows affected (0.02 sec)

mysql> CREATE TABLE `user_id_score` (
`id` int(11) NOT NULL AUTO_INCREMENT,
`score` double default 0,
PRIMARY KEY (`id`)
);
Query OK, 0 rows affected (0.02 sec)

mysql> insert into user_id_name values(1001, 'lily'),(1002, 'tom'),(1003, 'alan');
Query OK, 3 rows affected (0.01 sec)
Records: 3 Duplicates: 0 Warnings: 0

mysql> insert into user_id_score values(1001, 99),(1002, 96),(1003, 98);
Query OK, 3 rows affected (0.01 sec)
Records: 3 Duplicates: 0 Warnings: 0

mysql> select * from user_id_name;
+------+--------+
| id | name |
+------+--------+
| 1001 | lily |
| 1002 | tom |
| 1003 | alan |
+----+----------+
3 rows in set (0.07 sec)

mysql> select * from user_id_score;
+------+------+
| id | name |
+------+------+
| 1001 | 99 |
| 1002 | 96 |
| 1003 | 98 |
+----+--------+
3 rows in set (0.07 sec)

Create StarRocks Load table

  • For Single-sink: Create a table cdc.cdc_starrocks_sink in the StarRocks database. The command is as follows:
[root@fe001 ~]# mysql -u username -h localhost -P 9030 -p password
mysql> use cdc;
Reading table information for completion of table and column names
You can turn off this feature to get a quicker startup with -A
Database changed

mysql> CREATE TABLE `cdc_starrocks_sink` (
`id` int(11) NOT NULL COMMENT "user id",
`name` varchar(50) NOT NULL COMMENT "user name",
`dr` tinyint(4) NULL COMMENT "delete tag"
) ENGINE=OLAP
PRIMARY KEY(`id`)
COMMENT "OLAP"
DISTRIBUTED BY HASH(`id`) BUCKETS 1
PROPERTIES (
"replication_allocation" = "tag.location.default: 1"
);
Query OK, 0 rows affected (0.06 sec)
  • For Multi-sink: Create tables user_db.starrocks_user_id_nameuser_db.starrocks_user_id_score in the StarRocks database. The command is as follows:
[root@fe001 ~]# mysql -u username -h localhost -P 9030 -p password
mysql> use user_db;
Reading table information for completion of table and column names
You can turn off this feature to get a quicker startup with -A
Database changed

mysql> CREATE TABLE `starrocks_user_id_name` (
`id` int(11) NOT NULL COMMENT "用户id",
`name` varchar(50) NOT NULL COMMENT "昵称"
) ENGINE=OLAP
PRIMARY KEY(`id`)
COMMENT "OLAP"
DISTRIBUTED BY HASH(`id`) BUCKETS 1
PROPERTIES (
"replication_allocation" = "tag.location.default: 1"
);
Query OK, 0 rows affected (0.06 sec)

mysql> CREATE TABLE `starrocks_user_id_score` (
`id` int(11) NOT NULL COMMENT "用户id",
`score` double default 0
) ENGINE=OLAP
PRIMARY KEY(`id`)
COMMENT "OLAP"
DISTRIBUTED BY HASH(`id`) BUCKETS 1
PROPERTIES (
"replication_allocation" = "tag.location.default: 1"
);
Query OK, 0 rows affected (0.06 sec)

How to create a StarRocks Load Node

Usage for SQL API

  • For Single-sink: StarRocks load
[root@tasknode001 flink-1.13.5]# ./bin/sql-client.sh -l ./opt/connectors/mysql-cdc-inlong/ -l ./opt/connectors/starrocks/
Flink SQL> SET 'execution.checkpointing.interval' = '3s';
[INFO] Session property has been set.

Flink SQL> SET 'table.dynamic-table-options.enabled' = 'true';
[INFO] Session property has been set.

Flink SQL> CREATE TABLE cdc_mysql_source (
> id int
> ,name VARCHAR
> ,dr TINYINT
> ,PRIMARY KEY (id) NOT ENFORCED
> ) WITH (
> 'connector' = 'mysql-cdc-inlong',
> 'hostname' = 'localhost',
> 'port' = '3306',
> 'username' = 'root',
> 'password' = '123456',
> 'database-name' = 'cdc',
> 'table-name' = 'cdc_mysql_source'
> );
[INFO] Execute statement succeed.

Flink SQL> CREATE TABLE cdc_starrocks_sink (
> id INT,
> name STRING,
> dr TINYINT
> ) WITH (
> 'connector' = 'starrocks-inlong',
> 'fenodes' = 'localhost:8030',
> 'table.identifier' = 'cdc.cdc_starrocks_sink',
> 'username' = 'username',
> 'password' = 'password',
> 'sink.properties.format' = 'json',
> 'sink.properties.strip_outer_array' = 'true'
> );
[INFO] Execute statement succeed.

Flink SQL> insert into cdc_starrocks_sink select * from cdc_mysql_source /*+ OPTIONS('server-id'='5402') */;
[INFO] Submitting SQL update statement to the cluster...
[INFO] SQL update statement has been successfully submitted to the cluster:
Job ID: 5f89691571d7b3f3ca446589e3d0c3d3
  • For Single-sink: StarRocks load
./bin/sql-client.sh -l ./opt/connectors/mysql-cdc-inlong/ -l ./opt/connectors/starrocks/
Flink SQL> SET 'execution.checkpointing.interval' = '3s';
[INFO] Session property has been set.

Flink SQL> SET 'table.dynamic-table-options.enabled' = 'true';
[INFO] Session property has been set.

Flink SQL> CREATE TABLE cdc_mysql_source (
> id int
> ,name VARCHAR
> ,dr TINYINT
> ,PRIMARY KEY (id) NOT ENFORCED
> ) WITH (
> 'connector' = 'mysql-cdc-inlong',
> 'hostname' = 'localhost',
> 'port' = '3306',
> 'username' = 'root',
> 'password' = '123456',
> 'database-name' = 'test',
> 'table-name' = 'cdc_mysql_source'
> );
[INFO] Execute statement succeed.

Flink SQL> CREATE TABLE cdc_starrocks_sink (
> id INT,
> name STRING,
> dr TINYINT
> ) WITH (
> 'connector' = 'starrocks-inlong',
> 'fenodes' = 'localhost:8030',
> 'username' = 'username',
> 'password' = 'password',
> 'sink.multiple.enable' = 'true',
> 'sink.multiple.format' = 'canal-json',
> 'sink.multiple.database-pattern' = '${database}',
> 'sink.multiple.table-pattern' = 'starrocks_${table}'
> );
[INFO] Execute statement succeed.

Flink SQL> insert into cdc_starrocks_sink select * from cdc_mysql_source /*+ OPTIONS('server-id'='5402') */;
[INFO] Submitting SQL update statement to the cluster...
[INFO] SQL update statement has been successfully submitted to the cluster:
Job ID: 30feaa0ede92h6b6e25ea0cfda26df5e

Usage for InLong Dashboard

TODO: It will be supported in the future.

Usage for InLong Manager Client

TODO: It will be supported in the future.

StarRocks Load Node Options

OptionRequiredDefaultTypeDescription
connectorrequired(none)stringSpecify which connector to use, valid values are: starrocks-inlong
jdbc-urlrequired(none)stringthis will be used to execute queries in starrocks.
load-urlrequired(none)stringfe_ip:http_port;fe_ip:http_port separated with ';', which would be used to do the batch sinking.
database-namerequired(none)stringstarrocks database name
table-namerequired(none)stringstarrocks table name
usernamerequired(none)stringstarrocks connecting username
passwordrequired(none)stringstarrocks connecting password
sink.semanticoptionalat-least-oncestringat-least-once or exactly-once(flush at checkpoint only and options like sink.buffer-flush.* won't work either).
sink.versionoptionalAUTOstringThe version of implementaion for sink exactly-once. Only availible for connector 1.2.4+. If V2, use StarRocks' stream load transaction interface which requires StarRocks 2.4+. If V1, use stream load non-transaction interface. If AUTO, connector will choose the stream load transaction interface automatically if the StarRocks supports the feature, otherwise choose non-transaction interface.
sink.buffer-flush.max-bytesoptional94371840(90M)stringthe max batching size of the serialized data, range: [64MB, 10GB].
sink.buffer-flush.max-rowsoptional500000stringthe max batching rows, range: [64,000, 5000,000].
sink.buffer-flush.interval-msoptional300000stringthe flushing time interval, range: [1000ms, 3600000ms].
sink.max-retriesoptional3stringmax retry times of the stream load request, range: [0, 10].
sink.connect.timeout-msoptional1000stringTimeout in millisecond for connecting to the load-url, range: [100, 60000].
sink.properties.formatoptionalCSVstringThe file format of data loaded into starrocks. Valid values: CSV and JSON. Default value: CSV.
sink.properties.*optionalNONEstringthe stream load properties like 'sink.properties.columns' = 'k1, k2, k3',details in STREAM LOAD. Since 2.4, the flink-connector-starrocks supports partial updates for Primary Key model.
sink.properties.ignore_json_sizeoptionalfalsestringignore the batching size (100MB) of json data
sink.multiple.enableoptionalfalsebooleanDetermine whether to support multiple sink writing, default is false. when sink.multiple.enable is true, need sink.multiple.formatsink.multiple.database-patternsink.multiple.table-pattern be correctly set.
sink.multiple.formatoptional(none)stringThe format of multiple sink, it represents the real format of the raw binary data. can be canal-json or debezium-json at present. See kafka -- Dynamic Topic Extraction for more details.
sink.multiple.database-patternoptional(none)stringExtract database name from the raw binary data, this is only used in the multiple sink writing scenario.
sink.multiple.table-patternoptional(none)stringExtract table name from the raw binary data, this is only used in the multiple sink writing scenario.
inlong.metric.labelsoptional(none)StringInlong metric label, format of value is groupId={groupId}&streamId={streamId}&nodeId={nodeId}.
sink.multiple.schema-update.policyoptional(none)stringIf sink data fields do not match starrocks table, such as table does not exsit or string data is over length, starrocks server will throw an exception.

When this option is THROW_WITH_STOP, the exception will be thrown up to flink framework, flink will restart task automatically, trying to resume the task.

When this option is STOP_PARTIAL, starrocks connector will stop writing into this table, other tables are written normally. The exception will be logging but not thrown up.

When this option is LOG_WITH_IGNORE, starrocks connector only log the error, not throw up. StarRocks connector will try to write to starrocks server again when receiving new source data.
dirty.ignoreoptinal(none)booleanWhen writing data into starrocks table, errors may be thrown by starrocks server as table does not exist or data is over length.

When this option is true, and dirty.side-output.* properties are configed correctly, dirty data can be written to Amazon S3 or Tencent Colud COS storage. Dirty data metrics will also be collected automatically.

When this option is false, only dirty data metrics will be collected, but dirty data will not be archived.
dirty.side-output.enableoptinal(none)booleanWhen this option is ture and other options about S3 or COS is configed correctly, dirty data archiving will works. When false, dirty data archiving will not work.
dirty.side-output.connectoroptinal(none)strings3 or log are supported now.

When log, starrocks connector only log the dirty data, not archive data.

When s3, starrocks connector can write dirty data to S3 or COS.
dirty.side-output.s3.bucketoptinal(none)stringThe bucket name of S3 or COS
dirty.side-output.s3.endpointoptinal(none)stringThe endpoint of S3 or COS
dirty.side-output.s3.keyoptinal(none)stringThe key of S3 or COS
dirty.side-output.s3.regionoptinal(none)stringThe region of S3 or COS
dirty.side-output.line-delimiteroptinal(none)stringThe line delimiter of dirty data
dirty.side-output.field-delimiteroptinal(none)stringThe field delimiter of dirty data
dirty.side-output.s3.secret-key-idoptinal(none)stringThe secret key of S3 or COS
dirty.side-output.s3.access-key-idoptinal(none)stringThe access key of S3 or COS
dirty.side-output.formatoptinal(none)stringThe format of dirty data archiving, supports json or csv
dirty.side-output.log-tagoptinal(none)stringThe log tag of dirty data. StarRocks connector uses lags to distinguish which starrocks database and table the dirty data will be written to.
dirty.identifieroptinal(none)stringThe file name of drity data which written to S3 or COS.
dirty.side-output.labelsoptinal(none)stringEvery dirty data line contains label and business data fields. Label is in front, and business data is at end.

Data Type Mapping

Flink typeStarRocks type
BOOLEANBOOLEAN
TINYINTTINYINT
SMALLINTSMALLINT
INTEGERINTEGER
BIGINTBIGINT
FLOATFLOAT
DOUBLEDOUBLE
DECIMALDECIMAL
BINARYINT
CHARJSON / STRING
VARCHARJSON / STRING
STRINGJSON / STRING
DATEDATE
TIMESTAMP_WITHOUT_TIME_ZONE(N)DATETIME
TIMESTAMP_WITH_LOCAL_TIME_ZONE(N)DATETIME
ARRAY<T>ARRAY<T>
MAP<KT,VT>JSON / JSON STRING
ROW<arg T...>JSON / JSON STRING

See flink-connector-starrocks for more details.