diff --git a/ydb/core/external_sources/object_storage.cpp b/ydb/core/external_sources/object_storage.cpp index 7d81b91616a9..fc3603bb655f 100644 --- a/ydb/core/external_sources/object_storage.cpp +++ b/ydb/core/external_sources/object_storage.cpp @@ -18,6 +18,7 @@ #include #include #include +#include #include #include @@ -330,44 +331,87 @@ struct TObjectStorageExternalSource : public IExternalSource { const TString path = meta->TableLocation; const TString filePattern = meta->Attributes.Value("filepattern", TString{}); + const TString projection = meta->Attributes.Value("projection", TString{}); const TVector partitionedBy = GetPartitionedByConfig(meta); + + NYql::NPathGenerator::TPathGeneratorPtr pathGenerator; + + bool shouldInferPartitions = !partitionedBy.empty() && !projection; + bool ignoreEmptyListings = !projection.empty(); + NYql::NS3Lister::TListingRequest request { .Url = meta->DataSourceLocation, .Credentials = credentials }; + TVector requests; + + if (!projection) { + auto error = NYql::NS3::BuildS3FilePattern(path, filePattern, partitionedBy, request); + if (error) { + throw yexception() << *error; + } + requests.push_back(request); + } else { + if (NYql::NS3::HasWildcards(path)) { + throw yexception() << "Path prefix: '" << path << "' contains wildcards"; + } - auto error = NYql::NS3::BuildS3FilePattern(path, filePattern, partitionedBy, request); - if (error) { - throw yexception() << *error; + pathGenerator = NYql::NPathGenerator::CreatePathGenerator(projection, partitionedBy); + for (const auto& rule : pathGenerator->GetRules()) { + YQL_ENSURE(rule.ColumnValues.size() == partitionedBy.size()); + + request.Pattern = NYql::NS3::NormalizePath(TStringBuilder() << path << "/" << rule.Path << "/*"); + request.PatternType = NYql::NS3Lister::ES3PatternType::Wildcard; + request.Prefix = request.Pattern.substr(0, NYql::NS3::GetFirstWildcardPos(request.Pattern)); + + requests.push_back(request); + } } auto partByData = std::make_shared(); + if (shouldInferPartitions) { + *partByData << JoinSeq(",", partitionedBy); + } + TVector> futures; auto httpGateway = NYql::IHTTPGateway::Make(); auto httpRetryPolicy = NYql::GetHTTPDefaultRetryPolicy(NYql::THttpRetryPolicyOptions{.RetriedCurlCodes = NYql::FqRetriedCurlCodes()}); - auto s3Lister = NYql::NS3Lister::MakeS3Lister(httpGateway, httpRetryPolicy, request, Nothing(), true, ActorSystem); - auto afterListing = s3Lister->Next().Apply([partByData, partitionedBy, path = request.Pattern](const NThreading::TFuture& listResFut) { - auto& listRes = listResFut.GetValue(); - auto& partByRef = *partByData; - if (std::holds_alternative(listRes)) { - auto& error = std::get(listRes); - throw yexception() << error.Issues.ToString(); - } - auto& entries = std::get(listRes); - if (entries.Objects.empty()) { - throw yexception() << "couldn't find files at " << path; - } + for (const auto& req : requests) { + auto s3Lister = NYql::NS3Lister::MakeS3Lister(httpGateway, httpRetryPolicy, req, Nothing(), true, ActorSystem); + futures.push_back(s3Lister->Next()); + } - partByRef << JoinSeq(",", partitionedBy); - for (const auto& entry : entries.Objects) { - Y_ENSURE(entry.MatchedGlobs.size() == partitionedBy.size()); - partByRef << Endl << JoinSeq(",", entry.MatchedGlobs); - } - for (const auto& entry : entries.Objects) { - if (entry.Size > 0) { - return entry; + auto allFuture = NThreading::WaitExceptionOrAll(futures); + auto afterListing = allFuture.Apply([partByData, shouldInferPartitions, ignoreEmptyListings, futures = std::move(futures), requests = std::move(requests)](const NThreading::TFuture& result) { + result.GetValue(); + for (size_t i = 0; i < futures.size(); ++i) { + auto& listRes = futures[i].GetValue(); + if (std::holds_alternative(listRes)) { + auto& error = std::get(listRes); + throw yexception() << error.Issues.ToString(); + } + auto& entries = std::get(listRes); + if (entries.Objects.empty() && !ignoreEmptyListings) { + throw yexception() << "couldn't find files at " << requests[i].Pattern; + } + + if (shouldInferPartitions) { + for (const auto& entry : entries.Objects) { + *partByData << Endl << JoinSeq(",", entry.MatchedGlobs); + } + } + + for (const auto& entry : entries.Objects) { + if (entry.Size > 0) { + return entry; + } + } + + if (!ignoreEmptyListings) { + throw yexception() << "couldn't find any files for type inference, please check that the right path is provided"; } } + throw yexception() << "couldn't find any files for type inference, please check that the right path is provided"; }); @@ -410,13 +454,45 @@ struct TObjectStorageExternalSource : public IExternalSource { )); return promise.GetFuture(); - }).Apply([arrowInferencinatorId, meta, partByData, partitionedBy, this](const NThreading::TFuture& result) { + }).Apply([arrowInferencinatorId, meta, partByData, partitionedBy, pathGenerator, this](const NThreading::TFuture& result) { auto& value = result.GetValue(); if (!value.Success()) { return result; } - return InferPartitionedColumnsTypes(arrowInferencinatorId, partByData, partitionedBy, result); + auto meta = value.Metadata; + if (pathGenerator) { + for (const auto& rule : pathGenerator->GetConfig().Rules) { + auto& destColumn = *meta->Schema.add_column(); + destColumn.mutable_name()->assign(rule.Name); + switch (rule.Type) { + case NYql::NPathGenerator::IPathGenerator::EType::INTEGER: + destColumn.mutable_type()->set_type_id(Ydb::Type::INT64); + break; + + case NYql::NPathGenerator::IPathGenerator::EType::DATE: + destColumn.mutable_type()->set_type_id(Ydb::Type::DATE); + break; + + case NYql::NPathGenerator::IPathGenerator::EType::ENUM: + default: + destColumn.mutable_type()->set_type_id(Ydb::Type::STRING); + break; + } + } + } else { + for (const auto& partitionName : partitionedBy) { + auto& destColumn = *meta->Schema.add_column(); + destColumn.mutable_name()->assign(partitionName); + destColumn.mutable_type()->set_type_id(Ydb::Type::UTF8); + } + } + + if (!partitionedBy.empty() && !pathGenerator) { + return InferPartitionedColumnsTypes(arrowInferencinatorId, partByData, result); + } + + return result; }).Apply([](const NThreading::TFuture& result) { auto& value = result.GetValue(); if (value.Success()) { @@ -434,20 +510,10 @@ struct TObjectStorageExternalSource : public IExternalSource { NThreading::TFuture InferPartitionedColumnsTypes( NActors::TActorId arrowInferencinatorId, std::shared_ptr partByData, - const TVector& partitionedBy, const NThreading::TFuture& result) const { auto& value = result.GetValue(); - if (partitionedBy.empty()) { - return result; - } - auto meta = value.Metadata; - for (const auto& partitionName : partitionedBy) { - auto& destColumn = *meta->Schema.add_column(); - destColumn.mutable_name()->assign(partitionName); - destColumn.mutable_type()->set_type_id(Ydb::Type::UTF8); - } arrow::BufferBuilder builder; auto partitionBuffer = std::make_shared(nullptr, 0); @@ -498,15 +564,19 @@ struct TObjectStorageExternalSource : public IExternalSource { THashSet columns; if (auto partitioned = meta->Attributes.FindPtr("partitionedby"); partitioned) { NJson::TJsonValue values; - Y_ENSURE(NJson::ReadJsonTree(*partitioned, &values)); - Y_ENSURE(values.GetType() == NJson::JSON_ARRAY); + auto successful = NJson::ReadJsonTree(*partitioned, &values); + if (!successful) { + columns.insert(*partitioned); + } else { + Y_ENSURE(values.GetType() == NJson::JSON_ARRAY); - for (const auto& value : values.GetArray()) { - Y_ENSURE(value.GetType() == NJson::JSON_STRING); - if (columns.contains(value.GetString())) { - throw yexception() << "invalid partitioned_by parameter, column " << value.GetString() << "mentioned twice"; + for (const auto& value : values.GetArray()) { + Y_ENSURE(value.GetType() == NJson::JSON_STRING); + if (columns.contains(value.GetString())) { + throw yexception() << "invalid partitioned_by parameter, column " << value.GetString() << "mentioned twice"; + } + columns.insert(value.GetString()); } - columns.insert(value.GetString()); } } diff --git a/ydb/core/external_sources/object_storage/inference/infer_config.cpp b/ydb/core/external_sources/object_storage/inference/infer_config.cpp index 6b0551cdefc0..209f92d31b5d 100644 --- a/ydb/core/external_sources/object_storage/inference/infer_config.cpp +++ b/ydb/core/external_sources/object_storage/inference/infer_config.cpp @@ -42,6 +42,21 @@ std::shared_ptr MakeJsonListConfig(const THashMap MakeFormatConfig(const THashMap& params) { + static THashSet supportedParams { + "format", + "compression", + "filepattern", + "partitionedby", + "projection", + "csvdelimiter", + }; + + for (const auto& [param, value] : params) { + if (!supportedParams.contains(param)) { + throw yexception() << "parameter is not supported with type inference: " << param; + } + } + EFileFormat format; if (auto formatPtr = params.FindPtr("format"); formatPtr) { format = ConvertFileFormat(*formatPtr); diff --git a/ydb/core/kqp/provider/read_attributes_utils.cpp b/ydb/core/kqp/provider/read_attributes_utils.cpp index 1ddce020866d..51be6f6df1d4 100644 --- a/ydb/core/kqp/provider/read_attributes_utils.cpp +++ b/ydb/core/kqp/provider/read_attributes_utils.cpp @@ -21,11 +21,6 @@ class TGatheringAttributesVisitor : public IAstAttributesVisitor { void VisitAttribute(TString key, TString value) override { Y_ABORT_UNLESS(CurrentSource, "cannot write %s: %s", key.c_str(), value.c_str()); - if (key == "partitionedby") { - NJson::TJsonArray values({ value }); - CurrentSource->second.try_emplace(key, NJson::WriteJson({ values })); - return; - } CurrentSource->second.try_emplace(key, value); }; @@ -126,9 +121,11 @@ class TAttributesReplacingVisitor : public IAstAttributesVisitor { auto nodeChildren = node->Children(); if (!nodeChildren.empty() && nodeChildren[0]->IsAtom()) { TCoAtom attrName{nodeChildren[0]}; - if (attrName.StringValue().equal("userschema")) { + if (attrName.StringValue() == "userschema") { node = BuildSchemaFromMetadata(Read->Pos(), Ctx, Metadata->Columns); ReplacedUserchema = true; + } else if (attrName.StringValue() == "partitionedby") { + NewAttributes.erase("partitionedby"); } } Children.push_back(std::move(node)); diff --git a/ydb/tests/fq/s3/test_s3.py b/ydb/tests/fq/s3/test_s3.py index 3572b164c266..f62a7c3e5d6f 100644 --- a/ydb/tests/fq/s3/test_s3.py +++ b/ydb/tests/fq/s3/test_s3.py @@ -506,6 +506,93 @@ def test_inference_timestamp(self, kikimr, s3, client): assert result_set.columns[2].name == "c" assert result_set.columns[2].type.type_id == ydb.Type.UTF8 + sql = f''' + SELECT * + FROM `{storage_connection_name}`.`timestamp.csv` + WITH (format=csv_with_names, with_infer='true', `data.timestamp.formatname`='ISO'); + ''' + + query_id = client.create_query("simple", sql, type=fq.QueryContent.QueryType.ANALYTICS).result.query_id + client.wait_query_status(query_id, fq.QueryMeta.FAILED) + assert "parameter is not supported with type inference" in str( + client.describe_query(query_id).result + ) + + @yq_v2 + @pytest.mark.parametrize("client", [{"folder_id": "my_folder"}], indirect=True) + def test_inference_projection(self, kikimr, s3, client): + unique_prefix = str(uuid.uuid4()) + resource = boto3.resource( + "s3", endpoint_url=s3.s3_url, aws_access_key_id="key", aws_secret_access_key="secret_key" + ) + + bucket = resource.Bucket("fbucket") + bucket.create(ACL='public-read') + bucket.objects.all().delete() + + s3_client = boto3.client( + "s3", endpoint_url=s3.s3_url, aws_access_key_id="key", aws_secret_access_key="secret_key" + ) + + fruits = '''Fruit,Price,Weight +Banana,3,100 +Apple,2,22 +Pear,15,33''' + s3_client.put_object(Body=fruits, Bucket='fbucket', Key='year=2023/fruits.csv', ContentType='text/plain') + + kikimr.control_plane.wait_bootstrap(1) + storage_connection_name = unique_prefix + "fruitbucket" + client.create_storage_connection(storage_connection_name, "fbucket") + + sql = '''$projection = @@ { + "projection.enabled" : "true", + "storage.location.template" : "/${date}", + "projection.date.type" : "date", + "projection.date.min" : "2022-11-02", + "projection.date.max" : "2024-12-02", + "projection.date.interval" : "1", + "projection.date.format" : "/year=%Y", + "projection.date.unit" : "YEARS" + } @@;''' + f''' + + SELECT * + FROM `{storage_connection_name}`.`/` + WITH (format=csv_with_names, + with_infer='true', + partitioned_by=(`date`), + projection=$projection); + ''' + + query_id = client.create_query("simple", sql, type=fq.QueryContent.QueryType.ANALYTICS).result.query_id + client.wait_query_status(query_id, fq.QueryMeta.COMPLETED) + + data = client.get_result_data(query_id) + result_set = data.result.result_set + logging.debug(str(result_set)) + assert len(result_set.columns) == 4 + assert result_set.columns[0].name == "Fruit" + assert result_set.columns[0].type.type_id == ydb.Type.UTF8 + assert result_set.columns[1].name == "Price" + assert result_set.columns[1].type.optional_type.item.type_id == ydb.Type.INT64 + assert result_set.columns[2].name == "Weight" + assert result_set.columns[2].type.optional_type.item.type_id == ydb.Type.INT64 + assert result_set.columns[3].name == "date" + assert result_set.columns[3].type.type_id == ydb.Type.DATE + assert len(result_set.rows) == 3 + assert result_set.rows[0].items[0].text_value == "Banana" + assert result_set.rows[0].items[1].int64_value == 3 + assert result_set.rows[0].items[2].int64_value == 100 + assert result_set.rows[0].items[3].uint32_value == 19663 + assert result_set.rows[1].items[0].text_value == "Apple" + assert result_set.rows[1].items[1].int64_value == 2 + assert result_set.rows[1].items[2].int64_value == 22 + assert result_set.rows[1].items[3].uint32_value == 19663 + assert result_set.rows[2].items[0].text_value == "Pear" + assert result_set.rows[2].items[1].int64_value == 15 + assert result_set.rows[2].items[2].int64_value == 33 + assert result_set.rows[2].items[3].uint32_value == 19663 + assert sum(kikimr.control_plane.get_metering(1)) == 10 + @yq_all @pytest.mark.parametrize("client", [{"folder_id": "my_folder"}], indirect=True) def test_csv_with_hopping(self, kikimr, s3, client):