Skip to content

Commit

Permalink
[Feature][Connector-v2][StarRocks] Support write cdc changelog event(…
Browse files Browse the repository at this point in the history
…INSERT/UPDATE/DELETE)
  • Loading branch information
hailin0 committed Jan 4, 2023
1 parent f69af2a commit 7b02c9a
Show file tree
Hide file tree
Showing 9 changed files with 128 additions and 33 deletions.
57 changes: 41 additions & 16 deletions docs/en/connector-v2/sink/StarRocks.md
Original file line number Diff line number Diff line change
Expand Up @@ -8,24 +8,26 @@ The internal implementation of StarRocks sink connector is cached and imported b
## Key features

- [ ] [exactly-once](../../concept/connector-v2-features.md)
- [x] [cdc](../../concept/connector-v2-features.md)

## Options

| name | type | required | default value |
|-----------------------------|--------|----------|-----------------|
| node_urls | list | yes | - |
| username | string | yes | - |
| password | string | yes | - |
| database | string | yes | - |
| table | string | yes | - |
| labelPrefix | string | no | - |
| batch_max_rows | long | no | 1024 |
| batch_max_bytes | int | no | 5 * 1024 * 1024 |
| batch_interval_ms | int | no | - |
| max_retries | int | no | - |
| retry_backoff_multiplier_ms | int | no | - |
| max_retry_backoff_ms | int | no | - |
| starrocks.config | map | no | - |
| name | type | required | default value |
|-----------------------------|---------|----------|-----------------|
| node_urls | list | yes | - |
| username | string | yes | - |
| password | string | yes | - |
| database | string | yes | - |
| table | string | yes | - |
| labelPrefix | string | no | - |
| batch_max_rows | long | no | 1024 |
| batch_max_bytes | int | no | 5 * 1024 * 1024 |
| batch_interval_ms | int | no | - |
| max_retries | int | no | - |
| retry_backoff_multiplier_ms | int | no | - |
| max_retry_backoff_ms | int | no | - |
| enable_upsert_delete | boolean | no | false |
| starrocks.config | map | no | - |

### node_urls [list]

Expand Down Expand Up @@ -75,6 +77,10 @@ Using as a multiplier for generating the next delay for backoff

The amount of time to wait before attempting to retry a request to `StarRocks`

### enable_upsert_delete [boolean]

Whether to enable upsert/delete, only supports PrimaryKey model.

### starrocks.config [map]

