From 6a13f7279a84951903e410d380a963ffa07bdb4b Mon Sep 17 00:00:00 2001 From: tyrantlucifer Date: Thu, 22 Sep 2022 21:22:49 +0800 Subject: [PATCH 1/8] [Improve][Connector-V2] Improve orc write strategy --- .../file/sink/writer/OrcWriteStrategy.java | 71 +++++++++++++------ 1 file changed, 50 insertions(+), 21 deletions(-) diff --git a/seatunnel-connectors-v2/connector-file/connector-file-base/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/sink/writer/OrcWriteStrategy.java b/seatunnel-connectors-v2/connector-file/connector-file-base/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/sink/writer/OrcWriteStrategy.java index 875504e9b43..edd40f590ce 100644 --- a/seatunnel-connectors-v2/connector-file/connector-file-base/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/sink/writer/OrcWriteStrategy.java +++ b/seatunnel-connectors-v2/connector-file/connector-file-base/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/sink/writer/OrcWriteStrategy.java @@ -17,9 +17,13 @@ package org.apache.seatunnel.connectors.seatunnel.file.sink.writer; +import org.apache.seatunnel.api.table.type.ArrayType; import org.apache.seatunnel.api.table.type.BasicType; +import org.apache.seatunnel.api.table.type.DecimalType; +import org.apache.seatunnel.api.table.type.MapType; import org.apache.seatunnel.api.table.type.SeaTunnelDataType; import org.apache.seatunnel.api.table.type.SeaTunnelRow; +import org.apache.seatunnel.api.table.type.SeaTunnelRowType; import org.apache.seatunnel.connectors.seatunnel.file.sink.config.TextFileSinkConfig; import lombok.NonNull; @@ -109,28 +113,53 @@ private Writer getOrCreateWriter(@NonNull String filePath) { } private TypeDescription buildFieldWithRowType(SeaTunnelDataType type) { - if (BasicType.BOOLEAN_TYPE.equals(type)) { - return TypeDescription.createBoolean(); + switch (type.getSqlType()) { + case ARRAY: + BasicType elementType = ((ArrayType) type).getElementType(); + return TypeDescription.createList(buildFieldWithRowType(elementType)); + case MAP: + SeaTunnelDataType keyType = ((MapType) type).getKeyType(); + SeaTunnelDataType valueType = ((MapType) type).getValueType(); + return TypeDescription.createMap(buildFieldWithRowType(keyType), buildFieldWithRowType(valueType)); + case STRING: + return TypeDescription.createString(); + case BOOLEAN: + return TypeDescription.createBoolean(); + case TINYINT: + return TypeDescription.createByte(); + case SMALLINT: + return TypeDescription.createShort(); + case INT: + return TypeDescription.createInt(); + case BIGINT: + return TypeDescription.createLong(); + case FLOAT: + return TypeDescription.createFloat(); + case DOUBLE: + return TypeDescription.createDouble(); + case DECIMAL: + int precision = ((DecimalType) type).getPrecision(); + int scale = ((DecimalType) type).getScale(); + return TypeDescription.createDecimal().withPrecision(precision).withScale(scale); + case BYTES: + return TypeDescription.createBinary(); + case DATE: + return TypeDescription.createDate(); + case TIME: + case TIMESTAMP: + return TypeDescription.createTimestamp(); + case ROW: + TypeDescription struct = TypeDescription.createStruct(); + SeaTunnelDataType[] fieldTypes = ((SeaTunnelRowType) type).getFieldTypes(); + for (int i = 0; i < fieldTypes.length; i++) { + struct.addField(((SeaTunnelRowType) type).getFieldName(i), buildFieldWithRowType(fieldTypes[i])); + } + return struct; + case NULL: + default: + String errorMsg = String.format("Orc file not support this type [%s]", type.getSqlType()); + throw new UnsupportedOperationException(errorMsg); } - if (BasicType.SHORT_TYPE.equals(type)) { - return TypeDescription.createShort(); - } - if (BasicType.INT_TYPE.equals(type)) { - return TypeDescription.createInt(); - } - if (BasicType.LONG_TYPE.equals(type)) { - return TypeDescription.createLong(); - } - if (BasicType.FLOAT_TYPE.equals(type)) { - return TypeDescription.createFloat(); - } - if (BasicType.DOUBLE_TYPE.equals(type)) { - return TypeDescription.createDouble(); - } - if (BasicType.BYTE_TYPE.equals(type)) { - return TypeDescription.createByte(); - } - return TypeDescription.createString(); } private TypeDescription buildSchemaWithRowType() { From 8be760c276bdfc11b40e7dc69b271f4292cf8a49 Mon Sep 17 00:00:00 2001 From: Hisoka Date: Fri, 23 Sep 2022 18:11:24 +0800 Subject: [PATCH 2/8] [Connector-V2] [File] support sink orc type --- .../file/sink/writer/OrcWriteStrategy.java | 93 ++++++++++++++++++- 1 file changed, 88 insertions(+), 5 deletions(-) diff --git a/seatunnel-connectors-v2/connector-file/connector-file-base/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/sink/writer/OrcWriteStrategy.java b/seatunnel-connectors-v2/connector-file/connector-file-base/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/sink/writer/OrcWriteStrategy.java index edd40f590ce..c304b7b3816 100644 --- a/seatunnel-connectors-v2/connector-file/connector-file-base/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/sink/writer/OrcWriteStrategy.java +++ b/seatunnel-connectors-v2/connector-file/connector-file-base/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/sink/writer/OrcWriteStrategy.java @@ -28,10 +28,16 @@ import lombok.NonNull; import org.apache.hadoop.fs.Path; +import org.apache.hadoop.hive.common.type.HiveDecimal; import org.apache.hadoop.hive.ql.exec.vector.BytesColumnVector; import org.apache.hadoop.hive.ql.exec.vector.ColumnVector; +import org.apache.hadoop.hive.ql.exec.vector.DecimalColumnVector; import org.apache.hadoop.hive.ql.exec.vector.DoubleColumnVector; +import org.apache.hadoop.hive.ql.exec.vector.ListColumnVector; import org.apache.hadoop.hive.ql.exec.vector.LongColumnVector; +import org.apache.hadoop.hive.ql.exec.vector.MapColumnVector; +import org.apache.hadoop.hive.ql.exec.vector.StructColumnVector; +import org.apache.hadoop.hive.ql.exec.vector.TimestampColumnVector; import org.apache.hadoop.hive.ql.exec.vector.VectorizedRowBatch; import org.apache.orc.CompressionKind; import org.apache.orc.OrcFile; @@ -39,9 +45,16 @@ import org.apache.orc.Writer; import java.io.IOException; +import java.math.BigDecimal; import java.math.BigInteger; import java.nio.charset.StandardCharsets; +import java.sql.Timestamp; +import java.time.LocalDate; +import java.time.LocalDateTime; +import java.time.LocalTime; +import java.time.temporal.ChronoField; import java.util.HashMap; +import java.util.List; import java.util.Map; public class OrcWriteStrategy extends AbstractWriteStrategy { @@ -63,7 +76,7 @@ public void write(@NonNull SeaTunnelRow seaTunnelRow) { for (Integer index : sinkColumnsIndexInRow) { Object value = seaTunnelRow.getField(index); ColumnVector vector = rowBatch.cols[i]; - setColumn(value, vector, row); + setColumn(value, vector, row, rowBatch.getMaxSize()); i++; } try { @@ -140,7 +153,7 @@ private TypeDescription buildFieldWithRowType(SeaTunnelDataType type) { case DECIMAL: int precision = ((DecimalType) type).getPrecision(); int scale = ((DecimalType) type).getScale(); - return TypeDescription.createDecimal().withPrecision(precision).withScale(scale); + return TypeDescription.createDecimal().withScale(scale).withPrecision(precision); case BYTES: return TypeDescription.createBinary(); case DATE: @@ -171,7 +184,7 @@ private TypeDescription buildSchemaWithRowType() { return schema; } - private void setColumn(Object value, ColumnVector vector, int row) { + private void setColumn(Object value, ColumnVector vector, int row, int maxSize) { if (value == null) { vector.isNull[row] = true; vector.noNulls = false; @@ -189,23 +202,93 @@ private void setColumn(Object value, ColumnVector vector, int row) { BytesColumnVector bytesColumnVector = (BytesColumnVector) vector; setByteColumnVector(value, bytesColumnVector, row); break; + case DECIMAL: + DecimalColumnVector decimalColumnVector = (DecimalColumnVector) vector; + setDecimalColumnVector(value, decimalColumnVector, row); + break; + case TIMESTAMP: + TimestampColumnVector timestampColumnVector = (TimestampColumnVector) vector; + setTimestampColumnVector(value, timestampColumnVector, row); + break; + case LIST: + ListColumnVector listColumnVector = (ListColumnVector) vector; + setListColumnVector(value, listColumnVector, row); + break; + case MAP: + MapColumnVector mapColumnVector = (MapColumnVector) vector; + setMapColumnVector(value, mapColumnVector, row, maxSize); + break; + case STRUCT: + StructColumnVector structColumnVector = (StructColumnVector) vector; + setStructColumnVector(value, structColumnVector, row); + break; default: - throw new RuntimeException("Unexpected ColumnVector subtype"); + throw new RuntimeException("Unexpected ColumnVector subtype " + vector.type); } } } + private void setStructColumnVector(Object value, StructColumnVector structColumnVector, int row) { + + } + + private void setMapColumnVector(Object value, MapColumnVector mapColumnVector, int row, int maxSize) { + if (value instanceof Map) { + Map map = (Map) value; + + mapColumnVector.offsets[row] = mapColumnVector.childCount; + mapColumnVector.lengths[row] = map.size(); + mapColumnVector.childCount += map.size(); + } + } + + private void setListColumnVector(Object value, ListColumnVector listColumnVector, int row) { + if (value instanceof Object[]) { +// listColumnVector.child + } else if (value instanceof List) { + + } else { + throw new RuntimeException("List and Array type expected for field"); + } + } + + private void setDecimalColumnVector(Object value, DecimalColumnVector decimalColumnVector, int row) { + if (value instanceof BigDecimal) { + decimalColumnVector.set(row, HiveDecimal.create((BigDecimal) value)); + } else { + throw new RuntimeException("BigDecimal type expected for field"); + } + } + + private void setTimestampColumnVector(Object value, TimestampColumnVector timestampColumnVector, int row) { + if (value instanceof Timestamp) { + timestampColumnVector.set(row, (Timestamp) value); + } else if (value instanceof LocalDateTime) { + timestampColumnVector.set(row, Timestamp.valueOf((LocalDateTime) value)); + } else if (value instanceof LocalTime) { + timestampColumnVector.set(row, Timestamp.valueOf(((LocalTime) value).atDate(LocalDate.ofEpochDay(0)))); + } else { + throw new RuntimeException("Time series type expected for field"); + } + } + private void setLongColumnVector(Object value, LongColumnVector longVector, int row) { if (value instanceof Boolean) { Boolean bool = (Boolean) value; longVector.vector[row] = (bool.equals(Boolean.TRUE)) ? Long.valueOf(1) : Long.valueOf(0); - } else if (value instanceof Integer) { + } else if (value instanceof Integer) { longVector.vector[row] = ((Integer) value).longValue(); } else if (value instanceof Long) { longVector.vector[row] = (Long) value; } else if (value instanceof BigInteger) { BigInteger bigInt = (BigInteger) value; longVector.vector[row] = bigInt.longValue(); + } else if (value instanceof Byte) { + longVector.vector[row] = (Byte) value; + } else if (value instanceof Short) { + longVector.vector[row] = (Short) value; + } else if (value instanceof LocalDate) { + longVector.vector[row] = ((LocalDate) value).getLong(ChronoField.EPOCH_DAY); } else { throw new RuntimeException("Long or Integer type expected for field"); } From 31cb8cd0f02302bc9642e766bdddf46005ec5b04 Mon Sep 17 00:00:00 2001 From: Hisoka Date: Fri, 23 Sep 2022 19:15:05 +0800 Subject: [PATCH 3/8] [Connector-V2] [File] Add ORC sink type --- .../file/sink/writer/OrcWriteStrategy.java | 42 ++++++++-- .../e2e/spark/v2/file/FakeSourceToFileIT.java | 12 +++ .../file/fakesource_to_local_orc.conf | 78 +++++++++++++++++++ .../file/local_orc_source_to_console.conf | 51 ++++++++++++ 4 files changed, 176 insertions(+), 7 deletions(-) create mode 100644 seatunnel-e2e/seatunnel-spark-connector-v2-e2e/connector-file-spark-e2e/src/test/resources/file/fakesource_to_local_orc.conf create mode 100644 seatunnel-e2e/seatunnel-spark-connector-v2-e2e/connector-file-spark-e2e/src/test/resources/file/local_orc_source_to_console.conf diff --git a/seatunnel-connectors-v2/connector-file/connector-file-base/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/sink/writer/OrcWriteStrategy.java b/seatunnel-connectors-v2/connector-file/connector-file-base/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/sink/writer/OrcWriteStrategy.java index c304b7b3816..e4cd2cf7a69 100644 --- a/seatunnel-connectors-v2/connector-file/connector-file-base/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/sink/writer/OrcWriteStrategy.java +++ b/seatunnel-connectors-v2/connector-file/connector-file-base/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/sink/writer/OrcWriteStrategy.java @@ -76,7 +76,7 @@ public void write(@NonNull SeaTunnelRow seaTunnelRow) { for (Integer index : sinkColumnsIndexInRow) { Object value = seaTunnelRow.getField(index); ColumnVector vector = rowBatch.cols[i]; - setColumn(value, vector, row, rowBatch.getMaxSize()); + setColumn(value, vector, row); i++; } try { @@ -184,7 +184,7 @@ private TypeDescription buildSchemaWithRowType() { return schema; } - private void setColumn(Object value, ColumnVector vector, int row, int maxSize) { + private void setColumn(Object value, ColumnVector vector, int row) { if (value == null) { vector.isNull[row] = true; vector.noNulls = false; @@ -216,7 +216,7 @@ private void setColumn(Object value, ColumnVector vector, int row, int maxSize) break; case MAP: MapColumnVector mapColumnVector = (MapColumnVector) vector; - setMapColumnVector(value, mapColumnVector, row, maxSize); + setMapColumnVector(value, mapColumnVector, row); break; case STRUCT: StructColumnVector structColumnVector = (StructColumnVector) vector; @@ -229,27 +229,55 @@ private void setColumn(Object value, ColumnVector vector, int row, int maxSize) } private void setStructColumnVector(Object value, StructColumnVector structColumnVector, int row) { + if (value instanceof SeaTunnelRow) { + SeaTunnelRow seaTunnelRow = (SeaTunnelRow) value; + Object[] fields = seaTunnelRow.getFields(); + for (int i = 0; i < fields.length; i++) { + setColumn(fields[i], structColumnVector.fields[i], row); + } + } else { + throw new RuntimeException("SeaTunnelRow type expected for field"); + } } - private void setMapColumnVector(Object value, MapColumnVector mapColumnVector, int row, int maxSize) { + private void setMapColumnVector(Object value, MapColumnVector mapColumnVector, int row) { if (value instanceof Map) { - Map map = (Map) value; + Map map = (Map) value; mapColumnVector.offsets[row] = mapColumnVector.childCount; mapColumnVector.lengths[row] = map.size(); mapColumnVector.childCount += map.size(); + + int i = 0; + for (Map.Entry entry : map.entrySet()) { + int mapElem = (int) mapColumnVector.offsets[row] + i; + setColumn(entry.getKey(), mapColumnVector.keys, mapElem); + setColumn(entry.getValue(), mapColumnVector.values, mapElem); + ++i; + } + } else { + throw new RuntimeException("Map type expected for field"); } } private void setListColumnVector(Object value, ListColumnVector listColumnVector, int row) { + Object[] valueArray; if (value instanceof Object[]) { -// listColumnVector.child + valueArray = (Object[]) value; } else if (value instanceof List) { - + valueArray = ((List) value).toArray(); } else { throw new RuntimeException("List and Array type expected for field"); } + listColumnVector.offsets[row] = listColumnVector.childCount; + listColumnVector.lengths[row] = valueArray.length; + listColumnVector.childCount += valueArray.length; + + for (int i = 0; i < valueArray.length; i++) { + int listElem = (int) listColumnVector.offsets[row] + i; + setColumn(valueArray[i], listColumnVector.child, listElem); + } } private void setDecimalColumnVector(Object value, DecimalColumnVector decimalColumnVector, int row) { diff --git a/seatunnel-e2e/seatunnel-spark-connector-v2-e2e/connector-file-spark-e2e/src/test/java/org/apache/seatunnel/e2e/spark/v2/file/FakeSourceToFileIT.java b/seatunnel-e2e/seatunnel-spark-connector-v2-e2e/connector-file-spark-e2e/src/test/java/org/apache/seatunnel/e2e/spark/v2/file/FakeSourceToFileIT.java index b2535ebf281..fa56da7bcc8 100644 --- a/seatunnel-e2e/seatunnel-spark-connector-v2-e2e/connector-file-spark-e2e/src/test/java/org/apache/seatunnel/e2e/spark/v2/file/FakeSourceToFileIT.java +++ b/seatunnel-e2e/seatunnel-spark-connector-v2-e2e/connector-file-spark-e2e/src/test/java/org/apache/seatunnel/e2e/spark/v2/file/FakeSourceToFileIT.java @@ -57,4 +57,16 @@ public void testFakeSourceToLocalFileJson() throws IOException, InterruptedExcep Container.ExecResult execResult = executeSeaTunnelSparkJob("/file/fakesource_to_local_json.conf"); Assertions.assertEquals(0, execResult.getExitCode()); } + + @Test + public void testFakeSourceToLocalFileORC() throws IOException, InterruptedException { + Container.ExecResult execResult = executeSeaTunnelSparkJob("/file/fakesource_to_local_orc.conf"); + Assertions.assertEquals(0, execResult.getExitCode()); + } + + @Test + public void testLocalFileORCToLocalFileORC() throws IOException, InterruptedException { + Container.ExecResult execResult = executeSeaTunnelSparkJob("/file/local_orc_source_to_console.conf"); + Assertions.assertEquals(0, execResult.getExitCode()); + } } diff --git a/seatunnel-e2e/seatunnel-spark-connector-v2-e2e/connector-file-spark-e2e/src/test/resources/file/fakesource_to_local_orc.conf b/seatunnel-e2e/seatunnel-spark-connector-v2-e2e/connector-file-spark-e2e/src/test/resources/file/fakesource_to_local_orc.conf new file mode 100644 index 00000000000..9bdf41992a9 --- /dev/null +++ b/seatunnel-e2e/seatunnel-spark-connector-v2-e2e/connector-file-spark-e2e/src/test/resources/file/fakesource_to_local_orc.conf @@ -0,0 +1,78 @@ +# +# Licensed to the Apache Software Foundation (ASF) under one or more +# contributor license agreements. See the NOTICE file distributed with +# this work for additional information regarding copyright ownership. +# The ASF licenses this file to You under the Apache License, Version 2.0 +# (the "License"); you may not use this file except in compliance with +# the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +# + +env { + # You can set spark configuration here + spark.app.name = "SeaTunnel" + spark.executor.instances = 2 + spark.executor.cores = 1 + spark.executor.memory = "1g" + spark.master = local + job.mode = "BATCH" +} + +source { + FakeSource { + result_table_name = "fake" + schema = { + fields { + c_map = "map" + c_array = "array" + c_string = string + c_boolean = boolean + c_tinyint = tinyint + c_smallint = smallint + c_int = int + c_bigint = bigint + c_float = float + c_double = double + c_decimal = "decimal(30, 8)" + c_bytes = bytes + c_date = date + c_timestamp = timestamp + } + } + } + + # If you would like to get more information about how to configure seatunnel and see full list of source plugins, + # please go to https://seatunnel.apache.org/docs/connector-v2/source/FakeSource +} + +transform { + sql { + sql = "select * from fake" + } + + # If you would like to get more information about how to configure seatunnel and see full list of transform plugins, + # please go to https://seatunnel.apache.org/docs/category/transform +} + +sink { + LocalFile { + path="/tmp/test/orc/" + partition_by=["c_boolean"] + partition_dir_expression="${k0}=${v0}" + is_partition_field_write_in_file=true + file_name_expression="${transactionId}_${now}" + file_format="orc" + filename_time_format="yyyy.MM.dd" + is_enable_transaction=true + } + + # If you would like to get more information about how to configure seatunnel and see full list of sink plugins, + # please go to https://seatunnel.apache.org/docs/connector-v2/sink/File +} \ No newline at end of file diff --git a/seatunnel-e2e/seatunnel-spark-connector-v2-e2e/connector-file-spark-e2e/src/test/resources/file/local_orc_source_to_console.conf b/seatunnel-e2e/seatunnel-spark-connector-v2-e2e/connector-file-spark-e2e/src/test/resources/file/local_orc_source_to_console.conf new file mode 100644 index 00000000000..4a26c9fb335 --- /dev/null +++ b/seatunnel-e2e/seatunnel-spark-connector-v2-e2e/connector-file-spark-e2e/src/test/resources/file/local_orc_source_to_console.conf @@ -0,0 +1,51 @@ +# +# Licensed to the Apache Software Foundation (ASF) under one or more +# contributor license agreements. See the NOTICE file distributed with +# this work for additional information regarding copyright ownership. +# The ASF licenses this file to You under the Apache License, Version 2.0 +# (the "License"); you may not use this file except in compliance with +# the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +# + +env { + # You can set spark configuration here + spark.app.name = "SeaTunnel" + spark.executor.instances = 2 + spark.executor.cores = 1 + spark.executor.memory = "1g" + spark.master = local + job.mode = "BATCH" +} + +source { + LocalFile { + path = "/tmp/test/orc/" + type = "orc" + } + + # If you would like to get more information about how to configure seatunnel and see full list of source plugins, + # please go to https://seatunnel.apache.org/docs/connector-v2/source/FakeSource +} + +transform { + + + # If you would like to get more information about how to configure seatunnel and see full list of transform plugins, + # please go to https://seatunnel.apache.org/docs/category/transform +} + +sink { + console { + } + + # If you would like to get more information about how to configure seatunnel and see full list of sink plugins, + # please go to https://seatunnel.apache.org/docs/connector-v2/sink/File +} \ No newline at end of file From 0af3001cc8b15e2ecbcf25dbff4c7b434e0ec4ec Mon Sep 17 00:00:00 2001 From: Hisoka Date: Fri, 23 Sep 2022 19:27:48 +0800 Subject: [PATCH 4/8] [Connector-V2] [File] Add ORC sink type --- .../apache/seatunnel/e2e/spark/v2/file/FakeSourceToFileIT.java | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/seatunnel-e2e/seatunnel-spark-connector-v2-e2e/connector-file-spark-e2e/src/test/java/org/apache/seatunnel/e2e/spark/v2/file/FakeSourceToFileIT.java b/seatunnel-e2e/seatunnel-spark-connector-v2-e2e/connector-file-spark-e2e/src/test/java/org/apache/seatunnel/e2e/spark/v2/file/FakeSourceToFileIT.java index fa56da7bcc8..cd928c155eb 100644 --- a/seatunnel-e2e/seatunnel-spark-connector-v2-e2e/connector-file-spark-e2e/src/test/java/org/apache/seatunnel/e2e/spark/v2/file/FakeSourceToFileIT.java +++ b/seatunnel-e2e/seatunnel-spark-connector-v2-e2e/connector-file-spark-e2e/src/test/java/org/apache/seatunnel/e2e/spark/v2/file/FakeSourceToFileIT.java @@ -65,7 +65,7 @@ public void testFakeSourceToLocalFileORC() throws IOException, InterruptedExcept } @Test - public void testLocalFileORCToLocalFileORC() throws IOException, InterruptedException { + public void testLocalFileORCToConsole() throws IOException, InterruptedException { Container.ExecResult execResult = executeSeaTunnelSparkJob("/file/local_orc_source_to_console.conf"); Assertions.assertEquals(0, execResult.getExitCode()); } From 5202035139a3e098c264a331023b8efb98108a7d Mon Sep 17 00:00:00 2001 From: Hisoka Date: Fri, 23 Sep 2022 20:15:35 +0800 Subject: [PATCH 5/8] [Connector-V2] [File] Add ORC sink type --- .../file/sink/writer/OrcWriteStrategy.java | 22 +++++++++---------- 1 file changed, 11 insertions(+), 11 deletions(-) diff --git a/seatunnel-connectors-v2/connector-file/connector-file-base/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/sink/writer/OrcWriteStrategy.java b/seatunnel-connectors-v2/connector-file/connector-file-base/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/sink/writer/OrcWriteStrategy.java index 7e46fa0bc0e..a34b2b7dd40 100644 --- a/seatunnel-connectors-v2/connector-file/connector-file-base/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/sink/writer/OrcWriteStrategy.java +++ b/seatunnel-connectors-v2/connector-file/connector-file-base/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/sink/writer/OrcWriteStrategy.java @@ -28,21 +28,21 @@ import lombok.NonNull; import org.apache.hadoop.fs.Path; -import org.apache.hadoop.hive.common.type.HiveDecimal; -import org.apache.hadoop.hive.ql.exec.vector.BytesColumnVector; -import org.apache.hadoop.hive.ql.exec.vector.ColumnVector; -import org.apache.hadoop.hive.ql.exec.vector.DecimalColumnVector; -import org.apache.hadoop.hive.ql.exec.vector.DoubleColumnVector; -import org.apache.hadoop.hive.ql.exec.vector.ListColumnVector; -import org.apache.hadoop.hive.ql.exec.vector.LongColumnVector; -import org.apache.hadoop.hive.ql.exec.vector.MapColumnVector; -import org.apache.hadoop.hive.ql.exec.vector.StructColumnVector; -import org.apache.hadoop.hive.ql.exec.vector.TimestampColumnVector; -import org.apache.hadoop.hive.ql.exec.vector.VectorizedRowBatch; import org.apache.orc.CompressionKind; import org.apache.orc.OrcFile; import org.apache.orc.TypeDescription; import org.apache.orc.Writer; +import org.apache.orc.storage.common.type.HiveDecimal; +import org.apache.orc.storage.ql.exec.vector.BytesColumnVector; +import org.apache.orc.storage.ql.exec.vector.ColumnVector; +import org.apache.orc.storage.ql.exec.vector.DecimalColumnVector; +import org.apache.orc.storage.ql.exec.vector.DoubleColumnVector; +import org.apache.orc.storage.ql.exec.vector.ListColumnVector; +import org.apache.orc.storage.ql.exec.vector.LongColumnVector; +import org.apache.orc.storage.ql.exec.vector.MapColumnVector; +import org.apache.orc.storage.ql.exec.vector.StructColumnVector; +import org.apache.orc.storage.ql.exec.vector.TimestampColumnVector; +import org.apache.orc.storage.ql.exec.vector.VectorizedRowBatch; import java.io.IOException; import java.math.BigDecimal; From f138921b1f8dd00720432002eb416dd3655c3928 Mon Sep 17 00:00:00 2001 From: Hisoka Date: Sat, 24 Sep 2022 10:15:59 +0800 Subject: [PATCH 6/8] [Connector-V2] [File] Add ORC sink type --- .../e2e/spark/v2/file/FakeSourceToFileIT.java | 10 +++------- 1 file changed, 3 insertions(+), 7 deletions(-) diff --git a/seatunnel-e2e/seatunnel-spark-connector-v2-e2e/connector-file-spark-e2e/src/test/java/org/apache/seatunnel/e2e/spark/v2/file/FakeSourceToFileIT.java b/seatunnel-e2e/seatunnel-spark-connector-v2-e2e/connector-file-spark-e2e/src/test/java/org/apache/seatunnel/e2e/spark/v2/file/FakeSourceToFileIT.java index cd928c155eb..5ade8cd82fd 100644 --- a/seatunnel-e2e/seatunnel-spark-connector-v2-e2e/connector-file-spark-e2e/src/test/java/org/apache/seatunnel/e2e/spark/v2/file/FakeSourceToFileIT.java +++ b/seatunnel-e2e/seatunnel-spark-connector-v2-e2e/connector-file-spark-e2e/src/test/java/org/apache/seatunnel/e2e/spark/v2/file/FakeSourceToFileIT.java @@ -59,14 +59,10 @@ public void testFakeSourceToLocalFileJson() throws IOException, InterruptedExcep } @Test - public void testFakeSourceToLocalFileORC() throws IOException, InterruptedException { + public void testFakeSourceToLocalFileORCAndReadToConsole() throws IOException, InterruptedException { Container.ExecResult execResult = executeSeaTunnelSparkJob("/file/fakesource_to_local_orc.conf"); Assertions.assertEquals(0, execResult.getExitCode()); - } - - @Test - public void testLocalFileORCToConsole() throws IOException, InterruptedException { - Container.ExecResult execResult = executeSeaTunnelSparkJob("/file/local_orc_source_to_console.conf"); - Assertions.assertEquals(0, execResult.getExitCode()); + Container.ExecResult execResult2 = executeSeaTunnelSparkJob("/file/local_orc_source_to_console.conf"); + Assertions.assertEquals(0, execResult2.getExitCode()); } } From 58f06f02c8c3d41c9efd166620ca15c39e91cdd8 Mon Sep 17 00:00:00 2001 From: Hisoka Date: Sat, 24 Sep 2022 17:47:17 +0800 Subject: [PATCH 7/8] [Connector-V2] [File] Add ORC sink type --- .../file/local_orc_source_to_console.conf | 21 +++++++++++++++++++ 1 file changed, 21 insertions(+) diff --git a/seatunnel-e2e/seatunnel-spark-connector-v2-e2e/connector-file-spark-e2e/src/test/resources/file/local_orc_source_to_console.conf b/seatunnel-e2e/seatunnel-spark-connector-v2-e2e/connector-file-spark-e2e/src/test/resources/file/local_orc_source_to_console.conf index 4a26c9fb335..14b688e449b 100644 --- a/seatunnel-e2e/seatunnel-spark-connector-v2-e2e/connector-file-spark-e2e/src/test/resources/file/local_orc_source_to_console.conf +++ b/seatunnel-e2e/seatunnel-spark-connector-v2-e2e/connector-file-spark-e2e/src/test/resources/file/local_orc_source_to_console.conf @@ -46,6 +46,27 @@ sink { console { } + Assert { + rules = + [{ + field_name = c_string + field_type = string + field_value = [ + { + rule_type = NOT_NULL + } + ] + }, { + field_name = c_timestamp + field_type = timestamp + field_value = [ + { + rule_type = NOT_NULL + } + ] + } + ] + } # If you would like to get more information about how to configure seatunnel and see full list of sink plugins, # please go to https://seatunnel.apache.org/docs/connector-v2/sink/File } \ No newline at end of file From b08b1aa0cd657942d12ef2b5efdc1375085ccddb Mon Sep 17 00:00:00 2001 From: Hisoka Date: Mon, 26 Sep 2022 16:42:04 +0800 Subject: [PATCH 8/8] [File][Connector-V2] fix orc test erro --- .../src/test/resources/file/local_orc_source_to_console.conf | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/seatunnel-e2e/seatunnel-spark-connector-v2-e2e/connector-file-spark-e2e/src/test/resources/file/local_orc_source_to_console.conf b/seatunnel-e2e/seatunnel-spark-connector-v2-e2e/connector-file-spark-e2e/src/test/resources/file/local_orc_source_to_console.conf index 14b688e449b..fe64d423d99 100644 --- a/seatunnel-e2e/seatunnel-spark-connector-v2-e2e/connector-file-spark-e2e/src/test/resources/file/local_orc_source_to_console.conf +++ b/seatunnel-e2e/seatunnel-spark-connector-v2-e2e/connector-file-spark-e2e/src/test/resources/file/local_orc_source_to_console.conf @@ -57,8 +57,8 @@ sink { } ] }, { - field_name = c_timestamp - field_type = timestamp + field_name = c_boolean + field_type = boolean field_value = [ { rule_type = NOT_NULL