diff --git a/ydb/library/yql/providers/s3/compressors/zstd.cpp b/ydb/library/yql/providers/s3/compressors/zstd.cpp index 0d7d8217ffd6..7890537ca125 100644 --- a/ydb/library/yql/providers/s3/compressors/zstd.cpp +++ b/ydb/library/yql/providers/s3/compressors/zstd.cpp @@ -14,6 +14,7 @@ TReadBuffer::TReadBuffer(NDB::ReadBuffer& source) InBuffer.resize(8_KB); OutBuffer.resize(64_KB); Offset_ = InBuffer.size(); + Size_ = InBuffer.size(); } TReadBuffer::~TReadBuffer() { @@ -21,13 +22,14 @@ TReadBuffer::~TReadBuffer() { } bool TReadBuffer::nextImpl() { - ::ZSTD_inBuffer zIn{InBuffer.data(), InBuffer.size(), Offset_}; + ::ZSTD_inBuffer zIn{InBuffer.data(), Size_, Offset_}; ::ZSTD_outBuffer zOut{OutBuffer.data(), OutBuffer.size(), 0ULL}; size_t returnCode = 0ULL; if (!Finished_) do { if (zIn.pos == zIn.size) { zIn.size = Source_.read(InBuffer.data(), InBuffer.size()); + Size_ = zIn.size; zIn.pos = Offset_ = 0; if (!zIn.size) { diff --git a/ydb/library/yql/providers/s3/compressors/zstd.h b/ydb/library/yql/providers/s3/compressors/zstd.h index 1f72dcc034c3..9fc83ef09f48 100644 --- a/ydb/library/yql/providers/s3/compressors/zstd.h +++ b/ydb/library/yql/providers/s3/compressors/zstd.h @@ -19,7 +19,8 @@ class TReadBuffer : public NDB::ReadBuffer { NDB::ReadBuffer& Source_; std::vector InBuffer, OutBuffer; ::ZSTD_DStream *const ZCtx_; - size_t Offset_; + size_t Offset_; + size_t Size_; bool Finished_ = false; }; diff --git a/ydb/tests/fq/s3/test_compression_data/big.json.br b/ydb/tests/fq/s3/test_compression_data/big.json.br new file mode 100644 index 000000000000..2e08fa90c411 --- /dev/null +++ b/ydb/tests/fq/s3/test_compression_data/big.json.br @@ -0,0 +1,3 @@ +S +;…^ +KlġR\-!0 \ No newline at end of file diff --git a/ydb/tests/fq/s3/test_compression_data/big.json.bz2 b/ydb/tests/fq/s3/test_compression_data/big.json.bz2 new file mode 100644 index 000000000000..cd5f286a97f4 Binary files /dev/null and b/ydb/tests/fq/s3/test_compression_data/big.json.bz2 differ diff --git a/ydb/tests/fq/s3/test_compression_data/big.json.gz b/ydb/tests/fq/s3/test_compression_data/big.json.gz new file mode 100644 index 000000000000..fa18c6a5088e Binary files /dev/null and b/ydb/tests/fq/s3/test_compression_data/big.json.gz differ diff --git a/ydb/tests/fq/s3/test_compression_data/big.json.lz4 b/ydb/tests/fq/s3/test_compression_data/big.json.lz4 new file mode 100644 index 000000000000..36a2097ff1f1 Binary files /dev/null and b/ydb/tests/fq/s3/test_compression_data/big.json.lz4 differ diff --git a/ydb/tests/fq/s3/test_compression_data/big.json.xz b/ydb/tests/fq/s3/test_compression_data/big.json.xz new file mode 100644 index 000000000000..6e0196502ce2 Binary files /dev/null and b/ydb/tests/fq/s3/test_compression_data/big.json.xz differ diff --git a/ydb/tests/fq/s3/test_compression_data/big.json.zst b/ydb/tests/fq/s3/test_compression_data/big.json.zst new file mode 100644 index 000000000000..1ece8f7a9793 Binary files /dev/null and b/ydb/tests/fq/s3/test_compression_data/big.json.zst differ diff --git a/ydb/tests/fq/s3/test_compression_data/unknown_frame_descriptor.json.zst b/ydb/tests/fq/s3/test_compression_data/unknown_frame_descriptor.json.zst new file mode 100644 index 000000000000..1ece8f7a9793 Binary files /dev/null and b/ydb/tests/fq/s3/test_compression_data/unknown_frame_descriptor.json.zst differ diff --git a/ydb/tests/fq/s3/test_compressions.py b/ydb/tests/fq/s3/test_compressions.py index cecb71c093e1..76aa87989a1b 100644 --- a/ydb/tests/fq/s3/test_compressions.py +++ b/ydb/tests/fq/s3/test_compressions.py @@ -62,6 +62,41 @@ def test_compression(self, kikimr, s3, client, filename, compression): result_set = data.result.result_set self.validate_result(result_set) + @yq_all + @pytest.mark.parametrize("filename, compression", [ + ("big.json.gz", "gzip"), + ("big.json.lz4", "lz4"), + ("big.json.br", "brotli"), + ("big.json.bz2", "bzip2"), + ("big.json.zst", "zstd"), + ("big.json.xz", "xz") + ]) + def test_big_compression(self, kikimr, s3, client, filename, compression): + self.create_bucket_and_upload_file(filename, s3, kikimr) + storage_connection_name = "tbc_fruitbucket" + client.create_storage_connection(storage_connection_name, "fbucket") + + sql = ''' + SELECT count(*) + FROM `{}`.`{}` + WITH (format=json_each_row, compression="{}", SCHEMA ( + a String NOT NULL + )); + '''.format(storage_connection_name, filename, compression) + + query_id = client.create_query("simple", sql, type=fq.QueryContent.QueryType.ANALYTICS).result.query_id + client.wait_query_status(query_id, fq.QueryMeta.COMPLETED) + + data = client.get_result_data(query_id) + result_set = data.result.result_set + + logging.debug(str(result_set)) + assert len(result_set.columns) == 1 + assert result_set.columns[0].name == "column0" + assert result_set.columns[0].type.type_id == ydb.Type.UINT64 + assert len(result_set.rows) == 1 + assert result_set.rows[0].items[0].uint64_value == 5458 + @yq_all @pytest.mark.parametrize("client", [{"folder_id": "my_folder"}], indirect=True) def test_invalid_compression(self, kikimr, s3, client):