Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[Improve][Connector-V2] Improve orc write strategy #2860

Merged
merged 10 commits into from
Sep 26, 2022
Original file line number Diff line number Diff line change
Expand Up @@ -17,9 +17,13 @@

package org.apache.seatunnel.connectors.seatunnel.file.sink.writer;

import org.apache.seatunnel.api.table.type.ArrayType;
import org.apache.seatunnel.api.table.type.BasicType;
import org.apache.seatunnel.api.table.type.DecimalType;
import org.apache.seatunnel.api.table.type.MapType;
import org.apache.seatunnel.api.table.type.SeaTunnelDataType;
import org.apache.seatunnel.api.table.type.SeaTunnelRow;
import org.apache.seatunnel.api.table.type.SeaTunnelRowType;
import org.apache.seatunnel.connectors.seatunnel.file.sink.config.TextFileSinkConfig;

import lombok.NonNull;
Expand All @@ -28,16 +32,29 @@
import org.apache.orc.OrcFile;
import org.apache.orc.TypeDescription;
import org.apache.orc.Writer;
import org.apache.orc.storage.common.type.HiveDecimal;
import org.apache.orc.storage.ql.exec.vector.BytesColumnVector;
import org.apache.orc.storage.ql.exec.vector.ColumnVector;
import org.apache.orc.storage.ql.exec.vector.DecimalColumnVector;
import org.apache.orc.storage.ql.exec.vector.DoubleColumnVector;
import org.apache.orc.storage.ql.exec.vector.ListColumnVector;
import org.apache.orc.storage.ql.exec.vector.LongColumnVector;
import org.apache.orc.storage.ql.exec.vector.MapColumnVector;
import org.apache.orc.storage.ql.exec.vector.StructColumnVector;
import org.apache.orc.storage.ql.exec.vector.TimestampColumnVector;
import org.apache.orc.storage.ql.exec.vector.VectorizedRowBatch;

import java.io.IOException;
import java.math.BigDecimal;
import java.math.BigInteger;
import java.nio.charset.StandardCharsets;
import java.sql.Timestamp;
import java.time.LocalDate;
import java.time.LocalDateTime;
import java.time.LocalTime;
import java.time.temporal.ChronoField;
import java.util.HashMap;
import java.util.List;
import java.util.Map;

