From 4ad6b0b1dafd0b3d0308b5b61f6fd18cbdde35ce Mon Sep 17 00:00:00 2001 From: keirchen Date: Fri, 21 Jul 2023 10:57:22 +0800 Subject: [PATCH 1/3] [Docs][Connector-V2][StarRocks]Reconstruct the StarRocks connector document --- docs/en/connector-v2/sink/StarRocks.md | 219 ++++++++++++++----------- 1 file changed, 120 insertions(+), 99 deletions(-) diff --git a/docs/en/connector-v2/sink/StarRocks.md b/docs/en/connector-v2/sink/StarRocks.md index 7c6491fb591..a19041d1fa4 100644 --- a/docs/en/connector-v2/sink/StarRocks.md +++ b/docs/en/connector-v2/sink/StarRocks.md @@ -2,94 +2,44 @@ > StarRocks sink connector -## Description +## Support Those Engines -Used to send data to StarRocks. Both support streaming and batch mode. -The internal implementation of StarRocks sink connector is cached and imported by stream load in batches. +> Spark
+> Flink
+> SeaTunnel Zeta
-## Key features +## Key Features - [ ] [exactly-once](../../concept/connector-v2-features.md) - [x] [cdc](../../concept/connector-v2-features.md) -## Options - -| name | type | required | default value | -|-----------------------------|---------|----------|-----------------| -| nodeUrls | list | yes | - | -| base-url | string | yes | - | -| username | string | yes | - | -| password | string | yes | - | -| database | string | yes | - | -| table | string | no | - | -| labelPrefix | string | no | - | -| batch_max_rows | long | no | 1024 | -| batch_max_bytes | int | no | 5 * 1024 * 1024 | -| batch_interval_ms | int | no | - | -| max_retries | int | no | - | -| retry_backoff_multiplier_ms | int | no | - | -| max_retry_backoff_ms | int | no | - | -| enable_upsert_delete | boolean | no | false | -| save_mode_create_template | string | no | see below | -| starrocks.config | map | no | - | - -### nodeUrls [list] - -`StarRocks` cluster address, the format is `["fe_ip:fe_http_port", ...]` - -### base-url [string] - -The JDBC URL like `jdbc:mysql://localhost:9030/` or `jdbc:mysql://localhost:9030` or `jdbc:mysql://localhost:9030/db` - -### username [string] - -`StarRocks` user username - -### password [string] - -`StarRocks` user password - -### database [string] - -The name of StarRocks database - -### table [string] - -The name of StarRocks table, If not set, the table name will be the name of the upstream table - -### labelPrefix [string] - -The prefix of StarRocks stream load label - -### batch_max_rows [long] - -For batch writing, when the number of buffers reaches the number of `batch_max_rows` or the byte size of `batch_max_bytes` or the time reaches `batch_interval_ms`, the data will be flushed into the StarRocks - -### batch_max_bytes [int] - -For batch writing, when the number of buffers reaches the number of `batch_max_rows` or the byte size of `batch_max_bytes` or the time reaches `batch_interval_ms`, the data will be flushed into the StarRocks - -### batch_interval_ms [int] - -For batch writing, when the number of buffers reaches the number of `batch_max_rows` or the byte size of `batch_max_bytes` or the time reaches `batch_interval_ms`, the data will be flushed into the StarRocks - -### max_retries [int] - -The number of retries to flush failed - -### retry_backoff_multiplier_ms [int] - -Using as a multiplier for generating the next delay for backoff - -### max_retry_backoff_ms [int] - -The amount of time to wait before attempting to retry a request to `StarRocks` - -### enable_upsert_delete [boolean] +## Description -Whether to enable upsert/delete, only supports PrimaryKey model. +Used to send data to StarRocks. Both support streaming and batch mode. +The internal implementation of StarRocks sink connector is cached and imported by stream load in batches. -### save_mode_create_template [string] +## Sink Options + +| Name | Type | Required | Default | Description | +|------------------------------|---------|----------|-----------------|------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------| +| nodeUrls | list | yes | - | `StarRocks` cluster address, the format is `["fe_ip:fe_http_port", ...]` | +| base-url | string | yes | - | The JDBC URL like `jdbc:mysql://localhost:9030/` or `jdbc:mysql://localhost:9030` or `jdbc:mysql://localhost:9030/db` | +| username | string | yes | - | `StarRocks` user username | +| password | string | yes | - | `StarRocks` user password | +| database | string | yes | - | The name of StarRocks database | +| table | string | no | - | The name of StarRocks table, If not set, the table name will be the name of the upstream table | +| labelPrefix | string | no | - | The prefix of StarRocks stream load label | +| batch_max_rows | long | no | 1024 | For batch writing, when the number of buffers reaches the number of `batch_max_rows` or the byte size of `batch_max_bytes` or the time reaches `batch_interval_ms`, the data will be flushed into the StarRocks | +| batch_max_bytes | int | no | 5 * 1024 * 1024 | For batch writing, when the number of buffers reaches the number of `batch_max_rows` or the byte size of `batch_max_bytes` or the time reaches `batch_interval_ms`, the data will be flushed into the StarRocks | +| batch_interval_ms | int | no | - | For batch writing, when the number of buffers reaches the number of `batch_max_rows` or the byte size of `batch_max_bytes` or the time reaches `batch_interval_ms`, the data will be flushed into the StarRocks | +| max_retries | int | no | - | The number of retries to flush failed | +| retry_backoff_multiplier_ms | int | no | - | Using as a multiplier for generating the next delay for backoff | +| max_retry_backoff_ms | int | no | - | The amount of time to wait before attempting to retry a request to `StarRocks` | +| enable_upsert_delete | boolean | no | false | Whether to enable upsert/delete, only supports PrimaryKey model. | +| save_mode_create_template | string | no | see below | see below | +| starrocks.config | map | no | - | The parameter of the stream load `data_desc` | + +### save_mode_create_template We use templates to automatically create starrocks tables, which will create corresponding table creation statements based on the type of upstream data and schema type, @@ -131,19 +81,72 @@ You can use the following placeholders description of StarRocks - rowtype_primary_key: Used to get the primary key in the upstream schema (maybe a list) -### starrocks.config [map] - -The parameter of the stream load `data_desc` +## Data Type Mapping + +| StarRocks Data type | SeaTunnel Data type | +|---------------------|---------------------| +| BOOLEAN | BOOLEAN | +| TINYINT | TINYINT | +| SMALLINT | SMALLINT | +| INT | INT | +| BIGINT | BIGINT | +| FLOAT | FLOAT | +| DOUBLE | DOUBLE | +| DECIMAL | DECIMAL | +| DATE | STRING | +| TIME | STRING | +| DATETIME | STRING | +| STRING | STRING | +| ARRAY | STRING | +| MAP | STRING | +| BYTES | STRING | #### Supported import data formats -The supported formats include CSV and JSON. Default value: JSON +The supported formats include CSV and JSON -## Example +## Task Example -Use JSON format to import data +### Simple: + +> The following example describes writing multiple data types to StarRocks, and users need to create corresponding tables downstream ```hocon +env { + parallelism = 1 + job.mode = "BATCH" + checkpoint.interval = 10000 +} + +source { + FakeSource { + row.num = 10 + map.size = 10 + array.size = 10 + bytes.length = 10 + string.length = 10 + schema = { + fields { + c_map = "map>" + c_array = "array" + c_string = string + c_boolean = boolean + c_tinyint = tinyint + c_smallint = smallint + c_int = int + c_bigint = bigint + c_float = float + c_double = double + c_decimal = "decimal(16, 1)" + c_null = "null" + c_bytes = bytes + c_date = date + c_timestamp = timestamp + } + } + } +} + sink { StarRocks { nodeUrls = ["e2e_starRocksdb:8030"] @@ -158,12 +161,29 @@ sink { } } } - ``` -Use CSV format to import data +### Support write cdc changelog event(INSERT/UPDATE/DELETE) ```hocon +sink { + StarRocks { + nodeUrls = ["e2e_starRocksdb:8030"] + username = root + password = "" + database = "test" + table = "e2e_table_sink" + ... + + // Support upsert/delete event synchronization (enable_upsert_delete=true), only supports PrimaryKey model. + enable_upsert_delete = true + } +} +``` + +### Use JSON format to import data + +``` sink { StarRocks { nodeUrls = ["e2e_starRocksdb:8030"] @@ -173,17 +193,17 @@ sink { table = "e2e_table_sink" batch_max_rows = 10 starrocks.config = { - format = "CSV" - column_separator = "\\x01" - row_delimiter = "\\x02" + format = "JSON" + strip_outer_array = true } } } + ``` -Support write cdc changelog event(INSERT/UPDATE/DELETE) +### Use CSV format to import data -```hocon +``` sink { StarRocks { nodeUrls = ["e2e_starRocksdb:8030"] @@ -191,10 +211,12 @@ sink { password = "" database = "test" table = "e2e_table_sink" - ... - - // Support upsert/delete event synchronization (enable_upsert_delete=true), only supports PrimaryKey model. - enable_upsert_delete = true + batch_max_rows = 10 + starrocks.config = { + format = "CSV" + column_separator = "\\x01" + row_delimiter = "\\x02" + } } } ``` @@ -205,5 +227,4 @@ sink { - Add StarRocks Sink Connector - [Improve] Change Connector Custom Config Prefix To Map [3719](https://github.com/apache/seatunnel/pull/3719) -- [Feature] Support write cdc changelog event(INSERT/UPDATE/DELETE) [3865](https://github.com/apache/seatunnel/pull/3865) - +- [Feature] Support write cdc changelog event(INSERT/UPDATE/DELETE) [3865](https://github.com/apache/seatunnel/pull/3865) \ No newline at end of file From ddf2f7e1ceed499a9f1bfb8753fdda46ca12d460 Mon Sep 17 00:00:00 2001 From: David Zollo Date: Sun, 13 Aug 2023 22:28:01 +0800 Subject: [PATCH 2/3] Update StarRocks.md --- docs/en/connector-v2/sink/StarRocks.md | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/docs/en/connector-v2/sink/StarRocks.md b/docs/en/connector-v2/sink/StarRocks.md index a19041d1fa4..a3b81c7666b 100644 --- a/docs/en/connector-v2/sink/StarRocks.md +++ b/docs/en/connector-v2/sink/StarRocks.md @@ -2,7 +2,7 @@ > StarRocks sink connector -## Support Those Engines +## Support These Engines > Spark
> Flink
@@ -227,4 +227,4 @@ sink { - Add StarRocks Sink Connector - [Improve] Change Connector Custom Config Prefix To Map [3719](https://github.com/apache/seatunnel/pull/3719) -- [Feature] Support write cdc changelog event(INSERT/UPDATE/DELETE) [3865](https://github.com/apache/seatunnel/pull/3865) \ No newline at end of file +- [Feature] Support write cdc changelog event(INSERT/UPDATE/DELETE) [3865](https://github.com/apache/seatunnel/pull/3865) From 487670c1efb40c17426f4dbcb0969ad1774d169e Mon Sep 17 00:00:00 2001 From: liuli Date: Mon, 14 Aug 2023 11:46:19 +0800 Subject: [PATCH 3/3] fix checkstyle --- docs/en/connector-v2/sink/StarRocks.md | 37 +++++++++++++------------- 1 file changed, 19 insertions(+), 18 deletions(-) diff --git a/docs/en/connector-v2/sink/StarRocks.md b/docs/en/connector-v2/sink/StarRocks.md index a3b81c7666b..763743ce967 100644 --- a/docs/en/connector-v2/sink/StarRocks.md +++ b/docs/en/connector-v2/sink/StarRocks.md @@ -20,24 +20,24 @@ The internal implementation of StarRocks sink connector is cached and imported b ## Sink Options -| Name | Type | Required | Default | Description | -|------------------------------|---------|----------|-----------------|------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------| -| nodeUrls | list | yes | - | `StarRocks` cluster address, the format is `["fe_ip:fe_http_port", ...]` | -| base-url | string | yes | - | The JDBC URL like `jdbc:mysql://localhost:9030/` or `jdbc:mysql://localhost:9030` or `jdbc:mysql://localhost:9030/db` | -| username | string | yes | - | `StarRocks` user username | -| password | string | yes | - | `StarRocks` user password | -| database | string | yes | - | The name of StarRocks database | -| table | string | no | - | The name of StarRocks table, If not set, the table name will be the name of the upstream table | -| labelPrefix | string | no | - | The prefix of StarRocks stream load label | -| batch_max_rows | long | no | 1024 | For batch writing, when the number of buffers reaches the number of `batch_max_rows` or the byte size of `batch_max_bytes` or the time reaches `batch_interval_ms`, the data will be flushed into the StarRocks | -| batch_max_bytes | int | no | 5 * 1024 * 1024 | For batch writing, when the number of buffers reaches the number of `batch_max_rows` or the byte size of `batch_max_bytes` or the time reaches `batch_interval_ms`, the data will be flushed into the StarRocks | -| batch_interval_ms | int | no | - | For batch writing, when the number of buffers reaches the number of `batch_max_rows` or the byte size of `batch_max_bytes` or the time reaches `batch_interval_ms`, the data will be flushed into the StarRocks | -| max_retries | int | no | - | The number of retries to flush failed | -| retry_backoff_multiplier_ms | int | no | - | Using as a multiplier for generating the next delay for backoff | -| max_retry_backoff_ms | int | no | - | The amount of time to wait before attempting to retry a request to `StarRocks` | -| enable_upsert_delete | boolean | no | false | Whether to enable upsert/delete, only supports PrimaryKey model. | -| save_mode_create_template | string | no | see below | see below | -| starrocks.config | map | no | - | The parameter of the stream load `data_desc` | +| Name | Type | Required | Default | Description | +|-----------------------------|---------|----------|-----------------|-----------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------| +| nodeUrls | list | yes | - | `StarRocks` cluster address, the format is `["fe_ip:fe_http_port", ...]` | +| base-url | string | yes | - | The JDBC URL like `jdbc:mysql://localhost:9030/` or `jdbc:mysql://localhost:9030` or `jdbc:mysql://localhost:9030/db` | +| username | string | yes | - | `StarRocks` user username | +| password | string | yes | - | `StarRocks` user password | +| database | string | yes | - | The name of StarRocks database | +| table | string | no | - | The name of StarRocks table, If not set, the table name will be the name of the upstream table | +| labelPrefix | string | no | - | The prefix of StarRocks stream load label | +| batch_max_rows | long | no | 1024 | For batch writing, when the number of buffers reaches the number of `batch_max_rows` or the byte size of `batch_max_bytes` or the time reaches `batch_interval_ms`, the data will be flushed into the StarRocks | +| batch_max_bytes | int | no | 5 * 1024 * 1024 | For batch writing, when the number of buffers reaches the number of `batch_max_rows` or the byte size of `batch_max_bytes` or the time reaches `batch_interval_ms`, the data will be flushed into the StarRocks | +| batch_interval_ms | int | no | - | For batch writing, when the number of buffers reaches the number of `batch_max_rows` or the byte size of `batch_max_bytes` or the time reaches `batch_interval_ms`, the data will be flushed into the StarRocks | +| max_retries | int | no | - | The number of retries to flush failed | +| retry_backoff_multiplier_ms | int | no | - | Using as a multiplier for generating the next delay for backoff | +| max_retry_backoff_ms | int | no | - | The amount of time to wait before attempting to retry a request to `StarRocks` | +| enable_upsert_delete | boolean | no | false | Whether to enable upsert/delete, only supports PrimaryKey model. | +| save_mode_create_template | string | no | see below | see below | +| starrocks.config | map | no | - | The parameter of the stream load `data_desc` | ### save_mode_create_template @@ -228,3 +228,4 @@ sink { - Add StarRocks Sink Connector - [Improve] Change Connector Custom Config Prefix To Map [3719](https://github.com/apache/seatunnel/pull/3719) - [Feature] Support write cdc changelog event(INSERT/UPDATE/DELETE) [3865](https://github.com/apache/seatunnel/pull/3865) +