From ebb90ba0af974dca84ecedd5ba47af51235f9872 Mon Sep 17 00:00:00 2001 From: Laglangyue <35491928+laglangyue@users.noreply.github.com> Date: Tue, 23 Aug 2022 09:45:05 +0800 Subject: [PATCH] [Imporve][Fake-Connector-V2]support user-defined-schmea and random data for fake-table (#2406) * [Connector-V2][JDBC-connector] optimization fake Co-authored-by: tangjiafu --- docs/en/connector-v2/sink/Assert.md | 12 +- docs/en/connector-v2/source/FakeSource.md | 76 ++++++++++ .../common/schema/SeatunnelSchema.java | 3 +- .../seatunnel/fake/source/FakeRandomData.java | 133 ++++++++++++++++++ .../seatunnel/fake/source/FakeSource.java | 12 +- .../fake/source/FakeSourceReader.java | 16 +-- .../FakeRandomDataTest.java | 87 ++++++++++++ .../src/test/resources/complex.schema.conf | 38 +++++ .../src/test/resources/simple.schema.conf | 37 +++++ .../assertion/fakesource_to_assert.conf | 20 +-- .../resources/fake/fakesource_to_console.conf | 18 ++- .../file/fakesource_to_hdfs_json.conf | 11 +- .../file/fakesource_to_hdfs_parquet.conf | 11 +- .../file/fakesource_to_hdfs_text.conf | 11 +- .../file/fakesource_to_local_json.conf | 11 +- .../file/fakesource_to_local_parquet.conf | 11 +- .../file/fakesource_to_local_text.conf | 13 +- .../resources/iotdb/fakesource_to_iotdb.conf | 9 +- .../resources/jdbc/fakesource_to_jdbc.conf | 12 +- .../resources/jdbc/jdbcsource_to_console.conf | 4 +- .../resources/fake/fakesource_to_console.conf | 10 +- .../file/fakesource_to_hdfs_json.conf | 13 +- .../file/fakesource_to_hdfs_parquet.conf | 13 +- .../file/fakesource_to_hdfs_text.conf | 13 +- .../file/fakesource_to_local_json.conf | 11 +- .../file/fakesource_to_local_parquet.conf | 11 +- .../file/fakesource_to_local_text.conf | 11 +- .../resources/iotdb/fakesource_to_iotdb.conf | 7 +- .../resources/examples/fake_to_console.conf | 13 +- .../resources/examples/fake_to_dingtalk.conf | 13 +- .../examples/fakesource_to_file.conf | 13 +- .../examples/spark.batch.clickhouse.conf | 9 +- .../main/resources/examples/spark.batch.conf | 29 +++- .../serialization/RowConverter.java | 4 +- 34 files changed, 604 insertions(+), 111 deletions(-) create mode 100644 docs/en/connector-v2/source/FakeSource.md create mode 100644 seatunnel-connectors-v2/connector-fake/src/main/java/org/apache/seatunnel/connectors/seatunnel/fake/source/FakeRandomData.java create mode 100644 seatunnel-connectors-v2/connector-fake/src/test/java/org.apache.seatunnel.connectors.seatunnel.fake.source/FakeRandomDataTest.java create mode 100644 seatunnel-connectors-v2/connector-fake/src/test/resources/complex.schema.conf create mode 100644 seatunnel-connectors-v2/connector-fake/src/test/resources/simple.schema.conf diff --git a/docs/en/connector-v2/sink/Assert.md b/docs/en/connector-v2/sink/Assert.md index ca9332d0c58..9e5c49acfa4 100644 --- a/docs/en/connector-v2/sink/Assert.md +++ b/docs/en/connector-v2/sink/Assert.md @@ -37,13 +37,11 @@ A list value rule define the data value validation ### rule_type [string] The following rules are supported for now -` -NOT_NULL, // value can't be null -MIN, // define the minimum value of data -MAX, // define the maximum value of data -MIN_LENGTH, // define the minimum string length of a string data -MAX_LENGTH // define the maximum string length of a string data -` +- NOT_NULL `value can't be null` +- MIN `define the minimum value of data` +- MAX `define the maximum value of data` +- MIN_LENGTH `define the minimum string length of a string data` +- MAX_LENGTH `define the maximum string length of a string data` ### rule_value [double] diff --git a/docs/en/connector-v2/source/FakeSource.md b/docs/en/connector-v2/source/FakeSource.md new file mode 100644 index 00000000000..9c4bf4ffd0a --- /dev/null +++ b/docs/en/connector-v2/source/FakeSource.md @@ -0,0 +1,76 @@ +# FakeSource + +> FakeSource connector + +## Description + +The FakeSource is a virtual data source, which randomly generates the number of rows according to the data structure of the user-defined schema, +just for testing, such as type conversion and feature testing + +## Options + +| name | type | required | default value | +|-------------------|--------|----------|---------------| +| result_table_name | string | yes | - | +| schema | config | yes | - | + +### result_table_name [string] + +The table name. + +### type [string] +Table structure description ,you should assign schema option to tell connector how to parse data to the row you want. +**Tips**: Most of Unstructured-Datasource contain this param, such as LocalFile,HdfsFile. +**Example**: +```hocon +schema = { + fields { + c_map = "map" + c_array = "array" + c_string = string + c_boolean = boolean + c_tinyint = tinyint + c_smallint = smallint + c_int = int + c_bigint = bigint + c_float = float + c_double = double + c_decimal = "decimal(30, 8)" + c_null = "null" + c_bytes = bytes + c_date = date + c_time = time + c_timestamp = timestamp + } + } +``` + +## Example +Simple source for FakeSource which contains enough datatype +```hocon +source { + FakeSource { + schema = { + fields { + c_map = "map" + c_array = "array" + c_string = string + c_boolean = boolean + c_tinyint = tinyint + c_smallint = smallint + c_int = int + c_bigint = bigint + c_float = float + c_double = double + c_decimal = "decimal(30, 8)" + c_null = "null" + c_bytes = bytes + c_date = date + c_time = time + c_timestamp = timestamp + } + } + result_table_name = "fake" + } +} +``` \ No newline at end of file diff --git a/seatunnel-connectors-v2/connector-common/src/main/java/org/apache/seatunnel/connectors/seatunnel/common/schema/SeatunnelSchema.java b/seatunnel-connectors-v2/connector-common/src/main/java/org/apache/seatunnel/connectors/seatunnel/common/schema/SeatunnelSchema.java index 4c4799b21bd..835d1d0ca58 100644 --- a/seatunnel-connectors-v2/connector-common/src/main/java/org/apache/seatunnel/connectors/seatunnel/common/schema/SeatunnelSchema.java +++ b/seatunnel-connectors-v2/connector-common/src/main/java/org/apache/seatunnel/connectors/seatunnel/common/schema/SeatunnelSchema.java @@ -32,9 +32,10 @@ import org.apache.seatunnel.shade.com.typesafe.config.Config; import org.apache.seatunnel.shade.com.typesafe.config.ConfigRenderOptions; +import java.io.Serializable; import java.util.Map; -public class SeatunnelSchema { +public class SeatunnelSchema implements Serializable { public static final String SCHEMA = "schema"; private static final String FIELD_KEY = "fields"; private static final String SIMPLE_SCHEMA_FILED = "content"; diff --git a/seatunnel-connectors-v2/connector-fake/src/main/java/org/apache/seatunnel/connectors/seatunnel/fake/source/FakeRandomData.java b/seatunnel-connectors-v2/connector-fake/src/main/java/org/apache/seatunnel/connectors/seatunnel/fake/source/FakeRandomData.java new file mode 100644 index 00000000000..efafde5caff --- /dev/null +++ b/seatunnel-connectors-v2/connector-fake/src/main/java/org/apache/seatunnel/connectors/seatunnel/fake/source/FakeRandomData.java @@ -0,0 +1,133 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.seatunnel.connectors.seatunnel.fake.source; + +import static org.apache.seatunnel.api.table.type.BasicType.BOOLEAN_TYPE; +import static org.apache.seatunnel.api.table.type.BasicType.BYTE_TYPE; +import static org.apache.seatunnel.api.table.type.BasicType.DOUBLE_TYPE; +import static org.apache.seatunnel.api.table.type.BasicType.FLOAT_TYPE; +import static org.apache.seatunnel.api.table.type.BasicType.INT_TYPE; +import static org.apache.seatunnel.api.table.type.BasicType.LONG_TYPE; +import static org.apache.seatunnel.api.table.type.BasicType.SHORT_TYPE; +import static org.apache.seatunnel.api.table.type.BasicType.STRING_TYPE; +import static org.apache.seatunnel.api.table.type.BasicType.VOID_TYPE; + +import org.apache.seatunnel.api.table.type.ArrayType; +import org.apache.seatunnel.api.table.type.BasicType; +import org.apache.seatunnel.api.table.type.DecimalType; +import org.apache.seatunnel.api.table.type.LocalTimeType; +import org.apache.seatunnel.api.table.type.MapType; +import org.apache.seatunnel.api.table.type.PrimitiveByteArrayType; +import org.apache.seatunnel.api.table.type.SeaTunnelDataType; +import org.apache.seatunnel.api.table.type.SeaTunnelRow; +import org.apache.seatunnel.api.table.type.SeaTunnelRowType; +import org.apache.seatunnel.connectors.seatunnel.common.schema.SeatunnelSchema; + +import org.apache.commons.lang3.RandomStringUtils; +import org.apache.commons.lang3.RandomUtils; + +import java.lang.reflect.Array; +import java.math.BigDecimal; +import java.time.LocalDateTime; +import java.util.ArrayList; +import java.util.HashMap; +import java.util.List; + +public class FakeRandomData { + public static final String SCHEMA = "schema"; + private final SeatunnelSchema schema; + + public FakeRandomData(SeatunnelSchema schema) { + this.schema = schema; + } + + public SeaTunnelRow randomRow() { + SeaTunnelRowType seaTunnelRowType = schema.getSeaTunnelRowType(); + String[] fieldNames = seaTunnelRowType.getFieldNames(); + SeaTunnelDataType[] fieldTypes = seaTunnelRowType.getFieldTypes(); + List randomRow = new ArrayList<>(fieldNames.length); + for (SeaTunnelDataType fieldType : fieldTypes) { + randomRow.add(randomColumnValue(fieldType)); + } + return new SeaTunnelRow(randomRow.toArray()); + } + + @SuppressWarnings("magicnumber") + private Object randomColumnValue(SeaTunnelDataType fieldType) { + if (BOOLEAN_TYPE.equals(fieldType)) { + return RandomUtils.nextInt(0, 2) == 1; + } else if (BYTE_TYPE.equals(fieldType)) { + return (byte) RandomUtils.nextInt(0, 255); + } else if (SHORT_TYPE.equals(fieldType)) { + return (short) RandomUtils.nextInt(Byte.MAX_VALUE, Short.MAX_VALUE); + } else if (INT_TYPE.equals(fieldType)) { + return RandomUtils.nextInt(Short.MAX_VALUE, Integer.MAX_VALUE); + } else if (LONG_TYPE.equals(fieldType)) { + return RandomUtils.nextLong(Integer.MAX_VALUE, Long.MAX_VALUE); + } else if (FLOAT_TYPE.equals(fieldType)) { + return RandomUtils.nextFloat(Float.MIN_VALUE, Float.MAX_VALUE); + } else if (DOUBLE_TYPE.equals(fieldType)) { + return RandomUtils.nextDouble(Float.MAX_VALUE, Double.MAX_VALUE); + } else if (STRING_TYPE.equals(fieldType)) { + return RandomStringUtils.randomAlphabetic(10); + } else if (LocalTimeType.LOCAL_DATE_TYPE.equals(fieldType)) { + return randomLocalDateTime().toLocalDate(); + } else if (LocalTimeType.LOCAL_TIME_TYPE.equals(fieldType)) { + return randomLocalDateTime().toLocalTime(); + } else if (LocalTimeType.LOCAL_DATE_TIME_TYPE.equals(fieldType)) { + return randomLocalDateTime(); + } else if (fieldType instanceof DecimalType) { + DecimalType decimalType = (DecimalType) fieldType; + return new BigDecimal(RandomStringUtils.randomNumeric(decimalType.getPrecision() - decimalType.getScale()) + "." + + RandomStringUtils.randomNumeric(decimalType.getScale())); + } else if (fieldType instanceof ArrayType) { + ArrayType arrayType = (ArrayType) fieldType; + BasicType elementType = arrayType.getElementType(); + Object value = randomColumnValue(elementType); + Object arr = Array.newInstance(elementType.getTypeClass(), 1); + Array.set(arr, 0, value); + return arr; + } else if (fieldType instanceof MapType) { + MapType mapType = (MapType) fieldType; + SeaTunnelDataType keyType = mapType.getKeyType(); + Object key = randomColumnValue(keyType); + SeaTunnelDataType valueType = mapType.getValueType(); + Object value = randomColumnValue(valueType); + HashMap objectObjectHashMap = new HashMap<>(); + objectObjectHashMap.put(key, value); + return objectObjectHashMap; + } else if (fieldType instanceof PrimitiveByteArrayType) { + return RandomUtils.nextBytes(100); + } else if (VOID_TYPE.equals(fieldType) || fieldType == null) { + return Void.TYPE; + } else { + throw new UnsupportedOperationException("Unexpected value: " + fieldType); + } + } + + @SuppressWarnings("magicnumber") + private LocalDateTime randomLocalDateTime() { + return LocalDateTime.of( + LocalDateTime.now().getYear(), + RandomUtils.nextInt(1, 12), + RandomUtils.nextInt(1, LocalDateTime.now().getDayOfMonth()), + RandomUtils.nextInt(0, 24), + RandomUtils.nextInt(0, 59) + ); + } +} diff --git a/seatunnel-connectors-v2/connector-fake/src/main/java/org/apache/seatunnel/connectors/seatunnel/fake/source/FakeSource.java b/seatunnel-connectors-v2/connector-fake/src/main/java/org/apache/seatunnel/connectors/seatunnel/fake/source/FakeSource.java index 65e6587f47c..4e7b1317e20 100644 --- a/seatunnel-connectors-v2/connector-fake/src/main/java/org/apache/seatunnel/connectors/seatunnel/fake/source/FakeSource.java +++ b/seatunnel-connectors-v2/connector-fake/src/main/java/org/apache/seatunnel/connectors/seatunnel/fake/source/FakeSource.java @@ -20,11 +20,10 @@ import org.apache.seatunnel.api.common.SeaTunnelContext; import org.apache.seatunnel.api.source.Boundedness; import org.apache.seatunnel.api.source.SeaTunnelSource; -import org.apache.seatunnel.api.table.type.BasicType; -import org.apache.seatunnel.api.table.type.SeaTunnelDataType; import org.apache.seatunnel.api.table.type.SeaTunnelRow; import org.apache.seatunnel.api.table.type.SeaTunnelRowType; import org.apache.seatunnel.common.constants.JobMode; +import org.apache.seatunnel.connectors.seatunnel.common.schema.SeatunnelSchema; import org.apache.seatunnel.connectors.seatunnel.common.source.AbstractSingleSplitReader; import org.apache.seatunnel.connectors.seatunnel.common.source.AbstractSingleSplitSource; import org.apache.seatunnel.connectors.seatunnel.common.source.SingleSplitReaderContext; @@ -38,6 +37,7 @@ public class FakeSource extends AbstractSingleSplitSource { private Config pluginConfig; private SeaTunnelContext seaTunnelContext; + private SeatunnelSchema schema; @Override public Boundedness getBoundedness() { @@ -46,14 +46,12 @@ public Boundedness getBoundedness() { @Override public SeaTunnelRowType getProducedType() { - return new SeaTunnelRowType( - new String[]{"name", "age", "timestamp"}, - new SeaTunnelDataType[]{BasicType.STRING_TYPE, BasicType.INT_TYPE, BasicType.LONG_TYPE}); + return schema.getSeaTunnelRowType(); } @Override public AbstractSingleSplitReader createReader(SingleSplitReaderContext readerContext) throws Exception { - return new FakeSourceReader(readerContext); + return new FakeSourceReader(readerContext, new FakeRandomData(schema)); } @Override @@ -64,6 +62,8 @@ public String getPluginName() { @Override public void prepare(Config pluginConfig) { this.pluginConfig = pluginConfig; + assert pluginConfig.hasPath(FakeRandomData.SCHEMA); + this.schema = SeatunnelSchema.buildWithConfig(pluginConfig.getConfig(FakeRandomData.SCHEMA)); } @Override diff --git a/seatunnel-connectors-v2/connector-fake/src/main/java/org/apache/seatunnel/connectors/seatunnel/fake/source/FakeSourceReader.java b/seatunnel-connectors-v2/connector-fake/src/main/java/org/apache/seatunnel/connectors/seatunnel/fake/source/FakeSourceReader.java index 7007eaf25b3..d67e1f4b215 100644 --- a/seatunnel-connectors-v2/connector-fake/src/main/java/org/apache/seatunnel/connectors/seatunnel/fake/source/FakeSourceReader.java +++ b/seatunnel-connectors-v2/connector-fake/src/main/java/org/apache/seatunnel/connectors/seatunnel/fake/source/FakeSourceReader.java @@ -26,20 +26,17 @@ import org.slf4j.Logger; import org.slf4j.LoggerFactory; -import java.util.Random; -import java.util.concurrent.ThreadLocalRandom; - public class FakeSourceReader extends AbstractSingleSplitReader { private static final Logger LOGGER = LoggerFactory.getLogger(FakeSourceReader.class); private final SingleSplitReaderContext context; - private final String[] names = {"Wenjun", "Fanjia", "Zongwen", "CalvinKirs"}; - private final int[] ages = {11, 22, 33, 44}; + private final FakeRandomData fakeRandomData; - public FakeSourceReader(SingleSplitReaderContext context) { + public FakeSourceReader(SingleSplitReaderContext context, FakeRandomData randomData) { this.context = context; + this.fakeRandomData = randomData; } @Override @@ -56,11 +53,8 @@ public void close() { @SuppressWarnings("magicnumber") public void pollNext(Collector output) throws InterruptedException { // Generate a random number of rows to emit. - Random random = ThreadLocalRandom.current(); - int size = random.nextInt(10) + 1; - for (int i = 0; i < size; i++) { - int randomIndex = random.nextInt(names.length); - SeaTunnelRow seaTunnelRow = new SeaTunnelRow(new Object[]{names[randomIndex], ages[randomIndex], System.currentTimeMillis()}); + for (int i = 0; i < 10; i++) { + SeaTunnelRow seaTunnelRow = fakeRandomData.randomRow(); output.collect(seaTunnelRow); } if (Boundedness.BOUNDED.equals(context.getBoundedness())) { diff --git a/seatunnel-connectors-v2/connector-fake/src/test/java/org.apache.seatunnel.connectors.seatunnel.fake.source/FakeRandomDataTest.java b/seatunnel-connectors-v2/connector-fake/src/test/java/org.apache.seatunnel.connectors.seatunnel.fake.source/FakeRandomDataTest.java new file mode 100644 index 00000000000..67906a1616d --- /dev/null +++ b/seatunnel-connectors-v2/connector-fake/src/test/java/org.apache.seatunnel.connectors.seatunnel.fake.source/FakeRandomDataTest.java @@ -0,0 +1,87 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.seatunnel.connectors.seatunnel.fake.source; + +import org.apache.seatunnel.api.table.type.SeaTunnelDataType; +import org.apache.seatunnel.api.table.type.SeaTunnelRow; +import org.apache.seatunnel.api.table.type.SeaTunnelRowType; +import org.apache.seatunnel.api.table.type.SqlType; +import org.apache.seatunnel.connectors.seatunnel.common.schema.SeatunnelSchema; + +import org.apache.seatunnel.shade.com.typesafe.config.Config; +import org.apache.seatunnel.shade.com.typesafe.config.ConfigFactory; + +import org.junit.jupiter.api.Assertions; +import org.junit.jupiter.params.ParameterizedTest; +import org.junit.jupiter.params.provider.ValueSource; + +import java.io.File; +import java.io.FileNotFoundException; +import java.lang.reflect.Array; +import java.net.URISyntaxException; +import java.net.URL; +import java.nio.file.Paths; +import java.util.Map; + +public class FakeRandomDataTest { + + @ParameterizedTest + @ValueSource(strings = {"complex.schema.conf", "simple.schema.conf"}) + public void testComplexSchemaParse(String conf) throws FileNotFoundException, URISyntaxException { + Config testConfigFile = getTestConfigFile(conf); + SeatunnelSchema seatunnelSchema = SeatunnelSchema.buildWithConfig(testConfigFile); + FakeRandomData fakeRandomData = new FakeRandomData(seatunnelSchema); + SeaTunnelRow seaTunnelRow = fakeRandomData.randomRow(); + Assertions.assertNotNull(seaTunnelRow); + Object[] fields = seaTunnelRow.getFields(); + Assertions.assertNotNull(fields); + SeaTunnelRowType seaTunnelRowType = seatunnelSchema.getSeaTunnelRowType(); + SeaTunnelDataType[] fieldTypes = seaTunnelRowType.getFieldTypes(); + for (int i = 0; i < fieldTypes.length; i++) { + if (fieldTypes[i].getSqlType() != SqlType.NULL) { + Assertions.assertNotNull(fields[i]); + } else { + Assertions.assertSame(fields[i], Void.TYPE); + } + if (fieldTypes[i].getSqlType() == SqlType.MAP) { + Assertions.assertTrue(fields[i] instanceof Map); + Map field = (Map) fields[i]; + field.forEach((k, v) -> Assertions.assertTrue(k != null && v != null)); + } + if (fieldTypes[i].getSqlType() == SqlType.ARRAY) { + Assertions.assertTrue(fields[i].getClass().isArray()); + Assertions.assertNotNull(Array.get(fields[i], 0)); + } + } + } + + private Config getTestConfigFile(String configFile) throws FileNotFoundException, URISyntaxException { + if (!configFile.startsWith("/")) { + configFile = "/" + configFile; + } + URL resource = FakeRandomDataTest.class.getResource(configFile); + if (resource == null) { + throw new FileNotFoundException("Can't find config file: " + configFile); + } + String path = Paths.get(resource.toURI()).toString(); + Config config = ConfigFactory.parseFile(new File(path)); + assert config.hasPath("schema"); + return config.getConfig("schema"); + } + +} diff --git a/seatunnel-connectors-v2/connector-fake/src/test/resources/complex.schema.conf b/seatunnel-connectors-v2/connector-fake/src/test/resources/complex.schema.conf new file mode 100644 index 00000000000..6a06dbf06a6 --- /dev/null +++ b/seatunnel-connectors-v2/connector-fake/src/test/resources/complex.schema.conf @@ -0,0 +1,38 @@ +# +# Licensed to the Apache Software Foundation (ASF) under one or more +# contributor license agreements. See the NOTICE file distributed with +# this work for additional information regarding copyright ownership. +# The ASF licenses this file to You under the Apache License, Version 2.0 +# (the "License"); you may not use this file except in compliance with +# the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +# + +schema { + fields { + map = "map>" + map_array = "map>>" + array = "array" + string = string + boolean = boolean + tinyint = tinyint + smallint = smallint + int = int + bigint = bigint + float = float + double = double + decimal = "decimal(30, 8)" + null = "null" + bytes = bytes + date = date + time = time + timestamp = timestamp + } +} \ No newline at end of file diff --git a/seatunnel-connectors-v2/connector-fake/src/test/resources/simple.schema.conf b/seatunnel-connectors-v2/connector-fake/src/test/resources/simple.schema.conf new file mode 100644 index 00000000000..6716f00cdc2 --- /dev/null +++ b/seatunnel-connectors-v2/connector-fake/src/test/resources/simple.schema.conf @@ -0,0 +1,37 @@ +# +# Licensed to the Apache Software Foundation (ASF) under one or more +# contributor license agreements. See the NOTICE file distributed with +# this work for additional information regarding copyright ownership. +# The ASF licenses this file to You under the Apache License, Version 2.0 +# (the "License"); you may not use this file except in compliance with +# the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +# + +schema { + fields { + map = "map" + array = "array" + string = string + boolean = boolean + tinyint = tinyint + smallint = smallint + int = int + bigint = bigint + float = float + double = double + decimal = "decimal(30, 8)" + null = "null" + bytes = bytes + date = date + time = time + timestamp = timestamp + } +} \ No newline at end of file diff --git a/seatunnel-e2e/seatunnel-flink-connector-v2-e2e/src/test/resources/assertion/fakesource_to_assert.conf b/seatunnel-e2e/seatunnel-flink-connector-v2-e2e/src/test/resources/assertion/fakesource_to_assert.conf index d1e9c45839c..b2fda476b3e 100644 --- a/seatunnel-e2e/seatunnel-flink-connector-v2-e2e/src/test/resources/assertion/fakesource_to_assert.conf +++ b/seatunnel-e2e/seatunnel-flink-connector-v2-e2e/src/test/resources/assertion/fakesource_to_assert.conf @@ -29,9 +29,13 @@ source { # This is a example source plugin **only for test and demonstrate the feature source plugin** FakeSource { result_table_name = "fake" - field_name = "name,age" - } - + schema = { + fields { + name = "string" + age = "int" + } + } + } # If you would like to get more information about how to configure seatunnel and see full list of source plugins, # please go to https://seatunnel.apache.org/docs/flink/configuration/source-plugins/Fake } @@ -57,11 +61,11 @@ sink { }, { rule_type = MIN_LENGTH - rule_value = 3 + rule_value = 10 }, { rule_type = MAX_LENGTH - rule_value = 20 + rule_value = 10 } ] },{ @@ -73,16 +77,16 @@ sink { }, { rule_type = MIN - rule_value = 1 + rule_value = 32767 }, { rule_type = MAX - rule_value = 100 + rule_value = 2147483647 } ] } ] } # If you would like to get more information about how to configure seatunnel and see full list of sink plugins, - # please go to https://seatunnel.apache.org/docs/flink/configuration/sink-plugins/Console + # please go to https://seatunnel.apache.org/docs/connector-v2/sink/Assert } \ No newline at end of file diff --git a/seatunnel-e2e/seatunnel-flink-connector-v2-e2e/src/test/resources/fake/fakesource_to_console.conf b/seatunnel-e2e/seatunnel-flink-connector-v2-e2e/src/test/resources/fake/fakesource_to_console.conf index 49be0920fe8..d9eb0ff8614 100644 --- a/seatunnel-e2e/seatunnel-flink-connector-v2-e2e/src/test/resources/fake/fakesource_to_console.conf +++ b/seatunnel-e2e/seatunnel-flink-connector-v2-e2e/src/test/resources/fake/fakesource_to_console.conf @@ -28,13 +28,17 @@ env { source { # This is a example source plugin **only for test and demonstrate the feature source plugin** - FakeSource { - result_table_name = "fake" - field_name = "name,age" + FakeSource { + result_table_name = "fake" + schema = { + fields { + name = "string" + age = "int" + } } - + } # If you would like to get more information about how to configure seatunnel and see full list of source plugins, - # please go to https://seatunnel.apache.org/docs/flink/configuration/source-plugins/Fake + # please go to https://seatunnel.apache.org/docs/connector-v2/source/FakeSource } transform { @@ -43,12 +47,12 @@ transform { } # If you would like to get more information about how to configure seatunnel and see full list of transform plugins, - # please go to https://seatunnel.apache.org/docs/flink/configuration/transform-plugins/Sql + # please go to https://seatunnel.apache.org/docs/transform/sql } sink { Console {} # If you would like to get more information about how to configure seatunnel and see full list of sink plugins, - # please go to https://seatunnel.apache.org/docs/flink/configuration/sink-plugins/Console + # please go to https://seatunnel.apache.org/docs/category/sink-v2 } \ No newline at end of file diff --git a/seatunnel-e2e/seatunnel-flink-connector-v2-e2e/src/test/resources/file/fakesource_to_hdfs_json.conf b/seatunnel-e2e/seatunnel-flink-connector-v2-e2e/src/test/resources/file/fakesource_to_hdfs_json.conf index 769c8760de5..269b85d0840 100644 --- a/seatunnel-e2e/seatunnel-flink-connector-v2-e2e/src/test/resources/file/fakesource_to_hdfs_json.conf +++ b/seatunnel-e2e/seatunnel-flink-connector-v2-e2e/src/test/resources/file/fakesource_to_hdfs_json.conf @@ -29,11 +29,16 @@ env { source { FakeSource { result_table_name = "fake" - field_name = "name,age" + schema = { + fields { + name = "string" + age = "int" + } + } } # If you would like to get more information about how to configure seatunnel and see full list of source plugins, - # please go to https://seatunnel.apache.org/docs/connector-v2/source/Fake + # please go to https://seatunnel.apache.org/docs/connector-v2/source/FakeSource } transform { @@ -61,5 +66,5 @@ sink { } # If you would like to get more information about how to configure seatunnel and see full list of sink plugins, - # please go to https://seatunnel.apache.org/docs/connector-v2/sink/File + # please go to https://seatunnel.apache.org/docs/connector-v2/sink/HdfsFile } \ No newline at end of file diff --git a/seatunnel-e2e/seatunnel-flink-connector-v2-e2e/src/test/resources/file/fakesource_to_hdfs_parquet.conf b/seatunnel-e2e/seatunnel-flink-connector-v2-e2e/src/test/resources/file/fakesource_to_hdfs_parquet.conf index 9f5fd0b1787..5e1ea5c01d1 100644 --- a/seatunnel-e2e/seatunnel-flink-connector-v2-e2e/src/test/resources/file/fakesource_to_hdfs_parquet.conf +++ b/seatunnel-e2e/seatunnel-flink-connector-v2-e2e/src/test/resources/file/fakesource_to_hdfs_parquet.conf @@ -29,11 +29,16 @@ env { source { FakeSource { result_table_name = "fake" - field_name = "name,age" + schema = { + fields { + name = "string" + age = "int" + } + } } # If you would like to get more information about how to configure seatunnel and see full list of source plugins, - # please go to https://seatunnel.apache.org/docs/connector-v2/source/Fake + # please go to https://seatunnel.apache.org/docs/connector-v2/source/FakeSource } transform { @@ -62,5 +67,5 @@ sink { } # If you would like to get more information about how to configure seatunnel and see full list of sink plugins, - # please go to https://seatunnel.apache.org/docs/connector-v2/sink/File + # please go to https://seatunnel.apache.org/docs/connector-v2/sink/HdfsFile } \ No newline at end of file diff --git a/seatunnel-e2e/seatunnel-flink-connector-v2-e2e/src/test/resources/file/fakesource_to_hdfs_text.conf b/seatunnel-e2e/seatunnel-flink-connector-v2-e2e/src/test/resources/file/fakesource_to_hdfs_text.conf index ef83dfd4e4a..d4a8a745cc2 100644 --- a/seatunnel-e2e/seatunnel-flink-connector-v2-e2e/src/test/resources/file/fakesource_to_hdfs_text.conf +++ b/seatunnel-e2e/seatunnel-flink-connector-v2-e2e/src/test/resources/file/fakesource_to_hdfs_text.conf @@ -29,11 +29,16 @@ env { source { FakeSource { result_table_name = "fake" - field_name = "name,age" + schema = { + fields { + name = "string" + age = "int" + } + } } # If you would like to get more information about how to configure seatunnel and see full list of source plugins, - # please go to https://seatunnel.apache.org/docs/connector-v2/source/Fake + # please go to https://seatunnel.apache.org/docs/connector-v2/source/FakeSource } transform { @@ -62,5 +67,5 @@ sink { } # If you would like to get more information about how to configure seatunnel and see full list of sink plugins, - # please go to https://seatunnel.apache.org/docs/connector-v2/sink/File + # please go to https://seatunnel.apache.org/docs/connector-v2/sink/HdfsFile } \ No newline at end of file diff --git a/seatunnel-e2e/seatunnel-flink-connector-v2-e2e/src/test/resources/file/fakesource_to_local_json.conf b/seatunnel-e2e/seatunnel-flink-connector-v2-e2e/src/test/resources/file/fakesource_to_local_json.conf index b18b472f6cd..c118c33c89a 100644 --- a/seatunnel-e2e/seatunnel-flink-connector-v2-e2e/src/test/resources/file/fakesource_to_local_json.conf +++ b/seatunnel-e2e/seatunnel-flink-connector-v2-e2e/src/test/resources/file/fakesource_to_local_json.conf @@ -29,11 +29,16 @@ env { source { FakeSource { result_table_name = "fake" - field_name = "name,age" + schema = { + fields { + name = "string" + age = "int" + } + } } # If you would like to get more information about how to configure seatunnel and see full list of source plugins, - # please go to https://seatunnel.apache.org/docs/flink/configuration/source-plugins/Fake + # please go to https://seatunnel.apache.org/docs/connector-v2/source/FakeSource } transform { @@ -61,5 +66,5 @@ sink { } # If you would like to get more information about how to configure seatunnel and see full list of sink plugins, - # please go to https://seatunnel.apache.org/docs/connector-v2/sink/File + # please go to https://seatunnel.apache.org/docs/connector-v2/sink/LocalFile } \ No newline at end of file diff --git a/seatunnel-e2e/seatunnel-flink-connector-v2-e2e/src/test/resources/file/fakesource_to_local_parquet.conf b/seatunnel-e2e/seatunnel-flink-connector-v2-e2e/src/test/resources/file/fakesource_to_local_parquet.conf index 9e2d5ad96af..c02c1614579 100644 --- a/seatunnel-e2e/seatunnel-flink-connector-v2-e2e/src/test/resources/file/fakesource_to_local_parquet.conf +++ b/seatunnel-e2e/seatunnel-flink-connector-v2-e2e/src/test/resources/file/fakesource_to_local_parquet.conf @@ -29,11 +29,16 @@ env { source { FakeSource { result_table_name = "fake" - field_name = "name,age" + schema = { + fields { + name = "string" + age = "int" + } + } } # If you would like to get more information about how to configure seatunnel and see full list of source plugins, - # please go to https://seatunnel.apache.org/docs/connector-v2/source/Fake + # please go to https://seatunnel.apache.org/docs/connector-v2/source/FakeSource } transform { @@ -62,5 +67,5 @@ sink { } # If you would like to get more information about how to configure seatunnel and see full list of sink plugins, - # please go to https://seatunnel.apache.org/docs/connector-v2/sink/File + # please go to https://seatunnel.apache.org/docs/connector-v2/sink/LocalFile } \ No newline at end of file diff --git a/seatunnel-e2e/seatunnel-flink-connector-v2-e2e/src/test/resources/file/fakesource_to_local_text.conf b/seatunnel-e2e/seatunnel-flink-connector-v2-e2e/src/test/resources/file/fakesource_to_local_text.conf index d162b101f0c..7d1a5d42ce0 100644 --- a/seatunnel-e2e/seatunnel-flink-connector-v2-e2e/src/test/resources/file/fakesource_to_local_text.conf +++ b/seatunnel-e2e/seatunnel-flink-connector-v2-e2e/src/test/resources/file/fakesource_to_local_text.conf @@ -29,11 +29,16 @@ env { source { FakeSource { result_table_name = "fake" - field_name = "name,age" + schema = { + fields { + name = "string" + age = "int" + } + } } # If you would like to get more information about how to configure seatunnel and see full list of source plugins, - # please go to https://seatunnel.apache.org/docs/flink/configuration/source-plugins/Fake + # please go to https://seatunnel.apache.org/docs/connector-v2/source/FakeSource } transform { @@ -42,7 +47,7 @@ transform { } # If you would like to get more information about how to configure seatunnel and see full list of transform plugins, - # please go to https://seatunnel.apache.org/docs/flink/configuration/transform-plugins/Sql + # please go to https://seatunnel.apache.org/docs/transform/sql } sink { @@ -62,5 +67,5 @@ sink { } # If you would like to get more information about how to configure seatunnel and see full list of sink plugins, - # please go to https://seatunnel.apache.org/docs/connector-v2/sink/File + # please go to https://seatunnel.apache.org/docs/connector-v2/sink/LocalFile } \ No newline at end of file diff --git a/seatunnel-e2e/seatunnel-flink-connector-v2-e2e/src/test/resources/iotdb/fakesource_to_iotdb.conf b/seatunnel-e2e/seatunnel-flink-connector-v2-e2e/src/test/resources/iotdb/fakesource_to_iotdb.conf index da1ae493678..4c5e0fe4e96 100644 --- a/seatunnel-e2e/seatunnel-flink-connector-v2-e2e/src/test/resources/iotdb/fakesource_to_iotdb.conf +++ b/seatunnel-e2e/seatunnel-flink-connector-v2-e2e/src/test/resources/iotdb/fakesource_to_iotdb.conf @@ -29,13 +29,18 @@ env { source { FakeSource { result_table_name = "fake" - field_name = "name, age" + schema = { + fields { + name = "string" + age = "int" + } + } } - # If you would like to get more information about how to configure seatunnel and see full list of source plugins, # please go to https://seatunnel.apache.org/docs/connector-v2/source/FakeSource } + transform { sql { sql = "select * from (values('root.ln.d1', '1660147200000', 'status,value', 'true,1001'), ('root.ln.d1', '1660233600000', 'status,value', 'false,1002')) t (device, `timestamp`, measurements, `values`)" diff --git a/seatunnel-e2e/seatunnel-flink-connector-v2-e2e/src/test/resources/jdbc/fakesource_to_jdbc.conf b/seatunnel-e2e/seatunnel-flink-connector-v2-e2e/src/test/resources/jdbc/fakesource_to_jdbc.conf index 9640e19c228..0f732b63b94 100644 --- a/seatunnel-e2e/seatunnel-flink-connector-v2-e2e/src/test/resources/jdbc/fakesource_to_jdbc.conf +++ b/seatunnel-e2e/seatunnel-flink-connector-v2-e2e/src/test/resources/jdbc/fakesource_to_jdbc.conf @@ -30,11 +30,15 @@ source { # This is a example source plugin **only for test and demonstrate the feature source plugin** FakeSource { result_table_name = "fake" - field_name = "name" + schema = { + fields { + name = "string" + } + } } # If you would like to get more information about how to configure seatunnel and see full list of source plugins, - # please go to https://seatunnel.apache.org/docs/flink/configuration/source-plugins/Fake + # please go to https://seatunnel.apache.org/docs/connector-v2/source/FakeSource } transform { @@ -43,7 +47,7 @@ transform { } # If you would like to get more information about how to configure seatunnel and see full list of transform plugins, - # please go to https://seatunnel.apache.org/docs/flink/configuration/transform-plugins/Sql + # please go to https://seatunnel.apache.org/docs/transform/sql } sink { @@ -57,5 +61,5 @@ sink { } # If you would like to get more information about how to configure seatunnel and see full list of sink plugins, - # please go to https://seatunnel.apache.org/docs/flink/configuration/sink-plugins/Console + # please go to https://seatunnel.apache.org/docs/connector-v2/sink/Jdbc } \ No newline at end of file diff --git a/seatunnel-e2e/seatunnel-flink-connector-v2-e2e/src/test/resources/jdbc/jdbcsource_to_console.conf b/seatunnel-e2e/seatunnel-flink-connector-v2-e2e/src/test/resources/jdbc/jdbcsource_to_console.conf index 6862abc04c2..4d65c569ead 100644 --- a/seatunnel-e2e/seatunnel-flink-connector-v2-e2e/src/test/resources/jdbc/jdbcsource_to_console.conf +++ b/seatunnel-e2e/seatunnel-flink-connector-v2-e2e/src/test/resources/jdbc/jdbcsource_to_console.conf @@ -43,11 +43,11 @@ source { transform { # If you would like to get more information about how to configure seatunnel and see full list of transform plugins, - # please go to https://seatunnel.apache.org/docs/flink/configuration/transform-plugins/Sql + # please go to https://seatunnel.apache.org/docs/transform/sql } sink { Console {} # If you would like to get more information about how to configure seatunnel and see full list of sink plugins, - # please go to https://seatunnel.apache.org/docs/flink/configuration/sink-plugins/Console + # please go to https://seatunnel.apache.org/docs/category/sink-v2 } \ No newline at end of file diff --git a/seatunnel-e2e/seatunnel-spark-connector-v2-e2e/src/test/resources/fake/fakesource_to_console.conf b/seatunnel-e2e/seatunnel-spark-connector-v2-e2e/src/test/resources/fake/fakesource_to_console.conf index 7b812762051..f469338a517 100644 --- a/seatunnel-e2e/seatunnel-spark-connector-v2-e2e/src/test/resources/fake/fakesource_to_console.conf +++ b/seatunnel-e2e/seatunnel-spark-connector-v2-e2e/src/test/resources/fake/fakesource_to_console.conf @@ -33,6 +33,12 @@ source { # This is a example input plugin **only for test and demonstrate the feature input plugin** FakeSource { result_table_name = "my_dataset" + schema = { + fields { + name = "string" + age = "int" + } + } } # You can also use other input plugins, such as hdfs @@ -42,8 +48,8 @@ source { # format = "json" # } - # If you would like to get more information about how to configure seatunnel and see full list of input plugins, - # please go to https://seatunnel.apache.org/docs/spark/configuration/source-plugins/Fake + # If you would like to get more information about how to configure seatunnel and see full list of source plugins, + # please go to https://seatunnel.apache.org/docs/connector-v2/source/FakeSource } transform { diff --git a/seatunnel-e2e/seatunnel-spark-connector-v2-e2e/src/test/resources/file/fakesource_to_hdfs_json.conf b/seatunnel-e2e/seatunnel-spark-connector-v2-e2e/src/test/resources/file/fakesource_to_hdfs_json.conf index c4d1aabe59b..40454bce061 100644 --- a/seatunnel-e2e/seatunnel-spark-connector-v2-e2e/src/test/resources/file/fakesource_to_hdfs_json.conf +++ b/seatunnel-e2e/seatunnel-spark-connector-v2-e2e/src/test/resources/file/fakesource_to_hdfs_json.conf @@ -28,11 +28,16 @@ env { source { FakeSource { result_table_name = "fake" - field_name = "name,age" + schema = { + fields { + name = "string" + age = "int" + } + } } # If you would like to get more information about how to configure seatunnel and see full list of source plugins, - # please go to https://seatunnel.apache.org/docs/connector-v2/source/Fake + # please go to https://seatunnel.apache.org/docs/connector-v2/source/FakeSource } transform { @@ -41,7 +46,7 @@ transform { } # If you would like to get more information about how to configure seatunnel and see full list of transform plugins, - # please go to https://seatunnel.apache.org/docs/transform/sql + # please go to https://seatunnel.apache.org/docs/category/transform } sink { @@ -60,5 +65,5 @@ sink { } # If you would like to get more information about how to configure seatunnel and see full list of sink plugins, - # please go to https://seatunnel.apache.org/docs/connector-v2/sink/File + # please go to https://seatunnel.apache.org/docs/connector-v2/sink/HdfsFile } \ No newline at end of file diff --git a/seatunnel-e2e/seatunnel-spark-connector-v2-e2e/src/test/resources/file/fakesource_to_hdfs_parquet.conf b/seatunnel-e2e/seatunnel-spark-connector-v2-e2e/src/test/resources/file/fakesource_to_hdfs_parquet.conf index bdae80d74d1..550990eeace 100644 --- a/seatunnel-e2e/seatunnel-spark-connector-v2-e2e/src/test/resources/file/fakesource_to_hdfs_parquet.conf +++ b/seatunnel-e2e/seatunnel-spark-connector-v2-e2e/src/test/resources/file/fakesource_to_hdfs_parquet.conf @@ -28,11 +28,16 @@ env { source { FakeSource { result_table_name = "fake" - field_name = "name,age" + schema = { + fields { + name = "string" + age = "int" + } + } } # If you would like to get more information about how to configure seatunnel and see full list of source plugins, - # please go to https://seatunnel.apache.org/docs/connector-v2/source/Fake + # please go to https://seatunnel.apache.org/docs/connector-v2/source/FakeSource } transform { @@ -41,7 +46,7 @@ transform { } # If you would like to get more information about how to configure seatunnel and see full list of transform plugins, - # please go to https://seatunnel.apache.org/docs/transform/sql + # please go to https://seatunnel.apache.org/docs/category/transform } sink { @@ -61,5 +66,5 @@ sink { } # If you would like to get more information about how to configure seatunnel and see full list of sink plugins, - # please go to https://seatunnel.apache.org/docs/connector-v2/sink/File + # please go to https://seatunnel.apache.org/docs/connector-v2/sink/HdfsFile } \ No newline at end of file diff --git a/seatunnel-e2e/seatunnel-spark-connector-v2-e2e/src/test/resources/file/fakesource_to_hdfs_text.conf b/seatunnel-e2e/seatunnel-spark-connector-v2-e2e/src/test/resources/file/fakesource_to_hdfs_text.conf index b682d3831cf..2bf6afba61f 100644 --- a/seatunnel-e2e/seatunnel-spark-connector-v2-e2e/src/test/resources/file/fakesource_to_hdfs_text.conf +++ b/seatunnel-e2e/seatunnel-spark-connector-v2-e2e/src/test/resources/file/fakesource_to_hdfs_text.conf @@ -28,11 +28,16 @@ env { source { FakeSource { result_table_name = "fake" - field_name = "name,age" + schema = { + fields { + name = "string" + age = "int" + } + } } # If you would like to get more information about how to configure seatunnel and see full list of source plugins, - # please go to https://seatunnel.apache.org/docs/connector-v2/source/Fake + # please go to https://seatunnel.apache.org/docs/connector-v2/source/FakeSource } transform { @@ -41,7 +46,7 @@ transform { } # If you would like to get more information about how to configure seatunnel and see full list of transform plugins, - # please go to https://seatunnel.apache.org/docs/transform/sql + # please go to https://seatunnel.apache.org/docs/category/transform } sink { @@ -61,5 +66,5 @@ sink { } # If you would like to get more information about how to configure seatunnel and see full list of sink plugins, - # please go to https://seatunnel.apache.org/docs/connector-v2/sink/File + # please go to https://seatunnel.apache.org/docs/connector-v2/sink/HdfsFile } \ No newline at end of file diff --git a/seatunnel-e2e/seatunnel-spark-connector-v2-e2e/src/test/resources/file/fakesource_to_local_json.conf b/seatunnel-e2e/seatunnel-spark-connector-v2-e2e/src/test/resources/file/fakesource_to_local_json.conf index d257f81bb44..f8dab0cbca1 100644 --- a/seatunnel-e2e/seatunnel-spark-connector-v2-e2e/src/test/resources/file/fakesource_to_local_json.conf +++ b/seatunnel-e2e/seatunnel-spark-connector-v2-e2e/src/test/resources/file/fakesource_to_local_json.conf @@ -28,11 +28,16 @@ env { source { FakeSource { result_table_name = "fake" - field_name = "name,age" + schema = { + fields { + name = "string" + age = "int" + } + } } # If you would like to get more information about how to configure seatunnel and see full list of source plugins, - # please go to https://seatunnel.apache.org/docs/flink/configuration/source-plugins/Fake + # please go to https://seatunnel.apache.org/docs/connector-v2/source/FakeSource } transform { @@ -41,7 +46,7 @@ transform { } # If you would like to get more information about how to configure seatunnel and see full list of transform plugins, - # please go to https://seatunnel.apache.org/docs/flink/configuration/transform-plugins/Sql + # please go to https://seatunnel.apache.org/docs/category/transform } sink { diff --git a/seatunnel-e2e/seatunnel-spark-connector-v2-e2e/src/test/resources/file/fakesource_to_local_parquet.conf b/seatunnel-e2e/seatunnel-spark-connector-v2-e2e/src/test/resources/file/fakesource_to_local_parquet.conf index b5d1412120a..edbad7ec6f3 100644 --- a/seatunnel-e2e/seatunnel-spark-connector-v2-e2e/src/test/resources/file/fakesource_to_local_parquet.conf +++ b/seatunnel-e2e/seatunnel-spark-connector-v2-e2e/src/test/resources/file/fakesource_to_local_parquet.conf @@ -28,11 +28,16 @@ env { source { FakeSource { result_table_name = "fake" - field_name = "name,age" + schema = { + fields { + name = "string" + age = "int" + } + } } # If you would like to get more information about how to configure seatunnel and see full list of source plugins, - # please go to https://seatunnel.apache.org/docs/connector-v2/source/Fake + # please go to https://seatunnel.apache.org/docs/connector-v2/source/FakeSource } transform { @@ -41,7 +46,7 @@ transform { } # If you would like to get more information about how to configure seatunnel and see full list of transform plugins, - # please go to https://seatunnel.apache.org/docs/transform/sql + # please go to https://seatunnel.apache.org/docs/category/transform } sink { diff --git a/seatunnel-e2e/seatunnel-spark-connector-v2-e2e/src/test/resources/file/fakesource_to_local_text.conf b/seatunnel-e2e/seatunnel-spark-connector-v2-e2e/src/test/resources/file/fakesource_to_local_text.conf index 733a48e61a5..41ff5e8f60b 100644 --- a/seatunnel-e2e/seatunnel-spark-connector-v2-e2e/src/test/resources/file/fakesource_to_local_text.conf +++ b/seatunnel-e2e/seatunnel-spark-connector-v2-e2e/src/test/resources/file/fakesource_to_local_text.conf @@ -28,11 +28,16 @@ env { source { FakeSource { result_table_name = "fake" - field_name = "name,age" + schema = { + fields { + name = "string" + age = "int" + } + } } # If you would like to get more information about how to configure seatunnel and see full list of source plugins, - # please go to https://seatunnel.apache.org/docs/flink/configuration/source-plugins/Fake + # please go to https://seatunnel.apache.org/docs/connector-v2/source/FakeSource } transform { @@ -41,7 +46,7 @@ transform { } # If you would like to get more information about how to configure seatunnel and see full list of transform plugins, - # please go to https://seatunnel.apache.org/docs/flink/configuration/transform-plugins/Sql + # please go to https://seatunnel.apache.org/docs/category/transform } sink { diff --git a/seatunnel-e2e/seatunnel-spark-connector-v2-e2e/src/test/resources/iotdb/fakesource_to_iotdb.conf b/seatunnel-e2e/seatunnel-spark-connector-v2-e2e/src/test/resources/iotdb/fakesource_to_iotdb.conf index 9c7e521b73d..503668487e4 100644 --- a/seatunnel-e2e/seatunnel-spark-connector-v2-e2e/src/test/resources/iotdb/fakesource_to_iotdb.conf +++ b/seatunnel-e2e/seatunnel-spark-connector-v2-e2e/src/test/resources/iotdb/fakesource_to_iotdb.conf @@ -28,7 +28,12 @@ env { source { FakeSource { result_table_name = "fake" - field_name = "name, age" + schema = { + fields { + name = "string" + age = "int" + } + } } # If you would like to get more information about how to configure seatunnel and see full list of source plugins, diff --git a/seatunnel-examples/seatunnel-flink-connector-v2-example/src/main/resources/examples/fake_to_console.conf b/seatunnel-examples/seatunnel-flink-connector-v2-example/src/main/resources/examples/fake_to_console.conf index feb394127ac..6a89b64a656 100644 --- a/seatunnel-examples/seatunnel-flink-connector-v2-example/src/main/resources/examples/fake_to_console.conf +++ b/seatunnel-examples/seatunnel-flink-connector-v2-example/src/main/resources/examples/fake_to_console.conf @@ -30,11 +30,16 @@ source { # This is a example source plugin **only for test and demonstrate the feature source plugin** FakeSource { result_table_name = "fake" - field_name = "name,age" + schema = { + fields { + name = "string" + age = "int" + } + } } # If you would like to get more information about how to configure seatunnel and see full list of source plugins, - # please go to https://seatunnel.apache.org/docs/flink/configuration/source-plugins/Fake + # please go to https://seatunnel.apache.org/docs/category/source-v2 } transform { @@ -43,12 +48,12 @@ transform { } # If you would like to get more information about how to configure seatunnel and see full list of transform plugins, - # please go to https://seatunnel.apache.org/docs/flink/configuration/transform-plugins/Sql + # please go to https://seatunnel.apache.org/docs/category/transform } sink { Console {} # If you would like to get more information about how to configure seatunnel and see full list of sink plugins, - # please go to https://seatunnel.apache.org/docs/flink/configuration/sink-plugins/Console + # please go to https://seatunnel.apache.org/docs/category/sink-v2 } \ No newline at end of file diff --git a/seatunnel-examples/seatunnel-flink-connector-v2-example/src/main/resources/examples/fake_to_dingtalk.conf b/seatunnel-examples/seatunnel-flink-connector-v2-example/src/main/resources/examples/fake_to_dingtalk.conf index aed13429043..d681985d030 100644 --- a/seatunnel-examples/seatunnel-flink-connector-v2-example/src/main/resources/examples/fake_to_dingtalk.conf +++ b/seatunnel-examples/seatunnel-flink-connector-v2-example/src/main/resources/examples/fake_to_dingtalk.conf @@ -30,11 +30,16 @@ source { # This is a example source plugin **only for test and demonstrate the feature source plugin** FakeSource { result_table_name = "fake" - field_name = "name,age" + schema = { + fields { + name = "string" + age = "int" + } + } } # If you would like to get more information about how to configure seatunnel and see full list of source plugins, - # please go to https://seatunnel.apache.org/docs/flink/configuration/source-plugins/Fake + # please go to https://seatunnel.apache.org/docs/category/source-v2 } transform { @@ -43,7 +48,7 @@ transform { } # If you would like to get more information about how to configure seatunnel and see full list of transform plugins, - # please go to https://seatunnel.apache.org/docs/flink/configuration/transform-plugins/Sql + # please go to https://seatunnel.apache.org/docs/category/transform } sink { @@ -53,5 +58,5 @@ sink { } # If you would like to get more information about how to configure seatunnel and see full list of sink plugins, - # please go to https://seatunnel.apache.org/docs/flink/configuration/sink-plugins/Console + # please go to https://seatunnel.apache.org/docs/category/sink-v2 } \ No newline at end of file diff --git a/seatunnel-examples/seatunnel-flink-connector-v2-example/src/main/resources/examples/fakesource_to_file.conf b/seatunnel-examples/seatunnel-flink-connector-v2-example/src/main/resources/examples/fakesource_to_file.conf index f7b790c40b8..d640281ccf8 100644 --- a/seatunnel-examples/seatunnel-flink-connector-v2-example/src/main/resources/examples/fakesource_to_file.conf +++ b/seatunnel-examples/seatunnel-flink-connector-v2-example/src/main/resources/examples/fakesource_to_file.conf @@ -30,11 +30,16 @@ source { # This is a example source plugin **only for test and demonstrate the feature source plugin** FakeSource { result_table_name = "fake" - field_name = "name,age" + schema = { + fields { + name = "string" + age = "int" + } + } } # If you would like to get more information about how to configure seatunnel and see full list of source plugins, - # please go to https://seatunnel.apache.org/docs/flink/configuration/source-plugins/Fake + # please go to https://seatunnel.apache.org/docs/category/source-v2 } transform { @@ -43,7 +48,7 @@ transform { sql = "select name,age from fake" } # If you would like to get more information about how to configure seatunnel and see full list of transform plugins, - # please go to https://seatunnel.apache.org/docs/flink/configuration/transform-plugins/Sql + # please go to https://seatunnel.apache.org/docs/category/transform } sink { @@ -64,5 +69,5 @@ sink { } # If you would like to get more information about how to configure seatunnel and see full list of sink plugins, - # please go to https://seatunnel.apache.org/docs/flink/configuration/sink-plugins/Console + # please go to https://seatunnel.apache.org/docs/category/sink-v2 } \ No newline at end of file diff --git a/seatunnel-examples/seatunnel-spark-connector-v2-example/src/main/resources/examples/spark.batch.clickhouse.conf b/seatunnel-examples/seatunnel-spark-connector-v2-example/src/main/resources/examples/spark.batch.clickhouse.conf index 2e27410ee86..8d57e1f3b92 100644 --- a/seatunnel-examples/seatunnel-spark-connector-v2-example/src/main/resources/examples/spark.batch.clickhouse.conf +++ b/seatunnel-examples/seatunnel-spark-connector-v2-example/src/main/resources/examples/spark.batch.clickhouse.conf @@ -26,8 +26,15 @@ env { source { FakeSource { result_table_name = "fake" - field_name = "name,age" + schema = { + fields { + name = "string" + age = "int" + } + } } + # If you would like to get more information about how to configure seatunnel and see full list of input plugins, + # please go to https://seatunnel.apache.org/docs/category/source-v2 } diff --git a/seatunnel-examples/seatunnel-spark-connector-v2-example/src/main/resources/examples/spark.batch.conf b/seatunnel-examples/seatunnel-spark-connector-v2-example/src/main/resources/examples/spark.batch.conf index 45ab040b4f1..6e7755d8c53 100644 --- a/seatunnel-examples/seatunnel-spark-connector-v2-example/src/main/resources/examples/spark.batch.conf +++ b/seatunnel-examples/seatunnel-spark-connector-v2-example/src/main/resources/examples/spark.batch.conf @@ -33,8 +33,27 @@ env { source { # This is a example input plugin **only for test and demonstrate the feature input plugin** FakeSource { + schema = { + fields { + c_map = "map" + c_array = "array" + c_string = string + c_boolean = boolean + c_tinyint = tinyint + c_smallint = smallint + c_int = int + c_bigint = bigint + c_float = float + c_double = double + c_decimal = "decimal(30, 8)" + c_null = "null" + c_bytes = bytes + c_date = date + c_time = time + c_timestamp = timestamp + } + } result_table_name = "fake" - field_name = "name,age,timestamp" } # You can also use other input plugins, such as hdfs @@ -45,7 +64,7 @@ source { # } # If you would like to get more information about how to configure seatunnel and see full list of input plugins, - # please go to https://seatunnel.apache.org/docs/spark/configuration/source-plugins/Fake + # please go to https://seatunnel.apache.org/docs/category/source-v2 } transform { @@ -53,12 +72,12 @@ transform { # you can also use other transform plugins, such as sql sql { - sql = "select name,age from fake" + sql = "select c_string,c_boolean,c_tinyint,c_smallint,c_int,c_bigint,c_float,c_double,c_null,c_bytes from fake" result_table_name = "sql" } # If you would like to get more information about how to configure seatunnel and see full list of transform plugins, - # please go to https://seatunnel.apache.org/docs/spark/configuration/transform-plugins/Split + # please go to https://seatunnel.apache.org/docs/category/transform } sink { @@ -72,5 +91,5 @@ sink { # } # If you would like to get more information about how to configure seatunnel and see full list of output plugins, - # please go to https://seatunnel.apache.org/docs/spark/configuration/sink-plugins/Console + # please go to https://seatunnel.apache.org/docs/category/sink-v2 } diff --git a/seatunnel-translation/seatunnel-translation-base/src/main/java/org/apache/seatunnel/translation/serialization/RowConverter.java b/seatunnel-translation/seatunnel-translation-base/src/main/java/org/apache/seatunnel/translation/serialization/RowConverter.java index 4b6495e7f84..f642cfe45fb 100644 --- a/seatunnel-translation/seatunnel-translation-base/src/main/java/org/apache/seatunnel/translation/serialization/RowConverter.java +++ b/seatunnel-translation/seatunnel-translation-base/src/main/java/org/apache/seatunnel/translation/serialization/RowConverter.java @@ -57,12 +57,12 @@ public void validate(SeaTunnelRow seaTunnelRow) throws IOException { } } if (errors.size() > 0) { - throw new UnsupportedOperationException(""); + throw new UnsupportedOperationException(String.join(",", errors)); } } protected boolean validate(Object field, SeaTunnelDataType dataType) { - if (field == null) { + if (field == null || dataType.getSqlType() == SqlType.NULL) { return true; } SqlType sqlType = dataType.getSqlType();