diff --git a/docs/en/connector-v2/source/CosFile.md b/docs/en/connector-v2/source/CosFile.md index dd1e77ebcfd..236c4b8ca09 100644 --- a/docs/en/connector-v2/source/CosFile.md +++ b/docs/en/connector-v2/source/CosFile.md @@ -56,6 +56,7 @@ Read all the data in a split in a pollNext call. What splits are read will be sa | common-options | | no | - | | sheet_name | string | no | - | | file_filter_pattern | string | no | - | +| compress_codec | string | no | none | ### path [string] @@ -252,6 +253,16 @@ Reader the sheet of the workbook,Only used when file_format is excel. Filter pattern, which used for filtering files. +### compress_codec [string] + +The compress codec of files and the details that supported as the following shown: + +- txt: `lzo` `none` +- json: `lzo` `none` +- csv: `lzo` `none` +- orc/parquet: + automatically recognizes the compression type, no additional settings required. + ## Example ```hocon diff --git a/docs/en/connector-v2/source/FtpFile.md b/docs/en/connector-v2/source/FtpFile.md index c692a7483a6..f526d781f3c 100644 --- a/docs/en/connector-v2/source/FtpFile.md +++ b/docs/en/connector-v2/source/FtpFile.md @@ -49,6 +49,7 @@ If you use SeaTunnel Engine, It automatically integrated the hadoop jar when you | common-options | | no | - | | sheet_name | string | no | - | | file_filter_pattern | string | no | - | +| compress_codec | string | no | none | ### host [string] @@ -228,6 +229,16 @@ Source plugin common parameters, please refer to [Source Common Options](common- Reader the sheet of the workbook,Only used when file_format_type is excel. +### compress_codec [string] + +The compress codec of files and the details that supported as the following shown: + +- txt: `lzo` `none` +- json: `lzo` `none` +- csv: `lzo` `none` +- orc/parquet: + automatically recognizes the compression type, no additional settings required. + ## Example ```hocon diff --git a/docs/en/connector-v2/source/HdfsFile.md b/docs/en/connector-v2/source/HdfsFile.md index 88c1e35f87e..29092296ea7 100644 --- a/docs/en/connector-v2/source/HdfsFile.md +++ b/docs/en/connector-v2/source/HdfsFile.md @@ -57,6 +57,17 @@ Read data from hdfs file system. | schema | config | no | - | the schema fields of upstream data | | common-options | | no | - | Source plugin common parameters, please refer to [Source Common Options](common-options.md) for details. | | sheet_name | string | no | - | Reader the sheet of the workbook,Only used when file_format is excel. | +| compress_codec | string | no | none | The compress codec of files | + +### compress_codec [string] + +The compress codec of files and the details that supported as the following shown: + +- txt: `lzo` `none` +- json: `lzo` `none` +- csv: `lzo` `none` +- orc/parquet: + automatically recognizes the compression type, no additional settings required. ### Tips diff --git a/docs/en/connector-v2/source/Hive.md b/docs/en/connector-v2/source/Hive.md index afa9893d5b2..c2ed0ca0669 100644 --- a/docs/en/connector-v2/source/Hive.md +++ b/docs/en/connector-v2/source/Hive.md @@ -44,6 +44,7 @@ Read all the data in a split in a pollNext call. What splits are read will be sa | read_partitions | list | no | - | | read_columns | list | no | - | | abort_drop_partition_metadata | boolean | no | true | +| compress_codec | string | no | none | | common-options | | no | - | ### table_name [string] @@ -85,6 +86,16 @@ The read column list of the data source, user can use it to implement field proj Flag to decide whether to drop partition metadata from Hive Metastore during an abort operation. Note: this only affects the metadata in the metastore, the data in the partition will always be deleted(data generated during the synchronization process). +### compress_codec [string] + +The compress codec of files and the details that supported as the following shown: + +- txt: `lzo` `none` +- json: `lzo` `none` +- csv: `lzo` `none` +- orc/parquet: + automatically recognizes the compression type, no additional settings required. + ### common options Source plugin common parameters, please refer to [Source Common Options](common-options.md) for details diff --git a/docs/en/connector-v2/source/LocalFile.md b/docs/en/connector-v2/source/LocalFile.md index 80adfa6d9ad..981efd395bc 100644 --- a/docs/en/connector-v2/source/LocalFile.md +++ b/docs/en/connector-v2/source/LocalFile.md @@ -50,6 +50,7 @@ Read all the data in a split in a pollNext call. What splits are read will be sa | common-options | | no | - | | sheet_name | string | no | - | | file_filter_pattern | string | no | - | +| compress_codec | string | no | none | ### path [string] @@ -230,6 +231,16 @@ Reader the sheet of the workbook,Only used when file_format_type is excel. Filter pattern, which used for filtering files. +### compress_codec [string] + +The compress codec of files and the details that supported as the following shown: + +- txt: `lzo` `none` +- json: `lzo` `none` +- csv: `lzo` `none` +- orc/parquet: + automatically recognizes the compression type, no additional settings required. + ## Example ```hocon diff --git a/docs/en/connector-v2/source/OssFile.md b/docs/en/connector-v2/source/OssFile.md index 7c992581f5a..c9b44a3e84d 100644 --- a/docs/en/connector-v2/source/OssFile.md +++ b/docs/en/connector-v2/source/OssFile.md @@ -57,6 +57,7 @@ Read all the data in a split in a pollNext call. What splits are read will be sa | common-options | | no | - | | sheet_name | string | no | - | | file_filter_pattern | string | no | - | +| compress_codec | string | no | none | ### path [string] @@ -249,6 +250,16 @@ Source plugin common parameters, please refer to [Source Common Options](common- Reader the sheet of the workbook,Only used when file_format_type is excel. +### compress_codec [string] + +The compress codec of files and the details that supported as the following shown: + +- txt: `lzo` `none` +- json: `lzo` `none` +- csv: `lzo` `none` +- orc/parquet: + automatically recognizes the compression type, no additional settings required. + ## Example ```hocon diff --git a/docs/en/connector-v2/source/OssJindoFile.md b/docs/en/connector-v2/source/OssJindoFile.md index f77c4a4543a..093d9d4057e 100644 --- a/docs/en/connector-v2/source/OssJindoFile.md +++ b/docs/en/connector-v2/source/OssJindoFile.md @@ -57,6 +57,7 @@ Read all the data in a split in a pollNext call. What splits are read will be sa | common-options | | no | - | | sheet_name | string | no | - | | file_filter_pattern | string | no | - | +| compress_codec | string | no | none | ### path [string] @@ -253,6 +254,16 @@ Reader the sheet of the workbook,Only used when file_format_type is excel. Filter pattern, which used for filtering files. +### compress_codec [string] + +The compress codec of files and the details that supported as the following shown: + +- txt: `lzo` `none` +- json: `lzo` `none` +- csv: `lzo` `none` +- orc/parquet: + automatically recognizes the compression type, no additional settings required. + ## Example ```hocon diff --git a/docs/en/connector-v2/source/S3File.md b/docs/en/connector-v2/source/S3File.md index 54124a37038..9237b92cffc 100644 --- a/docs/en/connector-v2/source/S3File.md +++ b/docs/en/connector-v2/source/S3File.md @@ -214,6 +214,17 @@ If you assign file type to `parquet` `orc`, schema option not required, connecto | schema | config | no | - | The schema of upstream data. | | common-options | | no | - | Source plugin common parameters, please refer to [Source Common Options](common-options.md) for details. | | sheet_name | string | no | - | Reader the sheet of the workbook,Only used when file_format is excel. | +| compress_codec | string | no | none | + +### compress_codec [string] + +The compress codec of files and the details that supported as the following shown: + +- txt: `lzo` `none` +- json: `lzo` `none` +- csv: `lzo` `none` +- orc/parquet: + automatically recognizes the compression type, no additional settings required. ## Example diff --git a/docs/en/connector-v2/source/SftpFile.md b/docs/en/connector-v2/source/SftpFile.md index 184a587a928..c35fc98d58f 100644 --- a/docs/en/connector-v2/source/SftpFile.md +++ b/docs/en/connector-v2/source/SftpFile.md @@ -48,6 +48,7 @@ If you use SeaTunnel Engine, It automatically integrated the hadoop jar when you | common-options | | no | - | | sheet_name | string | no | - | | file_filter_pattern | string | no | - | +| compress_codec | string | no | none | ### host [string] @@ -231,6 +232,16 @@ Reader the sheet of the workbook,Only used when file_format_type is excel. Filter pattern, which used for filtering files. +### compress_codec [string] + +The compress codec of files and the details that supported as the following shown: + +- txt: `lzo` `none` +- json: `lzo` `none` +- csv: `lzo` `none` +- orc/parquet: + automatically recognizes the compression type, no additional settings required. + ## Example ```hocon diff --git a/seatunnel-connectors-v2/connector-file/connector-file-base/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/config/BaseSourceConfig.java b/seatunnel-connectors-v2/connector-file/connector-file-base/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/config/BaseSourceConfig.java index 7571a973e06..a99df97e16c 100644 --- a/seatunnel-connectors-v2/connector-file/connector-file-base/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/config/BaseSourceConfig.java +++ b/seatunnel-connectors-v2/connector-file/connector-file-base/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/config/BaseSourceConfig.java @@ -125,4 +125,10 @@ public class BaseSourceConfig { .noDefaultValue() .withDescription( "File pattern. The connector will filter some files base on the pattern."); + + public static final Option COMPRESS_CODEC = + Options.key("compress_codec") + .enumType(CompressFormat.class) + .defaultValue(CompressFormat.NONE) + .withDescription("Compression codec"); } diff --git a/seatunnel-connectors-v2/connector-file/connector-file-base/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/source/reader/JsonReadStrategy.java b/seatunnel-connectors-v2/connector-file/connector-file-base/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/source/reader/JsonReadStrategy.java index 4441a28faca..3b3b613aba3 100644 --- a/seatunnel-connectors-v2/connector-file/connector-file-base/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/source/reader/JsonReadStrategy.java +++ b/seatunnel-connectors-v2/connector-file/connector-file-base/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/source/reader/JsonReadStrategy.java @@ -22,6 +22,8 @@ import org.apache.seatunnel.api.table.type.SeaTunnelRow; import org.apache.seatunnel.api.table.type.SeaTunnelRowType; import org.apache.seatunnel.common.exception.CommonErrorCode; +import org.apache.seatunnel.connectors.seatunnel.file.config.BaseSourceConfig; +import org.apache.seatunnel.connectors.seatunnel.file.config.CompressFormat; import org.apache.seatunnel.connectors.seatunnel.file.config.HadoopConf; import org.apache.seatunnel.connectors.seatunnel.file.exception.FileConnectorException; import org.apache.seatunnel.format.json.JsonDeserializationSchema; @@ -30,14 +32,29 @@ import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.Path; +import io.airlift.compress.lzo.LzopCodec; +import lombok.extern.slf4j.Slf4j; + import java.io.BufferedReader; import java.io.IOException; +import java.io.InputStream; import java.io.InputStreamReader; import java.nio.charset.StandardCharsets; import java.util.Map; +@Slf4j public class JsonReadStrategy extends AbstractReadStrategy { private DeserializationSchema deserializationSchema; + private CompressFormat compressFormat = BaseSourceConfig.COMPRESS_CODEC.defaultValue(); + + @Override + public void init(HadoopConf conf) { + super.init(conf); + if (pluginConfig.hasPath(BaseSourceConfig.COMPRESS_CODEC.key())) { + String compressCodec = pluginConfig.getString(BaseSourceConfig.COMPRESS_CODEC.key()); + compressFormat = CompressFormat.valueOf(compressCodec.toUpperCase()); + } + } @Override public void setSeaTunnelRowTypeInfo(SeaTunnelRowType seaTunnelRowType) { @@ -58,9 +75,24 @@ public void read(String path, Collector output) FileSystem fs = FileSystem.get(conf); Path filePath = new Path(path); Map partitionsMap = parsePartitionsByPath(path); + InputStream inputStream; + switch (compressFormat) { + case LZO: + LzopCodec lzo = new LzopCodec(); + inputStream = lzo.createInputStream(fs.open(filePath)); + break; + case NONE: + inputStream = fs.open(filePath); + break; + default: + log.warn( + "Text file does not support this compress type: {}", + compressFormat.getCompressCodec()); + inputStream = fs.open(filePath); + break; + } try (BufferedReader reader = - new BufferedReader( - new InputStreamReader(fs.open(filePath), StandardCharsets.UTF_8))) { + new BufferedReader(new InputStreamReader(inputStream, StandardCharsets.UTF_8))) { reader.lines() .forEach( line -> { diff --git a/seatunnel-connectors-v2/connector-file/connector-file-base/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/source/reader/TextReadStrategy.java b/seatunnel-connectors-v2/connector-file/connector-file-base/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/source/reader/TextReadStrategy.java index 4b931cb8902..51892cf99f5 100644 --- a/seatunnel-connectors-v2/connector-file/connector-file-base/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/source/reader/TextReadStrategy.java +++ b/seatunnel-connectors-v2/connector-file/connector-file-base/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/source/reader/TextReadStrategy.java @@ -28,6 +28,7 @@ import org.apache.seatunnel.common.utils.DateUtils; import org.apache.seatunnel.common.utils.TimeUtils; import org.apache.seatunnel.connectors.seatunnel.file.config.BaseSourceConfig; +import org.apache.seatunnel.connectors.seatunnel.file.config.CompressFormat; import org.apache.seatunnel.connectors.seatunnel.file.config.FileFormat; import org.apache.seatunnel.connectors.seatunnel.file.config.HadoopConf; import org.apache.seatunnel.connectors.seatunnel.file.exception.FileConnectorErrorCode; @@ -39,12 +40,17 @@ import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.Path; +import io.airlift.compress.lzo.LzopCodec; +import lombok.extern.slf4j.Slf4j; + import java.io.BufferedReader; import java.io.IOException; +import java.io.InputStream; import java.io.InputStreamReader; import java.nio.charset.StandardCharsets; import java.util.Map; +@Slf4j public class TextReadStrategy extends AbstractReadStrategy { private DeserializationSchema deserializationSchema; private String fieldDelimiter = BaseSourceConfig.DELIMITER.defaultValue(); @@ -52,6 +58,7 @@ public class TextReadStrategy extends AbstractReadStrategy { private DateTimeUtils.Formatter datetimeFormat = BaseSourceConfig.DATETIME_FORMAT.defaultValue(); private TimeUtils.Formatter timeFormat = BaseSourceConfig.TIME_FORMAT.defaultValue(); + private CompressFormat compressFormat = BaseSourceConfig.COMPRESS_CODEC.defaultValue(); private int[] indexes; @Override @@ -61,9 +68,25 @@ public void read(String path, Collector output) FileSystem fs = FileSystem.get(conf); Path filePath = new Path(path); Map partitionsMap = parsePartitionsByPath(path); + InputStream inputStream; + switch (compressFormat) { + case LZO: + LzopCodec lzo = new LzopCodec(); + inputStream = lzo.createInputStream(fs.open(filePath)); + break; + case NONE: + inputStream = fs.open(filePath); + break; + default: + log.warn( + "Text file does not support this compress type: {}", + compressFormat.getCompressCodec()); + inputStream = fs.open(filePath); + break; + } + try (BufferedReader reader = - new BufferedReader( - new InputStreamReader(fs.open(filePath), StandardCharsets.UTF_8))) { + new BufferedReader(new InputStreamReader(inputStream, StandardCharsets.UTF_8))) { reader.lines() .skip(skipHeaderNumber) .forEach( @@ -200,5 +223,9 @@ private void initFormatter() { TimeUtils.Formatter.parse( pluginConfig.getString(BaseSourceConfig.TIME_FORMAT.key())); } + if (pluginConfig.hasPath(BaseSourceConfig.COMPRESS_CODEC.key())) { + String compressCodec = pluginConfig.getString(BaseSourceConfig.COMPRESS_CODEC.key()); + compressFormat = CompressFormat.valueOf(compressCodec.toUpperCase()); + } } } diff --git a/seatunnel-connectors-v2/connector-file/connector-file-cos/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/cos/source/CosFileSourceFactory.java b/seatunnel-connectors-v2/connector-file/connector-file-cos/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/cos/source/CosFileSourceFactory.java index 1e7b6a0198f..78f262d8a2b 100644 --- a/seatunnel-connectors-v2/connector-file/connector-file-cos/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/cos/source/CosFileSourceFactory.java +++ b/seatunnel-connectors-v2/connector-file/connector-file-cos/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/cos/source/CosFileSourceFactory.java @@ -61,6 +61,7 @@ public OptionRule optionRule() { .optional(BaseSourceConfig.DATETIME_FORMAT) .optional(BaseSourceConfig.TIME_FORMAT) .optional(BaseSourceConfig.FILE_FILTER_PATTERN) + .optional(BaseSourceConfig.COMPRESS_CODEC) .build(); } diff --git a/seatunnel-connectors-v2/connector-file/connector-file-ftp/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/ftp/source/FtpFileSourceFactory.java b/seatunnel-connectors-v2/connector-file/connector-file-ftp/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/ftp/source/FtpFileSourceFactory.java index e40938a329e..55d61d75110 100644 --- a/seatunnel-connectors-v2/connector-file/connector-file-ftp/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/ftp/source/FtpFileSourceFactory.java +++ b/seatunnel-connectors-v2/connector-file/connector-file-ftp/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/ftp/source/FtpFileSourceFactory.java @@ -61,6 +61,7 @@ public OptionRule optionRule() { .optional(BaseSourceConfig.DATETIME_FORMAT) .optional(BaseSourceConfig.TIME_FORMAT) .optional(BaseSourceConfig.FILE_FILTER_PATTERN) + .optional(BaseSourceConfig.COMPRESS_CODEC) .build(); } diff --git a/seatunnel-connectors-v2/connector-file/connector-file-hadoop/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/hdfs/source/HdfsFileSourceFactory.java b/seatunnel-connectors-v2/connector-file/connector-file-hadoop/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/hdfs/source/HdfsFileSourceFactory.java index 2e860df7952..2697e7dc45d 100644 --- a/seatunnel-connectors-v2/connector-file/connector-file-hadoop/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/hdfs/source/HdfsFileSourceFactory.java +++ b/seatunnel-connectors-v2/connector-file/connector-file-hadoop/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/hdfs/source/HdfsFileSourceFactory.java @@ -58,6 +58,7 @@ public OptionRule optionRule() { .optional(BaseSourceConfig.DATETIME_FORMAT) .optional(BaseSourceConfig.TIME_FORMAT) .optional(BaseSourceConfig.FILE_FILTER_PATTERN) + .optional(BaseSourceConfig.COMPRESS_CODEC) .build(); } diff --git a/seatunnel-connectors-v2/connector-file/connector-file-jindo-oss/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/oss/source/OssFileSourceFactory.java b/seatunnel-connectors-v2/connector-file/connector-file-jindo-oss/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/oss/source/OssFileSourceFactory.java index ab0a44d750e..02f2357b66f 100644 --- a/seatunnel-connectors-v2/connector-file/connector-file-jindo-oss/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/oss/source/OssFileSourceFactory.java +++ b/seatunnel-connectors-v2/connector-file/connector-file-jindo-oss/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/oss/source/OssFileSourceFactory.java @@ -61,6 +61,7 @@ public OptionRule optionRule() { .optional(BaseSourceConfig.DATETIME_FORMAT) .optional(BaseSourceConfig.TIME_FORMAT) .optional(BaseSourceConfig.FILE_FILTER_PATTERN) + .optional(BaseSourceConfig.COMPRESS_CODEC) .build(); } diff --git a/seatunnel-connectors-v2/connector-file/connector-file-oss/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/oss/source/OssFileSourceFactory.java b/seatunnel-connectors-v2/connector-file/connector-file-oss/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/oss/source/OssFileSourceFactory.java index 11b0b06dc7d..c01893c1f85 100644 --- a/seatunnel-connectors-v2/connector-file/connector-file-oss/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/oss/source/OssFileSourceFactory.java +++ b/seatunnel-connectors-v2/connector-file/connector-file-oss/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/oss/source/OssFileSourceFactory.java @@ -61,6 +61,7 @@ public OptionRule optionRule() { .optional(BaseSourceConfig.DATETIME_FORMAT) .optional(BaseSourceConfig.TIME_FORMAT) .optional(BaseSourceConfig.FILE_FILTER_PATTERN) + .optional(BaseSourceConfig.COMPRESS_CODEC) .build(); } diff --git a/seatunnel-connectors-v2/connector-file/connector-file-s3/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/s3/source/S3FileSourceFactory.java b/seatunnel-connectors-v2/connector-file/connector-file-s3/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/s3/source/S3FileSourceFactory.java index 8fc55d2681c..87a1ae3293c 100644 --- a/seatunnel-connectors-v2/connector-file/connector-file-s3/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/s3/source/S3FileSourceFactory.java +++ b/seatunnel-connectors-v2/connector-file/connector-file-s3/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/s3/source/S3FileSourceFactory.java @@ -66,6 +66,7 @@ public OptionRule optionRule() { .optional(BaseSourceConfig.DATETIME_FORMAT) .optional(BaseSourceConfig.TIME_FORMAT) .optional(BaseSourceConfig.FILE_FILTER_PATTERN) + .optional(BaseSourceConfig.COMPRESS_CODEC) .build(); } diff --git a/seatunnel-connectors-v2/connector-file/connector-file-sftp/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/sftp/source/SftpFileSourceFactory.java b/seatunnel-connectors-v2/connector-file/connector-file-sftp/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/sftp/source/SftpFileSourceFactory.java index 8d60c01d209..60156132226 100644 --- a/seatunnel-connectors-v2/connector-file/connector-file-sftp/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/sftp/source/SftpFileSourceFactory.java +++ b/seatunnel-connectors-v2/connector-file/connector-file-sftp/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/sftp/source/SftpFileSourceFactory.java @@ -61,6 +61,7 @@ public OptionRule optionRule() { .optional(BaseSourceConfig.DATETIME_FORMAT) .optional(BaseSourceConfig.TIME_FORMAT) .optional(BaseSourceConfig.FILE_FILTER_PATTERN) + .optional(BaseSourceConfig.COMPRESS_CODEC) .build(); } diff --git a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-file-local-e2e/pom.xml b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-file-local-e2e/pom.xml index 7a581bbc3a9..ea120abdd31 100644 --- a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-file-local-e2e/pom.xml +++ b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-file-local-e2e/pom.xml @@ -38,6 +38,19 @@ ${project.version} test + + org.apache.seatunnel + seatunnel-hadoop3-3.1.4-uber + ${project.version} + optional + test + + + org.apache.avro + avro + + + org.apache.seatunnel connector-assert diff --git a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-file-local-e2e/src/test/java/org/apache/seatunnel/e2e/connector/file/local/LocalFileIT.java b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-file-local-e2e/src/test/java/org/apache/seatunnel/e2e/connector/file/local/LocalFileIT.java index aed35767263..8478f7de12d 100644 --- a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-file-local-e2e/src/test/java/org/apache/seatunnel/e2e/connector/file/local/LocalFileIT.java +++ b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-file-local-e2e/src/test/java/org/apache/seatunnel/e2e/connector/file/local/LocalFileIT.java @@ -28,7 +28,14 @@ import org.junit.jupiter.api.TestTemplate; +import io.airlift.compress.lzo.LzopCodec; + +import java.io.File; import java.io.IOException; +import java.io.OutputStream; +import java.nio.file.Files; +import java.nio.file.Path; +import java.nio.file.Paths; @DisabledOnContainer( value = {TestContainerId.SPARK_2_4}, @@ -45,11 +52,18 @@ public class LocalFileIT extends TestSuiteBase { "/seatunnel/read/json/name=tyrantlucifer/hobby=coding/e2e.json", container); + Path jsonLzo = convertToLzoFile(ContainerUtil.getResourcesFile("/json/e2e.json")); + ContainerUtil.copyFileIntoContainers( + jsonLzo, "/seatunnel/read/lzo_json/e2e.json", container); + ContainerUtil.copyFileIntoContainers( "/text/e2e.txt", "/seatunnel/read/text/name=tyrantlucifer/hobby=coding/e2e.txt", container); + Path txtLzo = convertToLzoFile(ContainerUtil.getResourcesFile("/text/e2e.txt")); + ContainerUtil.copyFileIntoContainers( + txtLzo, "/seatunnel/read/lzo_text/e2e.txt", container); ContainerUtil.copyFileIntoContainers( "/excel/e2e.xlsx", "/seatunnel/read/excel/name=tyrantlucifer/hobby=coding/e2e.xlsx", @@ -81,6 +95,7 @@ public void testLocalFileReadAndWrite(TestContainer container) helper.execute("/excel/local_excel_projection_to_assert.conf"); // test write local text file helper.execute("/text/fake_to_local_file_text.conf"); + helper.execute("/text/local_file_text_lzo_to_assert.conf"); // test read skip header helper.execute("/text/local_file_text_skip_headers.conf"); // test read local text file @@ -91,6 +106,7 @@ public void testLocalFileReadAndWrite(TestContainer container) helper.execute("/json/fake_to_local_file_json.conf"); // test read local json file helper.execute("/json/local_file_json_to_assert.conf"); + helper.execute("/json/local_file_json_lzo_to_console.conf"); // test write local orc file helper.execute("/orc/fake_to_local_file_orc.conf"); // test read local orc file @@ -106,4 +122,13 @@ public void testLocalFileReadAndWrite(TestContainer container) // test read filtered local file helper.execute("/excel/local_filter_excel_to_assert.conf"); } + + private Path convertToLzoFile(File file) throws IOException { + LzopCodec lzo = new LzopCodec(); + Path path = Paths.get(file.getAbsolutePath() + ".lzo"); + OutputStream outputStream = lzo.createOutputStream(Files.newOutputStream(path)); + outputStream.write(Files.readAllBytes(file.toPath())); + outputStream.close(); + return path; + } } diff --git a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-file-local-e2e/src/test/resources/json/local_file_json_lzo_to_console.conf b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-file-local-e2e/src/test/resources/json/local_file_json_lzo_to_console.conf new file mode 100644 index 00000000000..bab2c38eddf --- /dev/null +++ b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-file-local-e2e/src/test/resources/json/local_file_json_lzo_to_console.conf @@ -0,0 +1,142 @@ +# +# Licensed to the Apache Software Foundation (ASF) under one or more +# contributor license agreements. See the NOTICE file distributed with +# this work for additional information regarding copyright ownership. +# The ASF licenses this file to You under the Apache License, Version 2.0 +# (the "License"); you may not use this file except in compliance with +# the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +# + +env { + execution.parallelism = 1 + spark.app.name = "SeaTunnel" + spark.executor.instances = 2 + spark.executor.cores = 1 + spark.executor.memory = "1g" + spark.master = local + job.mode = "BATCH" +} + +source { + LocalFile { + result_table_name = "fake" + path = "/seatunnel/read/lzo_json" + row_delimiter = "\n" + partition_dir_expression = "${k0}=${v0}" + is_partition_field_write_in_file = true + file_name_expression = "${transactionId}_${now}" + file_format_type = "json" + compress_codec = "lzo" + filename_time_format = "yyyy.MM.dd" + is_enable_transaction = true + schema = { + fields { + c_map = "map" + c_array = "array" + c_string = string + c_boolean = boolean + c_tinyint = tinyint + c_smallint = smallint + c_int = int + c_bigint = bigint + c_float = float + c_double = double + c_bytes = bytes + c_date = date + c_decimal = "decimal(38, 18)" + c_timestamp = timestamp + c_row = { + C_MAP = "map" + C_ARRAY = "array" + C_STRING = string + C_BOOLEAN = boolean + C_TINYINT = tinyint + C_SMALLINT = smallint + C_INT = int + C_BIGINT = bigint + C_FLOAT = float + C_DOUBLE = double + C_BYTES = bytes + C_DATE = date + C_DECIMAL = "decimal(38, 18)" + C_TIMESTAMP = timestamp + } + } + } + } +} + +transform { + sql { + source_table_name = "fake" + result_table_name = "sqlresult" + query = "select * from fake where c_string = 'WArEB'" + } +} + +sink { + Console { + source_table_name = "sqlresult" + } + Assert { + source_table_name = "sqlresult" + rules { + row_rules = [ + { + rule_type = MAX_ROW + rule_value = 1 + }, + { + rule_type = MIN_ROW + rule_value = 1 + } + ], + field_rules = [ + { + field_name = c_string + field_type = string + field_value = [ + { + equals_to = "WArEB" + } + ] + }, + { + field_name = c_boolean + field_type = boolean + field_value = [ + { + rule_type = NOT_NULL + } + ] + }, + { + field_name = c_smallint + field_type = short + field_value = [ + { + equals_to = 15920 + } + ] + }, + { + field_name = c_date + field_type = date + field_value = [ + { + equals_to = "2022-04-27" + } + ] + } + ] + } + } +} \ No newline at end of file diff --git a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-file-local-e2e/src/test/resources/text/local_file_text_lzo_to_assert.conf b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-file-local-e2e/src/test/resources/text/local_file_text_lzo_to_assert.conf new file mode 100644 index 00000000000..80613ec0fcc --- /dev/null +++ b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-file-local-e2e/src/test/resources/text/local_file_text_lzo_to_assert.conf @@ -0,0 +1,139 @@ +# +# Licensed to the Apache Software Foundation (ASF) under one or more +# contributor license agreements. See the NOTICE file distributed with +# this work for additional information regarding copyright ownership. +# The ASF licenses this file to You under the Apache License, Version 2.0 +# (the "License"); you may not use this file except in compliance with +# the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +# + +env { + execution.parallelism = 1 + spark.app.name = "SeaTunnel" + spark.executor.instances = 2 + spark.executor.cores = 1 + spark.executor.memory = "1g" + spark.master = local + job.mode = "BATCH" +} + +source { + LocalFile { + path = "/seatunnel/read/lzo_text" + row_delimiter = "\n" + partition_dir_expression = "${k0}=${v0}" + is_partition_field_write_in_file = true + file_name_expression = "${transactionId}_${now}" + file_format_type = "text" + filename_time_format = "yyyy.MM.dd" + is_enable_transaction = true + compress_codec = "lzo" + schema = { + fields { + c_map = "map" + c_array = "array" + c_string = string + c_boolean = boolean + c_tinyint = tinyint + c_smallint = smallint + c_int = int + c_bigint = bigint + c_float = float + c_double = double + c_bytes = bytes + c_date = date + c_decimal = "decimal(38, 18)" + c_timestamp = timestamp + c_row = { + c_map = "map" + c_array = "array" + c_string = string + c_boolean = boolean + c_tinyint = tinyint + c_smallint = smallint + c_int = int + c_bigint = bigint + c_float = float + c_double = double + c_bytes = bytes + c_date = date + c_decimal = "decimal(38, 18)" + c_timestamp = timestamp + } + } + } + result_table_name = "fake" + } +} + +transform { + sql { + source_table_name = "fake" + result_table_name = "sqlresult" + query = "select * from fake where c_string = 'MTDna'" + } +} + +sink { + Assert { + source_table_name = "sqlresult" + rules { + row_rules = [ + { + rule_type = MAX_ROW + rule_value = 1 + }, + { + rule_type = MIN_ROW + rule_value = 1 + } + ], + field_rules = [ + { + field_name = c_string + field_type = string + field_value = [ + { + equals_to = "MTDna" + } + ] + }, + { + field_name = c_boolean + field_type = boolean + field_value = [ + { + rule_type = NOT_NULL + } + ] + }, + { + field_name = c_smallint + field_type = short + field_value = [ + { + equals_to = 13846 + } + ] + }, + { + field_name = c_date + field_type = date + field_value = [ + { + equals_to = "2023-06-07" + } + ] + } + ] + } + } +} \ No newline at end of file diff --git a/seatunnel-e2e/seatunnel-e2e-common/src/test/java/org/apache/seatunnel/e2e/common/util/ContainerUtil.java b/seatunnel-e2e/seatunnel-e2e-common/src/test/java/org/apache/seatunnel/e2e/common/util/ContainerUtil.java index fa5660a1700..5b213c026e9 100644 --- a/seatunnel-e2e/seatunnel-e2e-common/src/test/java/org/apache/seatunnel/e2e/common/util/ContainerUtil.java +++ b/seatunnel-e2e/seatunnel-e2e-common/src/test/java/org/apache/seatunnel/e2e/common/util/ContainerUtil.java @@ -250,6 +250,11 @@ public static List discoverTestContainers() { public static void copyFileIntoContainers( String fileName, String targetPath, GenericContainer container) { Path path = getResourcesFile(fileName).toPath(); + copyFileIntoContainers(path, targetPath, container); + } + + public static void copyFileIntoContainers( + Path path, String targetPath, GenericContainer container) { container.copyFileToContainer(MountableFile.forHostPath(path), targetPath); } }