From 09ed768009614470d2eba90f7bf5a74bb64e1295 Mon Sep 17 00:00:00 2001
From: laglangyue <373435126@qq.com>
Date: Sat, 13 Aug 2022 22:06:00 +0800
Subject: [PATCH 01/23] [Connector-V2][JDBC-connector] optimization fake
---
.../connector-fake/pom.xml | 1 -
.../seatunnel/fake/source/FakeData.java | 61 +++++++++++++++++++
.../seatunnel/fake/source/FakeSource.java | 6 +-
.../fake/source/FakeSourceReader.java | 15 +----
.../main/resources/examples/spark.batch.conf | 10 +--
.../serialization/RowConverter.java | 4 +-
6 files changed, 72 insertions(+), 25 deletions(-)
create mode 100644 seatunnel-connectors-v2/connector-fake/src/main/java/org/apache/seatunnel/connectors/seatunnel/fake/source/FakeData.java
diff --git a/seatunnel-connectors-v2/connector-fake/pom.xml b/seatunnel-connectors-v2/connector-fake/pom.xml
index 7cb8ed3e4e2..60bb5042e36 100644
--- a/seatunnel-connectors-v2/connector-fake/pom.xml
+++ b/seatunnel-connectors-v2/connector-fake/pom.xml
@@ -28,7 +28,6 @@
4.0.0
connector-fake
-
org.apache.seatunnel
diff --git a/seatunnel-connectors-v2/connector-fake/src/main/java/org/apache/seatunnel/connectors/seatunnel/fake/source/FakeData.java b/seatunnel-connectors-v2/connector-fake/src/main/java/org/apache/seatunnel/connectors/seatunnel/fake/source/FakeData.java
new file mode 100644
index 00000000000..0062314d267
--- /dev/null
+++ b/seatunnel-connectors-v2/connector-fake/src/main/java/org/apache/seatunnel/connectors/seatunnel/fake/source/FakeData.java
@@ -0,0 +1,61 @@
+package org.apache.seatunnel.connectors.seatunnel.fake.source;
+
+import java.math.BigDecimal;
+import org.apache.commons.lang3.RandomStringUtils;
+import org.apache.commons.lang3.RandomUtils;
+import org.apache.seatunnel.api.table.type.BasicType;
+import org.apache.seatunnel.api.table.type.DecimalType;
+import org.apache.seatunnel.api.table.type.SeaTunnelDataType;
+import org.apache.seatunnel.api.table.type.SeaTunnelRow;
+
+/**
+ *
+ **/
+public class FakeData {
+
+ public static final String[] columnName = new String[]{
+ "c_void",
+ "c_boolean",
+ "c_byte",
+ "c_short",
+ "c_int",
+ "c_long",
+ "c_float",
+ "c_double",
+ "c_string",
+ "c_decimal"
+ };
+ public static final SeaTunnelDataType>[] columnType = new SeaTunnelDataType[]{
+ BasicType.VOID_TYPE,
+ BasicType.BOOLEAN_TYPE,
+ BasicType.BYTE_TYPE,
+ BasicType.SHORT_TYPE,
+ BasicType.INT_TYPE,
+ BasicType.LONG_TYPE,
+ BasicType.FLOAT_TYPE,
+ BasicType.DOUBLE_TYPE,
+ BasicType.STRING_TYPE,
+ new DecimalType(38, 16)
+ };
+
+
+ public static SeaTunnelRow generateRow() {
+ Object[] columnValue = {
+ Void.TYPE,
+ RandomUtils.nextInt(0, 2) == 1,
+ (byte) RandomUtils.nextInt(0, Byte.MAX_VALUE),
+ (short) RandomUtils.nextInt(Byte.MAX_VALUE, Short.MAX_VALUE),
+ RandomUtils.nextInt(Short.MAX_VALUE, Integer.MAX_VALUE),
+ RandomUtils.nextLong(Integer.MAX_VALUE, Long.MAX_VALUE),
+ RandomUtils.nextFloat(Float.MIN_VALUE, Float.MAX_VALUE),
+ RandomUtils.nextDouble(Float.MAX_VALUE, Double.MAX_VALUE),
+ RandomStringUtils.random(10),
+ BigDecimal.valueOf(RandomUtils.nextDouble(Float.MAX_VALUE, Double.MAX_VALUE))
+ };
+ if(columnValue.length != columnValue.length || columnValue.length != columnType.length){
+ throw new RuntimeException("the row data should be equals to column");
+ }
+ return new SeaTunnelRow(columnValue);
+ }
+
+}
diff --git a/seatunnel-connectors-v2/connector-fake/src/main/java/org/apache/seatunnel/connectors/seatunnel/fake/source/FakeSource.java b/seatunnel-connectors-v2/connector-fake/src/main/java/org/apache/seatunnel/connectors/seatunnel/fake/source/FakeSource.java
index 65e6587f47c..18c63dd4f41 100644
--- a/seatunnel-connectors-v2/connector-fake/src/main/java/org/apache/seatunnel/connectors/seatunnel/fake/source/FakeSource.java
+++ b/seatunnel-connectors-v2/connector-fake/src/main/java/org/apache/seatunnel/connectors/seatunnel/fake/source/FakeSource.java
@@ -20,8 +20,6 @@
import org.apache.seatunnel.api.common.SeaTunnelContext;
import org.apache.seatunnel.api.source.Boundedness;
import org.apache.seatunnel.api.source.SeaTunnelSource;
-import org.apache.seatunnel.api.table.type.BasicType;
-import org.apache.seatunnel.api.table.type.SeaTunnelDataType;
import org.apache.seatunnel.api.table.type.SeaTunnelRow;
import org.apache.seatunnel.api.table.type.SeaTunnelRowType;
import org.apache.seatunnel.common.constants.JobMode;
@@ -47,8 +45,8 @@ public Boundedness getBoundedness() {
@Override
public SeaTunnelRowType getProducedType() {
return new SeaTunnelRowType(
- new String[]{"name", "age", "timestamp"},
- new SeaTunnelDataType>[]{BasicType.STRING_TYPE, BasicType.INT_TYPE, BasicType.LONG_TYPE});
+ FakeData.columnName,
+ FakeData.columnType);
}
@Override
diff --git a/seatunnel-connectors-v2/connector-fake/src/main/java/org/apache/seatunnel/connectors/seatunnel/fake/source/FakeSourceReader.java b/seatunnel-connectors-v2/connector-fake/src/main/java/org/apache/seatunnel/connectors/seatunnel/fake/source/FakeSourceReader.java
index 7007eaf25b3..f51f9cb9dfa 100644
--- a/seatunnel-connectors-v2/connector-fake/src/main/java/org/apache/seatunnel/connectors/seatunnel/fake/source/FakeSourceReader.java
+++ b/seatunnel-connectors-v2/connector-fake/src/main/java/org/apache/seatunnel/connectors/seatunnel/fake/source/FakeSourceReader.java
@@ -26,18 +26,12 @@
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
-import java.util.Random;
-import java.util.concurrent.ThreadLocalRandom;
-
public class FakeSourceReader extends AbstractSingleSplitReader {
private static final Logger LOGGER = LoggerFactory.getLogger(FakeSourceReader.class);
private final SingleSplitReaderContext context;
- private final String[] names = {"Wenjun", "Fanjia", "Zongwen", "CalvinKirs"};
- private final int[] ages = {11, 22, 33, 44};
-
public FakeSourceReader(SingleSplitReaderContext context) {
this.context = context;
}
@@ -56,13 +50,8 @@ public void close() {
@SuppressWarnings("magicnumber")
public void pollNext(Collector output) throws InterruptedException {
// Generate a random number of rows to emit.
- Random random = ThreadLocalRandom.current();
- int size = random.nextInt(10) + 1;
- for (int i = 0; i < size; i++) {
- int randomIndex = random.nextInt(names.length);
- SeaTunnelRow seaTunnelRow = new SeaTunnelRow(new Object[]{names[randomIndex], ages[randomIndex], System.currentTimeMillis()});
- output.collect(seaTunnelRow);
- }
+ SeaTunnelRow seaTunnelRow = FakeData.generateRow();
+ output.collect(seaTunnelRow);
if (Boundedness.BOUNDED.equals(context.getBoundedness())) {
// signal to the source that we have reached the end of the data.
LOGGER.info("Closed the bounded fake source");
diff --git a/seatunnel-examples/seatunnel-spark-connector-v2-example/src/main/resources/examples/spark.batch.conf b/seatunnel-examples/seatunnel-spark-connector-v2-example/src/main/resources/examples/spark.batch.conf
index 45ab040b4f1..37be7ca17b8 100644
--- a/seatunnel-examples/seatunnel-spark-connector-v2-example/src/main/resources/examples/spark.batch.conf
+++ b/seatunnel-examples/seatunnel-spark-connector-v2-example/src/main/resources/examples/spark.batch.conf
@@ -34,7 +34,7 @@ source {
# This is a example input plugin **only for test and demonstrate the feature input plugin**
FakeSource {
result_table_name = "fake"
- field_name = "name,age,timestamp"
+ field_name = "c_void, c_boolean, c_byte, c_short, c_int, c_long, c_float, c_double, c_string, c_decimal"
}
# You can also use other input plugins, such as hdfs
@@ -45,7 +45,7 @@ source {
# }
# If you would like to get more information about how to configure seatunnel and see full list of input plugins,
- # please go to https://seatunnel.apache.org/docs/spark/configuration/source-plugins/Fake
+ # please go to https://seatunnel.apache.org/docs/2.1.3/category/source
}
transform {
@@ -53,12 +53,12 @@ transform {
# you can also use other transform plugins, such as sql
sql {
- sql = "select name,age from fake"
+ sql = "select c_void, c_boolean,c_byte, c_short, c_int, c_long, c_float, c_double, c_string from fake"
result_table_name = "sql"
}
# If you would like to get more information about how to configure seatunnel and see full list of transform plugins,
- # please go to https://seatunnel.apache.org/docs/spark/configuration/transform-plugins/Split
+ # please go to https://seatunnel.apache.org/docs/2.1.3/category/transform
}
sink {
@@ -72,5 +72,5 @@ sink {
# }
# If you would like to get more information about how to configure seatunnel and see full list of output plugins,
- # please go to https://seatunnel.apache.org/docs/spark/configuration/sink-plugins/Console
+ # please go to https://seatunnel.apache.org/docs/2.1.3/category/sink
}
diff --git a/seatunnel-translation/seatunnel-translation-base/src/main/java/org/apache/seatunnel/translation/serialization/RowConverter.java b/seatunnel-translation/seatunnel-translation-base/src/main/java/org/apache/seatunnel/translation/serialization/RowConverter.java
index 4b6495e7f84..f642cfe45fb 100644
--- a/seatunnel-translation/seatunnel-translation-base/src/main/java/org/apache/seatunnel/translation/serialization/RowConverter.java
+++ b/seatunnel-translation/seatunnel-translation-base/src/main/java/org/apache/seatunnel/translation/serialization/RowConverter.java
@@ -57,12 +57,12 @@ public void validate(SeaTunnelRow seaTunnelRow) throws IOException {
}
}
if (errors.size() > 0) {
- throw new UnsupportedOperationException("");
+ throw new UnsupportedOperationException(String.join(",", errors));
}
}
protected boolean validate(Object field, SeaTunnelDataType> dataType) {
- if (field == null) {
+ if (field == null || dataType.getSqlType() == SqlType.NULL) {
return true;
}
SqlType sqlType = dataType.getSqlType();
From 7a378e21cc481ea6bf3325eea2d68cfe45cbc25f Mon Sep 17 00:00:00 2001
From: laglangyue <373435126@qq.com>
Date: Sat, 13 Aug 2022 22:20:21 +0800
Subject: [PATCH 02/23] add more fake data row
---
.../connectors/seatunnel/fake/source/FakeSourceReader.java | 6 ++++--
1 file changed, 4 insertions(+), 2 deletions(-)
diff --git a/seatunnel-connectors-v2/connector-fake/src/main/java/org/apache/seatunnel/connectors/seatunnel/fake/source/FakeSourceReader.java b/seatunnel-connectors-v2/connector-fake/src/main/java/org/apache/seatunnel/connectors/seatunnel/fake/source/FakeSourceReader.java
index f51f9cb9dfa..5b413d81663 100644
--- a/seatunnel-connectors-v2/connector-fake/src/main/java/org/apache/seatunnel/connectors/seatunnel/fake/source/FakeSourceReader.java
+++ b/seatunnel-connectors-v2/connector-fake/src/main/java/org/apache/seatunnel/connectors/seatunnel/fake/source/FakeSourceReader.java
@@ -50,8 +50,10 @@ public void close() {
@SuppressWarnings("magicnumber")
public void pollNext(Collector output) throws InterruptedException {
// Generate a random number of rows to emit.
- SeaTunnelRow seaTunnelRow = FakeData.generateRow();
- output.collect(seaTunnelRow);
+ for (int i = 0; i < 10; i++) {
+ SeaTunnelRow seaTunnelRow = FakeData.generateRow();
+ output.collect(seaTunnelRow);
+ }
if (Boundedness.BOUNDED.equals(context.getBoundedness())) {
// signal to the source that we have reached the end of the data.
LOGGER.info("Closed the bounded fake source");
From 6c443235475c2e7fb9040084cf1d06dc2bf0220d Mon Sep 17 00:00:00 2001
From: laglangyue <373435126@qq.com>
Date: Sat, 13 Aug 2022 22:44:05 +0800
Subject: [PATCH 03/23] check style
---
.../connector-fake/pom.xml | 1 +
.../seatunnel/fake/source/FakeData.java | 19 +++++++++----------
.../seatunnel/fake/source/FakeSource.java | 4 ++--
3 files changed, 12 insertions(+), 12 deletions(-)
diff --git a/seatunnel-connectors-v2/connector-fake/pom.xml b/seatunnel-connectors-v2/connector-fake/pom.xml
index 60bb5042e36..7cb8ed3e4e2 100644
--- a/seatunnel-connectors-v2/connector-fake/pom.xml
+++ b/seatunnel-connectors-v2/connector-fake/pom.xml
@@ -28,6 +28,7 @@
4.0.0
connector-fake
+
org.apache.seatunnel
diff --git a/seatunnel-connectors-v2/connector-fake/src/main/java/org/apache/seatunnel/connectors/seatunnel/fake/source/FakeData.java b/seatunnel-connectors-v2/connector-fake/src/main/java/org/apache/seatunnel/connectors/seatunnel/fake/source/FakeData.java
index 0062314d267..a0b125534de 100644
--- a/seatunnel-connectors-v2/connector-fake/src/main/java/org/apache/seatunnel/connectors/seatunnel/fake/source/FakeData.java
+++ b/seatunnel-connectors-v2/connector-fake/src/main/java/org/apache/seatunnel/connectors/seatunnel/fake/source/FakeData.java
@@ -1,19 +1,18 @@
package org.apache.seatunnel.connectors.seatunnel.fake.source;
-import java.math.BigDecimal;
-import org.apache.commons.lang3.RandomStringUtils;
-import org.apache.commons.lang3.RandomUtils;
import org.apache.seatunnel.api.table.type.BasicType;
import org.apache.seatunnel.api.table.type.DecimalType;
import org.apache.seatunnel.api.table.type.SeaTunnelDataType;
import org.apache.seatunnel.api.table.type.SeaTunnelRow;
-/**
- *
- **/
+import org.apache.commons.lang3.RandomStringUtils;
+import org.apache.commons.lang3.RandomUtils;
+
+import java.math.BigDecimal;
+
public class FakeData {
- public static final String[] columnName = new String[]{
+ public static final String[] COLUMN_NAME = new String[]{
"c_void",
"c_boolean",
"c_byte",
@@ -25,7 +24,7 @@ public class FakeData {
"c_string",
"c_decimal"
};
- public static final SeaTunnelDataType>[] columnType = new SeaTunnelDataType[]{
+ public static final SeaTunnelDataType>[] COLUMN_TYPE = new SeaTunnelDataType[]{
BasicType.VOID_TYPE,
BasicType.BOOLEAN_TYPE,
BasicType.BYTE_TYPE,
@@ -38,7 +37,7 @@ public class FakeData {
new DecimalType(38, 16)
};
-
+ @SuppressWarnings("magicnumber")
public static SeaTunnelRow generateRow() {
Object[] columnValue = {
Void.TYPE,
@@ -52,7 +51,7 @@ public static SeaTunnelRow generateRow() {
RandomStringUtils.random(10),
BigDecimal.valueOf(RandomUtils.nextDouble(Float.MAX_VALUE, Double.MAX_VALUE))
};
- if(columnValue.length != columnValue.length || columnValue.length != columnType.length){
+ if (columnValue.length != columnValue.length || columnValue.length != COLUMN_TYPE.length) {
throw new RuntimeException("the row data should be equals to column");
}
return new SeaTunnelRow(columnValue);
diff --git a/seatunnel-connectors-v2/connector-fake/src/main/java/org/apache/seatunnel/connectors/seatunnel/fake/source/FakeSource.java b/seatunnel-connectors-v2/connector-fake/src/main/java/org/apache/seatunnel/connectors/seatunnel/fake/source/FakeSource.java
index 18c63dd4f41..b97f852844e 100644
--- a/seatunnel-connectors-v2/connector-fake/src/main/java/org/apache/seatunnel/connectors/seatunnel/fake/source/FakeSource.java
+++ b/seatunnel-connectors-v2/connector-fake/src/main/java/org/apache/seatunnel/connectors/seatunnel/fake/source/FakeSource.java
@@ -45,8 +45,8 @@ public Boundedness getBoundedness() {
@Override
public SeaTunnelRowType getProducedType() {
return new SeaTunnelRowType(
- FakeData.columnName,
- FakeData.columnType);
+ FakeData.COLUMN_NAME,
+ FakeData.COLUMN_TYPE);
}
@Override
From 140e46ff8e983f29d06d010905cf52f08c3c3751 Mon Sep 17 00:00:00 2001
From: laglangyue <373435126@qq.com>
Date: Sat, 13 Aug 2022 23:34:35 +0800
Subject: [PATCH 04/23] add license
---
.../seatunnel/fake/source/FakeData.java | 17 +++++++++++++++++
1 file changed, 17 insertions(+)
diff --git a/seatunnel-connectors-v2/connector-fake/src/main/java/org/apache/seatunnel/connectors/seatunnel/fake/source/FakeData.java b/seatunnel-connectors-v2/connector-fake/src/main/java/org/apache/seatunnel/connectors/seatunnel/fake/source/FakeData.java
index a0b125534de..677bb818193 100644
--- a/seatunnel-connectors-v2/connector-fake/src/main/java/org/apache/seatunnel/connectors/seatunnel/fake/source/FakeData.java
+++ b/seatunnel-connectors-v2/connector-fake/src/main/java/org/apache/seatunnel/connectors/seatunnel/fake/source/FakeData.java
@@ -1,3 +1,20 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
package org.apache.seatunnel.connectors.seatunnel.fake.source;
import org.apache.seatunnel.api.table.type.BasicType;
From 87842e426c01b2a8ac89976731050edb46564c28 Mon Sep 17 00:00:00 2001
From: laglangyue <373435126@qq.com>
Date: Sun, 14 Aug 2022 01:18:09 +0800
Subject: [PATCH 05/23] add data type
---
.../seatunnel/fake/source/FakeData.java | 23 +++++++++++++++----
.../main/resources/examples/spark.batch.conf | 4 ++--
.../serialization/InternalRowConverter.java | 1 +
3 files changed, 22 insertions(+), 6 deletions(-)
diff --git a/seatunnel-connectors-v2/connector-fake/src/main/java/org/apache/seatunnel/connectors/seatunnel/fake/source/FakeData.java b/seatunnel-connectors-v2/connector-fake/src/main/java/org/apache/seatunnel/connectors/seatunnel/fake/source/FakeData.java
index 677bb818193..9bcc0ca5baf 100644
--- a/seatunnel-connectors-v2/connector-fake/src/main/java/org/apache/seatunnel/connectors/seatunnel/fake/source/FakeData.java
+++ b/seatunnel-connectors-v2/connector-fake/src/main/java/org/apache/seatunnel/connectors/seatunnel/fake/source/FakeData.java
@@ -17,8 +17,11 @@
package org.apache.seatunnel.connectors.seatunnel.fake.source;
+import java.time.LocalDateTime;
import org.apache.seatunnel.api.table.type.BasicType;
import org.apache.seatunnel.api.table.type.DecimalType;
+import org.apache.seatunnel.api.table.type.LocalTimeType;
+import org.apache.seatunnel.api.table.type.PrimitiveByteArrayType;
import org.apache.seatunnel.api.table.type.SeaTunnelDataType;
import org.apache.seatunnel.api.table.type.SeaTunnelRow;
@@ -39,7 +42,11 @@ public class FakeData {
"c_float",
"c_double",
"c_string",
- "c_decimal"
+ "c_decimal",
+ "c_local_date_time",
+ "c_local_date",
+ "c_local_time",
+ "c_byte_arr"
};
public static final SeaTunnelDataType>[] COLUMN_TYPE = new SeaTunnelDataType[]{
BasicType.VOID_TYPE,
@@ -51,7 +58,11 @@ public class FakeData {
BasicType.FLOAT_TYPE,
BasicType.DOUBLE_TYPE,
BasicType.STRING_TYPE,
- new DecimalType(38, 16)
+ new DecimalType(38, 18),
+ LocalTimeType.LOCAL_DATE_TIME_TYPE,
+ LocalTimeType.LOCAL_DATE_TYPE,
+ LocalTimeType.LOCAL_TIME_TYPE,
+ PrimitiveByteArrayType.INSTANCE
};
@SuppressWarnings("magicnumber")
@@ -65,8 +76,12 @@ public static SeaTunnelRow generateRow() {
RandomUtils.nextLong(Integer.MAX_VALUE, Long.MAX_VALUE),
RandomUtils.nextFloat(Float.MIN_VALUE, Float.MAX_VALUE),
RandomUtils.nextDouble(Float.MAX_VALUE, Double.MAX_VALUE),
- RandomStringUtils.random(10),
- BigDecimal.valueOf(RandomUtils.nextDouble(Float.MAX_VALUE, Double.MAX_VALUE))
+ RandomStringUtils.randomAlphabetic(10),
+ BigDecimal.valueOf(RandomUtils.nextFloat(0, Long.MAX_VALUE)),
+ LocalDateTime.now(),
+ LocalDateTime.now().toLocalDate(),
+ LocalDateTime.now().toLocalTime(),
+ RandomStringUtils.randomAlphabetic(10).getBytes(),
};
if (columnValue.length != columnValue.length || columnValue.length != COLUMN_TYPE.length) {
throw new RuntimeException("the row data should be equals to column");
diff --git a/seatunnel-examples/seatunnel-spark-connector-v2-example/src/main/resources/examples/spark.batch.conf b/seatunnel-examples/seatunnel-spark-connector-v2-example/src/main/resources/examples/spark.batch.conf
index 37be7ca17b8..e007f07ef2d 100644
--- a/seatunnel-examples/seatunnel-spark-connector-v2-example/src/main/resources/examples/spark.batch.conf
+++ b/seatunnel-examples/seatunnel-spark-connector-v2-example/src/main/resources/examples/spark.batch.conf
@@ -34,7 +34,7 @@ source {
# This is a example input plugin **only for test and demonstrate the feature input plugin**
FakeSource {
result_table_name = "fake"
- field_name = "c_void, c_boolean, c_byte, c_short, c_int, c_long, c_float, c_double, c_string, c_decimal"
+ field_name = "c_void, c_boolean, c_byte, c_short, c_int, c_long, c_float, c_double, c_string, c_decimal, c_local_date_time, c_local_date, c_local_time, c_byte_arr"
}
# You can also use other input plugins, such as hdfs
@@ -53,7 +53,7 @@ transform {
# you can also use other transform plugins, such as sql
sql {
- sql = "select c_void, c_boolean,c_byte, c_short, c_int, c_long, c_float, c_double, c_string from fake"
+ sql = "select c_boolean,c_byte, c_short, c_int, c_long, c_float, c_double, c_string, c_decimal, c_byte_arr from fake"
result_table_name = "sql"
}
diff --git a/seatunnel-translation/seatunnel-translation-spark/seatunnel-translation-spark-common/src/main/java/org/apache/seatunnel/translation/spark/common/serialization/InternalRowConverter.java b/seatunnel-translation/seatunnel-translation-spark/seatunnel-translation-spark-common/src/main/java/org/apache/seatunnel/translation/spark/common/serialization/InternalRowConverter.java
index 0224654d6a8..5bbbd66a2dd 100644
--- a/seatunnel-translation/seatunnel-translation-spark/seatunnel-translation-spark-common/src/main/java/org/apache/seatunnel/translation/spark/common/serialization/InternalRowConverter.java
+++ b/seatunnel-translation/seatunnel-translation-spark/seatunnel-translation-spark-common/src/main/java/org/apache/seatunnel/translation/spark/common/serialization/InternalRowConverter.java
@@ -17,6 +17,7 @@
package org.apache.seatunnel.translation.spark.common.serialization;
+import java.math.BigDecimal;
import org.apache.seatunnel.api.table.type.MapType;
import org.apache.seatunnel.api.table.type.SeaTunnelDataType;
import org.apache.seatunnel.api.table.type.SeaTunnelRow;
From 659ac9e37334b370eab8edbf802fac80cf9f6fd4 Mon Sep 17 00:00:00 2001
From: laglangyue <373435126@qq.com>
Date: Sun, 14 Aug 2022 09:34:21 +0800
Subject: [PATCH 06/23] update decimalType random
---
.../seatunnel/connectors/seatunnel/fake/source/FakeData.java | 5 ++++-
1 file changed, 4 insertions(+), 1 deletion(-)
diff --git a/seatunnel-connectors-v2/connector-fake/src/main/java/org/apache/seatunnel/connectors/seatunnel/fake/source/FakeData.java b/seatunnel-connectors-v2/connector-fake/src/main/java/org/apache/seatunnel/connectors/seatunnel/fake/source/FakeData.java
index 9bcc0ca5baf..868a197eb29 100644
--- a/seatunnel-connectors-v2/connector-fake/src/main/java/org/apache/seatunnel/connectors/seatunnel/fake/source/FakeData.java
+++ b/seatunnel-connectors-v2/connector-fake/src/main/java/org/apache/seatunnel/connectors/seatunnel/fake/source/FakeData.java
@@ -77,7 +77,7 @@ public static SeaTunnelRow generateRow() {
RandomUtils.nextFloat(Float.MIN_VALUE, Float.MAX_VALUE),
RandomUtils.nextDouble(Float.MAX_VALUE, Double.MAX_VALUE),
RandomStringUtils.randomAlphabetic(10),
- BigDecimal.valueOf(RandomUtils.nextFloat(0, Long.MAX_VALUE)),
+ BigDecimal.valueOf(RandomUtils.nextLong(Integer.MAX_VALUE,Long.MAX_VALUE),18),
LocalDateTime.now(),
LocalDateTime.now().toLocalDate(),
LocalDateTime.now().toLocalTime(),
@@ -89,4 +89,7 @@ public static SeaTunnelRow generateRow() {
return new SeaTunnelRow(columnValue);
}
+ public static void main(String[] args) {
+ System.out.println(BigDecimal.valueOf(RandomUtils.nextDouble(0, Long.MAX_VALUE)));
+ }
}
From 7bad2a6d8e798f56c73aee44e2daedd4ed77b0c4 Mon Sep 17 00:00:00 2001
From: laglangyue <373435126@qq.com>
Date: Sun, 14 Aug 2022 09:37:52 +0800
Subject: [PATCH 07/23] update decimalType random
---
.../connectors/seatunnel/fake/source/FakeData.java | 8 ++------
.../spark/common/serialization/InternalRowConverter.java | 4 ----
2 files changed, 2 insertions(+), 10 deletions(-)
diff --git a/seatunnel-connectors-v2/connector-fake/src/main/java/org/apache/seatunnel/connectors/seatunnel/fake/source/FakeData.java b/seatunnel-connectors-v2/connector-fake/src/main/java/org/apache/seatunnel/connectors/seatunnel/fake/source/FakeData.java
index 868a197eb29..1639e5b0e49 100644
--- a/seatunnel-connectors-v2/connector-fake/src/main/java/org/apache/seatunnel/connectors/seatunnel/fake/source/FakeData.java
+++ b/seatunnel-connectors-v2/connector-fake/src/main/java/org/apache/seatunnel/connectors/seatunnel/fake/source/FakeData.java
@@ -17,7 +17,6 @@
package org.apache.seatunnel.connectors.seatunnel.fake.source;
-import java.time.LocalDateTime;
import org.apache.seatunnel.api.table.type.BasicType;
import org.apache.seatunnel.api.table.type.DecimalType;
import org.apache.seatunnel.api.table.type.LocalTimeType;
@@ -29,6 +28,7 @@
import org.apache.commons.lang3.RandomUtils;
import java.math.BigDecimal;
+import java.time.LocalDateTime;
public class FakeData {
@@ -77,7 +77,7 @@ public static SeaTunnelRow generateRow() {
RandomUtils.nextFloat(Float.MIN_VALUE, Float.MAX_VALUE),
RandomUtils.nextDouble(Float.MAX_VALUE, Double.MAX_VALUE),
RandomStringUtils.randomAlphabetic(10),
- BigDecimal.valueOf(RandomUtils.nextLong(Integer.MAX_VALUE,Long.MAX_VALUE),18),
+ BigDecimal.valueOf(RandomUtils.nextLong(Integer.MAX_VALUE, Long.MAX_VALUE), 18),
LocalDateTime.now(),
LocalDateTime.now().toLocalDate(),
LocalDateTime.now().toLocalTime(),
@@ -88,8 +88,4 @@ public static SeaTunnelRow generateRow() {
}
return new SeaTunnelRow(columnValue);
}
-
- public static void main(String[] args) {
- System.out.println(BigDecimal.valueOf(RandomUtils.nextDouble(0, Long.MAX_VALUE)));
- }
}
diff --git a/seatunnel-translation/seatunnel-translation-spark/seatunnel-translation-spark-common/src/main/java/org/apache/seatunnel/translation/spark/common/serialization/InternalRowConverter.java b/seatunnel-translation/seatunnel-translation-spark/seatunnel-translation-spark-common/src/main/java/org/apache/seatunnel/translation/spark/common/serialization/InternalRowConverter.java
index 5bbbd66a2dd..decd29efd44 100644
--- a/seatunnel-translation/seatunnel-translation-spark/seatunnel-translation-spark-common/src/main/java/org/apache/seatunnel/translation/spark/common/serialization/InternalRowConverter.java
+++ b/seatunnel-translation/seatunnel-translation-spark/seatunnel-translation-spark-common/src/main/java/org/apache/seatunnel/translation/spark/common/serialization/InternalRowConverter.java
@@ -17,7 +17,6 @@
package org.apache.seatunnel.translation.spark.common.serialization;
-import java.math.BigDecimal;
import org.apache.seatunnel.api.table.type.MapType;
import org.apache.seatunnel.api.table.type.SeaTunnelDataType;
import org.apache.seatunnel.api.table.type.SeaTunnelRow;
@@ -36,7 +35,6 @@
import org.apache.spark.sql.catalyst.expressions.MutableShort;
import org.apache.spark.sql.catalyst.expressions.MutableValue;
import org.apache.spark.sql.catalyst.expressions.SpecificInternalRow;
-import org.apache.spark.sql.types.Decimal;
import org.apache.spark.unsafe.types.UTF8String;
import java.io.IOException;
@@ -83,8 +81,6 @@ private static Object convert(Object field, SeaTunnelDataType> dataType) {
return convertMap((Map, ?>) field, (MapType, ?>) dataType, InternalRowConverter::convert);
case STRING:
return UTF8String.fromString((String) field);
- case DECIMAL:
- return Decimal.apply((BigDecimal) field);
default:
return field;
}
From 5ed56bc5372d2f878e3770484c5fb2e62cfdc0ed Mon Sep 17 00:00:00 2001
From: laglangyue <373435126@qq.com>
Date: Tue, 16 Aug 2022 00:47:27 +0800
Subject: [PATCH 08/23] support user-schema
---
.../seatunnel/fake/source/FakeData.java | 91 ---------------
.../seatunnel/fake/source/FakeRandomData.java | 104 ++++++++++++++++++
.../seatunnel/fake/source/FakeSource.java | 9 +-
.../fake/source/FakeSourceReader.java | 7 +-
4 files changed, 114 insertions(+), 97 deletions(-)
delete mode 100644 seatunnel-connectors-v2/connector-fake/src/main/java/org/apache/seatunnel/connectors/seatunnel/fake/source/FakeData.java
create mode 100644 seatunnel-connectors-v2/connector-fake/src/main/java/org/apache/seatunnel/connectors/seatunnel/fake/source/FakeRandomData.java
diff --git a/seatunnel-connectors-v2/connector-fake/src/main/java/org/apache/seatunnel/connectors/seatunnel/fake/source/FakeData.java b/seatunnel-connectors-v2/connector-fake/src/main/java/org/apache/seatunnel/connectors/seatunnel/fake/source/FakeData.java
deleted file mode 100644
index 1639e5b0e49..00000000000
--- a/seatunnel-connectors-v2/connector-fake/src/main/java/org/apache/seatunnel/connectors/seatunnel/fake/source/FakeData.java
+++ /dev/null
@@ -1,91 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.seatunnel.connectors.seatunnel.fake.source;
-
-import org.apache.seatunnel.api.table.type.BasicType;
-import org.apache.seatunnel.api.table.type.DecimalType;
-import org.apache.seatunnel.api.table.type.LocalTimeType;
-import org.apache.seatunnel.api.table.type.PrimitiveByteArrayType;
-import org.apache.seatunnel.api.table.type.SeaTunnelDataType;
-import org.apache.seatunnel.api.table.type.SeaTunnelRow;
-
-import org.apache.commons.lang3.RandomStringUtils;
-import org.apache.commons.lang3.RandomUtils;
-
-import java.math.BigDecimal;
-import java.time.LocalDateTime;
-
-public class FakeData {
-
- public static final String[] COLUMN_NAME = new String[]{
- "c_void",
- "c_boolean",
- "c_byte",
- "c_short",
- "c_int",
- "c_long",
- "c_float",
- "c_double",
- "c_string",
- "c_decimal",
- "c_local_date_time",
- "c_local_date",
- "c_local_time",
- "c_byte_arr"
- };
- public static final SeaTunnelDataType>[] COLUMN_TYPE = new SeaTunnelDataType[]{
- BasicType.VOID_TYPE,
- BasicType.BOOLEAN_TYPE,
- BasicType.BYTE_TYPE,
- BasicType.SHORT_TYPE,
- BasicType.INT_TYPE,
- BasicType.LONG_TYPE,
- BasicType.FLOAT_TYPE,
- BasicType.DOUBLE_TYPE,
- BasicType.STRING_TYPE,
- new DecimalType(38, 18),
- LocalTimeType.LOCAL_DATE_TIME_TYPE,
- LocalTimeType.LOCAL_DATE_TYPE,
- LocalTimeType.LOCAL_TIME_TYPE,
- PrimitiveByteArrayType.INSTANCE
- };
-
- @SuppressWarnings("magicnumber")
- public static SeaTunnelRow generateRow() {
- Object[] columnValue = {
- Void.TYPE,
- RandomUtils.nextInt(0, 2) == 1,
- (byte) RandomUtils.nextInt(0, Byte.MAX_VALUE),
- (short) RandomUtils.nextInt(Byte.MAX_VALUE, Short.MAX_VALUE),
- RandomUtils.nextInt(Short.MAX_VALUE, Integer.MAX_VALUE),
- RandomUtils.nextLong(Integer.MAX_VALUE, Long.MAX_VALUE),
- RandomUtils.nextFloat(Float.MIN_VALUE, Float.MAX_VALUE),
- RandomUtils.nextDouble(Float.MAX_VALUE, Double.MAX_VALUE),
- RandomStringUtils.randomAlphabetic(10),
- BigDecimal.valueOf(RandomUtils.nextLong(Integer.MAX_VALUE, Long.MAX_VALUE), 18),
- LocalDateTime.now(),
- LocalDateTime.now().toLocalDate(),
- LocalDateTime.now().toLocalTime(),
- RandomStringUtils.randomAlphabetic(10).getBytes(),
- };
- if (columnValue.length != columnValue.length || columnValue.length != COLUMN_TYPE.length) {
- throw new RuntimeException("the row data should be equals to column");
- }
- return new SeaTunnelRow(columnValue);
- }
-}
diff --git a/seatunnel-connectors-v2/connector-fake/src/main/java/org/apache/seatunnel/connectors/seatunnel/fake/source/FakeRandomData.java b/seatunnel-connectors-v2/connector-fake/src/main/java/org/apache/seatunnel/connectors/seatunnel/fake/source/FakeRandomData.java
new file mode 100644
index 00000000000..e081bde9fbe
--- /dev/null
+++ b/seatunnel-connectors-v2/connector-fake/src/main/java/org/apache/seatunnel/connectors/seatunnel/fake/source/FakeRandomData.java
@@ -0,0 +1,104 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.seatunnel.connectors.seatunnel.fake.source;
+
+import static org.apache.seatunnel.api.table.type.BasicType.BOOLEAN_TYPE;
+import static org.apache.seatunnel.api.table.type.BasicType.BYTE_TYPE;
+import static org.apache.seatunnel.api.table.type.BasicType.DOUBLE_TYPE;
+import static org.apache.seatunnel.api.table.type.BasicType.FLOAT_TYPE;
+import static org.apache.seatunnel.api.table.type.BasicType.INT_TYPE;
+import static org.apache.seatunnel.api.table.type.BasicType.LONG_TYPE;
+import static org.apache.seatunnel.api.table.type.BasicType.SHORT_TYPE;
+import static org.apache.seatunnel.api.table.type.BasicType.STRING_TYPE;
+
+import java.math.BigDecimal;
+import java.time.LocalDateTime;
+import java.util.ArrayList;
+import java.util.List;
+import org.apache.commons.lang3.RandomStringUtils;
+import org.apache.commons.lang3.RandomUtils;
+import org.apache.seatunnel.api.table.type.DecimalType;
+import org.apache.seatunnel.api.table.type.LocalTimeType;
+import org.apache.seatunnel.api.table.type.SeaTunnelDataType;
+import org.apache.seatunnel.api.table.type.SeaTunnelRow;
+import org.apache.seatunnel.api.table.type.SeaTunnelRowType;
+import org.apache.seatunnel.connectors.seatunnel.common.schema.SeatunnelSchema;
+
+public class FakeRandomData {
+
+ private final SeatunnelSchema schema;
+
+ public FakeRandomData(SeatunnelSchema schema) {
+ this.schema = schema;
+ }
+
+ public SeaTunnelRow randomRow() {
+ SeaTunnelRowType seaTunnelRowType = schema.getSeaTunnelRowType();
+ String[] fieldNames = seaTunnelRowType.getFieldNames();
+ SeaTunnelDataType>[] fieldTypes = seaTunnelRowType.getFieldTypes();
+ List