Skip to content

Commit

Permalink
[Feature][Connector-v2][StarRocks] Support write cdc changelog event(…
Browse files Browse the repository at this point in the history
…INSERT/UPDATE/DELETE)

* Support StarRocks sink execute upsert/delete action
* Support FakeSource config fake data rows
  • Loading branch information
hailin0 committed Jan 7, 2023
1 parent 41751d8 commit 26428a5
Show file tree
Hide file tree
Showing 22 changed files with 616 additions and 42 deletions.
57 changes: 41 additions & 16 deletions docs/en/connector-v2/sink/StarRocks.md
Original file line number Diff line number Diff line change
Expand Up @@ -8,24 +8,26 @@ The internal implementation of StarRocks sink connector is cached and imported b
## Key features

- [ ] [exactly-once](../../concept/connector-v2-features.md)
- [x] [cdc](../../concept/connector-v2-features.md)

## Options

| name | type | required | default value |
|-----------------------------|--------|----------|-----------------|
| node_urls | list | yes | - |
| username | string | yes | - |
| password | string | yes | - |
| database | string | yes | - |
| table | string | yes | - |
| labelPrefix | string | no | - |
| batch_max_rows | long | no | 1024 |
| batch_max_bytes | int | no | 5 * 1024 * 1024 |
| batch_interval_ms | int | no | - |
| max_retries | int | no | - |
| retry_backoff_multiplier_ms | int | no | - |
| max_retry_backoff_ms | int | no | - |
| starrocks.config | map | no | - |
| name | type | required | default value |
|-----------------------------|---------|----------|-----------------|
| node_urls | list | yes | - |
| username | string | yes | - |
| password | string | yes | - |
| database | string | yes | - |
| table | string | yes | - |
| labelPrefix | string | no | - |
| batch_max_rows | long | no | 1024 |
| batch_max_bytes | int | no | 5 * 1024 * 1024 |
| batch_interval_ms | int | no | - |
| max_retries | int | no | - |
| retry_backoff_multiplier_ms | int | no | - |
| max_retry_backoff_ms | int | no | - |
| enable_upsert_delete | boolean | no | false |
| starrocks.config | map | no | - |

### node_urls [list]

Expand Down Expand Up @@ -75,6 +77,10 @@ Using as a multiplier for generating the next delay for backoff

The amount of time to wait before attempting to retry a request to `StarRocks`

### enable_upsert_delete [boolean]

Whether to enable upsert/delete, only supports PrimaryKey model.

### starrocks.config [map]

The parameter of the stream load `data_desc`
Expand Down Expand Up @@ -125,9 +131,28 @@ sink {
}
```

Support write cdc changelog event(INSERT/UPDATE/DELETE)

```hocon
sink {
StarRocks {
nodeUrls = ["e2e_starRocksdb:8030"]
username = root
password = ""
database = "test"
table = "e2e_table_sink"
...
// Support upsert/delete event synchronization (enable_upsert_delete=true), only supports PrimaryKey model.
enable_upsert_delete = true
}
}
```

## Changelog

### next version

- Add StarRocks Sink Connector
- [Improve] Change Connector Custom Config Prefix To Map [3719](https://github.com/apache/incubator-seatunnel/pull/3719)
- [Improve] Change Connector Custom Config Prefix To Map [3719](https://github.com/apache/incubator-seatunnel/pull/3719)
- [Feature] Support write cdc changelog event(INSERT/UPDATE/DELETE) [3865](https://github.com/apache/incubator-seatunnel/pull/3865)
74 changes: 74 additions & 0 deletions docs/en/connector-v2/source/FakeSource.md
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@ just for some test cases such as type conversion or connector new feature testin
| name | type | required | default value |
|---------------------|--------|----------|---------------|
| schema | config | yes | - |
| rows | config | no | - |
| row.num | int | no | 5 |
| split.num | int | no | 1 |
| split.read-interval | long | no | 1 |
Expand Down Expand Up @@ -77,6 +78,33 @@ The schema of fake data that you want to generate
}
```

### rows

The row list of fake data output per degree of parallelism

example

```hocon
rows = [
{
kind = INSERT
fields = [1, "A", 100]
},
{
kind = UPDATE_BEFORE
fields = [1, "A", 100]
},
{
kind = UPDATE_AFTER
fields = [1, "A_1", 100]
},
{
kind = DELETE
fields = [1, "A_1", 100]
}
]
```

### row.num

