Skip to content

Commit

Permalink
[Feature] [File Connector] Supports writing column names when the out…
Browse files Browse the repository at this point in the history
…put type is file (CSV) (apache#5459)

* [Feature] [File Connector] Supports writing column names when the output type is file (CSV) apache#5443

* [Feature] [File Connector] fix code style and lineSeparator apache#5443

* [Feature] [File Connector] add enable_header_write,false:dont write header,true:write header. apache#5443

* [Feature] [File Connector] fix code style apache#5443

* [Feature] [File Connector] add enable_header_write explain apache#5443

* [Feature] [File Connector]fix code style apache#5443

* Update docs/en/connector-v2/sink/LocalFile.md

* Update docs/en/connector-v2/sink/LocalFile.md

* [Feature] [File Connector]fix code style apache#5443

* [Feature] [File Connector]add junit test apache#5443

* [Feature] [File Connector]add license header: apache#5443

* [Feature] [File Connector] Supports writing column names when the output type is file (CSV) apache#5443

* [Feature] [File Connector] fix code style and lineSeparator apache#5443

* [Feature] [File Connector] add enable_header_write,false:dont write header,true:write header. apache#5443

* [Feature] [File Connector] fix code style apache#5443

* [Feature] [File Connector] add enable_header_write explain apache#5443

* [Feature] [File Connector]fix code style apache#5443

* Update docs/en/connector-v2/sink/LocalFile.md

* Update docs/en/connector-v2/sink/LocalFile.md

* [Feature] [File Connector]fix code style apache#5443

* [Feature] [File Connector]add junit test apache#5443

* [Feature] [File Connector]add license header: apache#5443

* [Feature] [File Connector]add junit: apache#5443

* [Feature] [File Connector]add junit: apache#5443

* [Feature] [File Connector]remove scala: apache#5443

* [Feature] [File Connector]modify md style: apache#5443

* [Feature] [File Connector] Supports writing column names when the output type is file (CSV) apache#5443

* [Feature] [File Connector] fix code style and lineSeparator apache#5443

* [Feature] [File Connector] add enable_header_write,false:dont write header,true:write header. apache#5443

* [Feature] [File Connector] fix code style apache#5443

* [Feature] [File Connector] add enable_header_write explain apache#5443

* [Feature] [File Connector]fix code style apache#5443

* Update docs/en/connector-v2/sink/LocalFile.md

* Update docs/en/connector-v2/sink/LocalFile.md

* [Feature] [File Connector]fix code style apache#5443

* [Feature] [File Connector]add junit test apache#5443

* [Feature] [File Connector]add license header: apache#5443

* [Feature] [File Connector]add junit: apache#5443

* [Feature] [File Connector]add junit: apache#5443

* [Feature] [File Connector]remove scala: apache#5443

* [Feature] [File Connector]modify md style: apache#5443

* [Feature] [File Connector]junit modify: apache#5443

---------

Co-authored-by: zck <573693104@qq.com>
Co-authored-by: Eric <gaojun2048@gmail.com>
  • Loading branch information
3 people authored and gnehil committed Oct 12, 2023
1 parent 230bbba commit 1611609
Show file tree
Hide file tree
Showing 6 changed files with 296 additions and 20 deletions.
45 changes: 25 additions & 20 deletions docs/en/connector-v2/sink/LocalFile.md
Original file line number Diff line number Diff line change
Expand Up @@ -30,26 +30,27 @@ By default, we use 2PC commit to ensure `exactly-once`

## Options

| name | type | required | default value | remarks |
|----------------------------------|---------|----------|--------------------------------------------|-----------------------------------------------------------|
| path | string | yes | - | |
| custom_filename | boolean | no | false | Whether you need custom the filename |
| file_name_expression | string | no | "${transactionId}" | Only used when custom_filename is true |
| filename_time_format | string | no | "yyyy.MM.dd" | Only used when custom_filename is true |
| file_format_type | string | no | "csv" | |
| field_delimiter | string | no | '\001' | Only used when file_format_type is text |
| row_delimiter | string | no | "\n" | Only used when file_format_type is text |
| have_partition | boolean | no | false | Whether you need processing partitions. |
| partition_by | array | no | - | Only used then have_partition is true |
| partition_dir_expression | string | no | "${k0}=${v0}/${k1}=${v1}/.../${kn}=${vn}/" | Only used then have_partition is true |
| is_partition_field_write_in_file | boolean | no | false | Only used then have_partition is true |
| sink_columns | array | no | | When this parameter is empty, all fields are sink columns |
| is_enable_transaction | boolean | no | true | |
| batch_size | int | no | 1000000 | |
| compress_codec | string | no | none | |
| common-options | object | no | - | |
| max_rows_in_memory | int | no | - | Only used when file_format_type is excel. |
| sheet_name | string | no | Sheet${Random number} | Only used when file_format_type is excel. |
| name | type | required | default value | remarks |
|----------------------------------|---------|----------|--------------------------------------------|-----------------------------------------------------------------------------------------------|
| path | string | yes | - | |
| custom_filename | boolean | no | false | Whether you need custom the filename |
| file_name_expression | string | no | "${transactionId}" | Only used when custom_filename is true |
| filename_time_format | string | no | "yyyy.MM.dd" | Only used when custom_filename is true |
| file_format_type | string | no | "csv" | |
| field_delimiter | string | no | '\001' | Only used when file_format_type is text |
| row_delimiter | string | no | "\n" | Only used when file_format_type is text |
| have_partition | boolean | no | false | Whether you need processing partitions. |
| partition_by | array | no | - | Only used then have_partition is true |
| partition_dir_expression | string | no | "${k0}=${v0}/${k1}=${v1}/.../${kn}=${vn}/" | Only used then have_partition is true |
| is_partition_field_write_in_file | boolean | no | false | Only used then have_partition is true |
| sink_columns | array | no | | When this parameter is empty, all fields are sink columns |
| is_enable_transaction | boolean | no | true | |
| batch_size | int | no | 1000000 | |
| compress_codec | string | no | none | |
| common-options | object | no | - | |
| max_rows_in_memory | int | no | - | Only used when file_format_type is excel. |
| sheet_name | string | no | Sheet${Random number} | Only used when file_format_type is excel. |
| enable_header_write | boolean | no | false | Only used when file_format_type is text,csv.<br/> false:don't write header,true:write header. |

### path [string]

Expand Down Expand Up @@ -166,6 +167,10 @@ When File Format is Excel,The maximum number of data items that can be cached in

Writer the sheet of the workbook

### enable_header_write [boolean]

Only used when file_format_type is text,csv.false:don't write header,true:write header.

## Example

For orc file format simple config
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -46,6 +46,7 @@ public class BaseFileSinkConfig implements DelimiterConfig, Serializable {
protected DateUtils.Formatter dateFormat = DateUtils.Formatter.YYYY_MM_DD;
protected DateTimeUtils.Formatter datetimeFormat = DateTimeUtils.Formatter.YYYY_MM_DD_HH_MM_SS;
protected TimeUtils.Formatter timeFormat = TimeUtils.Formatter.HH_MM_SS;
protected Boolean enableHeaderWriter = false;

public BaseFileSinkConfig(@NonNull Config config) {
if (config.hasPath(BaseSinkConfig.COMPRESS_CODEC.key())) {
Expand Down Expand Up @@ -99,6 +100,10 @@ public BaseFileSinkConfig(@NonNull Config config) {
timeFormat =
TimeUtils.Formatter.parse(config.getString(BaseSinkConfig.TIME_FORMAT.key()));
}

if (config.hasPath(BaseSinkConfig.ENABLE_HEADER_WRITE.key())) {
enableHeaderWriter = config.getBoolean(BaseSinkConfig.ENABLE_HEADER_WRITE.key());
}
}

public BaseFileSinkConfig() {}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -232,4 +232,10 @@ public class BaseSinkConfig {
.stringType()
.noDefaultValue()
.withDescription("To be written sheet name,only valid for excel files");

public static final Option<Boolean> ENABLE_HEADER_WRITE =
Options.key("enable_header_write")
.booleanType()
.defaultValue(false)
.withDescription("false:dont write header,true:write header");
}
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@
import org.apache.seatunnel.common.utils.DateTimeUtils;
import org.apache.seatunnel.common.utils.DateUtils;
import org.apache.seatunnel.common.utils.TimeUtils;
import org.apache.seatunnel.connectors.seatunnel.file.config.FileFormat;
import org.apache.seatunnel.connectors.seatunnel.file.exception.FileConnectorException;
import org.apache.seatunnel.connectors.seatunnel.file.sink.config.FileSinkConfig;
import org.apache.seatunnel.format.text.TextSerializationSchema;
Expand All @@ -47,6 +48,8 @@ public class TextWriteStrategy extends AbstractWriteStrategy {
private final DateUtils.Formatter dateFormat;
private final DateTimeUtils.Formatter dateTimeFormat;
private final TimeUtils.Formatter timeFormat;
private final FileFormat fileFormat;
private final Boolean enableHeaderWriter;
private SerializationSchema serializationSchema;

public TextWriteStrategy(FileSinkConfig fileSinkConfig) {
Expand All @@ -58,6 +61,8 @@ public TextWriteStrategy(FileSinkConfig fileSinkConfig) {
this.dateFormat = fileSinkConfig.getDateFormat();
this.dateTimeFormat = fileSinkConfig.getDatetimeFormat();
this.timeFormat = fileSinkConfig.getTimeFormat();
this.fileFormat = fileSinkConfig.getFileFormat();
this.enableHeaderWriter = fileSinkConfig.getEnableHeaderWriter();
}

@Override
Expand Down Expand Up @@ -133,15 +138,18 @@ private FSDataOutputStream getOrCreateOutputStream(@NonNull String filePath) {
OutputStream out =
lzo.createOutputStream(fileSystemUtils.getOutputStream(filePath));
fsDataOutputStream = new FSDataOutputStream(out, null);
enableWriteHeader(fsDataOutputStream);
break;
case NONE:
fsDataOutputStream = fileSystemUtils.getOutputStream(filePath);
enableWriteHeader(fsDataOutputStream);
break;
default:
log.warn(
"Text file does not support this compress type: {}",
compressFormat.getCompressCodec());
fsDataOutputStream = fileSystemUtils.getOutputStream(filePath);
enableWriteHeader(fsDataOutputStream);
break;
}
beingWrittenOutputStream.put(filePath, fsDataOutputStream);
Expand All @@ -155,4 +163,15 @@ private FSDataOutputStream getOrCreateOutputStream(@NonNull String filePath) {
}
return fsDataOutputStream;
}

private void enableWriteHeader(FSDataOutputStream fsDataOutputStream) throws IOException {
if (enableHeaderWriter) {
fsDataOutputStream.write(
String.join(
FileFormat.CSV.equals(fileFormat) ? "," : fieldDelimiter,
seaTunnelRowType.getFieldNames())
.getBytes());
fsDataOutputStream.write(rowDelimiter.getBytes());
}
}
}
Loading

0 comments on commit 1611609

Please sign in to comment.