diff --git a/docs/en/connector-v2/sink/LocalFile.md b/docs/en/connector-v2/sink/LocalFile.md index 8e2c1526e907..90e80c6c3728 100644 --- a/docs/en/connector-v2/sink/LocalFile.md +++ b/docs/en/connector-v2/sink/LocalFile.md @@ -30,26 +30,27 @@ By default, we use 2PC commit to ensure `exactly-once` ## Options -| name | type | required | default value | remarks | -|----------------------------------|---------|----------|--------------------------------------------|-----------------------------------------------------------| -| path | string | yes | - | | -| custom_filename | boolean | no | false | Whether you need custom the filename | -| file_name_expression | string | no | "${transactionId}" | Only used when custom_filename is true | -| filename_time_format | string | no | "yyyy.MM.dd" | Only used when custom_filename is true | -| file_format_type | string | no | "csv" | | -| field_delimiter | string | no | '\001' | Only used when file_format_type is text | -| row_delimiter | string | no | "\n" | Only used when file_format_type is text | -| have_partition | boolean | no | false | Whether you need processing partitions. | -| partition_by | array | no | - | Only used then have_partition is true | -| partition_dir_expression | string | no | "${k0}=${v0}/${k1}=${v1}/.../${kn}=${vn}/" | Only used then have_partition is true | -| is_partition_field_write_in_file | boolean | no | false | Only used then have_partition is true | -| sink_columns | array | no | | When this parameter is empty, all fields are sink columns | -| is_enable_transaction | boolean | no | true | | -| batch_size | int | no | 1000000 | | -| compress_codec | string | no | none | | -| common-options | object | no | - | | -| max_rows_in_memory | int | no | - | Only used when file_format_type is excel. | -| sheet_name | string | no | Sheet${Random number} | Only used when file_format_type is excel. | +| name | type | required | default value | remarks | +|----------------------------------|---------|----------|--------------------------------------------|-----------------------------------------------------------------------------------------------| +| path | string | yes | - | | +| custom_filename | boolean | no | false | Whether you need custom the filename | +| file_name_expression | string | no | "${transactionId}" | Only used when custom_filename is true | +| filename_time_format | string | no | "yyyy.MM.dd" | Only used when custom_filename is true | +| file_format_type | string | no | "csv" | | +| field_delimiter | string | no | '\001' | Only used when file_format_type is text | +| row_delimiter | string | no | "\n" | Only used when file_format_type is text | +| have_partition | boolean | no | false | Whether you need processing partitions. | +| partition_by | array | no | - | Only used then have_partition is true | +| partition_dir_expression | string | no | "${k0}=${v0}/${k1}=${v1}/.../${kn}=${vn}/" | Only used then have_partition is true | +| is_partition_field_write_in_file | boolean | no | false | Only used then have_partition is true | +| sink_columns | array | no | | When this parameter is empty, all fields are sink columns | +| is_enable_transaction | boolean | no | true | | +| batch_size | int | no | 1000000 | | +| compress_codec | string | no | none | | +| common-options | object | no | - | | +| max_rows_in_memory | int | no | - | Only used when file_format_type is excel. | +| sheet_name | string | no | Sheet${Random number} | Only used when file_format_type is excel. | +| enable_header_write | boolean | no | false | Only used when file_format_type is text,csv.
false:don't write header,true:write header. | ### path [string] @@ -166,6 +167,10 @@ When File Format is Excel,The maximum number of data items that can be cached in Writer the sheet of the workbook +### enable_header_write [boolean] + +Only used when file_format_type is text,csv.false:don't write header,true:write header. + ## Example For orc file format simple config diff --git a/seatunnel-connectors-v2/connector-file/connector-file-base/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/config/BaseFileSinkConfig.java b/seatunnel-connectors-v2/connector-file/connector-file-base/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/config/BaseFileSinkConfig.java index 9a0ac6c678be..112ab9fa1c63 100644 --- a/seatunnel-connectors-v2/connector-file/connector-file-base/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/config/BaseFileSinkConfig.java +++ b/seatunnel-connectors-v2/connector-file/connector-file-base/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/config/BaseFileSinkConfig.java @@ -46,6 +46,7 @@ public class BaseFileSinkConfig implements DelimiterConfig, Serializable { protected DateUtils.Formatter dateFormat = DateUtils.Formatter.YYYY_MM_DD; protected DateTimeUtils.Formatter datetimeFormat = DateTimeUtils.Formatter.YYYY_MM_DD_HH_MM_SS; protected TimeUtils.Formatter timeFormat = TimeUtils.Formatter.HH_MM_SS; + protected Boolean enableHeaderWriter = false; public BaseFileSinkConfig(@NonNull Config config) { if (config.hasPath(BaseSinkConfig.COMPRESS_CODEC.key())) { @@ -99,6 +100,10 @@ public BaseFileSinkConfig(@NonNull Config config) { timeFormat = TimeUtils.Formatter.parse(config.getString(BaseSinkConfig.TIME_FORMAT.key())); } + + if (config.hasPath(BaseSinkConfig.ENABLE_HEADER_WRITE.key())) { + enableHeaderWriter = config.getBoolean(BaseSinkConfig.ENABLE_HEADER_WRITE.key()); + } } public BaseFileSinkConfig() {} diff --git a/seatunnel-connectors-v2/connector-file/connector-file-base/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/config/BaseSinkConfig.java b/seatunnel-connectors-v2/connector-file/connector-file-base/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/config/BaseSinkConfig.java index 431a5d1daa5e..4f4d09d75cd3 100644 --- a/seatunnel-connectors-v2/connector-file/connector-file-base/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/config/BaseSinkConfig.java +++ b/seatunnel-connectors-v2/connector-file/connector-file-base/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/config/BaseSinkConfig.java @@ -232,4 +232,10 @@ public class BaseSinkConfig { .stringType() .noDefaultValue() .withDescription("To be written sheet name,only valid for excel files"); + + public static final Option ENABLE_HEADER_WRITE = + Options.key("enable_header_write") + .booleanType() + .defaultValue(false) + .withDescription("false:dont write header,true:write header"); } diff --git a/seatunnel-connectors-v2/connector-file/connector-file-base/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/sink/writer/TextWriteStrategy.java b/seatunnel-connectors-v2/connector-file/connector-file-base/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/sink/writer/TextWriteStrategy.java index f309edb70f26..b4b7bdb9558f 100644 --- a/seatunnel-connectors-v2/connector-file/connector-file-base/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/sink/writer/TextWriteStrategy.java +++ b/seatunnel-connectors-v2/connector-file/connector-file-base/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/sink/writer/TextWriteStrategy.java @@ -24,6 +24,7 @@ import org.apache.seatunnel.common.utils.DateTimeUtils; import org.apache.seatunnel.common.utils.DateUtils; import org.apache.seatunnel.common.utils.TimeUtils; +import org.apache.seatunnel.connectors.seatunnel.file.config.FileFormat; import org.apache.seatunnel.connectors.seatunnel.file.exception.FileConnectorException; import org.apache.seatunnel.connectors.seatunnel.file.sink.config.FileSinkConfig; import org.apache.seatunnel.format.text.TextSerializationSchema; @@ -47,6 +48,8 @@ public class TextWriteStrategy extends AbstractWriteStrategy { private final DateUtils.Formatter dateFormat; private final DateTimeUtils.Formatter dateTimeFormat; private final TimeUtils.Formatter timeFormat; + private final FileFormat fileFormat; + private final Boolean enableHeaderWriter; private SerializationSchema serializationSchema; public TextWriteStrategy(FileSinkConfig fileSinkConfig) { @@ -58,6 +61,8 @@ public TextWriteStrategy(FileSinkConfig fileSinkConfig) { this.dateFormat = fileSinkConfig.getDateFormat(); this.dateTimeFormat = fileSinkConfig.getDatetimeFormat(); this.timeFormat = fileSinkConfig.getTimeFormat(); + this.fileFormat = fileSinkConfig.getFileFormat(); + this.enableHeaderWriter = fileSinkConfig.getEnableHeaderWriter(); } @Override @@ -133,15 +138,18 @@ private FSDataOutputStream getOrCreateOutputStream(@NonNull String filePath) { OutputStream out = lzo.createOutputStream(fileSystemUtils.getOutputStream(filePath)); fsDataOutputStream = new FSDataOutputStream(out, null); + enableWriteHeader(fsDataOutputStream); break; case NONE: fsDataOutputStream = fileSystemUtils.getOutputStream(filePath); + enableWriteHeader(fsDataOutputStream); break; default: log.warn( "Text file does not support this compress type: {}", compressFormat.getCompressCodec()); fsDataOutputStream = fileSystemUtils.getOutputStream(filePath); + enableWriteHeader(fsDataOutputStream); break; } beingWrittenOutputStream.put(filePath, fsDataOutputStream); @@ -155,4 +163,15 @@ private FSDataOutputStream getOrCreateOutputStream(@NonNull String filePath) { } return fsDataOutputStream; } + + private void enableWriteHeader(FSDataOutputStream fsDataOutputStream) throws IOException { + if (enableHeaderWriter) { + fsDataOutputStream.write( + String.join( + FileFormat.CSV.equals(fileFormat) ? "," : fieldDelimiter, + seaTunnelRowType.getFieldNames()) + .getBytes()); + fsDataOutputStream.write(rowDelimiter.getBytes()); + } + } } diff --git a/seatunnel-e2e/seatunnel-engine-e2e/connector-seatunnel-e2e-base/src/test/java/org/apache/seatunnel/engine/e2e/TextHeaderIT.java b/seatunnel-e2e/seatunnel-engine-e2e/connector-seatunnel-e2e-base/src/test/java/org/apache/seatunnel/engine/e2e/TextHeaderIT.java new file mode 100644 index 000000000000..7d60fadcde9c --- /dev/null +++ b/seatunnel-e2e/seatunnel-engine-e2e/connector-seatunnel-e2e-base/src/test/java/org/apache/seatunnel/engine/e2e/TextHeaderIT.java @@ -0,0 +1,192 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.seatunnel.engine.e2e; + +import org.apache.seatunnel.common.config.Common; +import org.apache.seatunnel.common.config.DeployMode; +import org.apache.seatunnel.common.utils.FileUtils; +import org.apache.seatunnel.connectors.seatunnel.file.config.BaseSinkConfig; +import org.apache.seatunnel.engine.client.SeaTunnelClient; +import org.apache.seatunnel.engine.client.job.ClientJobProxy; +import org.apache.seatunnel.engine.client.job.JobExecutionEnvironment; +import org.apache.seatunnel.engine.common.config.ConfigProvider; +import org.apache.seatunnel.engine.common.config.JobConfig; +import org.apache.seatunnel.engine.common.config.SeaTunnelConfig; +import org.apache.seatunnel.engine.core.job.JobStatus; +import org.apache.seatunnel.engine.server.SeaTunnelServerStarter; +import org.apache.seatunnel.format.text.constant.TextFormatConstant; + +import org.awaitility.Awaitility; +import org.junit.jupiter.api.Assertions; +import org.junit.jupiter.api.Test; +import org.testcontainers.shaded.org.apache.commons.lang3.tuple.ImmutablePair; + +import com.hazelcast.client.config.ClientConfig; +import com.hazelcast.instance.impl.HazelcastInstanceImpl; +import lombok.Getter; +import lombok.NonNull; +import lombok.Setter; +import lombok.extern.slf4j.Slf4j; + +import java.io.File; +import java.util.ArrayList; +import java.util.HashMap; +import java.util.List; +import java.util.Map; +import java.util.concurrent.CompletableFuture; +import java.util.concurrent.ExecutionException; +import java.util.concurrent.TimeUnit; + +/** + * Cluster fault tolerance test. Test the job recovery capability and data consistency assurance + * capability in case of cluster node failure + */ +@Slf4j +public class TextHeaderIT { + + private String FILE_FORMAT_TYPE = "file_format_type"; + private String ENABLE_HEADER_WRITE = "enable_header_write"; + + @Getter + @Setter + static class ContentHeader { + private String fileStyle; + private String enableWriteHeader; + private String headerName; + + public ContentHeader(String fileStyle, String enableWriteHeader, String headerName) { + this.fileStyle = fileStyle; + this.enableWriteHeader = enableWriteHeader; + this.headerName = headerName; + } + } + + @Test + public void testEnableWriteHeader() { + List lists = new ArrayList<>(); + lists.add( + new ContentHeader( + "text", "true", "name" + TextFormatConstant.SEPARATOR[0] + "age")); + lists.add( + new ContentHeader( + "text", "false", "name" + TextFormatConstant.SEPARATOR[0] + "age")); + lists.add(new ContentHeader("csv", "true", "name,age")); + lists.add(new ContentHeader("csv", "false", "name,age")); + lists.forEach( + t -> { + try { + enableWriteHeader( + t.getFileStyle(), t.getEnableWriteHeader(), t.getHeaderName()); + } catch (ExecutionException | InterruptedException e) { + throw new RuntimeException(e); + } + }); + } + + public void enableWriteHeader(String file_format_type, String headerWrite, String headerContent) + throws ExecutionException, InterruptedException { + String testClusterName = "ClusterFaultToleranceIT_EnableWriteHeaderNode"; + HazelcastInstanceImpl node1 = null; + SeaTunnelClient engineClient = null; + + SeaTunnelConfig seaTunnelConfig = ConfigProvider.locateAndGetSeaTunnelConfig(); + seaTunnelConfig + .getHazelcastConfig() + .setClusterName(TestUtils.getClusterName(testClusterName)); + + try { + node1 = SeaTunnelServerStarter.createHazelcastInstance(seaTunnelConfig); + + // waiting all node added to cluster + HazelcastInstanceImpl finalNode = node1; + Awaitility.await() + .atMost(10000, TimeUnit.MILLISECONDS) + .untilAsserted( + () -> + Assertions.assertEquals( + 1, finalNode.getCluster().getMembers().size())); + + Common.setDeployMode(DeployMode.CLIENT); + ImmutablePair testResources = + createTestResources(headerWrite, file_format_type); + JobConfig jobConfig = new JobConfig(); + jobConfig.setName(headerWrite); + + ClientConfig clientConfig = ConfigProvider.locateAndGetClientConfig(); + clientConfig.setClusterName(TestUtils.getClusterName(testClusterName)); + engineClient = new SeaTunnelClient(clientConfig); + JobExecutionEnvironment jobExecutionEnv = + engineClient.createExecutionContext(testResources.getRight(), jobConfig); + ClientJobProxy clientJobProxy = jobExecutionEnv.execute(); + + CompletableFuture objectCompletableFuture = + CompletableFuture.supplyAsync(clientJobProxy::waitForJobComplete); + Awaitility.await() + .atMost(600000, TimeUnit.MILLISECONDS) + .untilAsserted( + () -> { + Thread.sleep(2000); + Assertions.assertTrue( + objectCompletableFuture.isDone() + && JobStatus.FINISHED.equals( + objectCompletableFuture.get())); + }); + File file = new File(testResources.getLeft()); + for (File targetFile : file.listFiles()) { + String[] texts = + FileUtils.readFileToStr(targetFile.toPath()) + .split(BaseSinkConfig.ROW_DELIMITER.defaultValue()); + if (headerWrite.equals("true")) { + Assertions.assertEquals(headerContent, texts[0]); + } else { + Assertions.assertNotEquals(headerContent, texts[0]); + } + } + log.info("========================clean test resource===================="); + } finally { + if (engineClient != null) { + engineClient.shutdown(); + } + if (node1 != null) { + node1.shutdown(); + } + } + } + + private ImmutablePair createTestResources( + @NonNull String headerWrite, @NonNull String formatType) { + Map valueMap = new HashMap<>(); + valueMap.put(ENABLE_HEADER_WRITE, headerWrite); + valueMap.put(FILE_FORMAT_TYPE, formatType); + String targetDir = "/tmp/text"; + targetDir = targetDir.replace("/", File.separator); + // clear target dir before test + FileUtils.createNewDir(targetDir); + String targetConfigFilePath = + File.separator + + "tmp" + + File.separator + + "test_conf" + + File.separator + + headerWrite + + ".conf"; + TestUtils.createTestConfigFileFromTemplate( + "batch_fakesource_to_file_header.conf", valueMap, targetConfigFilePath); + return new ImmutablePair<>(targetDir, targetConfigFilePath); + } +} diff --git a/seatunnel-e2e/seatunnel-engine-e2e/connector-seatunnel-e2e-base/src/test/resources/batch_fakesource_to_file_header.conf b/seatunnel-e2e/seatunnel-engine-e2e/connector-seatunnel-e2e-base/src/test/resources/batch_fakesource_to_file_header.conf new file mode 100644 index 000000000000..96ec46dc2eec --- /dev/null +++ b/seatunnel-e2e/seatunnel-engine-e2e/connector-seatunnel-e2e-base/src/test/resources/batch_fakesource_to_file_header.conf @@ -0,0 +1,49 @@ +# +# Licensed to the Apache Software Foundation (ASF) under one or more +# contributor license agreements. See the NOTICE file distributed with +# this work for additional information regarding copyright ownership. +# The ASF licenses this file to You under the Apache License, Version 2.0 +# (the "License"); you may not use this file except in compliance with +# the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +# +###### +###### This config file is a demonstration of streaming processing in seatunnel config +###### + +env { + execution.parallelism = 1 + job.mode = "BATCH" +} + +# Create a source to connect to Mongodb +source { + # This is a example source plugin **only for test and demonstrate the feature source plugin** + FakeSource { + parallelism = 1 + result_table_name = "fake" + row.num = 1 + schema = { + fields { + name = "string" + age = "int" + } + } + } +} + +sink { + +LocalFile { + path = "/tmp/text" + file_format_type="${file_format_type}" + enable_header_write="${enable_header_write}" +} +}