The total number of data generated per degree of parallelism
Expand Down Expand Up @@ -111,6 +139,8 @@ Source plugin common parameters, please refer to [Source Common Options](common-

## Example

Auto generate data rows

```hocon
FakeSource {
row.num = 10
Expand Down Expand Up @@ -157,6 +187,46 @@ FakeSource {
}
```

Using fake data rows

```hocon
FakeSource {
schema = {
fields {
pk_id = bigint
name = string
score = int
}
}
rows = [
{
kind = INSERT
fields = [1, "A", 100]
},
{
kind = INSERT
fields = [2, "B", 100]
},
{
kind = INSERT
fields = [3, "C", 100]
},
{
kind = UPDATE_BEFORE
fields = [1, "A", 100]
},
{
kind = UPDATE_AFTER
fields = [1, "A_1", 100]
},
{
kind = DELETE
fields = [2, "B", 100]
}
]
}
```

## Changelog

### 2.2.0-beta 2022-09-26
Expand All @@ -173,3 +243,7 @@ FakeSource {
- Support user-defined bytes length
- [Improve] Support multiple splits for fake source connector ([2974](https://github.com/apache/incubator-seatunnel/pull/2974))
- [Improve] Supports setting the number of splits per parallelism and the reading interval between two splits ([3098](https://github.com/apache/incubator-seatunnel/pull/3098))

### next version

- [Feature] Support config fake data rows [3865](https://github.com/apache/incubator-seatunnel/pull/3865)
5 changes: 5 additions & 0 deletions seatunnel-connectors-v2/connector-fake/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,11 @@
<artifactId>connector-common</artifactId>
<version>${project.version}</version>
</dependency>
<dependency>
<groupId>org.apache.seatunnel</groupId>
<artifactId>seatunnel-format-json</artifactId>
<version>${project.version}</version>
</dependency>
</dependencies>

</project>
Original file line number Diff line number Diff line change
Expand Up @@ -20,17 +20,22 @@
import static org.apache.seatunnel.connectors.seatunnel.fake.config.FakeOption.ARRAY_SIZE;
import static org.apache.seatunnel.connectors.seatunnel.fake.config.FakeOption.BYTES_LENGTH;
import static org.apache.seatunnel.connectors.seatunnel.fake.config.FakeOption.MAP_SIZE;
import static org.apache.seatunnel.connectors.seatunnel.fake.config.FakeOption.ROWS;
import static org.apache.seatunnel.connectors.seatunnel.fake.config.FakeOption.ROW_NUM;
import static org.apache.seatunnel.connectors.seatunnel.fake.config.FakeOption.SPLIT_NUM;
import static org.apache.seatunnel.connectors.seatunnel.fake.config.FakeOption.SPLIT_READ_INTERVAL;
import static org.apache.seatunnel.connectors.seatunnel.fake.config.FakeOption.STRING_LENGTH;

import org.apache.seatunnel.shade.com.typesafe.config.Config;
import org.apache.seatunnel.shade.com.typesafe.config.ConfigRenderOptions;

import lombok.AllArgsConstructor;
import lombok.Builder;
import lombok.Getter;

import java.io.Serializable;
import java.util.ArrayList;
import java.util.List;

@Builder
@Getter
Expand All @@ -49,6 +54,7 @@ public class FakeConfig implements Serializable {
private int bytesLength = BYTES_LENGTH.defaultValue();
@Builder.Default
private int stringLength = STRING_LENGTH.defaultValue();
private List<RowData> fakeRows;

public static FakeConfig buildWithConfig(Config config) {
FakeConfigBuilder builder = FakeConfig.builder();
Expand All @@ -73,6 +79,27 @@ public static FakeConfig buildWithConfig(Config config) {
if (config.hasPath(STRING_LENGTH.key())) {
builder.stringLength(config.getInt(STRING_LENGTH.key()));
}
if (config.hasPath(ROWS.key())) {
List<? extends Config> configs = config.getConfigList(ROWS.key());
List<RowData> rows = new ArrayList<>(configs.size());
ConfigRenderOptions options = ConfigRenderOptions.concise();
for (Config configItem : configs) {
String fieldsJson = configItem.getValue(RowData.KEY_FIELDS).render(options);
RowData rowData = new RowData(configItem.getString(RowData.KEY_KIND), fieldsJson);
rows.add(rowData);
}
builder.fakeRows(rows);
}
return builder.build();
}

@Getter
@AllArgsConstructor
public static class RowData implements Serializable {
static final String KEY_KIND = "kind";
static final String KEY_FIELDS = "fields";

private String kind;
private String fieldsJson;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -19,10 +19,15 @@

import org.apache.seatunnel.api.configuration.Option;
import org.apache.seatunnel.api.configuration.Options;
import org.apache.seatunnel.api.table.type.SeaTunnelRow;

import java.util.List;

@SuppressWarnings("checkstyle:MagicNumber")
public class FakeOption {

public static final Option<List<SeaTunnelRow>> ROWS = Options.key("rows").listType(SeaTunnelRow.class).noDefaultValue()
.withDescription("The row list of fake data output per degree of parallelism");
public static final Option<Integer> ROW_NUM = Options.key("row.num").intType().defaultValue(5)
.withDescription("The total number of data generated per degree of parallelism");
public static final Option<Integer> SPLIT_NUM = Options.key("split.num").intType().defaultValue(1)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,17 +21,20 @@
import org.apache.seatunnel.api.table.type.BasicType;
import org.apache.seatunnel.api.table.type.DecimalType;
import org.apache.seatunnel.api.table.type.MapType;
import org.apache.seatunnel.api.table.type.RowKind;
import org.apache.seatunnel.api.table.type.SeaTunnelDataType;
import org.apache.seatunnel.api.table.type.SeaTunnelRow;
import org.apache.seatunnel.api.table.type.SeaTunnelRowType;
import org.apache.seatunnel.common.exception.CommonErrorCode;
import org.apache.seatunnel.connectors.seatunnel.common.schema.SeaTunnelSchema;
import org.apache.seatunnel.connectors.seatunnel.fake.config.FakeConfig;
import org.apache.seatunnel.connectors.seatunnel.fake.exception.FakeConnectorException;
import org.apache.seatunnel.format.json.JsonDeserializationSchema;

import org.apache.commons.lang3.RandomStringUtils;
import org.apache.commons.lang3.RandomUtils;

import java.io.IOException;
import java.lang.reflect.Array;
import java.math.BigDecimal;
import java.time.LocalDateTime;
Expand All @@ -43,10 +46,27 @@ public class FakeDataGenerator {
public static final String SCHEMA = "schema";
private final SeaTunnelSchema schema;
private final FakeConfig fakeConfig;
private final JsonDeserializationSchema jsonDeserializationSchema;

public FakeDataGenerator(SeaTunnelSchema schema, FakeConfig fakeConfig) {
this.schema = schema;
this.fakeConfig = fakeConfig;
this.jsonDeserializationSchema = fakeConfig.getFakeRows() == null ?
null :
new JsonDeserializationSchema(
false, false, schema.getSeaTunnelRowType());
}

private SeaTunnelRow convertRow(FakeConfig.RowData rowData) {
try {
SeaTunnelRow seaTunnelRow = jsonDeserializationSchema.deserialize(rowData.getFieldsJson());
if (rowData.getKind() != null) {
seaTunnelRow.setRowKind(RowKind.valueOf(rowData.getKind()));
}
return seaTunnelRow;
} catch (IOException e) {
throw new FakeConnectorException(CommonErrorCode.JSON_OPERATION_FAILED, e);
}
}

private SeaTunnelRow randomRow() {
Expand All @@ -61,7 +81,14 @@ private SeaTunnelRow randomRow() {
}

public List<SeaTunnelRow> generateFakedRows(int rowNum) {
ArrayList<SeaTunnelRow> seaTunnelRows = new ArrayList<>();
List<SeaTunnelRow> seaTunnelRows = new ArrayList<>();
if (fakeConfig.getFakeRows() != null) {
for (FakeConfig.RowData rowData : fakeConfig.getFakeRows()) {
seaTunnelRows.add(convertRow(rowData));
}
return seaTunnelRows;
}

for (int i = 0; i < rowNum; i++) {
seaTunnelRows.add(randomRow());
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@
import static org.apache.seatunnel.connectors.seatunnel.fake.config.FakeOption.ARRAY_SIZE;
import static org.apache.seatunnel.connectors.seatunnel.fake.config.FakeOption.BYTES_LENGTH;
import static org.apache.seatunnel.connectors.seatunnel.fake.config.FakeOption.MAP_SIZE;
import static org.apache.seatunnel.connectors.seatunnel.fake.config.FakeOption.ROWS;
import static org.apache.seatunnel.connectors.seatunnel.fake.config.FakeOption.ROW_NUM;
import static org.apache.seatunnel.connectors.seatunnel.fake.config.FakeOption.SPLIT_NUM;
import static org.apache.seatunnel.connectors.seatunnel.fake.config.FakeOption.SPLIT_READ_INTERVAL;
Expand All @@ -42,8 +43,18 @@ public String factoryIdentifier() {

@Override
public OptionRule optionRule() {
return OptionRule.builder().required(SeaTunnelSchema.SCHEMA).optional(ROW_NUM, SPLIT_NUM, SPLIT_READ_INTERVAL, MAP_SIZE,
ARRAY_SIZE, BYTES_LENGTH, STRING_LENGTH).build();
return OptionRule.builder()
.required(SeaTunnelSchema.SCHEMA)
.optional(
ROWS,
ROW_NUM,
SPLIT_NUM,
SPLIT_READ_INTERVAL,
MAP_SIZE,
ARRAY_SIZE,
BYTES_LENGTH,
STRING_LENGTH)
.build();
}

@Override
Expand Down
Loading

0 comments on commit 26428a5

Please sign in to comment.