Skip to content

Commit

Permalink
ignoring columns with unsupported parquet and json types with type in…
Browse files Browse the repository at this point in the history
…ferring (#9524)
  • Loading branch information
evanevanevanevannnn authored Sep 20, 2024
1 parent 77a5f3a commit a5bf388
Show file tree
Hide file tree
Showing 2 changed files with 53 additions and 33 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -126,32 +126,10 @@ bool ArrowToYdbType(Ydb::Type& maybeOptionalType, const arrow::DataType& type, s
case arrow::Type::LIST: { // TODO: is ok?
return false;
}
case arrow::Type::STRUCT: { // TODO: is ok?
auto& structType = *resType.mutable_struct_type();
for (const auto& field : type.fields()) {
auto& member = *structType.add_members();
auto& memberType = *member.mutable_type();
if (!ArrowToYdbType(memberType, *field->type(), config)) {
return false;
}
member.mutable_name()->assign(field->name().data(), field->name().size());
}
return true;
}
case arrow::Type::STRUCT:
case arrow::Type::SPARSE_UNION:
case arrow::Type::DENSE_UNION: { // TODO: is ok?
auto& variant = *resType.mutable_variant_type()->mutable_struct_items();
for (const auto& field : type.fields()) {
auto& member = *variant.add_members();
if (!ArrowToYdbType(*member.mutable_type(), *field->type(), config)) {
return false;
}
if (field->name().empty()) {
return false;
}
member.mutable_name()->assign(field->name().data(), field->name().size());
}
return true;
case arrow::Type::DENSE_UNION: {
return false;
}
case arrow::Type::DICTIONARY: // TODO: is representable?
return false;
Expand Down Expand Up @@ -325,17 +303,15 @@ class TArrowInferencinator : public NActors::TActorBootstrapped<TArrowInferencin
auto& arrowFields = std::get<ArrowFields>(mbArrowFields);
std::vector<Ydb::Column> ydbFields;
for (const auto& field : arrowFields) {
if (field->name().empty()) {
Ydb::Column column;
if (!ArrowToYdbType(*column.mutable_type(), *field->type(), file.Config)) {
continue;
}
ydbFields.emplace_back();
auto& ydbField = ydbFields.back();
if (!ArrowToYdbType(*ydbField.mutable_type(), *field->type(), file.Config)) {
ctx.Send(RequesterId_, MakeErrorSchema(file.Path, NFq::TIssuesIds::UNSUPPORTED, TStringBuilder{} << "couldn't convert arrow type to ydb: " << field->ToString()));
RequesterId_ = {};
return;
if (field->name().empty()) {
continue;
}
ydbField.mutable_name()->assign(field->name());
column.mutable_name()->assign(field->name());
ydbFields.push_back(column);
}

ctx.Send(RequesterId_, new TEvInferredFileSchema(file.Path, std::move(ydbFields)));
Expand Down
44 changes: 44 additions & 0 deletions ydb/tests/fq/s3/test_s3_0.py
Original file line number Diff line number Diff line change
Expand Up @@ -624,6 +624,50 @@ def test_inference_null_column_name(self, kikimr, s3, client, unique_prefix):
assert result_set.rows[2].items[1].int64_value == 15
assert sum(kikimr.control_plane.get_metering(1)) == 10

@yq_v2
@pytest.mark.parametrize("client", [{"folder_id": "my_folder"}], indirect=True)
def test_inference_unsupported_types(self, kikimr, s3, client, unique_prefix):
resource = boto3.resource(
"s3", endpoint_url=s3.s3_url, aws_access_key_id="key", aws_secret_access_key="secret_key"
)

bucket = resource.Bucket("fbucket")
bucket.create(ACL='public-read')
bucket.objects.all().delete()

s3_client = boto3.client(
"s3", endpoint_url=s3.s3_url, aws_access_key_id="key", aws_secret_access_key="secret_key"
)

fruits = '''{ "a" : [10, 20, 30] , "b" : { "key" : "value" }, "c" : 10 }
{ "a" : [10, 20, 30] , "b" : { "key" : "value" }, "c" : 20 }
{ "a" : [10, 20, 30] , "b" : { "key" : "value" }, "c" : 30 }'''
s3_client.put_object(Body=fruits, Bucket='fbucket', Key='fruits.json', ContentType='text/plain')
kikimr.control_plane.wait_bootstrap(1)
storage_connection_name = unique_prefix + "fruitbucket"
client.create_storage_connection(storage_connection_name, "fbucket")

sql = f'''
SELECT *
FROM `{storage_connection_name}`.`fruits.json`
WITH (format=json_each_row, with_infer='true');
'''

query_id = client.create_query("simple", sql, type=fq.QueryContent.QueryType.ANALYTICS).result.query_id
client.wait_query_status(query_id, fq.QueryMeta.COMPLETED)

data = client.get_result_data(query_id)
result_set = data.result.result_set
logging.debug(str(result_set))
assert len(result_set.columns) == 1
assert result_set.columns[0].name == "c"
assert result_set.columns[0].type.optional_type.item.type_id == ydb.Type.INT64
assert len(result_set.rows) == 3
assert result_set.rows[0].items[0].int64_value == 10
assert result_set.rows[1].items[0].int64_value == 20
assert result_set.rows[2].items[0].int64_value == 30
assert sum(kikimr.control_plane.get_metering(1)) == 10

@yq_all
@pytest.mark.parametrize("client", [{"folder_id": "my_folder"}], indirect=True)
def test_csv_with_hopping(self, kikimr, s3, client, unique_prefix):
Expand Down

0 comments on commit a5bf388

Please sign in to comment.