diff --git a/docs/en/connector-v2/sink/StarRocks.md b/docs/en/connector-v2/sink/StarRocks.md index 02243f722f21..5bdddffbc6e3 100644 --- a/docs/en/connector-v2/sink/StarRocks.md +++ b/docs/en/connector-v2/sink/StarRocks.md @@ -8,24 +8,26 @@ The internal implementation of StarRocks sink connector is cached and imported b ## Key features - [ ] [exactly-once](../../concept/connector-v2-features.md) +- [x] [cdc](../../concept/connector-v2-features.md) ## Options -| name | type | required | default value | -|-----------------------------|--------|----------|-----------------| -| node_urls | list | yes | - | -| username | string | yes | - | -| password | string | yes | - | -| database | string | yes | - | -| table | string | yes | - | -| labelPrefix | string | no | - | -| batch_max_rows | long | no | 1024 | -| batch_max_bytes | int | no | 5 * 1024 * 1024 | -| batch_interval_ms | int | no | - | -| max_retries | int | no | - | -| retry_backoff_multiplier_ms | int | no | - | -| max_retry_backoff_ms | int | no | - | -| starrocks.config | map | no | - | +| name | type | required | default value | +|-----------------------------|---------|----------|-----------------| +| node_urls | list | yes | - | +| username | string | yes | - | +| password | string | yes | - | +| database | string | yes | - | +| table | string | yes | - | +| labelPrefix | string | no | - | +| batch_max_rows | long | no | 1024 | +| batch_max_bytes | int | no | 5 * 1024 * 1024 | +| batch_interval_ms | int | no | - | +| max_retries | int | no | - | +| retry_backoff_multiplier_ms | int | no | - | +| max_retry_backoff_ms | int | no | - | +| enable_upsert_delete | boolean | no | false | +| starrocks.config | map | no | - | ### node_urls [list] @@ -75,6 +77,10 @@ Using as a multiplier for generating the next delay for backoff The amount of time to wait before attempting to retry a request to `StarRocks` +### enable_upsert_delete [boolean] + +Whether to enable upsert/delete, only supports PrimaryKey model. + ### starrocks.config [map] The parameter of the stream load `data_desc` @@ -125,9 +131,28 @@ sink { } ``` +Support write cdc changelog event(INSERT/UPDATE/DELETE) + +```hocon +sink { + StarRocks { + nodeUrls = ["e2e_starRocksdb:8030"] + username = root + password = "" + database = "test" + table = "e2e_table_sink" + ... + + // Support upsert/delete event synchronization (enable_upsert_delete=true), only supports PrimaryKey model. + enable_upsert_delete = true + } +} +``` + ## Changelog ### next version - Add StarRocks Sink Connector -- [Improve] Change Connector Custom Config Prefix To Map [3719](https://github.com/apache/incubator-seatunnel/pull/3719) \ No newline at end of file +- [Improve] Change Connector Custom Config Prefix To Map [3719](https://github.com/apache/incubator-seatunnel/pull/3719) +- [Feature] Support write cdc changelog event(INSERT/UPDATE/DELETE) [3865](https://github.com/apache/incubator-seatunnel/pull/3865) \ No newline at end of file diff --git a/docs/en/connector-v2/source/FakeSource.md b/docs/en/connector-v2/source/FakeSource.md index 9e6337f104ea..d3d93bd2bb78 100644 --- a/docs/en/connector-v2/source/FakeSource.md +++ b/docs/en/connector-v2/source/FakeSource.md @@ -21,6 +21,7 @@ just for some test cases such as type conversion or connector new feature testin | name | type | required | default value | |---------------------|--------|----------|---------------| | schema | config | yes | - | +| rows | config | no | - | | row.num | int | no | 5 | | split.num | int | no | 1 | | split.read-interval | long | no | 1 | @@ -77,6 +78,33 @@ The schema of fake data that you want to generate } ``` +### rows + +The row list of fake data output per degree of parallelism + +example + +```hocon + rows = [ + { + kind = INSERT + fields = [1, "A", 100] + }, + { + kind = UPDATE_BEFORE + fields = [1, "A", 100] + }, + { + kind = UPDATE_AFTER + fields = [1, "A_1", 100] + }, + { + kind = DELETE + fields = [1, "A_1", 100] + } + ] +``` + ### row.num The total number of data generated per degree of parallelism @@ -111,6 +139,8 @@ Source plugin common parameters, please refer to [Source Common Options](common- ## Example +Auto generate data rows + ```hocon FakeSource { row.num = 10 @@ -157,6 +187,46 @@ FakeSource { } ``` +Using fake data rows + +```hocon +FakeSource { + schema = { + fields { + pk_id = bigint + name = string + score = int + } + } + rows = [ + { + kind = INSERT + fields = [1, "A", 100] + }, + { + kind = INSERT + fields = [2, "B", 100] + }, + { + kind = INSERT + fields = [3, "C", 100] + }, + { + kind = UPDATE_BEFORE + fields = [1, "A", 100] + }, + { + kind = UPDATE_AFTER + fields = [1, "A_1", 100] + }, + { + kind = DELETE + fields = [2, "B", 100] + } + ] +} +``` + ## Changelog ### 2.2.0-beta 2022-09-26 @@ -173,3 +243,7 @@ FakeSource { - Support user-defined bytes length - [Improve] Support multiple splits for fake source connector ([2974](https://github.com/apache/incubator-seatunnel/pull/2974)) - [Improve] Supports setting the number of splits per parallelism and the reading interval between two splits ([3098](https://github.com/apache/incubator-seatunnel/pull/3098)) + +### next version + +- [Feature] Support config fake data rows [3865](https://github.com/apache/incubator-seatunnel/pull/3865) \ No newline at end of file diff --git a/seatunnel-connectors-v2/connector-fake/pom.xml b/seatunnel-connectors-v2/connector-fake/pom.xml index 7cb8ed3e4e21..e6b23272de65 100644 --- a/seatunnel-connectors-v2/connector-fake/pom.xml +++ b/seatunnel-connectors-v2/connector-fake/pom.xml @@ -35,6 +35,11 @@ connector-common ${project.version} + + org.apache.seatunnel + seatunnel-format-json + ${project.version} + \ No newline at end of file diff --git a/seatunnel-connectors-v2/connector-fake/src/main/java/org/apache/seatunnel/connectors/seatunnel/fake/config/FakeConfig.java b/seatunnel-connectors-v2/connector-fake/src/main/java/org/apache/seatunnel/connectors/seatunnel/fake/config/FakeConfig.java index c6555f7ab49c..035b755c7b61 100644 --- a/seatunnel-connectors-v2/connector-fake/src/main/java/org/apache/seatunnel/connectors/seatunnel/fake/config/FakeConfig.java +++ b/seatunnel-connectors-v2/connector-fake/src/main/java/org/apache/seatunnel/connectors/seatunnel/fake/config/FakeConfig.java @@ -20,17 +20,22 @@ import static org.apache.seatunnel.connectors.seatunnel.fake.config.FakeOption.ARRAY_SIZE; import static org.apache.seatunnel.connectors.seatunnel.fake.config.FakeOption.BYTES_LENGTH; import static org.apache.seatunnel.connectors.seatunnel.fake.config.FakeOption.MAP_SIZE; +import static org.apache.seatunnel.connectors.seatunnel.fake.config.FakeOption.ROWS; import static org.apache.seatunnel.connectors.seatunnel.fake.config.FakeOption.ROW_NUM; import static org.apache.seatunnel.connectors.seatunnel.fake.config.FakeOption.SPLIT_NUM; import static org.apache.seatunnel.connectors.seatunnel.fake.config.FakeOption.SPLIT_READ_INTERVAL; import static org.apache.seatunnel.connectors.seatunnel.fake.config.FakeOption.STRING_LENGTH; import org.apache.seatunnel.shade.com.typesafe.config.Config; +import org.apache.seatunnel.shade.com.typesafe.config.ConfigRenderOptions; +import lombok.AllArgsConstructor; import lombok.Builder; import lombok.Getter; import java.io.Serializable; +import java.util.ArrayList; +import java.util.List; @Builder @Getter @@ -49,6 +54,7 @@ public class FakeConfig implements Serializable { private int bytesLength = BYTES_LENGTH.defaultValue(); @Builder.Default private int stringLength = STRING_LENGTH.defaultValue(); + private List fakeRows; public static FakeConfig buildWithConfig(Config config) { FakeConfigBuilder builder = FakeConfig.builder(); @@ -73,6 +79,27 @@ public static FakeConfig buildWithConfig(Config config) { if (config.hasPath(STRING_LENGTH.key())) { builder.stringLength(config.getInt(STRING_LENGTH.key())); } + if (config.hasPath(ROWS.key())) { + List configs = config.getConfigList(ROWS.key()); + List rows = new ArrayList<>(configs.size()); + ConfigRenderOptions options = ConfigRenderOptions.concise(); + for (Config configItem : configs) { + String fieldsJson = configItem.getValue(RowData.KEY_FIELDS).render(options); + RowData rowData = new RowData(configItem.getString(RowData.KEY_KIND), fieldsJson); + rows.add(rowData); + } + builder.fakeRows(rows); + } return builder.build(); } + + @Getter + @AllArgsConstructor + public static class RowData implements Serializable { + static final String KEY_KIND = "kind"; + static final String KEY_FIELDS = "fields"; + + private String kind; + private String fieldsJson; + } } diff --git a/seatunnel-connectors-v2/connector-fake/src/main/java/org/apache/seatunnel/connectors/seatunnel/fake/config/FakeOption.java b/seatunnel-connectors-v2/connector-fake/src/main/java/org/apache/seatunnel/connectors/seatunnel/fake/config/FakeOption.java index 02cb5fe8b860..c50ed00782ed 100644 --- a/seatunnel-connectors-v2/connector-fake/src/main/java/org/apache/seatunnel/connectors/seatunnel/fake/config/FakeOption.java +++ b/seatunnel-connectors-v2/connector-fake/src/main/java/org/apache/seatunnel/connectors/seatunnel/fake/config/FakeOption.java @@ -19,10 +19,15 @@ import org.apache.seatunnel.api.configuration.Option; import org.apache.seatunnel.api.configuration.Options; +import org.apache.seatunnel.api.table.type.SeaTunnelRow; + +import java.util.List; @SuppressWarnings("checkstyle:MagicNumber") public class FakeOption { + public static final Option> ROWS = Options.key("rows").listType(SeaTunnelRow.class).noDefaultValue() + .withDescription("The row list of fake data output per degree of parallelism"); public static final Option ROW_NUM = Options.key("row.num").intType().defaultValue(5) .withDescription("The total number of data generated per degree of parallelism"); public static final Option SPLIT_NUM = Options.key("split.num").intType().defaultValue(1) diff --git a/seatunnel-connectors-v2/connector-fake/src/main/java/org/apache/seatunnel/connectors/seatunnel/fake/source/FakeDataGenerator.java b/seatunnel-connectors-v2/connector-fake/src/main/java/org/apache/seatunnel/connectors/seatunnel/fake/source/FakeDataGenerator.java index df158d0b8c0b..5848bf3d67e8 100644 --- a/seatunnel-connectors-v2/connector-fake/src/main/java/org/apache/seatunnel/connectors/seatunnel/fake/source/FakeDataGenerator.java +++ b/seatunnel-connectors-v2/connector-fake/src/main/java/org/apache/seatunnel/connectors/seatunnel/fake/source/FakeDataGenerator.java @@ -21,6 +21,7 @@ import org.apache.seatunnel.api.table.type.BasicType; import org.apache.seatunnel.api.table.type.DecimalType; import org.apache.seatunnel.api.table.type.MapType; +import org.apache.seatunnel.api.table.type.RowKind; import org.apache.seatunnel.api.table.type.SeaTunnelDataType; import org.apache.seatunnel.api.table.type.SeaTunnelRow; import org.apache.seatunnel.api.table.type.SeaTunnelRowType; @@ -28,10 +29,12 @@ import org.apache.seatunnel.connectors.seatunnel.common.schema.SeaTunnelSchema; import org.apache.seatunnel.connectors.seatunnel.fake.config.FakeConfig; import org.apache.seatunnel.connectors.seatunnel.fake.exception.FakeConnectorException; +import org.apache.seatunnel.format.json.JsonDeserializationSchema; import org.apache.commons.lang3.RandomStringUtils; import org.apache.commons.lang3.RandomUtils; +import java.io.IOException; import java.lang.reflect.Array; import java.math.BigDecimal; import java.time.LocalDateTime; @@ -43,10 +46,27 @@ public class FakeDataGenerator { public static final String SCHEMA = "schema"; private final SeaTunnelSchema schema; private final FakeConfig fakeConfig; + private final JsonDeserializationSchema jsonDeserializationSchema; public FakeDataGenerator(SeaTunnelSchema schema, FakeConfig fakeConfig) { this.schema = schema; this.fakeConfig = fakeConfig; + this.jsonDeserializationSchema = fakeConfig.getFakeRows() == null ? + null : + new JsonDeserializationSchema( + false, false, schema.getSeaTunnelRowType()); + } + + private SeaTunnelRow convertRow(FakeConfig.RowData rowData) { + try { + SeaTunnelRow seaTunnelRow = jsonDeserializationSchema.deserialize(rowData.getFieldsJson()); + if (rowData.getKind() != null) { + seaTunnelRow.setRowKind(RowKind.valueOf(rowData.getKind())); + } + return seaTunnelRow; + } catch (IOException e) { + throw new FakeConnectorException(CommonErrorCode.JSON_OPERATION_FAILED, e); + } } private SeaTunnelRow randomRow() { @@ -61,7 +81,14 @@ private SeaTunnelRow randomRow() { } public List generateFakedRows(int rowNum) { - ArrayList seaTunnelRows = new ArrayList<>(); + List seaTunnelRows = new ArrayList<>(); + if (fakeConfig.getFakeRows() != null) { + for (FakeConfig.RowData rowData : fakeConfig.getFakeRows()) { + seaTunnelRows.add(convertRow(rowData)); + } + return seaTunnelRows; + } + for (int i = 0; i < rowNum; i++) { seaTunnelRows.add(randomRow()); } diff --git a/seatunnel-connectors-v2/connector-fake/src/main/java/org/apache/seatunnel/connectors/seatunnel/fake/source/FakeSourceFactory.java b/seatunnel-connectors-v2/connector-fake/src/main/java/org/apache/seatunnel/connectors/seatunnel/fake/source/FakeSourceFactory.java index 11674b60bda1..f4822ea56ed8 100644 --- a/seatunnel-connectors-v2/connector-fake/src/main/java/org/apache/seatunnel/connectors/seatunnel/fake/source/FakeSourceFactory.java +++ b/seatunnel-connectors-v2/connector-fake/src/main/java/org/apache/seatunnel/connectors/seatunnel/fake/source/FakeSourceFactory.java @@ -20,6 +20,7 @@ import static org.apache.seatunnel.connectors.seatunnel.fake.config.FakeOption.ARRAY_SIZE; import static org.apache.seatunnel.connectors.seatunnel.fake.config.FakeOption.BYTES_LENGTH; import static org.apache.seatunnel.connectors.seatunnel.fake.config.FakeOption.MAP_SIZE; +import static org.apache.seatunnel.connectors.seatunnel.fake.config.FakeOption.ROWS; import static org.apache.seatunnel.connectors.seatunnel.fake.config.FakeOption.ROW_NUM; import static org.apache.seatunnel.connectors.seatunnel.fake.config.FakeOption.SPLIT_NUM; import static org.apache.seatunnel.connectors.seatunnel.fake.config.FakeOption.SPLIT_READ_INTERVAL; @@ -42,8 +43,18 @@ public String factoryIdentifier() { @Override public OptionRule optionRule() { - return OptionRule.builder().required(SeaTunnelSchema.SCHEMA).optional(ROW_NUM, SPLIT_NUM, SPLIT_READ_INTERVAL, MAP_SIZE, - ARRAY_SIZE, BYTES_LENGTH, STRING_LENGTH).build(); + return OptionRule.builder() + .required(SeaTunnelSchema.SCHEMA) + .optional( + ROWS, + ROW_NUM, + SPLIT_NUM, + SPLIT_READ_INTERVAL, + MAP_SIZE, + ARRAY_SIZE, + BYTES_LENGTH, + STRING_LENGTH) + .build(); } @Override diff --git a/seatunnel-connectors-v2/connector-fake/src/test/java/org.apache.seatunnel.connectors.seatunnel.fake.source/FakeDataGeneratorTest.java b/seatunnel-connectors-v2/connector-fake/src/test/java/org/apache/seatunnel/connectors/seatunnel/fake/source/FakeDataGeneratorTest.java similarity index 71% rename from seatunnel-connectors-v2/connector-fake/src/test/java/org.apache.seatunnel.connectors.seatunnel.fake.source/FakeDataGeneratorTest.java rename to seatunnel-connectors-v2/connector-fake/src/test/java/org/apache/seatunnel/connectors/seatunnel/fake/source/FakeDataGeneratorTest.java index 6e58e30bafa2..5bf9a1062d98 100644 --- a/seatunnel-connectors-v2/connector-fake/src/test/java/org.apache.seatunnel.connectors.seatunnel.fake.source/FakeDataGeneratorTest.java +++ b/seatunnel-connectors-v2/connector-fake/src/test/java/org/apache/seatunnel/connectors/seatunnel/fake/source/FakeDataGeneratorTest.java @@ -17,6 +17,7 @@ package org.apache.seatunnel.connectors.seatunnel.fake.source; +import org.apache.seatunnel.api.table.type.RowKind; import org.apache.seatunnel.api.table.type.SeaTunnelRow; import org.apache.seatunnel.api.table.type.SeaTunnelRowType; import org.apache.seatunnel.connectors.seatunnel.common.schema.SeaTunnelSchema; @@ -34,6 +35,7 @@ import java.net.URISyntaxException; import java.net.URL; import java.nio.file.Paths; +import java.util.Arrays; import java.util.List; import java.util.Map; @@ -73,6 +75,31 @@ public void testComplexSchemaParse(String conf) throws FileNotFoundException, UR } } + @ParameterizedTest + @ValueSource(strings = {"fake-data.schema.conf"}) + public void testRowDataParse(String conf) throws FileNotFoundException, URISyntaxException { + SeaTunnelRow row1 = new SeaTunnelRow(new Object[]{ 1L, "A", 100 }); + row1.setRowKind(RowKind.INSERT); + SeaTunnelRow row2 = new SeaTunnelRow(new Object[]{ 2L, "B", 100 }); + row2.setRowKind(RowKind.INSERT); + SeaTunnelRow row3 = new SeaTunnelRow(new Object[]{ 3L, "C", 100 }); + row3.setRowKind(RowKind.INSERT); + SeaTunnelRow row1UpdateBefore = new SeaTunnelRow(new Object[]{ 1L, "A", 100 }); + row1UpdateBefore.setRowKind(RowKind.UPDATE_BEFORE); + SeaTunnelRow row1UpdateAfter = new SeaTunnelRow(new Object[]{ 1L, "A_1", 100 }); + row1UpdateAfter.setRowKind(RowKind.UPDATE_AFTER); + SeaTunnelRow row2Delete = new SeaTunnelRow(new Object[]{ 2L, "B", 100 }); + row2Delete.setRowKind(RowKind.DELETE); + List expected = Arrays.asList(row1, row2, row3, row1UpdateBefore, row1UpdateAfter, row2Delete); + + Config testConfig = getTestConfigFile(conf); + SeaTunnelSchema seaTunnelSchema = SeaTunnelSchema.buildWithConfig(testConfig.getConfig(SeaTunnelSchema.SCHEMA.key())); + FakeConfig fakeConfig = FakeConfig.buildWithConfig(testConfig); + FakeDataGenerator fakeDataGenerator = new FakeDataGenerator(seaTunnelSchema, fakeConfig); + List seaTunnelRows = fakeDataGenerator.generateFakedRows(fakeConfig.getRowNum()); + Assertions.assertIterableEquals(expected, seaTunnelRows); + } + private Config getTestConfigFile(String configFile) throws FileNotFoundException, URISyntaxException { if (!configFile.startsWith("/")) { configFile = "/" + configFile; diff --git a/seatunnel-connectors-v2/connector-fake/src/test/resources/fake-data.schema.conf b/seatunnel-connectors-v2/connector-fake/src/test/resources/fake-data.schema.conf new file mode 100644 index 000000000000..3ffbca224e28 --- /dev/null +++ b/seatunnel-connectors-v2/connector-fake/src/test/resources/fake-data.schema.conf @@ -0,0 +1,52 @@ +# +# Licensed to the Apache Software Foundation (ASF) under one or more +# contributor license agreements. See the NOTICE file distributed with +# this work for additional information regarding copyright ownership. +# The ASF licenses this file to You under the Apache License, Version 2.0 +# (the "License"); you may not use this file except in compliance with +# the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +# + +FakeSource { + schema = { + fields { + pk_id = bigint + name = string + score = int + } + } + rows = [ + { + kind = INSERT + fields = [1, "A", 100] + }, + { + kind = INSERT + fields = [2, "B", 100] + }, + { + kind = INSERT + fields = [3, "C", 100] + }, + { + kind = UPDATE_BEFORE + fields = [1, "A", 100] + }, + { + kind = UPDATE_AFTER + fields = [1, "A_1", 100] + }, + { + kind = DELETE + fields = [2, "B", 100] + } + ] +} \ No newline at end of file diff --git a/seatunnel-connectors-v2/connector-starrocks/src/main/java/org/apache/seatunnel/connectors/seatunnel/starrocks/config/SinkConfig.java b/seatunnel-connectors-v2/connector-starrocks/src/main/java/org/apache/seatunnel/connectors/seatunnel/starrocks/config/SinkConfig.java index 16da9f983a90..3320eab69697 100644 --- a/seatunnel-connectors-v2/connector-starrocks/src/main/java/org/apache/seatunnel/connectors/seatunnel/starrocks/config/SinkConfig.java +++ b/seatunnel-connectors-v2/connector-starrocks/src/main/java/org/apache/seatunnel/connectors/seatunnel/starrocks/config/SinkConfig.java @@ -109,6 +109,11 @@ public class SinkConfig { .noDefaultValue() .withDescription("The amount of time to wait before attempting to retry a request to StarRocks"); + public static final Option ENABLE_UPSERT_DELETE = Options.key("enable_upsert_delete") + .booleanType() + .defaultValue(false) + .withDescription("Whether to enable upsert/delete, only supports PrimaryKey model."); + public enum StreamLoadFormat { CSV, JSON; public static StreamLoadFormat parse(String format) { @@ -135,6 +140,7 @@ public static StreamLoadFormat parse(String format) { private int maxRetries; private int retryBackoffMultiplierMs; private int maxRetryBackoffMs; + private boolean enableUpsertDelete; private final Map streamLoadProps = new HashMap<>(); @@ -171,6 +177,9 @@ public static SinkConfig loadConfig(Config pluginConfig) { if (pluginConfig.hasPath(MAX_RETRY_BACKOFF_MS.key())) { sinkConfig.setMaxRetryBackoffMs(pluginConfig.getInt(MAX_RETRY_BACKOFF_MS.key())); } + if (pluginConfig.hasPath(ENABLE_UPSERT_DELETE.key())) { + sinkConfig.setEnableUpsertDelete(pluginConfig.getBoolean(ENABLE_UPSERT_DELETE.key())); + } parseSinkStreamLoadProperties(pluginConfig, sinkConfig); if (sinkConfig.streamLoadProps.containsKey(COLUMN_SEPARATOR)) { sinkConfig.setColumnSeparator((String) sinkConfig.streamLoadProps.get(COLUMN_SEPARATOR)); diff --git a/seatunnel-connectors-v2/connector-starrocks/src/main/java/org/apache/seatunnel/connectors/seatunnel/starrocks/serialize/StarRocksBaseSerializer.java b/seatunnel-connectors-v2/connector-starrocks/src/main/java/org/apache/seatunnel/connectors/seatunnel/starrocks/serialize/StarRocksBaseSerializer.java index 7f9cc43ee4df..c00cdfb2caff 100644 --- a/seatunnel-connectors-v2/connector-starrocks/src/main/java/org/apache/seatunnel/connectors/seatunnel/starrocks/serialize/StarRocksBaseSerializer.java +++ b/seatunnel-connectors-v2/connector-starrocks/src/main/java/org/apache/seatunnel/connectors/seatunnel/starrocks/serialize/StarRocksBaseSerializer.java @@ -39,33 +39,27 @@ public class StarRocksBaseSerializer { @Builder.Default private TimeUtils.Formatter timeFormatter = TimeUtils.Formatter.HH_MM_SS; - protected String convert(SeaTunnelDataType dataType, Object val) { + protected Object convert(SeaTunnelDataType dataType, Object val) { if (val == null) { return null; } switch (dataType.getSqlType()) { case TINYINT: case SMALLINT: - return String.valueOf(((Number) val).shortValue()); case INT: - return String.valueOf(((Number) val).intValue()); case BIGINT: - return String.valueOf(((Number) val).longValue()); case FLOAT: - return String.valueOf(((Number) val).floatValue()); case DOUBLE: - return String.valueOf(((Number) val).doubleValue()); case DECIMAL: case BOOLEAN: - return val.toString(); + case STRING: + return val; case DATE: return DateUtils.toString((LocalDate) val, dateFormatter); case TIME: return TimeUtils.toString((LocalTime) val, timeFormatter); case TIMESTAMP: return DateTimeUtils.toString((LocalDateTime) val, dateTimeFormatter); - case STRING: - return (String) val; case ARRAY: case MAP: return JsonUtils.toJsonString(val); diff --git a/seatunnel-connectors-v2/connector-starrocks/src/main/java/org/apache/seatunnel/connectors/seatunnel/starrocks/serialize/StarRocksCsvSerializer.java b/seatunnel-connectors-v2/connector-starrocks/src/main/java/org/apache/seatunnel/connectors/seatunnel/starrocks/serialize/StarRocksCsvSerializer.java index 191b615baedc..352d30369fc8 100644 --- a/seatunnel-connectors-v2/connector-starrocks/src/main/java/org/apache/seatunnel/connectors/seatunnel/starrocks/serialize/StarRocksCsvSerializer.java +++ b/seatunnel-connectors-v2/connector-starrocks/src/main/java/org/apache/seatunnel/connectors/seatunnel/starrocks/serialize/StarRocksCsvSerializer.java @@ -25,22 +25,30 @@ public class StarRocksCsvSerializer extends StarRocksBaseSerializer implements S private final String columnSeparator; private final SeaTunnelRowType seaTunnelRowType; + private final boolean enableUpsertDelete; - public StarRocksCsvSerializer(String sp, SeaTunnelRowType seaTunnelRowType) { - this.seaTunnelRowType = seaTunnelRowType; + public StarRocksCsvSerializer(String sp, + SeaTunnelRowType seaTunnelRowType, + boolean enableUpsertDelete) { this.columnSeparator = StarRocksDelimiterParser.parse(sp, "\t"); + this.seaTunnelRowType = seaTunnelRowType; + this.enableUpsertDelete = enableUpsertDelete; } @Override public String serialize(SeaTunnelRow row) { StringBuilder sb = new StringBuilder(); for (int i = 0; i < row.getFields().length; i++) { - String value = convert(seaTunnelRowType.getFieldType(i), row.getField(i)); + Object value = convert(seaTunnelRowType.getFieldType(i), row.getField(i)); sb.append(null == value ? "\\N" : value); if (i < row.getFields().length - 1) { sb.append(columnSeparator); } } + if (enableUpsertDelete) { + sb.append(columnSeparator) + .append(StarRocksSinkOP.parse(row.getRowKind()).ordinal()); + } return sb.toString(); } diff --git a/seatunnel-connectors-v2/connector-starrocks/src/main/java/org/apache/seatunnel/connectors/seatunnel/starrocks/serialize/StarRocksJsonSerializer.java b/seatunnel-connectors-v2/connector-starrocks/src/main/java/org/apache/seatunnel/connectors/seatunnel/starrocks/serialize/StarRocksJsonSerializer.java index 4997dbd86d78..8d60dd8b1d9a 100644 --- a/seatunnel-connectors-v2/connector-starrocks/src/main/java/org/apache/seatunnel/connectors/seatunnel/starrocks/serialize/StarRocksJsonSerializer.java +++ b/seatunnel-connectors-v2/connector-starrocks/src/main/java/org/apache/seatunnel/connectors/seatunnel/starrocks/serialize/StarRocksJsonSerializer.java @@ -28,9 +28,12 @@ public class StarRocksJsonSerializer extends StarRocksBaseSerializer implements private static final long serialVersionUID = 1L; private final SeaTunnelRowType seaTunnelRowType; + private final boolean enableUpsertDelete; - public StarRocksJsonSerializer(SeaTunnelRowType seaTunnelRowType) { + public StarRocksJsonSerializer(SeaTunnelRowType seaTunnelRowType, + boolean enableUpsertDelete) { this.seaTunnelRowType = seaTunnelRowType; + this.enableUpsertDelete = enableUpsertDelete; } @Override @@ -38,9 +41,12 @@ public String serialize(SeaTunnelRow row) { Map rowMap = new HashMap<>(row.getFields().length); for (int i = 0; i < row.getFields().length; i++) { - String value = convert(seaTunnelRowType.getFieldType(i), row.getField(i)); + Object value = convert(seaTunnelRowType.getFieldType(i), row.getField(i)); rowMap.put(seaTunnelRowType.getFieldName(i), value); } + if (enableUpsertDelete) { + rowMap.put(StarRocksSinkOP.COLUMN_KEY, StarRocksSinkOP.parse(row.getRowKind()).ordinal()); + } return JsonUtils.toJsonString(rowMap); } } diff --git a/seatunnel-connectors-v2/connector-starrocks/src/main/java/org/apache/seatunnel/connectors/seatunnel/starrocks/serialize/StarRocksSinkOP.java b/seatunnel-connectors-v2/connector-starrocks/src/main/java/org/apache/seatunnel/connectors/seatunnel/starrocks/serialize/StarRocksSinkOP.java new file mode 100644 index 000000000000..0bfbd1f5b2f9 --- /dev/null +++ b/seatunnel-connectors-v2/connector-starrocks/src/main/java/org/apache/seatunnel/connectors/seatunnel/starrocks/serialize/StarRocksSinkOP.java @@ -0,0 +1,42 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.seatunnel.connectors.seatunnel.starrocks.serialize; + +import org.apache.seatunnel.api.table.type.RowKind; + +/** + * Reference https://github.com/StarRocks/starrocks/blob/main/docs/loading/Load_to_Primary_Key_tables.md#upsert-and-delete + */ +public enum StarRocksSinkOP { + UPSERT, DELETE; + + public static final String COLUMN_KEY = "__op"; + + static StarRocksSinkOP parse(RowKind kind) { + switch (kind) { + case INSERT: + case UPDATE_AFTER: + return UPSERT; + case DELETE: + case UPDATE_BEFORE: + return DELETE; + default: + throw new RuntimeException("Unsupported row kind."); + } + } +} diff --git a/seatunnel-connectors-v2/connector-starrocks/src/main/java/org/apache/seatunnel/connectors/seatunnel/starrocks/sink/StarRocksSinkFactory.java b/seatunnel-connectors-v2/connector-starrocks/src/main/java/org/apache/seatunnel/connectors/seatunnel/starrocks/sink/StarRocksSinkFactory.java index 579fa6ad5e1f..b04750a94612 100644 --- a/seatunnel-connectors-v2/connector-starrocks/src/main/java/org/apache/seatunnel/connectors/seatunnel/starrocks/sink/StarRocksSinkFactory.java +++ b/seatunnel-connectors-v2/connector-starrocks/src/main/java/org/apache/seatunnel/connectors/seatunnel/starrocks/sink/StarRocksSinkFactory.java @@ -37,7 +37,7 @@ public OptionRule optionRule() { .required(SinkConfig.NODE_URLS, SinkConfig.USERNAME, SinkConfig.PASSWORD, SinkConfig.DATABASE, SinkConfig.TABLE) .optional(SinkConfig.LABEL_PREFIX, SinkConfig.BATCH_MAX_SIZE, SinkConfig.BATCH_MAX_BYTES, SinkConfig.BATCH_INTERVAL_MS, SinkConfig.MAX_RETRIES, SinkConfig.MAX_RETRY_BACKOFF_MS, - SinkConfig.RETRY_BACKOFF_MULTIPLIER_MS, SinkConfig.STARROCKS_CONFIG) + SinkConfig.RETRY_BACKOFF_MULTIPLIER_MS, SinkConfig.STARROCKS_CONFIG, SinkConfig.ENABLE_UPSERT_DELETE) .build(); } } diff --git a/seatunnel-connectors-v2/connector-starrocks/src/main/java/org/apache/seatunnel/connectors/seatunnel/starrocks/sink/StarRocksSinkWriter.java b/seatunnel-connectors-v2/connector-starrocks/src/main/java/org/apache/seatunnel/connectors/seatunnel/starrocks/sink/StarRocksSinkWriter.java index 6d0d44dee6d0..abe6e41cffa9 100644 --- a/seatunnel-connectors-v2/connector-starrocks/src/main/java/org/apache/seatunnel/connectors/seatunnel/starrocks/sink/StarRocksSinkWriter.java +++ b/seatunnel-connectors-v2/connector-starrocks/src/main/java/org/apache/seatunnel/connectors/seatunnel/starrocks/sink/StarRocksSinkWriter.java @@ -27,6 +27,7 @@ import org.apache.seatunnel.connectors.seatunnel.starrocks.serialize.StarRocksCsvSerializer; import org.apache.seatunnel.connectors.seatunnel.starrocks.serialize.StarRocksISerializer; import org.apache.seatunnel.connectors.seatunnel.starrocks.serialize.StarRocksJsonSerializer; +import org.apache.seatunnel.connectors.seatunnel.starrocks.serialize.StarRocksSinkOP; import org.apache.seatunnel.shade.com.typesafe.config.Config; @@ -49,6 +50,9 @@ public StarRocksSinkWriter(Config pluginConfig, SeaTunnelRowType seaTunnelRowType) { SinkConfig sinkConfig = SinkConfig.loadConfig(pluginConfig); List fieldNames = Arrays.stream(seaTunnelRowType.getFieldNames()).collect(Collectors.toList()); + if (sinkConfig.isEnableUpsertDelete()) { + fieldNames.add(StarRocksSinkOP.COLUMN_KEY); + } this.serializer = createSerializer(sinkConfig, seaTunnelRowType); this.manager = new StarRocksSinkManager(sinkConfig, fieldNames); } @@ -81,10 +85,11 @@ public void close() throws IOException { public static StarRocksISerializer createSerializer(SinkConfig sinkConfig, SeaTunnelRowType seaTunnelRowType) { if (SinkConfig.StreamLoadFormat.CSV.equals(sinkConfig.getLoadFormat())) { - return new StarRocksCsvSerializer(sinkConfig.getColumnSeparator(), seaTunnelRowType); + return new StarRocksCsvSerializer( + sinkConfig.getColumnSeparator(), seaTunnelRowType, sinkConfig.isEnableUpsertDelete()); } if (SinkConfig.StreamLoadFormat.JSON.equals(sinkConfig.getLoadFormat())) { - return new StarRocksJsonSerializer(seaTunnelRowType); + return new StarRocksJsonSerializer(seaTunnelRowType, sinkConfig.isEnableUpsertDelete()); } throw new StarRocksConnectorException(CommonErrorCode.ILLEGAL_ARGUMENT, "Failed to create row serializer, unsupported `format` from stream load properties."); } diff --git a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-starrocks-e2e/pom.xml b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-starrocks-e2e/pom.xml index 684b7e33669d..f27aa3888701 100644 --- a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-starrocks-e2e/pom.xml +++ b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-starrocks-e2e/pom.xml @@ -40,7 +40,7 @@ org.apache.seatunnel - connector-jdbc-e2e + connector-fake ${project.version} test diff --git a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-starrocks-e2e/src/test/java/org/apache/seatunnel/e2e/connector/starrocks/StarRocksCDCSinkIT.java b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-starrocks-e2e/src/test/java/org/apache/seatunnel/e2e/connector/starrocks/StarRocksCDCSinkIT.java new file mode 100644 index 000000000000..6ee3aef094db --- /dev/null +++ b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-starrocks-e2e/src/test/java/org/apache/seatunnel/e2e/connector/starrocks/StarRocksCDCSinkIT.java @@ -0,0 +1,154 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.seatunnel.e2e.connector.starrocks; + +import static org.awaitility.Awaitility.given; + +import org.apache.seatunnel.e2e.common.TestResource; +import org.apache.seatunnel.e2e.common.TestSuiteBase; +import org.apache.seatunnel.e2e.common.container.TestContainer; + +import lombok.extern.slf4j.Slf4j; +import org.junit.jupiter.api.AfterAll; +import org.junit.jupiter.api.Assertions; +import org.junit.jupiter.api.BeforeAll; +import org.junit.jupiter.api.TestTemplate; +import org.testcontainers.containers.Container; +import org.testcontainers.containers.GenericContainer; +import org.testcontainers.containers.output.Slf4jLogConsumer; +import org.testcontainers.lifecycle.Startables; +import org.testcontainers.utility.DockerLoggerFactory; + +import java.net.URL; +import java.net.URLClassLoader; +import java.sql.Connection; +import java.sql.Driver; +import java.sql.ResultSet; +import java.sql.SQLException; +import java.sql.Statement; +import java.util.Arrays; +import java.util.HashSet; +import java.util.List; +import java.util.Properties; +import java.util.Set; +import java.util.concurrent.TimeUnit; +import java.util.stream.Collectors; +import java.util.stream.Stream; + +@Slf4j +public class StarRocksCDCSinkIT extends TestSuiteBase implements TestResource { + private static final String DOCKER_IMAGE = "d87904488/starrocks-starter:2.2.1"; + private static final String DRIVER_CLASS = "com.mysql.cj.jdbc.Driver"; + private static final String HOST = "starrocks_cdc_e2e"; + private static final int SR_DOCKER_PORT = 9030; + private static final String USERNAME = "root"; + private static final String PASSWORD = ""; + private static final String DATABASE = "test"; + private static final String SINK_TABLE = "e2e_table_sink"; + private static final String SR_DRIVER_JAR = "https://repo1.maven.org/maven2/mysql/mysql-connector-java/8.0.16/mysql-connector-java-8.0.16.jar"; + + private static final String DDL_SINK = "create table " + DATABASE + "." + SINK_TABLE + " (\n" + + " pk_id BIGINT,\n" + + " name VARCHAR(128),\n" + + " score INT\n" + + ")ENGINE=OLAP\n" + + "PRIMARY KEY(`PK_ID`)\n" + + "DISTRIBUTED BY HASH(`PK_ID`) BUCKETS 1\n" + + "PROPERTIES (\n" + + "\"replication_num\" = \"1\",\n" + + "\"in_memory\" = \"false\"," + + "\"storage_format\" = \"DEFAULT\"" + + ")"; + + private Connection jdbcConnection; + private GenericContainer starRocksServer; + + @BeforeAll + @Override + public void startUp() { + starRocksServer = new GenericContainer<>(DOCKER_IMAGE) + .withNetwork(NETWORK) + .withNetworkAliases(HOST) + .withExposedPorts(SR_DOCKER_PORT) + .withLogConsumer(new Slf4jLogConsumer(DockerLoggerFactory.getLogger(DOCKER_IMAGE))); + Startables.deepStart(Stream.of(starRocksServer)).join(); + log.info("StarRocks container started"); + // wait for starrocks fully start + given().ignoreExceptions() + .await() + .atMost(360, TimeUnit.SECONDS) + .untilAsserted(this::initializeJdbcConnection); + initializeJdbcTable(); + } + + @AfterAll + @Override + public void tearDown() throws Exception { + if (jdbcConnection != null) { + jdbcConnection.close(); + } + if (starRocksServer != null) { + starRocksServer.close(); + } + } + + @TestTemplate + public void testStarRocksSink(TestContainer container) throws Exception { + Container.ExecResult execResult = container.executeJob("/write-cdc-changelog-to-starrocks.conf"); + Assertions.assertEquals(0, execResult.getExitCode()); + + String sinkSql = String.format("select * from %s.%s", DATABASE, SINK_TABLE); + Set> actual = new HashSet<>(); + try (Statement sinkStatement = jdbcConnection.createStatement()) { + ResultSet sinkResultSet = sinkStatement.executeQuery(sinkSql); + while (sinkResultSet.next()) { + List row = Arrays.asList( + sinkResultSet.getLong("pk_id"), + sinkResultSet.getString("name"), + sinkResultSet.getInt("score")); + actual.add(row); + } + } + Set> expected = Stream.>of( + Arrays.asList(1L, "A_1", 100), + Arrays.asList(3L, "C", 100)) + .collect(Collectors.toSet()); + Assertions.assertIterableEquals(expected, actual); + } + + private void initializeJdbcConnection() throws Exception { + URLClassLoader urlClassLoader = new URLClassLoader(new URL[]{new URL(SR_DRIVER_JAR)}, StarRocksCDCSinkIT.class.getClassLoader()); + Thread.currentThread().setContextClassLoader(urlClassLoader); + Driver driver = (Driver) urlClassLoader.loadClass(DRIVER_CLASS).newInstance(); + Properties props = new Properties(); + props.put("user", USERNAME); + props.put("password", PASSWORD); + jdbcConnection = driver.connect(String.format("jdbc:mysql://%s:%s", starRocksServer.getHost(), starRocksServer.getFirstMappedPort()), props); + } + + private void initializeJdbcTable() { + try (Statement statement = jdbcConnection.createStatement()) { + // create databases + statement.execute("create database test"); + // create sink table + statement.execute(DDL_SINK); + } catch (SQLException e) { + throw new RuntimeException("Initializing table failed!", e); + } + } +} diff --git a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-starrocks-e2e/src/test/resources/starrocks-jdbc-to-starrocks.conf b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-starrocks-e2e/src/test/resources/starrocks-jdbc-to-starrocks.conf index 4cd02a03a9e8..0edc9481ef95 100644 --- a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-starrocks-e2e/src/test/resources/starrocks-jdbc-to-starrocks.conf +++ b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-starrocks-e2e/src/test/resources/starrocks-jdbc-to-starrocks.conf @@ -41,8 +41,11 @@ sink { database = "test" table = "e2e_table_sink" batch_max_rows = 100 - sink.properties.format = "JSON" - sink.properties.strip_outer_array = true max_retries = 3 + + starrocks.config = { + format = "JSON" + strip_outer_array = true + } } } \ No newline at end of file diff --git a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-starrocks-e2e/src/test/resources/write-cdc-changelog-to-starrocks.conf b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-starrocks-e2e/src/test/resources/write-cdc-changelog-to-starrocks.conf new file mode 100644 index 000000000000..1a4066d70207 --- /dev/null +++ b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-starrocks-e2e/src/test/resources/write-cdc-changelog-to-starrocks.conf @@ -0,0 +1,78 @@ +# +# Licensed to the Apache Software Foundation (ASF) under one or more +# contributor license agreements. See the NOTICE file distributed with +# this work for additional information regarding copyright ownership. +# The ASF licenses this file to You under the Apache License, Version 2.0 +# (the "License"); you may not use this file except in compliance with +# the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +# + +env { + execution.parallelism = 1 + job.mode = "BATCH" +} + +source { + FakeSource { + schema = { + fields { + pk_id = bigint + name = string + score = int + } + } + rows = [ + { + kind = INSERT + fields = [1, "A", 100] + }, + { + kind = INSERT + fields = [2, "B", 100] + }, + { + kind = INSERT + fields = [3, "C", 100] + }, + { + kind = UPDATE_BEFORE + fields = [1, "A", 100] + }, + { + kind = UPDATE_AFTER + fields = [1, "A_1", 100] + }, + { + kind = DELETE + fields = [2, "B", 100] + } + ] + } +} + +sink { + StarRocks { + nodeUrls = ["starrocks_cdc_e2e:8030"] + username = root + password = "" + database = "test" + table = "e2e_table_sink" + + batch_max_rows = 100 + max_retries = 3 + + starrocks.config = { + format = "CSV" + } + + enable_upsert_delete = true + } +} \ No newline at end of file diff --git a/seatunnel-formats/seatunnel-format-json/src/main/java/org/apache/seatunnel/format/json/JsonDeserializationSchema.java b/seatunnel-formats/seatunnel-format-json/src/main/java/org/apache/seatunnel/format/json/JsonDeserializationSchema.java index e33a749a35cb..73417825158f 100644 --- a/seatunnel-formats/seatunnel-format-json/src/main/java/org/apache/seatunnel/format/json/JsonDeserializationSchema.java +++ b/seatunnel-formats/seatunnel-format-json/src/main/java/org/apache/seatunnel/format/json/JsonDeserializationSchema.java @@ -110,6 +110,13 @@ public SeaTunnelRow deserialize(byte[] message) throws IOException { return convertJsonNode(convertBytes(message)); } + public SeaTunnelRow deserialize(String message) throws IOException { + if (message == null) { + return null; + } + return convertJsonNode(convert(message)); + } + public void collect(byte[] message, Collector out) throws IOException { JsonNode jsonNode = convertBytes(message); if (jsonNode.isArray()) { @@ -151,6 +158,18 @@ private JsonNode convertBytes(byte[] message) throws IOException { } } + private JsonNode convert(String message) throws IOException { + try { + return objectMapper.readTree(message); + } catch (Throwable t) { + if (ignoreParseErrors) { + return null; + } + throw new SeaTunnelJsonFormatException(CommonErrorCode.JSON_OPERATION_FAILED, + String.format("Failed to deserialize JSON '%s'.", message), t); + } + } + @Override public SeaTunnelRowType getProducedType() { return this.rowType; diff --git a/seatunnel-formats/seatunnel-format-json/src/main/java/org/apache/seatunnel/format/json/JsonToRowConverters.java b/seatunnel-formats/seatunnel-format-json/src/main/java/org/apache/seatunnel/format/json/JsonToRowConverters.java index 7aad5c6d2026..1dbd363c9929 100644 --- a/seatunnel-formats/seatunnel-format-json/src/main/java/org/apache/seatunnel/format/json/JsonToRowConverters.java +++ b/seatunnel-formats/seatunnel-format-json/src/main/java/org/apache/seatunnel/format/json/JsonToRowConverters.java @@ -30,7 +30,6 @@ import org.apache.seatunnel.format.json.exception.SeaTunnelJsonFormatException; import com.fasterxml.jackson.databind.JsonNode; -import com.fasterxml.jackson.databind.node.ObjectNode; import java.io.IOException; import java.io.Serializable; @@ -304,12 +303,16 @@ public JsonToRowConverter[] apply(int value) { return new JsonToRowConverter() { @Override public Object convert(JsonNode jsonNode) { - ObjectNode node = (ObjectNode) jsonNode; int arity = fieldNames.length; SeaTunnelRow row = new SeaTunnelRow(arity); for (int i = 0; i < arity; i++) { String fieldName = fieldNames[i]; - JsonNode field = node.get(fieldName); + JsonNode field; + if (jsonNode.isArray()) { + field = jsonNode.get(i); + } else { + field = jsonNode.get(fieldName); + } try { Object convertedField = convertField(fieldConverters[i], fieldName, field); row.setField(i, convertedField);