The parameter of the stream load `data_desc`
Expand Down Expand Up @@ -125,9 +131,28 @@ sink {
}
```

Support write cdc changelog event(INSERT/UPDATE/DELETE)

```hocon
sink {
StarRocks {
nodeUrls = ["e2e_starRocksdb:8030"]
username = root
password = ""
database = "test"
table = "e2e_table_sink"
...
// Support upsert/delete event synchronization (enable_upsert_delete=true), only supports PrimaryKey model.
enable_upsert_delete = true
}
}
```

## Changelog

### next version

- Add StarRocks Sink Connector
- [Improve] Change Connector Custom Config Prefix To Map [3719](https://github.com/apache/incubator-seatunnel/pull/3719)
- [Improve] Change Connector Custom Config Prefix To Map [3719](https://github.com/apache/incubator-seatunnel/pull/3719)
- [Feature] Support write cdc changelog event(INSERT/UPDATE/DELETE) [3719](https://github.com/apache/incubator-seatunnel/pull/3719)
6 changes: 6 additions & 0 deletions seatunnel-connectors-v2/connector-starrocks/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,7 @@
<properties>
<httpclient.version>4.5.13</httpclient.version>
<httpcore.version>4.4.4</httpcore.version>
<mysql.driver.version>5.1.49</mysql.driver.version>
</properties>

<dependencies>
Expand All @@ -54,5 +55,10 @@
<artifactId>httpcore</artifactId>
<version>${httpcore.version}</version>
</dependency>
<dependency>
<groupId>mysql</groupId>
<artifactId>mysql-connector-java</artifactId>
<version>${mysql.driver.version}</version>
</dependency>
</dependencies>
</project>
Original file line number Diff line number Diff line change
Expand Up @@ -109,6 +109,11 @@ public class SinkConfig {
.noDefaultValue()
.withDescription("The amount of time to wait before attempting to retry a request to StarRocks");

public static final Option<Boolean> ENABLE_UPSERT_DELETE = Options.key("enable_upsert_delete")
.booleanType()
.defaultValue(false)
.withDescription("Whether to enable upsert/delete, only supports PrimaryKey model.");

public enum StreamLoadFormat {
CSV, JSON;
public static StreamLoadFormat parse(String format) {
Expand All @@ -135,6 +140,7 @@ public static StreamLoadFormat parse(String format) {
private int maxRetries;
private int retryBackoffMultiplierMs;
private int maxRetryBackoffMs;
private boolean enableUpsertDelete;

private final Map<String, Object> streamLoadProps = new HashMap<>();

Expand Down Expand Up @@ -171,6 +177,9 @@ public static SinkConfig loadConfig(Config pluginConfig) {
if (pluginConfig.hasPath(MAX_RETRY_BACKOFF_MS.key())) {
sinkConfig.setMaxRetryBackoffMs(pluginConfig.getInt(MAX_RETRY_BACKOFF_MS.key()));
}
if (pluginConfig.hasPath(ENABLE_UPSERT_DELETE.key())) {
sinkConfig.setEnableUpsertDelete(pluginConfig.getBoolean(ENABLE_UPSERT_DELETE.key()));
}
parseSinkStreamLoadProperties(pluginConfig, sinkConfig);
if (sinkConfig.streamLoadProps.containsKey(COLUMN_SEPARATOR)) {
sinkConfig.setColumnSeparator((String) sinkConfig.streamLoadProps.get(COLUMN_SEPARATOR));
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -39,33 +39,27 @@ public class StarRocksBaseSerializer {
@Builder.Default
private TimeUtils.Formatter timeFormatter = TimeUtils.Formatter.HH_MM_SS;

protected String convert(SeaTunnelDataType dataType, Object val) {
protected Object convert(SeaTunnelDataType dataType, Object val) {
if (val == null) {
return null;
}
switch (dataType.getSqlType()) {
case TINYINT:
case SMALLINT:
return String.valueOf(((Number) val).shortValue());
case INT:
return String.valueOf(((Number) val).intValue());
case BIGINT:
return String.valueOf(((Number) val).longValue());
case FLOAT:
return String.valueOf(((Number) val).floatValue());
case DOUBLE:
return String.valueOf(((Number) val).doubleValue());
case DECIMAL:
case BOOLEAN:
return val.toString();
case STRING:
return val;
case DATE:
return DateUtils.toString((LocalDate) val, dateFormatter);
case TIME:
return TimeUtils.toString((LocalTime) val, timeFormatter);
case TIMESTAMP:
return DateTimeUtils.toString((LocalDateTime) val, dateTimeFormatter);
case STRING:
return (String) val;
case ARRAY:
case MAP:
return JsonUtils.toJsonString(val);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -25,22 +25,30 @@ public class StarRocksCsvSerializer extends StarRocksBaseSerializer implements S

private final String columnSeparator;
private final SeaTunnelRowType seaTunnelRowType;
private final boolean enableUpsertDelete;

public StarRocksCsvSerializer(String sp, SeaTunnelRowType seaTunnelRowType) {
this.seaTunnelRowType = seaTunnelRowType;
public StarRocksCsvSerializer(String sp,
SeaTunnelRowType seaTunnelRowType,
boolean enableUpsertDelete) {
this.columnSeparator = StarRocksDelimiterParser.parse(sp, "\t");
this.seaTunnelRowType = seaTunnelRowType;
this.enableUpsertDelete = enableUpsertDelete;
}

@Override
public String serialize(SeaTunnelRow row) {
StringBuilder sb = new StringBuilder();
for (int i = 0; i < row.getFields().length; i++) {
String value = convert(seaTunnelRowType.getFieldType(i), row.getField(i));
Object value = convert(seaTunnelRowType.getFieldType(i), row.getField(i));
sb.append(null == value ? "\\N" : value);
if (i < row.getFields().length - 1) {
sb.append(columnSeparator);
}
}
if (enableUpsertDelete) {
sb.append(columnSeparator)
.append(StarRocksSinkOP.parse(row.getRowKind()).ordinal());
}
return sb.toString();
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -28,19 +28,25 @@ public class StarRocksJsonSerializer extends StarRocksBaseSerializer implements

private static final long serialVersionUID = 1L;
private final SeaTunnelRowType seaTunnelRowType;
private final boolean enableUpsertDelete;

public StarRocksJsonSerializer(SeaTunnelRowType seaTunnelRowType) {
public StarRocksJsonSerializer(SeaTunnelRowType seaTunnelRowType,
boolean enableUpsertDelete) {
this.seaTunnelRowType = seaTunnelRowType;
this.enableUpsertDelete = enableUpsertDelete;
}

@Override
public String serialize(SeaTunnelRow row) {
Map<String, Object> rowMap = new HashMap<>(row.getFields().length);

for (int i = 0; i < row.getFields().length; i++) {
String value = convert(seaTunnelRowType.getFieldType(i), row.getField(i));
Object value = convert(seaTunnelRowType.getFieldType(i), row.getField(i));
rowMap.put(seaTunnelRowType.getFieldName(i), value);
}
if (enableUpsertDelete) {
rowMap.put(StarRocksSinkOP.COLUMN_KEY, StarRocksSinkOP.parse(row.getRowKind()));
}
return JsonUtils.toJsonString(rowMap);
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,42 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.seatunnel.connectors.seatunnel.starrocks.serialize;

import org.apache.seatunnel.api.table.type.RowKind;

/**
* Reference https://github.com/StarRocks/starrocks/blob/main/docs/loading/Load_to_Primary_Key_tables.md#upsert-and-delete
*/
public enum StarRocksSinkOP {
UPSERT, DELETE;

public static final String COLUMN_KEY = "__op";

static StarRocksSinkOP parse(RowKind kind) {
switch (kind) {
case INSERT:
case UPDATE_AFTER:
return UPSERT;
case DELETE:
case UPDATE_BEFORE:
return DELETE;
default:
throw new RuntimeException("Unsupported row kind.");
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -37,7 +37,7 @@ public OptionRule optionRule() {
.required(SinkConfig.NODE_URLS, SinkConfig.USERNAME, SinkConfig.PASSWORD, SinkConfig.DATABASE, SinkConfig.TABLE)
.optional(SinkConfig.LABEL_PREFIX, SinkConfig.BATCH_MAX_SIZE, SinkConfig.BATCH_MAX_BYTES,
SinkConfig.BATCH_INTERVAL_MS, SinkConfig.MAX_RETRIES, SinkConfig.MAX_RETRY_BACKOFF_MS,
SinkConfig.RETRY_BACKOFF_MULTIPLIER_MS, SinkConfig.STARROCKS_CONFIG)
SinkConfig.RETRY_BACKOFF_MULTIPLIER_MS, SinkConfig.STARROCKS_CONFIG, SinkConfig.ENABLE_UPSERT_DELETE)
.build();
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@
import org.apache.seatunnel.connectors.seatunnel.starrocks.serialize.StarRocksCsvSerializer;
import org.apache.seatunnel.connectors.seatunnel.starrocks.serialize.StarRocksISerializer;
import org.apache.seatunnel.connectors.seatunnel.starrocks.serialize.StarRocksJsonSerializer;
import org.apache.seatunnel.connectors.seatunnel.starrocks.serialize.StarRocksSinkOP;

import org.apache.seatunnel.shade.com.typesafe.config.Config;

Expand All @@ -49,6 +50,9 @@ public StarRocksSinkWriter(Config pluginConfig,
SeaTunnelRowType seaTunnelRowType) {
SinkConfig sinkConfig = SinkConfig.loadConfig(pluginConfig);
List<String> fieldNames = Arrays.stream(seaTunnelRowType.getFieldNames()).collect(Collectors.toList());
if (sinkConfig.isEnableUpsertDelete()) {
fieldNames.add(StarRocksSinkOP.COLUMN_KEY);
}
this.serializer = createSerializer(sinkConfig, seaTunnelRowType);
this.manager = new StarRocksSinkManager(sinkConfig, fieldNames);
}
Expand Down Expand Up @@ -81,10 +85,11 @@ public void close() throws IOException {

public static StarRocksISerializer createSerializer(SinkConfig sinkConfig, SeaTunnelRowType seaTunnelRowType) {
if (SinkConfig.StreamLoadFormat.CSV.equals(sinkConfig.getLoadFormat())) {
return new StarRocksCsvSerializer(sinkConfig.getColumnSeparator(), seaTunnelRowType);
return new StarRocksCsvSerializer(
sinkConfig.getColumnSeparator(), seaTunnelRowType, sinkConfig.isEnableUpsertDelete());
}
if (SinkConfig.StreamLoadFormat.JSON.equals(sinkConfig.getLoadFormat())) {
return new StarRocksJsonSerializer(seaTunnelRowType);
return new StarRocksJsonSerializer(seaTunnelRowType, sinkConfig.isEnableUpsertDelete());
}
throw new StarRocksConnectorException(CommonErrorCode.ILLEGAL_ARGUMENT, "Failed to create row serializer, unsupported `format` from stream load properties.");
}
Expand Down

0 comments on commit 7b02c9a

Please sign in to comment.