Skip to content

Commit

Permalink
[Improve] [Connector-V2] Remove scheduler in StarRocks sink (#5269)
Browse files Browse the repository at this point in the history
* [Improve] [Connector-V2] Remove scheduler in StarRocks sink

* [Improve] [Connector-V2] Remove scheduler in InfluxDB sink

* format doc

* format doc

* format doc

---------

Co-authored-by: gdliu3 <gdliu3@iflytek.com>
  • Loading branch information
liugddx and gdliu3 authored Aug 24, 2023
1 parent 17482c8 commit cb7b794
Show file tree
Hide file tree
Showing 5 changed files with 19 additions and 65 deletions.
35 changes: 17 additions & 18 deletions docs/en/connector-v2/sink/StarRocks.md
Original file line number Diff line number Diff line change
Expand Up @@ -20,24 +20,23 @@ The internal implementation of StarRocks sink connector is cached and imported b

## Sink Options

| Name | Type | Required | Default | Description |
|-----------------------------|---------|----------|-----------------|-----------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------|
| nodeUrls | list | yes | - | `StarRocks` cluster address, the format is `["fe_ip:fe_http_port", ...]` |
| base-url | string | yes | - | The JDBC URL like `jdbc:mysql://localhost:9030/` or `jdbc:mysql://localhost:9030` or `jdbc:mysql://localhost:9030/db` |
| username | string | yes | - | `StarRocks` user username |
| password | string | yes | - | `StarRocks` user password |
| database | string | yes | - | The name of StarRocks database |
| table | string | no | - | The name of StarRocks table, If not set, the table name will be the name of the upstream table |
| labelPrefix | string | no | - | The prefix of StarRocks stream load label |
| batch_max_rows | long | no | 1024 | For batch writing, when the number of buffers reaches the number of `batch_max_rows` or the byte size of `batch_max_bytes` or the time reaches `batch_interval_ms`, the data will be flushed into the StarRocks |
| batch_max_bytes | int | no | 5 * 1024 * 1024 | For batch writing, when the number of buffers reaches the number of `batch_max_rows` or the byte size of `batch_max_bytes` or the time reaches `batch_interval_ms`, the data will be flushed into the StarRocks |
| batch_interval_ms | int | no | - | For batch writing, when the number of buffers reaches the number of `batch_max_rows` or the byte size of `batch_max_bytes` or the time reaches `batch_interval_ms`, the data will be flushed into the StarRocks |
| max_retries | int | no | - | The number of retries to flush failed |
| retry_backoff_multiplier_ms | int | no | - | Using as a multiplier for generating the next delay for backoff |
| max_retry_backoff_ms | int | no | - | The amount of time to wait before attempting to retry a request to `StarRocks` |
| enable_upsert_delete | boolean | no | false | Whether to enable upsert/delete, only supports PrimaryKey model. |
| save_mode_create_template | string | no | see below | see below |
| starrocks.config | map | no | - | The parameter of the stream load `data_desc` |
| Name | Type | Required | Default | Description |
|-----------------------------|---------|----------|-----------------|-------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------|
| nodeUrls | list | yes | - | `StarRocks` cluster address, the format is `["fe_ip:fe_http_port", ...]` |
| base-url | string | yes | - | The JDBC URL like `jdbc:mysql://localhost:9030/` or `jdbc:mysql://localhost:9030` or `jdbc:mysql://localhost:9030/db` |
| username | string | yes | - | `StarRocks` user username |
| password | string | yes | - | `StarRocks` user password |
| database | string | yes | - | The name of StarRocks database |
| table | string | no | - | The name of StarRocks table, If not set, the table name will be the name of the upstream table |
| labelPrefix | string | no | - | The prefix of StarRocks stream load label |
| batch_max_rows | long | no | 1024 | For batch writing, when the number of buffers reaches the number of `batch_max_rows` or the byte size of `batch_max_bytes` or the time reaches `checkpoint.interval`, the data will be flushed into the StarRocks |
| batch_max_bytes | int | no | 5 * 1024 * 1024 | For batch writing, when the number of buffers reaches the number of `batch_max_rows` or the byte size of `batch_max_bytes` or the time reaches `checkpoint.interval`, the data will be flushed into the StarRocks |
| max_retries | int | no | - | The number of retries to flush failed |
| retry_backoff_multiplier_ms | int | no | - | Using as a multiplier for generating the next delay for backoff |
| max_retry_backoff_ms | int | no | - | The amount of time to wait before attempting to retry a request to `StarRocks` |
| enable_upsert_delete | boolean | no | false | Whether to enable upsert/delete, only supports PrimaryKey model. |
| save_mode_create_template | string | no | see below | see below |
| starrocks.config | map | no | - | The parameter of the stream load `data_desc` |

### save_mode_create_template

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,18 +22,13 @@
import org.apache.seatunnel.connectors.seatunnel.starrocks.exception.StarRocksConnectorException;

import com.google.common.base.Strings;
import com.google.common.util.concurrent.ThreadFactoryBuilder;
import lombok.extern.slf4j.Slf4j;

import java.io.IOException;
import java.nio.charset.StandardCharsets;
import java.util.ArrayList;
import java.util.List;
import java.util.UUID;
import java.util.concurrent.Executors;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.ScheduledFuture;
import java.util.concurrent.TimeUnit;

@Slf4j
public class StarRocksSinkManager {
Expand All @@ -42,18 +37,14 @@ public class StarRocksSinkManager {
private final List<byte[]> batchList;

private final StarRocksStreamLoadVisitor starrocksStreamLoadVisitor;
private ScheduledExecutorService scheduler;
private ScheduledFuture<?> scheduledFuture;
private volatile boolean initialize;
private volatile Exception flushException;
private int batchRowCount = 0;
private long batchBytesSize = 0;
private final Integer batchIntervalMs;

public StarRocksSinkManager(SinkConfig sinkConfig, List<String> fileNames) {
this.sinkConfig = sinkConfig;
this.batchList = new ArrayList<>();
this.batchIntervalMs = sinkConfig.getBatchIntervalMs();
starrocksStreamLoadVisitor = new StarRocksStreamLoadVisitor(sinkConfig, fileNames);
}

Expand All @@ -62,26 +53,6 @@ private void tryInit() throws IOException {
return;
}
initialize = true;

if (batchIntervalMs != null) {
scheduler =
Executors.newSingleThreadScheduledExecutor(
new ThreadFactoryBuilder()
.setNameFormat("StarRocks-sink-output-%s")
.build());
scheduledFuture =
scheduler.scheduleAtFixedRate(
() -> {
try {
flush();
} catch (IOException e) {
flushException = e;
}
},
batchIntervalMs,
batchIntervalMs,
TimeUnit.MILLISECONDS);
}
}

public synchronized void write(String record) throws IOException {
Expand All @@ -98,11 +69,6 @@ public synchronized void write(String record) throws IOException {
}

public synchronized void close() throws IOException {
if (scheduledFuture != null) {
scheduledFuture.cancel(false);
scheduler.shutdown();
}

flush();
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -51,7 +51,6 @@ public enum StreamLoadFormat {
private int batchMaxSize;
private long batchMaxBytes;

private Integer batchIntervalMs;
private int maxRetries;
private int retryBackoffMultiplierMs;
private int maxRetryBackoffMs;
Expand All @@ -74,8 +73,6 @@ public static SinkConfig of(ReadonlyConfig config) {
config.getOptional(StarRocksSinkOptions.LABEL_PREFIX).ifPresent(sinkConfig::setLabelPrefix);
sinkConfig.setBatchMaxSize(config.get(StarRocksSinkOptions.BATCH_MAX_SIZE));
sinkConfig.setBatchMaxBytes(config.get(StarRocksSinkOptions.BATCH_MAX_BYTES));
config.getOptional(StarRocksSinkOptions.BATCH_INTERVAL_MS)
.ifPresent(sinkConfig::setBatchIntervalMs);
config.getOptional(StarRocksSinkOptions.MAX_RETRIES).ifPresent(sinkConfig::setMaxRetries);
config.getOptional(StarRocksSinkOptions.RETRY_BACKOFF_MULTIPLIER_MS)
.ifPresent(sinkConfig::setRetryBackoffMultiplierMs);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -75,21 +75,14 @@ public interface StarRocksSinkOptions {
.intType()
.defaultValue(1024)
.withDescription(
"For batch writing, when the number of buffers reaches the number of batch_max_rows or the byte size of batch_max_bytes or the time reaches batch_interval_ms, the data will be flushed into the StarRocks");
"For batch writing, when the number of buffers reaches the number of batch_max_rows or the byte size of batch_max_bytes or the time reaches checkpoint.interval, the data will be flushed into the StarRocks");

Option<Long> BATCH_MAX_BYTES =
Options.key("batch_max_bytes")
.longType()
.defaultValue((long) (5 * 1024 * 1024))
.withDescription(
"For batch writing, when the number of buffers reaches the number of batch_max_rows or the byte size of batch_max_bytes or the time reaches batch_interval_ms, the data will be flushed into the StarRocks");

Option<Integer> BATCH_INTERVAL_MS =
Options.key("batch_interval_ms")
.intType()
.noDefaultValue()
.withDescription(
"For batch writing, when the number of buffers reaches the number of batch_max_rows or the byte size of batch_max_bytes or the time reaches batch_interval_ms, the data will be flushed into the StarRocks");
"For batch writing, when the number of buffers reaches the number of batch_max_rows or the byte size of batch_max_bytes or the time reaches checkpoint.interval, the data will be flushed into the StarRocks");

Option<Integer> MAX_RETRIES =
Options.key("max_retries")
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -49,7 +49,6 @@ public OptionRule optionRule() {
StarRocksSinkOptions.LABEL_PREFIX,
StarRocksSinkOptions.BATCH_MAX_SIZE,
StarRocksSinkOptions.BATCH_MAX_BYTES,
StarRocksSinkOptions.BATCH_INTERVAL_MS,
StarRocksSinkOptions.MAX_RETRIES,
StarRocksSinkOptions.MAX_RETRY_BACKOFF_MS,
StarRocksSinkOptions.RETRY_BACKOFF_MULTIPLIER_MS,
Expand Down

0 comments on commit cb7b794

Please sign in to comment.