Skip to content

Commit

Permalink
Fail when reading parquets with complex types (ydb-platform#785)
Browse files Browse the repository at this point in the history
  • Loading branch information
Hor911 authored and adameat committed Dec 29, 2023
1 parent 34f61b0 commit 08afd27
Show file tree
Hide file tree
Showing 3 changed files with 53 additions and 4 deletions.
20 changes: 16 additions & 4 deletions ydb/library/yql/providers/s3/actors/yql_s3_read_actor.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -1158,7 +1158,7 @@ TColumnConverter BuildColumnConverter(const std::string& columnName, const std::
return conv;
}

if (originalType->id() == arrow::Type::DATE32) {
if (originalType->id() == arrow::Type::DATE32) {
// TODO: support more than 1 optional level
bool isOptional = false;
auto unpackedYqlType = UnpackOptional(yqlType, isOptional);
Expand Down Expand Up @@ -1541,6 +1541,20 @@ class TS3ReadCoroImpl : public TActorCoroImpl {

void BuildColumnConverters(std::shared_ptr<arrow::Schema> outputSchema, std::shared_ptr<arrow::Schema> dataSchema,
std::vector<int>& columnIndices, std::vector<TColumnConverter>& columnConverters) {

for (int i = 0; i < dataSchema->num_fields(); ++i) {
switch (dataSchema->field(i)->type()->id()) {
case arrow::Type::LIST:
throw parquet::ParquetException(TStringBuilder() << "File contains LIST field "
<< dataSchema->field(i)->name() << " and can't be parsed");
case arrow::Type::STRUCT:
throw parquet::ParquetException(TStringBuilder() << "File contains STRUCT field "
<< dataSchema->field(i)->name() << " and can't be parsed");
default:
;
}
}

columnConverters.reserve(outputSchema->num_fields());
for (int i = 0; i < outputSchema->num_fields(); ++i) {
const auto& targetField = outputSchema->field(i);
Expand All @@ -1556,9 +1570,7 @@ class TS3ReadCoroImpl : public TActorCoroImpl {
}
columnIndices.push_back(srcFieldIndex);
auto rowSpecColumnIt = ReadSpec->RowSpec.find(targetField->name());
if (rowSpecColumnIt == ReadSpec->RowSpec.end()) {
throw parquet::ParquetException(TStringBuilder() << "Column " << targetField->name() << " not found in row spec");
}
YQL_ENSURE(rowSpecColumnIt != ReadSpec->RowSpec.end(), "Column " << targetField->name() << " not found in row spec");
columnConverters.emplace_back(BuildColumnConverter(targetField->name(), originalType, targetType, rowSpecColumnIt->second));
}
}
Expand Down
Binary file added ydb/tests/fq/s3/test_format_data/btct.parquet
Binary file not shown.
37 changes: 37 additions & 0 deletions ydb/tests/fq/s3/test_formats.py
Original file line number Diff line number Diff line change
Expand Up @@ -100,6 +100,43 @@ def test_format(self, kikimr, s3, client, filename, type_format, yq_version):
if type_format != "json_list":
assert stat["ResultSet"]["IngressRows"]["sum"] == 3

@yq_all
def test_btc(self, kikimr, s3, client, yq_version):
self.create_bucket_and_upload_file("btct.parquet", s3, kikimr)
client.create_storage_connection("btct", "fbucket")

sql = f'''
PRAGMA s3.UseBlocksSource="true";
SELECT
*
FROM btct.`btct.parquet`
WITH (format=`parquet`,
SCHEMA=(
hash STRING,
version INT64,
size INT64,
block_hash UTF8,
block_number INT64,
virtual_size INT64,
lock_time INT64,
input_count INT64,
output_count INT64,
is_coinbase BOOL,
output_value DOUBLE,
block_timestamp DATETIME,
date DATE
)
);
'''

query_id = client.create_query("simple", sql, type=fq.QueryContent.QueryType.ANALYTICS).result.query_id
client.wait_query_status(query_id, fq.QueryMeta.FAILED)
describe_result = client.describe_query(query_id).result
issues = describe_result.query.issue[0].issues
assert "Error while reading file btct.parquet" in issues[0].message
assert "File contains LIST field outputs and can\'t be parsed" in issues[0].issues[0].message


@yq_all
@pytest.mark.parametrize("client", [{"folder_id": "my_folder"}], indirect=True)
def test_invalid_format(self, kikimr, s3, client):
Expand Down

0 comments on commit 08afd27

Please sign in to comment.