diff --git a/ydb/core/external_sources/object_storage/inference/arrow_inferencinator.cpp b/ydb/core/external_sources/object_storage/inference/arrow_inferencinator.cpp index 5efe9d2316fe..023787ad4e09 100644 --- a/ydb/core/external_sources/object_storage/inference/arrow_inferencinator.cpp +++ b/ydb/core/external_sources/object_storage/inference/arrow_inferencinator.cpp @@ -126,32 +126,10 @@ bool ArrowToYdbType(Ydb::Type& maybeOptionalType, const arrow::DataType& type, s case arrow::Type::LIST: { // TODO: is ok? return false; } - case arrow::Type::STRUCT: { // TODO: is ok? - auto& structType = *resType.mutable_struct_type(); - for (const auto& field : type.fields()) { - auto& member = *structType.add_members(); - auto& memberType = *member.mutable_type(); - if (!ArrowToYdbType(memberType, *field->type(), config)) { - return false; - } - member.mutable_name()->assign(field->name().data(), field->name().size()); - } - return true; - } + case arrow::Type::STRUCT: case arrow::Type::SPARSE_UNION: - case arrow::Type::DENSE_UNION: { // TODO: is ok? - auto& variant = *resType.mutable_variant_type()->mutable_struct_items(); - for (const auto& field : type.fields()) { - auto& member = *variant.add_members(); - if (!ArrowToYdbType(*member.mutable_type(), *field->type(), config)) { - return false; - } - if (field->name().empty()) { - return false; - } - member.mutable_name()->assign(field->name().data(), field->name().size()); - } - return true; + case arrow::Type::DENSE_UNION: { + return false; } case arrow::Type::DICTIONARY: // TODO: is representable? return false; @@ -325,17 +303,15 @@ class TArrowInferencinator : public NActors::TActorBootstrapped(mbArrowFields); std::vector ydbFields; for (const auto& field : arrowFields) { - if (field->name().empty()) { + Ydb::Column column; + if (!ArrowToYdbType(*column.mutable_type(), *field->type(), file.Config)) { continue; } - ydbFields.emplace_back(); - auto& ydbField = ydbFields.back(); - if (!ArrowToYdbType(*ydbField.mutable_type(), *field->type(), file.Config)) { - ctx.Send(RequesterId_, MakeErrorSchema(file.Path, NFq::TIssuesIds::UNSUPPORTED, TStringBuilder{} << "couldn't convert arrow type to ydb: " << field->ToString())); - RequesterId_ = {}; - return; + if (field->name().empty()) { + continue; } - ydbField.mutable_name()->assign(field->name()); + column.mutable_name()->assign(field->name()); + ydbFields.push_back(column); } ctx.Send(RequesterId_, new TEvInferredFileSchema(file.Path, std::move(ydbFields))); diff --git a/ydb/tests/fq/s3/test_s3_0.py b/ydb/tests/fq/s3/test_s3_0.py index 646109b39624..f23e3747388b 100644 --- a/ydb/tests/fq/s3/test_s3_0.py +++ b/ydb/tests/fq/s3/test_s3_0.py @@ -624,6 +624,50 @@ def test_inference_null_column_name(self, kikimr, s3, client, unique_prefix): assert result_set.rows[2].items[1].int64_value == 15 assert sum(kikimr.control_plane.get_metering(1)) == 10 + @yq_v2 + @pytest.mark.parametrize("client", [{"folder_id": "my_folder"}], indirect=True) + def test_inference_unsupported_types(self, kikimr, s3, client, unique_prefix): + resource = boto3.resource( + "s3", endpoint_url=s3.s3_url, aws_access_key_id="key", aws_secret_access_key="secret_key" + ) + + bucket = resource.Bucket("fbucket") + bucket.create(ACL='public-read') + bucket.objects.all().delete() + + s3_client = boto3.client( + "s3", endpoint_url=s3.s3_url, aws_access_key_id="key", aws_secret_access_key="secret_key" + ) + + fruits = '''{ "a" : [10, 20, 30] , "b" : { "key" : "value" }, "c" : 10 } +{ "a" : [10, 20, 30] , "b" : { "key" : "value" }, "c" : 20 } +{ "a" : [10, 20, 30] , "b" : { "key" : "value" }, "c" : 30 }''' + s3_client.put_object(Body=fruits, Bucket='fbucket', Key='fruits.json', ContentType='text/plain') + kikimr.control_plane.wait_bootstrap(1) + storage_connection_name = unique_prefix + "fruitbucket" + client.create_storage_connection(storage_connection_name, "fbucket") + + sql = f''' + SELECT * + FROM `{storage_connection_name}`.`fruits.json` + WITH (format=json_each_row, with_infer='true'); + ''' + + query_id = client.create_query("simple", sql, type=fq.QueryContent.QueryType.ANALYTICS).result.query_id + client.wait_query_status(query_id, fq.QueryMeta.COMPLETED) + + data = client.get_result_data(query_id) + result_set = data.result.result_set + logging.debug(str(result_set)) + assert len(result_set.columns) == 1 + assert result_set.columns[0].name == "c" + assert result_set.columns[0].type.optional_type.item.type_id == ydb.Type.INT64 + assert len(result_set.rows) == 3 + assert result_set.rows[0].items[0].int64_value == 10 + assert result_set.rows[1].items[0].int64_value == 20 + assert result_set.rows[2].items[0].int64_value == 30 + assert sum(kikimr.control_plane.get_metering(1)) == 10 + @yq_all @pytest.mark.parametrize("client", [{"folder_id": "my_folder"}], indirect=True) def test_csv_with_hopping(self, kikimr, s3, client, unique_prefix):