Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Fail when reading parquets with complex types #785

Merged
merged 1 commit into from
Dec 28, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
20 changes: 16 additions & 4 deletions ydb/library/yql/providers/s3/actors/yql_s3_read_actor.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -1158,7 +1158,7 @@ TColumnConverter BuildColumnConverter(const std::string& columnName, const std::
return conv;
}

if (originalType->id() == arrow::Type::DATE32) {
if (originalType->id() == arrow::Type::DATE32) {
// TODO: support more than 1 optional level
bool isOptional = false;
auto unpackedYqlType = UnpackOptional(yqlType, isOptional);
Expand Down Expand Up @@ -1541,6 +1541,20 @@ class TS3ReadCoroImpl : public TActorCoroImpl {

void BuildColumnConverters(std::shared_ptr<arrow::Schema> outputSchema, std::shared_ptr<arrow::Schema> dataSchema,
std::vector<int>& columnIndices, std::vector<TColumnConverter>& columnConverters) {

for (int i = 0; i < dataSchema->num_fields(); ++i) {
switch (dataSchema->field(i)->type()->id()) {
case arrow::Type::LIST:
throw parquet::ParquetException(TStringBuilder() << "File contains LIST field "
<< dataSchema->field(i)->name() << " and can't be parsed");
case arrow::Type::STRUCT:
throw parquet::ParquetException(TStringBuilder() << "File contains STRUCT field "
<< dataSchema->field(i)->name() << " and can't be parsed");
default:
;
}
}

columnConverters.reserve(outputSchema->num_fields());
for (int i = 0; i < outputSchema->num_fields(); ++i) {
const auto& targetField = outputSchema->field(i);
Expand All @@ -1556,9 +1570,7 @@ class TS3ReadCoroImpl : public TActorCoroImpl {
}
columnIndices.push_back(srcFieldIndex);
auto rowSpecColumnIt = ReadSpec->RowSpec.find(targetField->name());
if (rowSpecColumnIt == ReadSpec->RowSpec.end()) {
throw parquet::ParquetException(TStringBuilder() << "Column " << targetField->name() << " not found in row spec");
}
YQL_ENSURE(rowSpecColumnIt != ReadSpec->RowSpec.end(), "Column " << targetField->name() << " not found in row spec");
columnConverters.emplace_back(BuildColumnConverter(targetField->name(), originalType, targetType, rowSpecColumnIt->second));
}
}
Expand Down
Binary file added ydb/tests/fq/s3/test_format_data/btct.parquet
Binary file not shown.
37 changes: 37 additions & 0 deletions ydb/tests/fq/s3/test_formats.py
Original file line number Diff line number Diff line change
Expand Up @@ -100,6 +100,43 @@ def test_format(self, kikimr, s3, client, filename, type_format, yq_version):
if type_format != "json_list":
assert stat["ResultSet"]["IngressRows"]["sum"] == 3

@yq_all
def test_btc(self, kikimr, s3, client, yq_version):
self.create_bucket_and_upload_file("btct.parquet", s3, kikimr)
client.create_storage_connection("btct", "fbucket")

sql = f'''
PRAGMA s3.UseBlocksSource="true";
SELECT
*
FROM btct.`btct.parquet`
WITH (format=`parquet`,
SCHEMA=(
hash STRING,
version INT64,
size INT64,
block_hash UTF8,
block_number INT64,
virtual_size INT64,
lock_time INT64,
input_count INT64,
output_count INT64,
is_coinbase BOOL,
output_value DOUBLE,
block_timestamp DATETIME,
date DATE
)
);
'''

query_id = client.create_query("simple", sql, type=fq.QueryContent.QueryType.ANALYTICS).result.query_id
client.wait_query_status(query_id, fq.QueryMeta.FAILED)
describe_result = client.describe_query(query_id).result
issues = describe_result.query.issue[0].issues
assert "Error while reading file btct.parquet" in issues[0].message
assert "File contains LIST field outputs and can\'t be parsed" in issues[0].issues[0].message


@yq_all
@pytest.mark.parametrize("client", [{"folder_id": "my_folder"}], indirect=True)
def test_invalid_format(self, kikimr, s3, client):
Expand Down
Loading