Skip to content

Commit

Permalink
Browse files Browse the repository at this point in the history
  • Loading branch information
dorooleg authored and uzhastik committed Jun 20, 2024
1 parent 57dd8de commit 8db82ed
Show file tree
Hide file tree
Showing 10 changed files with 43 additions and 2 deletions.
4 changes: 3 additions & 1 deletion ydb/library/yql/providers/s3/compressors/zstd.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -14,20 +14,22 @@ TReadBuffer::TReadBuffer(NDB::ReadBuffer& source)
InBuffer.resize(8_KB);
OutBuffer.resize(64_KB);
Offset_ = InBuffer.size();
Size_ = InBuffer.size();
}

TReadBuffer::~TReadBuffer() {
::ZSTD_freeDStream(ZCtx_);
}

bool TReadBuffer::nextImpl() {
::ZSTD_inBuffer zIn{InBuffer.data(), InBuffer.size(), Offset_};
::ZSTD_inBuffer zIn{InBuffer.data(), Size_, Offset_};
::ZSTD_outBuffer zOut{OutBuffer.data(), OutBuffer.size(), 0ULL};

size_t returnCode = 0ULL;
if (!Finished_) do {
if (zIn.pos == zIn.size) {
zIn.size = Source_.read(InBuffer.data(), InBuffer.size());
Size_ = zIn.size;

zIn.pos = Offset_ = 0;
if (!zIn.size) {
Expand Down
3 changes: 2 additions & 1 deletion ydb/library/yql/providers/s3/compressors/zstd.h
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,8 @@ class TReadBuffer : public NDB::ReadBuffer {
NDB::ReadBuffer& Source_;
std::vector<char> InBuffer, OutBuffer;
::ZSTD_DStream *const ZCtx_;
size_t Offset_;
size_t Offset_;
size_t Size_;
bool Finished_ = false;
};

Expand Down
3 changes: 3 additions & 0 deletions ydb/tests/fq/s3/test_compression_data/big.json.br
Original file line number Diff line number Diff line change
@@ -0,0 +1,3 @@
��S
��;�…�����^�
K�lġ�R�\���-!0
Binary file added ydb/tests/fq/s3/test_compression_data/big.json.bz2
Binary file not shown.
Binary file added ydb/tests/fq/s3/test_compression_data/big.json.gz
Binary file not shown.
Binary file added ydb/tests/fq/s3/test_compression_data/big.json.lz4
Binary file not shown.
Binary file added ydb/tests/fq/s3/test_compression_data/big.json.xz
Binary file not shown.
Binary file added ydb/tests/fq/s3/test_compression_data/big.json.zst
Binary file not shown.
Binary file not shown.
35 changes: 35 additions & 0 deletions ydb/tests/fq/s3/test_compressions.py
Original file line number Diff line number Diff line change
Expand Up @@ -62,6 +62,41 @@ def test_compression(self, kikimr, s3, client, filename, compression):
result_set = data.result.result_set
self.validate_result(result_set)

@yq_all
@pytest.mark.parametrize("filename, compression", [
("big.json.gz", "gzip"),
("big.json.lz4", "lz4"),
("big.json.br", "brotli"),
("big.json.bz2", "bzip2"),
("big.json.zst", "zstd"),
("big.json.xz", "xz")
])
def test_big_compression(self, kikimr, s3, client, filename, compression):
self.create_bucket_and_upload_file(filename, s3, kikimr)
storage_connection_name = "tbc_fruitbucket"
client.create_storage_connection(storage_connection_name, "fbucket")

sql = '''
SELECT count(*)
FROM `{}`.`{}`
WITH (format=json_each_row, compression="{}", SCHEMA (
a String NOT NULL
));
'''.format(storage_connection_name, filename, compression)

query_id = client.create_query("simple", sql, type=fq.QueryContent.QueryType.ANALYTICS).result.query_id
client.wait_query_status(query_id, fq.QueryMeta.COMPLETED)

data = client.get_result_data(query_id)
result_set = data.result.result_set

logging.debug(str(result_set))
assert len(result_set.columns) == 1
assert result_set.columns[0].name == "column0"
assert result_set.columns[0].type.type_id == ydb.Type.UINT64
assert len(result_set.rows) == 1
assert result_set.rows[0].items[0].uint64_value == 5458

@yq_all
@pytest.mark.parametrize("client", [{"folder_id": "my_folder"}], indirect=True)
def test_invalid_compression(self, kikimr, s3, client):
Expand Down

0 comments on commit 8db82ed

Please sign in to comment.