From bd60670b4a1e7084c5e47bd4ebd4f582ed3dd397 Mon Sep 17 00:00:00 2001 From: Egor Zudin Date: Mon, 27 May 2024 12:10:12 +0000 Subject: [PATCH] Fix commits, add tests --- ydb/core/grpc_services/ya.make | 1 + .../behaviour/external_data_source/ya.make | 1 + ydb/core/kqp/gateway/kqp_metadata_loader.cpp | 27 ++- ydb/core/kqp/gateway/kqp_metadata_loader.h | 5 + .../kqp/gateway/ut/metadata_conversion.cpp | 78 ++++++ ydb/core/kqp/gateway/ut/ya.make | 17 ++ ydb/core/kqp/gateway/ya.make | 2 + .../kqp/provider/read_attributes_utils.cpp | 229 ++++++++++++++++++ ydb/core/kqp/provider/read_attributes_utils.h | 29 +++ .../kqp/provider/read_attributes_utils_ut.cpp | 164 +++++++++++++ ydb/core/kqp/provider/ut/ya.make | 2 + ydb/core/kqp/provider/ya.make | 1 + .../kqp/provider/yql_kikimr_datasource.cpp | 220 +---------------- 13 files changed, 556 insertions(+), 220 deletions(-) create mode 100644 ydb/core/kqp/gateway/ut/metadata_conversion.cpp create mode 100644 ydb/core/kqp/gateway/ut/ya.make create mode 100644 ydb/core/kqp/provider/read_attributes_utils.cpp create mode 100644 ydb/core/kqp/provider/read_attributes_utils.h create mode 100644 ydb/core/kqp/provider/read_attributes_utils_ut.cpp diff --git a/ydb/core/grpc_services/ya.make b/ydb/core/grpc_services/ya.make index 4549dcb64f5d..0b8e1e9dcd04 100644 --- a/ydb/core/grpc_services/ya.make +++ b/ydb/core/grpc_services/ya.make @@ -113,6 +113,7 @@ PEERDIR( ydb/core/io_formats/ydb_dump ydb/core/kesus/tablet ydb/core/kqp/common + ydb/core/kqp/session_actor ydb/core/protos ydb/core/scheme ydb/core/sys_view diff --git a/ydb/core/kqp/gateway/behaviour/external_data_source/ya.make b/ydb/core/kqp/gateway/behaviour/external_data_source/ya.make index 600ae69dd33c..4ceb0a1a5cde 100644 --- a/ydb/core/kqp/gateway/behaviour/external_data_source/ya.make +++ b/ydb/core/kqp/gateway/behaviour/external_data_source/ya.make @@ -10,6 +10,7 @@ PEERDIR( ydb/services/metadata/abstract ydb/services/metadata/secret ydb/core/kqp/gateway/actors + ydb/core/kqp/federated_query ydb/core/kqp/gateway/utils ydb/core/kqp/gateway/behaviour/tablestore/operations ) diff --git a/ydb/core/kqp/gateway/kqp_metadata_loader.cpp b/ydb/core/kqp/gateway/kqp_metadata_loader.cpp index c1848433aae7..847df60c9c64 100644 --- a/ydb/core/kqp/gateway/kqp_metadata_loader.cpp +++ b/ydb/core/kqp/gateway/kqp_metadata_loader.cpp @@ -473,8 +473,11 @@ NThreading::TFuture LoadExternalDataSo return DescribeExternalDataSourceSecrets(authDescription, userToken ? userToken->GetUserSID() : "", actorSystem, maximalSecretsSnapshotWaitTime); } +} // anonymous namespace + NExternalSource::TAuth MakeAuth(const NYql::TExternalSource& metadata) { switch (metadata.DataSourceAuth.identity_case()) { + case NKikimrSchemeOp::TAuth::IDENTITY_NOT_SET: case NKikimrSchemeOp::TAuth::kNone: return NExternalSource::NAuth::MakeNone(); case NKikimrSchemeOp::TAuth::kServiceAccount: @@ -484,7 +487,6 @@ NExternalSource::TAuth MakeAuth(const NYql::TExternalSource& metadata) { case NKikimrSchemeOp::TAuth::kBasic: case NKikimrSchemeOp::TAuth::kMdbBasic: case NKikimrSchemeOp::TAuth::kToken: - case NKikimrSchemeOp::TAuth::IDENTITY_NOT_SET: Y_ABORT("Unimplemented external source auth: %d", metadata.DataSourceAuth.identity_case()); break; } @@ -496,6 +498,7 @@ std::shared_ptr ConvertToExternalSourceMetadata(cons metadata->TableLocation = tableMetadata.ExternalSource.TableLocation; metadata->DataSourceLocation = tableMetadata.ExternalSource.DataSourceLocation; metadata->DataSourcePath = tableMetadata.ExternalSource.DataSourcePath; + metadata->Type = tableMetadata.ExternalSource.Type; metadata->Attributes = tableMetadata.Attributes; metadata->Auth = MakeAuth(tableMetadata.ExternalSource); return metadata; @@ -525,11 +528,13 @@ bool EnrichMetadata(NYql::TKikimrTableMetadata& tableMetadata, const NExternalSo ++id; } tableMetadata.Attributes = dynamicMetadata.Attributes; + tableMetadata.ExternalSource.TableLocation = dynamicMetadata.TableLocation; + tableMetadata.ExternalSource.DataSourceLocation = dynamicMetadata.DataSourceLocation; + tableMetadata.ExternalSource.DataSourcePath = dynamicMetadata.DataSourcePath; + tableMetadata.ExternalSource.Type = dynamicMetadata.Type; return true; } -} // anonymous namespace - TVector TKqpTableMetadataLoader::GetCollectedSchemeData() { TVector result(std::move(CollectedSchemeData)); @@ -842,12 +847,16 @@ NThreading::TFuture TKqpTableMetadataLoader::LoadTableMeta externalSource->LoadDynamicMetadata(std::move(externalSourceMeta)) .Subscribe([promise = std::move(promise), externalDataSourceMetadata](const TFuture>& result) mutable { TTableMetadataResult wrapper; - if (result.HasValue() && (!result.GetValue()->Changed || EnrichMetadata(*externalDataSourceMetadata.Metadata, *result.GetValue()))) { - wrapper.SetSuccess(); - wrapper.Metadata = externalDataSourceMetadata.Metadata; - } else { - // TODO: forward exception from result - wrapper.SetException(yexception() << "LoadDynamicMetadata failed"); + try { + auto& dynamicMetadata = result.GetValue(); + if (!dynamicMetadata->Changed || EnrichMetadata(*externalDataSourceMetadata.Metadata, *dynamicMetadata)) { + wrapper.SetSuccess(); + wrapper.Metadata = externalDataSourceMetadata.Metadata; + } else { + wrapper.SetException(yexception() << "couldn't enrich metadata with dynamically loaded part"); + } + } catch (const std::exception& exception) { + wrapper.SetException(yexception() << "couldn't load table metadata: " << exception.what()); } promise.SetValue(wrapper); }); diff --git a/ydb/core/kqp/gateway/kqp_metadata_loader.h b/ydb/core/kqp/gateway/kqp_metadata_loader.h index 798e9cca88b8..6cb0ff907413 100644 --- a/ydb/core/kqp/gateway/kqp_metadata_loader.h +++ b/ydb/core/kqp/gateway/kqp_metadata_loader.h @@ -11,6 +11,11 @@ namespace NKikimr::NKqp { +// only exposed to be unit-tested +NExternalSource::TAuth MakeAuth(const NYql::TExternalSource& metadata); +std::shared_ptr ConvertToExternalSourceMetadata(const NYql::TKikimrTableMetadata& tableMetadata); +bool EnrichMetadata(NYql::TKikimrTableMetadata& tableMetadata, const NExternalSource::TMetadata& dynamicMetadata); + class TKqpTableMetadataLoader : public NYql::IKikimrGateway::IKqpTableMetadataLoader { public: diff --git a/ydb/core/kqp/gateway/ut/metadata_conversion.cpp b/ydb/core/kqp/gateway/ut/metadata_conversion.cpp new file mode 100644 index 000000000000..f86239bbc6fd --- /dev/null +++ b/ydb/core/kqp/gateway/ut/metadata_conversion.cpp @@ -0,0 +1,78 @@ +#include + +#include +#include + +using namespace NKikimr; + +TEST(MetadataConversion, MakeAuthTest) { + NYql::TExternalSource externalSource; + auto auth = NKqp::MakeAuth(externalSource); + ASSERT_TRUE(std::holds_alternative(auth)); + + externalSource.DataSourceAuth.MutableNone(); + auth = NKqp::MakeAuth(externalSource); + ASSERT_TRUE(std::holds_alternative(auth)); + + externalSource.DataSourceAuth.ClearNone(); + auto& serviceAccount = *externalSource.DataSourceAuth.MutableServiceAccount(); + { + serviceAccount.SetId("sa-id"); + serviceAccount.SetSecretName("sa-name-of-secret"); + externalSource.ServiceAccountIdSignature = "sa-id-signature"; + } + auth = NKqp::MakeAuth(externalSource); + ASSERT_TRUE(std::holds_alternative(auth)); + { + auto& saAuth = std::get(auth); + ASSERT_EQ(saAuth.ServiceAccountId, "sa-id"); + ASSERT_EQ(saAuth.ServiceAccountIdSignature, "sa-id-signature"); + } + + externalSource.DataSourceAuth.ClearServiceAccount(); + auto& awsAccount = *externalSource.DataSourceAuth.MutableAws(); + { + awsAccount.SetAwsRegion("aws-test"); + awsAccount.SetAwsAccessKeyIdSecretName("aws-ak-secret-name"); + awsAccount.SetAwsSecretAccessKeySecretName("aws-sak-secret-name"); + externalSource.AwsAccessKeyId = "aws-ak"; + externalSource.AwsSecretAccessKey = "aws-sak"; + } + auth = NKqp::MakeAuth(externalSource); + ASSERT_TRUE(std::holds_alternative(auth)); + { + auto& awsAuth = std::get(auth); + ASSERT_EQ(awsAuth.Region, "aws-test"); + ASSERT_EQ(awsAuth.AccessKey, "aws-ak"); + ASSERT_EQ(awsAuth.SecretAccessKey, "aws-sak"); + } +} + +TEST(MetadataConversion, ConvertingExternalSourceMetadata) { + NYql::TExternalSource externalSource{ + .Type = "type", + .TableLocation = "table-loc", + .DataSourcePath = "ds-path", + .DataSourceLocation = "ds-loc", + }; + THashMap attributes{{"key1", "val1"}, {"key2", "val2"}}; + + std::shared_ptr externalMetadata; + { + NYql::TKikimrTableMetadata tableMetadata; + tableMetadata.ExternalSource = externalSource; + tableMetadata.Attributes = attributes; + externalMetadata = NKqp::ConvertToExternalSourceMetadata(tableMetadata); + } + ASSERT_TRUE(externalMetadata); + ASSERT_TRUE(std::holds_alternative(externalMetadata->Auth)); + + NYql::TKikimrTableMetadata tableMetadata; + ASSERT_TRUE(NKqp::EnrichMetadata(tableMetadata, *externalMetadata)); + + ASSERT_EQ(tableMetadata.ExternalSource.Type, externalSource.Type); + ASSERT_EQ(tableMetadata.ExternalSource.TableLocation, externalSource.TableLocation); + ASSERT_EQ(tableMetadata.ExternalSource.DataSourcePath, externalSource.DataSourcePath); + ASSERT_EQ(tableMetadata.ExternalSource.DataSourceLocation, externalSource.DataSourceLocation); + ASSERT_THAT(tableMetadata.Attributes, testing::ContainerEq(attributes)); +} diff --git a/ydb/core/kqp/gateway/ut/ya.make b/ydb/core/kqp/gateway/ut/ya.make new file mode 100644 index 000000000000..8490d2025802 --- /dev/null +++ b/ydb/core/kqp/gateway/ut/ya.make @@ -0,0 +1,17 @@ +GTEST() + +SRCS( + metadata_conversion.cpp +) + +PEERDIR( + ydb/core/kqp/gateway + ydb/library/yql/parser/pg_wrapper + ydb/library/yql/public/udf/service/stub + ydb/services/metadata +) + +YQL_LAST_ABI_VERSION() + +END() + diff --git a/ydb/core/kqp/gateway/ya.make b/ydb/core/kqp/gateway/ya.make index 5d669920e458..42a45f677775 100644 --- a/ydb/core/kqp/gateway/ya.make +++ b/ydb/core/kqp/gateway/ya.make @@ -32,3 +32,5 @@ RECURSE( local_rpc utils ) + +RECURSE_FOR_TESTS(ut) diff --git a/ydb/core/kqp/provider/read_attributes_utils.cpp b/ydb/core/kqp/provider/read_attributes_utils.cpp new file mode 100644 index 000000000000..ef75017e75f6 --- /dev/null +++ b/ydb/core/kqp/provider/read_attributes_utils.cpp @@ -0,0 +1,229 @@ +#include "read_attributes_utils.h" +#include +#include +#include +#include +#include + +namespace NYql { + +using namespace NNodes; + +class TGatheringAttributesVisitor : public IAstAttributesVisitor { + void VisitRead(TExprNode&, TString cluster, TString tablePath) override { + CurrentSource = &*Result.try_emplace(std::make_pair(cluster, tablePath)).first; + }; + + void ExitRead() override { + CurrentSource = nullptr; + }; + + void VisitAttribute(TString key, TString value) override { + Y_ABORT_UNLESS(CurrentSource, "cannot write %s: %s", key.c_str(), value.c_str()); + CurrentSource->second.try_emplace(key, value); + }; + + void VisitNonAttribute(TExprNode::TPtr) override {} + +public: + THashMap, THashMap> Result; + +private: + decltype(Result)::pointer CurrentSource = nullptr; +}; + +class TAttributesReplacingVisitor : public IAstAttributesVisitor { +public: + TAttributesReplacingVisitor(THashMap&& attributesBeforeFilter, + TStringBuf cluster, TStringBuf tablePath, + TKikimrTableMetadataPtr&& metadata, + TExprContext& ctx) + : GatheredAttributes{std::move(attributesBeforeFilter)} + , Cluster{cluster}, TablePath{tablePath} + , Ctx{ctx} + , Metadata{std::move(metadata)} + , NewAttributes{Metadata->Attributes} + {} + + void VisitRead(TExprNode& read, TString cluster, TString tablePath) override { + if (cluster == Cluster && tablePath == TablePath) { + Read = &read; + } + }; + + void ExitRead() override { + if (!Read) { + return; + } + if (!ReplacedUserchema && Metadata) { + Children.push_back(BuildSchemaFromMetadata(Read->Pos(), Ctx, Metadata->Columns)); + } + for (const auto& [key, value] : NewAttributes) { + Children.push_back(Ctx.NewList(Read->Pos(), { + Ctx.NewAtom(Read->Pos(), key), + Ctx.NewAtom(Read->Pos(), value), + })); + } + Read->Child(4)->ChangeChildrenInplace(std::move(Children)); + Read = nullptr; + }; + + void VisitAttribute(TString key, TString value) override { + if (!Read) { + return; + } + const bool gotNewAttributes = Metadata && !Metadata->Attributes.empty(); + + if (gotNewAttributes && GatheredAttributes.contains(key) && !Metadata->Attributes.contains(key)) { + return; + } + auto mbNewValue = Metadata->Attributes.FindPtr(key); + NewAttributes.erase(key); + + auto pos = Read->Pos(); + auto attribute = Ctx.NewList(pos, { + Ctx.NewAtom(pos, key), + Ctx.NewAtom(pos, mbNewValue ? *mbNewValue : value) + }); + Children.push_back(std::move(attribute)); + }; + + void VisitNonAttribute(TExprNode::TPtr node) override { + if (!Read) { + return; + } + if (!Metadata || Metadata->Columns.empty()) { + Children.push_back(std::move(node)); + return; + } + + auto nodeChildren = node->Children(); + if (!nodeChildren.empty() && nodeChildren[0]->IsAtom()) { + TCoAtom attrName{nodeChildren[0]}; + if (attrName.StringValue().equal("userschema")) { + node = BuildSchemaFromMetadata(Read->Pos(), Ctx, Metadata->Columns); + ReplacedUserchema = true; + } + } + Children.push_back(std::move(node)); + } + +private: + THashMap GatheredAttributes; + TStringBuf Cluster; + TStringBuf TablePath; + + TExprContext& Ctx; + TKikimrTableMetadataPtr Metadata; + TExprNode* Read = nullptr; + std::vector Children; + THashMap NewAttributes; + bool ReplacedUserchema = false; +}; + +namespace { + +std::optional> GetAsTextAttribute(const TExprNode& child) { + if (!child.IsList() || child.ChildrenSize() != 2) { + return std::nullopt; + } + if (!(child.Child(0)->IsAtom() && child.Child(1)->IsAtom())) { + return std::nullopt; + } + + TCoAtom attrKey{child.Child(0)}; + TCoAtom attrVal{child.Child(1)}; + + return std::make_optional(std::make_pair(attrKey.StringValue(), attrVal.StringValue())); +} + +} // namespace anonymous + +void ExtractReadAttributes(IAstAttributesVisitor& visitor, TExprNode& read, TExprContext& ctx) { + TraverseReadAttributes(visitor, *read.Child(0), ctx); + + TKiDataSource source(read.ChildPtr(1)); + TKikimrKey key{ctx}; + if (!key.Extract(*read.Child(2))) { + return; + } + auto cluster = source.Cluster().StringValue(); + auto tablePath = key.GetTablePath(); + + if (read.ChildrenSize() <= 4) { + return; + } + auto& astAttrs = *read.Child(4); + visitor.VisitRead(read, cluster, tablePath); + for (const auto& child : astAttrs.Children()) { + if (auto asAttribute = GetAsTextAttribute(*child)) { + visitor.VisitAttribute(asAttribute->first, asAttribute->second); + } else { + visitor.VisitNonAttribute(child); + } + } + visitor.ExitRead(); +} + +void TraverseReadAttributes(IAstAttributesVisitor& visitor, TExprNode& node, TExprContext& ctx) { + if (node.IsCallable(ReadName)) { + return ExtractReadAttributes(visitor, node, ctx); + } + if (node.IsCallable()) { + if (node.ChildrenSize() == 0) { + return; + } + return TraverseReadAttributes(visitor, *node.Child(0), ctx); + } +} + +THashMap, THashMap> GatherReadAttributes(TExprNode& node, TExprContext& ctx) { + TGatheringAttributesVisitor visitor; + TraverseReadAttributes(visitor, node, ctx); + return visitor.Result; +} + +void ReplaceReadAttributes(TExprNode& node, + THashMap attributesBeforeFilter, + TStringBuf cluster, TStringBuf tablePath, + TKikimrTableMetadataPtr metadata, + TExprContext& ctx) { + TAttributesReplacingVisitor visitor{std::move(attributesBeforeFilter), cluster, tablePath, std::move(metadata), ctx}; + TraverseReadAttributes(visitor, node, ctx); +} + +static Ydb::Type CreateYdbType(const NKikimr::NScheme::TTypeInfo& typeInfo, bool notNull) { + Ydb::Type ydbType; + if (typeInfo.GetTypeId() == NKikimr::NScheme::NTypeIds::Pg) { + auto* typeDesc = typeInfo.GetTypeDesc(); + auto* pg = ydbType.mutable_pg_type(); + pg->set_type_name(NKikimr::NPg::PgTypeNameFromTypeDesc(typeDesc)); + pg->set_oid(NKikimr::NPg::PgTypeIdFromTypeDesc(typeDesc)); + } else { + auto& item = notNull + ? ydbType + : *ydbType.mutable_optional_type()->mutable_item(); + item.set_type_id((Ydb::Type::PrimitiveTypeId)typeInfo.GetTypeId()); + } + return ydbType; +} + +TExprNode::TPtr BuildSchemaFromMetadata(TPositionHandle pos, TExprContext& ctx, const TMap& columns) { + TVector> typedColumns; + typedColumns.reserve(columns.size()); + for (const auto& [n, c] : columns) { + NYdb::TTypeParser parser(NYdb::TType(CreateYdbType(c.TypeInfo, c.NotNull))); + auto type = NFq::MakeType(parser, ctx); + typedColumns.emplace_back(n, type); + } + + const TString ysonSchema = NYql::NCommon::WriteTypeToYson(NFq::MakeStructType(typedColumns, ctx), NYson::EYsonFormat::Text); + TExprNode::TListType items; + auto schema = ctx.NewAtom(pos, ysonSchema); + auto type = ctx.NewCallable(pos, "SqlTypeFromYson"sv, { schema }); + auto order = ctx.NewCallable(pos, "SqlColumnOrderFromYson"sv, { schema }); + auto userSchema = ctx.NewAtom(pos, "userschema"sv); + return ctx.NewList(pos, {userSchema, type, order}); +} + +} // namespace NYql diff --git a/ydb/core/kqp/provider/read_attributes_utils.h b/ydb/core/kqp/provider/read_attributes_utils.h new file mode 100644 index 000000000000..08faba074ef2 --- /dev/null +++ b/ydb/core/kqp/provider/read_attributes_utils.h @@ -0,0 +1,29 @@ +#include +#include + +namespace NYql { + +class IAstAttributesVisitor { +public: + virtual ~IAstAttributesVisitor() = default; + + virtual void VisitRead(TExprNode& read, TString cluster, TString tablePath) = 0; + virtual void ExitRead() = 0; + + virtual void VisitAttribute(TString key, TString value) = 0; + + virtual void VisitNonAttribute(TExprNode::TPtr node) = 0; +}; + +void TraverseReadAttributes(IAstAttributesVisitor& visitor, TExprNode& node, TExprContext& ctx); + +THashMap, THashMap> GatherReadAttributes(TExprNode& node, TExprContext& ctx); + +void ReplaceReadAttributes(TExprNode& node, + THashMap attributesBeforeFilter, + TStringBuf cluster, TStringBuf tablePath, + TKikimrTableMetadataPtr metadata, + TExprContext& ctx); + +TExprNode::TPtr BuildSchemaFromMetadata(TPositionHandle pos, TExprContext& ctx, const TMap& columns); +} // namespace NYql diff --git a/ydb/core/kqp/provider/read_attributes_utils_ut.cpp b/ydb/core/kqp/provider/read_attributes_utils_ut.cpp new file mode 100644 index 000000000000..74e1b3c94bc7 --- /dev/null +++ b/ydb/core/kqp/provider/read_attributes_utils_ut.cpp @@ -0,0 +1,164 @@ +#include "read_attributes_utils.h" + +#include +#include + +#include + +namespace { +using namespace NYql; + +struct TAstHelper { + TExprNode::TPtr CreateReadNode(TStringBuf cluster, TStringBuf path, TExprNode::TListType attributes, TExprNode::TPtr world = nullptr) { + if (!world) { + world = Ctx.NewWorld(Pos); + } + return Ctx.NewCallable(Pos, ReadName, { + std::move(world), + Ctx.NewCallable(Pos, "DataSource", { + Ctx.NewAtom(Pos, "kikimr"), + Ctx.NewAtom(Pos, cluster), + }), + Ctx.NewCallable(Pos, "Key", { + Ctx.NewList(Pos, { + Ctx.NewAtom(Pos, "table"), + Ctx.NewCallable(Pos, "String", { + Ctx.NewAtom(Pos, path) + }), + }), + }), + Ctx.NewCallable(Pos, "Void", {}), + Ctx.NewList(Pos, std::move(attributes)), + }); + } + + TExprNode::TPtr NewAttribute(TStringBuf key, TStringBuf value) { + return NewList({ + NewAtom(key), + NewAtom(value), + }); + } + + TExprNode::TPtr NewAtom(TStringBuf value) { + return Ctx.NewAtom(Pos, value); + } + + TExprNode::TPtr NewCallable(TStringBuf name, TExprNode::TListType arguments) { + return Ctx.NewCallable(Pos, name, std::move(arguments)); + } + + TExprNode::TPtr NewList(TExprNode::TListType children) { + return Ctx.NewList(Pos, std::move(children)); + } + + TExprContext Ctx; + TPosition Pos; +}; + +void CheckAttributes(const THashMap, THashMap>& allAttributes, + TStringBuf cluster, TStringBuf path, THashMap expected) { + auto attributes = allAttributes.FindPtr(std::make_pair(cluster, path)); + UNIT_ASSERT(attributes); + ASSERT_THAT(*attributes, testing::ContainerEq(expected)); +} +} + +Y_UNIT_TEST_SUITE(ReadAttributesUtils) { + using namespace NYql; + + Y_UNIT_TEST(AttributesGatheringEmpry) { + TAstHelper helper; + auto node = helper.CreateReadNode("cluster", "path/to/file", {}); + auto allAttributes = GatherReadAttributes(*node, helper.Ctx); + UNIT_ASSERT_EQUAL(allAttributes.size(), 1); + + CheckAttributes(allAttributes, "cluster", "path/to/file", {}); + } + + Y_UNIT_TEST(AttributesGatheringFilter) { + TAstHelper helper; + auto node = helper.CreateReadNode("cluster", "path/to/file", { + helper.NewList({ + helper.NewAtom("should ignore"), + helper.NewAtom("because has"), + helper.NewAtom("three atoms"), + }), + helper.NewAttribute("good", "attribute"), + helper.NewList({ + helper.NewAtom("second is not atom, thus should be ignored"), + helper.NewCallable("callable", {}), + }), + helper.NewAttribute("also good", "attribute"), + helper.NewAtom("no children - not an attribute"), + helper.NewCallable("callable instead of list", { + helper.NewAtom("should"), + helper.NewAtom("ignore"), + }) + }); + auto allAttributes = GatherReadAttributes(*node, helper.Ctx); + UNIT_ASSERT_EQUAL(allAttributes.size(), 1); + + CheckAttributes(allAttributes, "cluster", "path/to/file", {{"good", "attribute"}, {"also good", "attribute"}}); + } + + Y_UNIT_TEST(AttributesGatheringRecursive) { + TAstHelper helper; + auto node = helper.CreateReadNode("cluster", "path/1", { + helper.NewAttribute("key11", "val11"), + }, helper.NewCallable("Left!", { + helper.CreateReadNode("cluster", "path/2", { + helper.NewAttribute("key21", "val21"), + helper.NewAttribute("key22", "val22"), + helper.NewAttribute("key23", "val23"), + }), + })); + auto allAttributes = GatherReadAttributes(*node, helper.Ctx); + UNIT_ASSERT_EQUAL(allAttributes.size(), 2); + + CheckAttributes(allAttributes, "cluster", "path/1", {{"key11", "val11"}}); + CheckAttributes(allAttributes, "cluster", "path/2", {{"key21", "val21"}, {"key22", "val22"}, {"key23", "val23"}}); + } + + Y_UNIT_TEST(ReplaceAttributesEmpty) { + TAstHelper helper; + auto node = helper.CreateReadNode("cluster", "path", { + helper.NewAttribute("key1", "val1"), + helper.NewAttribute("key2", "val2"), + }); + auto metadata = MakeIntrusive(); + // empty attributes in metadata should not change anything + ReplaceReadAttributes(*node, {{"key", "val"}}, "cluster", "path", metadata, helper.Ctx); + + auto allAttributes = GatherReadAttributes(*node, helper.Ctx); + CheckAttributes(allAttributes, "cluster", "path", {{"key1", "val1"}, {"key2", "val2"}}); + } + + Y_UNIT_TEST(ReplaceAttributesFilter) { + TAstHelper helper; + auto node = helper.CreateReadNode("cluster", "path/to/file", { + helper.NewList({ + helper.NewAtom("should ignore"), + helper.NewAtom("because has"), + helper.NewAtom("three atoms"), + }), + helper.NewAttribute("good", "attribute"), + helper.NewList({ + helper.NewAtom("second is not atom, thus should be ignored"), + helper.NewCallable("callable", {}), + }), + helper.NewAttribute("also good", "attribute"), + helper.NewAtom("no children - not an attribute"), + helper.NewCallable("callable instead of list", { + helper.NewAtom("should"), + helper.NewAtom("ignore"), + }) + }); + auto metadata = MakeIntrusive(); + // NB! changes to attributes from metadata, will result into replacing AST metadata + metadata->Attributes = {{"good", "still good"}, {"new key", "new value"}}; + ReplaceReadAttributes(*node, {{"good", "attribute"}, {"also good", "attribute"}}, "cluster", "path/to/file", metadata, helper.Ctx); + + auto allAttributes = GatherReadAttributes(*node, helper.Ctx); + CheckAttributes(allAttributes, "cluster", "path/to/file", metadata->Attributes); + } +} \ No newline at end of file diff --git a/ydb/core/kqp/provider/ut/ya.make b/ydb/core/kqp/provider/ut/ya.make index 7991978375bc..0b71054c6a86 100644 --- a/ydb/core/kqp/provider/ut/ya.make +++ b/ydb/core/kqp/provider/ut/ya.make @@ -3,12 +3,14 @@ UNITTEST_FOR(ydb/core/kqp/provider) SRCS( yql_kikimr_gateway_ut.cpp yql_kikimr_provider_ut.cpp + read_attributes_utils_ut.cpp ) PEERDIR( ydb/core/client/minikql_result_lib ydb/core/kqp/ut/common ydb/library/yql/sql/pg_dummy + library/cpp/testing/gmock_in_unittest ) YQL_LAST_ABI_VERSION() diff --git a/ydb/core/kqp/provider/ya.make b/ydb/core/kqp/provider/ya.make index 53820ae3a96a..d0908f343e3f 100644 --- a/ydb/core/kqp/provider/ya.make +++ b/ydb/core/kqp/provider/ya.make @@ -1,6 +1,7 @@ LIBRARY() SRCS( + read_attributes_utils.cpp rewrite_io_utils.cpp yql_kikimr_datasink.cpp yql_kikimr_datasource.cpp diff --git a/ydb/core/kqp/provider/yql_kikimr_datasource.cpp b/ydb/core/kqp/provider/yql_kikimr_datasource.cpp index 62c4b6c5394f..6b06f0d4a61b 100644 --- a/ydb/core/kqp/provider/yql_kikimr_datasource.cpp +++ b/ydb/core/kqp/provider/yql_kikimr_datasource.cpp @@ -1,3 +1,4 @@ +#include "read_attributes_utils.h" #include "rewrite_io_utils.h" #include "yql_kikimr_provider_impl.h" @@ -19,40 +20,6 @@ namespace NYql { -static Ydb::Type CreateYdbType(const NKikimr::NScheme::TTypeInfo& typeInfo, bool notNull) { - Ydb::Type ydbType; - if (typeInfo.GetTypeId() == NKikimr::NScheme::NTypeIds::Pg) { - auto* typeDesc = typeInfo.GetTypeDesc(); - auto* pg = ydbType.mutable_pg_type(); - pg->set_type_name(NKikimr::NPg::PgTypeNameFromTypeDesc(typeDesc)); - pg->set_oid(NKikimr::NPg::PgTypeIdFromTypeDesc(typeDesc)); - } else { - auto& item = notNull - ? ydbType - : *ydbType.mutable_optional_type()->mutable_item(); - item.set_type_id((Ydb::Type::PrimitiveTypeId)typeInfo.GetTypeId()); - } - return ydbType; -} - -TExprNode::TPtr BuildSchemaFromMetadata(TPositionHandle pos, TExprContext& ctx, const TMap& columns) { - TVector> typedColumns; - typedColumns.reserve(columns.size()); - for (const auto& [n, c] : columns) { - NYdb::TTypeParser parser(NYdb::TType(CreateYdbType(c.TypeInfo, c.NotNull))); - auto type = NFq::MakeType(parser, ctx); - typedColumns.emplace_back(n, type); - } - - const TString ysonSchema = NYql::NCommon::WriteTypeToYson(NFq::MakeStructType(typedColumns, ctx), NYson::EYsonFormat::Text); - TExprNode::TListType items; - auto schema = ctx.NewAtom(pos, ysonSchema); - auto type = ctx.NewCallable(pos, "SqlTypeFromYson"sv, { schema }); - auto order = ctx.NewCallable(pos, "SqlColumnOrderFromYson"sv, { schema }); - auto userSchema = ctx.NewAtom(pos, "userschema"sv); - return ctx.NewList(pos, {userSchema, type, order}); -} - TExprNode::TPtr BuildExternalTableSettings(TPositionHandle pos, TExprContext& ctx, const TMap& columns, const NKikimr::NExternalSource::IExternalSource::TPtr& source, const TString& content) { TExprNode::TListType items; items.emplace_back(BuildSchemaFromMetadata(pos, ctx, columns)); @@ -126,179 +93,6 @@ namespace { using namespace NKikimr; using namespace NNodes; -class IAstAttributesVisitor { -public: - virtual ~IAstAttributesVisitor() = default; - - virtual void VisitRead(TExprNode& read, TString cluster, TString tablePath) = 0; - virtual void ExitRead() = 0; - - virtual void VisitAttribute(TString key, TString value) = 0; - - virtual void VisitNonAttribute(TExprNode::TPtr node) = 0; -}; - -class TGatheringAttributesVisitor : public IAstAttributesVisitor { - void VisitRead(TExprNode&, TString cluster, TString tablePath) override { - CurrentSource = &*Result.try_emplace(std::make_pair(cluster, tablePath)).first; - }; - - void ExitRead() override { - CurrentSource = nullptr; - }; - - void VisitAttribute(TString key, TString value) override { - Y_ABORT_UNLESS(CurrentSource, "cannot write %s: %s", key.c_str(), value.c_str()); - CurrentSource->second.try_emplace(key, value); - }; - - void VisitNonAttribute(TExprNode::TPtr) override {} - -public: - THashMap, THashMap> Result; - -private: - decltype(Result)::pointer CurrentSource = nullptr; -}; - -class TAttributesReplacingVisitor : public IAstAttributesVisitor { -public: - TAttributesReplacingVisitor(THashMap attributesBeforeFilter, - TStringBuf cluster, TStringBuf tablePath, - TKikimrTableMetadataPtr metadata, - TExprContext& ctx) - : GatheredAttributes{std::move(attributesBeforeFilter)} - , Cluster{cluster}, TablePath{tablePath} - , Ctx{ctx} - , Metadata{std::move(metadata)} - {} - - void VisitRead(TExprNode& read, TString cluster, TString tablePath) override { - if (cluster == Cluster && tablePath == TablePath) { - Read = &read; - } - }; - - void ExitRead() override { - if (!Read) { - return; - } - if (!ReplacedUserchema && Metadata) { - Children.push_back(BuildSchemaFromMetadata(Read->Pos(), Ctx, Metadata->Columns)); - } - Read->Child(4)->ChangeChildrenInplace(std::move(Children)); - Read = nullptr; - }; - - void VisitAttribute(TString key, TString value) override { - if (!Read) { - return; - } - const bool gotNewAttributes = Metadata && !Metadata->Attributes.empty(); - - if (gotNewAttributes && GatheredAttributes.contains(key) && !Metadata->Attributes.contains(key)) { - return; - } - auto pos = Read->Pos(); - auto attribute = Ctx.NewList(pos, { - Ctx.NewAtom(pos, key), - Ctx.NewAtom(pos, value) - }); - Children.push_back(std::move(attribute)); - }; - - void VisitNonAttribute(TExprNode::TPtr node) override { - if (!Read) { - return; - } - if (!Metadata || Metadata->Columns.empty()) { - Children.push_back(std::move(node)); - return; - } - - auto nodeChildren = node->Children(); - if (!nodeChildren.empty() && nodeChildren[0]->IsAtom()) { - TCoAtom attrName{nodeChildren[0]}; - if (attrName.StringValue().equal("userschema")) { - node = BuildSchemaFromMetadata(Read->Pos(), Ctx, Metadata->Columns); - ReplacedUserchema = true; - } - } - Children.push_back(std::move(node)); - } - -private: - THashMap GatheredAttributes; - TStringBuf Cluster; - TStringBuf TablePath; - - TExprContext& Ctx; - TKikimrTableMetadataPtr Metadata; - TExprNode* Read = nullptr; - std::vector Children; - bool ReplacedUserchema = false; -}; - -std::optional> GetAsTextAttribute(const TExprNode& child) { - if (!child.IsList() || child.ChildrenSize() != 2) { - return std::nullopt; - } - if (!(child.Child(0)->IsAtom() && child.Child(1)->IsAtom())) { - return std::nullopt; - } - - TCoAtom attrKey{child.Child(0)}; - TCoAtom attrVal{child.Child(1)}; - - return std::make_optional(std::make_pair(attrKey.StringValue(), attrVal.StringValue())); -} - -void TraverseReadAttributes(IAstAttributesVisitor& visitor, TExprNode& node, TExprContext& ctx); - -void ExtractReadAttributes(IAstAttributesVisitor& visitor, TExprNode& read, TExprContext& ctx) { - TraverseReadAttributes(visitor, *read.Child(0), ctx); - - TKiDataSource source(read.ChildPtr(1)); - TKikimrKey key{ctx}; - if (!key.Extract(*read.Child(2))) { - return; - } - auto cluster = source.Cluster().StringValue(); - auto tablePath = key.GetTablePath(); - - if (read.ChildrenSize() <= 4) { - return; - } - auto& astAttrs = *read.Child(4); - visitor.VisitRead(read, cluster, tablePath); - for (const auto& child : astAttrs.Children()) { - if (auto asAttribute = GetAsTextAttribute(*child)) { - visitor.VisitAttribute(asAttribute->first, asAttribute->second); - } else { - visitor.VisitNonAttribute(child); - } - } - visitor.ExitRead(); -} - -void TraverseReadAttributes(IAstAttributesVisitor& visitor, TExprNode& node, TExprContext& ctx) { - if (node.IsCallable(ReadName)) { - return ExtractReadAttributes(visitor, node, ctx); - } - if (node.IsCallable()) { - if (node.ChildrenSize() == 0) { - return; - } - return TraverseReadAttributes(visitor, *node.Child(0), ctx); - } -} - -THashMap, THashMap> GatherReadAttributes(TExprNode& node, TExprContext& ctx) { - TGatheringAttributesVisitor visitor; - TraverseReadAttributes(visitor, node, ctx); - return visitor.Result; -} - class TKiSourceIntentDeterminationTransformer: public TKiSourceVisitorTransformer { public: TKiSourceIntentDeterminationTransformer(TIntrusivePtr sessionCtx) @@ -402,18 +196,23 @@ class TKiSourceLoadTableMetadataTransformer : public TGraphTransformerBase { size_t tablesCount = SessionCtx->Tables().GetTables().size(); TVector> futures; futures.reserve(tablesCount); - auto readAttributes = GatherReadAttributes(*input, ctx); + std::optional, THashMap>> readAttributes; for (auto& it : SessionCtx->Tables().GetTables()) { const TString& clusterName = it.first.first; const TString& tableName = it.first.second; TKikimrTableDescription& table = SessionCtx->Tables().GetTable(clusterName, tableName); - auto readAttrs = readAttributes.FindPtr(std::make_pair(clusterName, tableName)); if (table.Metadata || table.GetTableType() != ETableType::Table) { continue; } + const THashMap* readAttrs = nullptr; + if (!table.Metadata && table.GetTableType() == ETableType::ExternalTable) { + readAttributes = GatherReadAttributes(*input, ctx); + readAttrs = readAttributes->FindPtr(std::make_pair(clusterName, tableName)); + } + auto emplaceResult = LoadResults.emplace(std::make_pair(clusterName, tableName), std::make_shared()); @@ -545,8 +344,7 @@ class TKiSourceLoadTableMetadataTransformer : public TGraphTransformerBase { if (tableDesc->Metadata->Kind == EKikimrTableKind::External) { auto currentAttributes = gatheredAttributes.FindPtr(std::make_pair(cluster, tablePath)); if (currentAttributes && !currentAttributes->empty()) { - TAttributesReplacingVisitor replacer{*currentAttributes, cluster, tablePath, tableDesc->Metadata, ctx}; - TraverseReadAttributes(replacer, *input, ctx); + ReplaceReadAttributes(*input, *currentAttributes, cluster, tablePath, tableDesc->Metadata, ctx); } }