public class OrcWriteStrategy extends AbstractWriteStrategy {
Expand Down Expand Up @@ -109,37 +126,53 @@ private Writer getOrCreateWriter(@NonNull String filePath) {
}

private TypeDescription buildFieldWithRowType(SeaTunnelDataType<?> type) {
if (BasicType.BOOLEAN_TYPE.equals(type)) {
return TypeDescription.createBoolean();
switch (type.getSqlType()) {
case ARRAY:
BasicType<?> elementType = ((ArrayType<?, ?>) type).getElementType();
return TypeDescription.createList(buildFieldWithRowType(elementType));
case MAP:
SeaTunnelDataType<?> keyType = ((MapType<?, ?>) type).getKeyType();
SeaTunnelDataType<?> valueType = ((MapType<?, ?>) type).getValueType();
return TypeDescription.createMap(buildFieldWithRowType(keyType), buildFieldWithRowType(valueType));
case STRING:
return TypeDescription.createString();
case BOOLEAN:
return TypeDescription.createBoolean();
case TINYINT:
return TypeDescription.createByte();
case SMALLINT:
return TypeDescription.createShort();
case INT:
return TypeDescription.createInt();
case BIGINT:
return TypeDescription.createLong();
case FLOAT:
return TypeDescription.createFloat();
case DOUBLE:
return TypeDescription.createDouble();
case DECIMAL:
int precision = ((DecimalType) type).getPrecision();
int scale = ((DecimalType) type).getScale();
return TypeDescription.createDecimal().withScale(scale).withPrecision(precision);
case BYTES:
return TypeDescription.createBinary();
case DATE:
return TypeDescription.createDate();
case TIME:
case TIMESTAMP:
return TypeDescription.createTimestamp();
case ROW:
TypeDescription struct = TypeDescription.createStruct();
SeaTunnelDataType<?>[] fieldTypes = ((SeaTunnelRowType) type).getFieldTypes();
for (int i = 0; i < fieldTypes.length; i++) {
struct.addField(((SeaTunnelRowType) type).getFieldName(i), buildFieldWithRowType(fieldTypes[i]));
}
return struct;
case NULL:
default:
String errorMsg = String.format("Orc file not support this type [%s]", type.getSqlType());
throw new UnsupportedOperationException(errorMsg);
}
if (BasicType.SHORT_TYPE.equals(type)) {
return TypeDescription.createShort();
}
if (BasicType.INT_TYPE.equals(type)) {
return TypeDescription.createInt();
}
if (BasicType.LONG_TYPE.equals(type)) {
return TypeDescription.createLong();
}
if (BasicType.FLOAT_TYPE.equals(type)) {
return TypeDescription.createFloat();
}
if (BasicType.DOUBLE_TYPE.equals(type)) {
return TypeDescription.createDouble();
}
if (BasicType.BYTE_TYPE.equals(type)) {
return TypeDescription.createByte();
}
if (BasicType.STRING_TYPE.equals(type)) {
return TypeDescription.createString();
}
if (BasicType.VOID_TYPE.equals(type)) {
return TypeDescription.createString();
}

// TODO map struct array

return TypeDescription.createString();
}

private TypeDescription buildSchemaWithRowType() {
Expand Down Expand Up @@ -169,9 +202,101 @@ private void setColumn(Object value, ColumnVector vector, int row) {
BytesColumnVector bytesColumnVector = (BytesColumnVector) vector;
setByteColumnVector(value, bytesColumnVector, row);
break;
case DECIMAL:
DecimalColumnVector decimalColumnVector = (DecimalColumnVector) vector;
setDecimalColumnVector(value, decimalColumnVector, row);
break;
case TIMESTAMP:
TimestampColumnVector timestampColumnVector = (TimestampColumnVector) vector;
setTimestampColumnVector(value, timestampColumnVector, row);
break;
case LIST:
ListColumnVector listColumnVector = (ListColumnVector) vector;
setListColumnVector(value, listColumnVector, row);
break;
case MAP:
MapColumnVector mapColumnVector = (MapColumnVector) vector;
setMapColumnVector(value, mapColumnVector, row);
break;
case STRUCT:
StructColumnVector structColumnVector = (StructColumnVector) vector;
setStructColumnVector(value, structColumnVector, row);
break;
default:
throw new RuntimeException("Unexpected ColumnVector subtype");
throw new RuntimeException("Unexpected ColumnVector subtype " + vector.type);
}
}
}

private void setStructColumnVector(Object value, StructColumnVector structColumnVector, int row) {
if (value instanceof SeaTunnelRow) {
SeaTunnelRow seaTunnelRow = (SeaTunnelRow) value;
Object[] fields = seaTunnelRow.getFields();
for (int i = 0; i < fields.length; i++) {
setColumn(fields[i], structColumnVector.fields[i], row);
}
} else {
throw new RuntimeException("SeaTunnelRow type expected for field");
}

}

private void setMapColumnVector(Object value, MapColumnVector mapColumnVector, int row) {
if (value instanceof Map) {
Map<?, ?> map = (Map<?, ?>) value;

mapColumnVector.offsets[row] = mapColumnVector.childCount;
mapColumnVector.lengths[row] = map.size();
mapColumnVector.childCount += map.size();

int i = 0;
for (Map.Entry<?, ?> entry : map.entrySet()) {
int mapElem = (int) mapColumnVector.offsets[row] + i;
setColumn(entry.getKey(), mapColumnVector.keys, mapElem);
setColumn(entry.getValue(), mapColumnVector.values, mapElem);
++i;
}
} else {
throw new RuntimeException("Map type expected for field");
}
}

private void setListColumnVector(Object value, ListColumnVector listColumnVector, int row) {
Object[] valueArray;
if (value instanceof Object[]) {
valueArray = (Object[]) value;
} else if (value instanceof List) {
valueArray = ((List<?>) value).toArray();
} else {
throw new RuntimeException("List and Array type expected for field");
}
listColumnVector.offsets[row] = listColumnVector.childCount;
listColumnVector.lengths[row] = valueArray.length;
listColumnVector.childCount += valueArray.length;

for (int i = 0; i < valueArray.length; i++) {
int listElem = (int) listColumnVector.offsets[row] + i;
setColumn(valueArray[i], listColumnVector.child, listElem);
}
}

private void setDecimalColumnVector(Object value, DecimalColumnVector decimalColumnVector, int row) {
if (value instanceof BigDecimal) {
decimalColumnVector.set(row, HiveDecimal.create((BigDecimal) value));
} else {
throw new RuntimeException("BigDecimal type expected for field");
}
}

private void setTimestampColumnVector(Object value, TimestampColumnVector timestampColumnVector, int row) {
if (value instanceof Timestamp) {
timestampColumnVector.set(row, (Timestamp) value);
} else if (value instanceof LocalDateTime) {
timestampColumnVector.set(row, Timestamp.valueOf((LocalDateTime) value));
} else if (value instanceof LocalTime) {
timestampColumnVector.set(row, Timestamp.valueOf(((LocalTime) value).atDate(LocalDate.ofEpochDay(0))));
} else {
throw new RuntimeException("Time series type expected for field");
}
}

Expand All @@ -186,10 +311,12 @@ private void setLongColumnVector(Object value, LongColumnVector longVector, int
} else if (value instanceof BigInteger) {
BigInteger bigInt = (BigInteger) value;
longVector.vector[row] = bigInt.longValue();
} else if (value instanceof Short) {
longVector.vector[row] = (Short) value;
} else if (value instanceof Byte) {
longVector.vector[row] = (Byte) value;
} else if (value instanceof Short) {
longVector.vector[row] = (Short) value;
} else if (value instanceof LocalDate) {
longVector.vector[row] = ((LocalDate) value).getLong(ChronoField.EPOCH_DAY);
} else {
throw new RuntimeException("Long or Integer type expected for field");
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -57,4 +57,12 @@ public void testFakeSourceToLocalFileJson() throws IOException, InterruptedExcep
Container.ExecResult execResult = executeSeaTunnelSparkJob("/file/fakesource_to_local_json.conf");
Assertions.assertEquals(0, execResult.getExitCode());
}

@Test
public void testFakeSourceToLocalFileORCAndReadToConsole() throws IOException, InterruptedException {
Container.ExecResult execResult = executeSeaTunnelSparkJob("/file/fakesource_to_local_orc.conf");
Assertions.assertEquals(0, execResult.getExitCode());
Container.ExecResult execResult2 = executeSeaTunnelSparkJob("/file/local_orc_source_to_console.conf");
Assertions.assertEquals(0, execResult2.getExitCode());
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,78 @@
#
# Licensed to the Apache Software Foundation (ASF) under one or more
# contributor license agreements. See the NOTICE file distributed with
# this work for additional information regarding copyright ownership.
# The ASF licenses this file to You under the Apache License, Version 2.0
# (the "License"); you may not use this file except in compliance with
# the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
#

env {
# You can set spark configuration here
spark.app.name = "SeaTunnel"
spark.executor.instances = 2
spark.executor.cores = 1
spark.executor.memory = "1g"
spark.master = local
job.mode = "BATCH"
}

source {
FakeSource {
result_table_name = "fake"
schema = {
fields {
c_map = "map<string, string>"
c_array = "array<tinyint>"
c_string = string
c_boolean = boolean
c_tinyint = tinyint
c_smallint = smallint
c_int = int
c_bigint = bigint
c_float = float
c_double = double
c_decimal = "decimal(30, 8)"
c_bytes = bytes
c_date = date
c_timestamp = timestamp
}
}
}

# If you would like to get more information about how to configure seatunnel and see full list of source plugins,
# please go to https://seatunnel.apache.org/docs/connector-v2/source/FakeSource
}

transform {
sql {
sql = "select * from fake"
}

# If you would like to get more information about how to configure seatunnel and see full list of transform plugins,
# please go to https://seatunnel.apache.org/docs/category/transform
}

sink {
LocalFile {
path="/tmp/test/orc/"
partition_by=["c_boolean"]
partition_dir_expression="${k0}=${v0}"
is_partition_field_write_in_file=true
file_name_expression="${transactionId}_${now}"
file_format="orc"
filename_time_format="yyyy.MM.dd"
is_enable_transaction=true
}

# If you would like to get more information about how to configure seatunnel and see full list of sink plugins,
# please go to https://seatunnel.apache.org/docs/connector-v2/sink/File
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,51 @@
#
# Licensed to the Apache Software Foundation (ASF) under one or more
# contributor license agreements. See the NOTICE file distributed with
# this work for additional information regarding copyright ownership.
# The ASF licenses this file to You under the Apache License, Version 2.0
# (the "License"); you may not use this file except in compliance with
# the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
#

env {
# You can set spark configuration here
spark.app.name = "SeaTunnel"
spark.executor.instances = 2
spark.executor.cores = 1
spark.executor.memory = "1g"
spark.master = local
job.mode = "BATCH"
}

source {
LocalFile {
path = "/tmp/test/orc/"
type = "orc"
}

# If you would like to get more information about how to configure seatunnel and see full list of source plugins,
# please go to https://seatunnel.apache.org/docs/connector-v2/source/FakeSource
}

transform {


# If you would like to get more information about how to configure seatunnel and see full list of transform plugins,
# please go to https://seatunnel.apache.org/docs/category/transform
}

sink {
console {
Hisoka-X marked this conversation as resolved.
Show resolved Hide resolved
}

# If you would like to get more information about how to configure seatunnel and see full list of sink plugins,
# please go to https://seatunnel.apache.org/docs/connector-v2/sink/File
}