diff --git a/ydb/core/external_sources/external_data_source.cpp b/ydb/core/external_sources/external_data_source.cpp index e525eac6df85..e11c7cce2446 100644 --- a/ydb/core/external_sources/external_data_source.cpp +++ b/ydb/core/external_sources/external_data_source.cpp @@ -52,6 +52,14 @@ struct TExternalDataSource : public IExternalSource { ValidateHostname(HostnamePatterns, proto.GetLocation()); } + virtual NThreading::TFuture> LoadDynamicMetadata(std::shared_ptr meta) override { + return NThreading::MakeFuture(std::move(meta)); + } + + virtual bool CanLoadDynamicMetadata() const override { + return false; + } + private: const TString Name; const TVector AuthMethods; diff --git a/ydb/core/external_sources/external_source.h b/ydb/core/external_sources/external_source.h index 78121c7b0fc1..92c565c3c350 100644 --- a/ydb/core/external_sources/external_source.h +++ b/ydb/core/external_sources/external_source.h @@ -1,9 +1,11 @@ #pragma once +#include #include #include #include +#include #include namespace NKikimr::NExternalSource { @@ -11,6 +13,71 @@ namespace NKikimr::NExternalSource { struct TExternalSourceException: public yexception { }; +namespace NAuth { + +struct TNone { + static constexpr std::string_view Method = "NONE"; +}; + +struct TAws { + static constexpr std::string_view Method = "AWS"; + + TAws(const TString& accessKey, const TString& secretAccessKey, const TString& region) + : AccessKey{accessKey} + , SecretAccessKey{secretAccessKey} + , Region{region} + {} + + TString AccessKey; + TString SecretAccessKey; + TString Region; +}; + +struct TServiceAccount { + static constexpr std::string_view Method = "SERVICE_ACCOUNT"; + + TServiceAccount(TString serviceAccountId, TString serviceAccountIdSignature) + : ServiceAccountId{std::move(serviceAccountId)} + , ServiceAccountIdSignature{std::move(serviceAccountIdSignature)} + {} + + TString ServiceAccountId; + TString ServiceAccountIdSignature; +}; + +using TAuth = std::variant; + +std::string_view GetMethod(const TAuth& auth); + +inline TAuth MakeNone() { + return TAuth{std::in_place_type_t{}}; +} + +inline TAuth MakeServiceAccount(const TString& serviceAccountId, const TString& serviceAccountIdSignature) { + return TAuth{std::in_place_type_t{}, serviceAccountId, serviceAccountIdSignature}; +} + +inline TAuth MakeAws(const TString& accessKey, const TString& secretAccessKey, const TString& region) { + return TAuth{std::in_place_type_t{}, accessKey, secretAccessKey, region}; +} +} + +using TAuth = NAuth::TAuth; + +struct TMetadata { + bool Changed = false; + TString TableLocation; + TString DataSourceLocation; + TString DataSourcePath; + TString Type; + + THashMap Attributes; + + TAuth Auth; + + NKikimrExternalSources::TSchema Schema; +}; + struct IExternalSource : public TThrRefBase { using TPtr = TIntrusivePtr; @@ -54,6 +121,17 @@ struct IExternalSource : public TThrRefBase { If an error occurs, an exception is thrown. */ virtual void ValidateExternalDataSource(const TString& externalDataSourceDescription) const = 0; + + /* + Retrieve additional metadata from runtime data, enrich provided metadata + */ + virtual NThreading::TFuture> LoadDynamicMetadata(std::shared_ptr meta) = 0; + + /* + A method that should tell whether there is an implementation + of the previous method. + */ + virtual bool CanLoadDynamicMetadata() const = 0; }; } diff --git a/ydb/core/external_sources/external_source_factory.cpp b/ydb/core/external_sources/external_source_factory.cpp index 200dd47dabd1..fd8a0f863955 100644 --- a/ydb/core/external_sources/external_source_factory.cpp +++ b/ydb/core/external_sources/external_source_factory.cpp @@ -32,12 +32,12 @@ struct TExternalSourceFactory : public IExternalSourceFactory { } -IExternalSourceFactory::TPtr CreateExternalSourceFactory(const std::vector& hostnamePatterns, size_t pathsLimit) { +IExternalSourceFactory::TPtr CreateExternalSourceFactory(const std::vector& hostnamePatterns, NActors::TActorSystem* actorSystem, size_t pathsLimit) { std::vector hostnamePatternsRegEx(hostnamePatterns.begin(), hostnamePatterns.end()); return MakeIntrusive(TMap{ { ToString(NYql::EDatabaseType::ObjectStorage), - CreateObjectStorageExternalSource(hostnamePatternsRegEx, pathsLimit) + CreateObjectStorageExternalSource(hostnamePatternsRegEx, actorSystem, pathsLimit) }, { ToString(NYql::EDatabaseType::ClickHouse), diff --git a/ydb/core/external_sources/external_source_factory.h b/ydb/core/external_sources/external_source_factory.h index 9a7b3133d9e7..9fe78a60d562 100644 --- a/ydb/core/external_sources/external_source_factory.h +++ b/ydb/core/external_sources/external_source_factory.h @@ -10,6 +10,6 @@ struct IExternalSourceFactory : public TThrRefBase { virtual IExternalSource::TPtr GetOrCreate(const TString& type) const = 0; }; -IExternalSourceFactory::TPtr CreateExternalSourceFactory(const std::vector& hostnamePatterns, size_t pathsLimit = 50000); +IExternalSourceFactory::TPtr CreateExternalSourceFactory(const std::vector& hostnamePatterns, NActors::TActorSystem* actorSystem = nullptr, size_t pathsLimit = 50000); } diff --git a/ydb/core/external_sources/object_storage.cpp b/ydb/core/external_sources/object_storage.cpp index cb3cebe24aa7..62eee02e59db 100644 --- a/ydb/core/external_sources/object_storage.cpp +++ b/ydb/core/external_sources/object_storage.cpp @@ -2,9 +2,12 @@ #include "object_storage.h" #include "validation_functions.h" +#include #include #include +#include #include +#include #include #include #include @@ -20,9 +23,10 @@ namespace NKikimr::NExternalSource { namespace { struct TObjectStorageExternalSource : public IExternalSource { - explicit TObjectStorageExternalSource(const std::vector& hostnamePatterns, size_t pathsLimit) + explicit TObjectStorageExternalSource(const std::vector& hostnamePatterns, NActors::TActorSystem* actorSystem, size_t pathsLimit) : HostnamePatterns(hostnamePatterns) , PathsLimit(pathsLimit) + , ActorSystem(actorSystem) {} virtual TString Pack(const NKikimrExternalSources::TSchema& schema, @@ -255,6 +259,20 @@ struct TObjectStorageExternalSource : public IExternalSource { return issues; } + struct TMetadataResult : NYql::NCommon::TOperationResult { + std::shared_ptr Metadata; + }; + + virtual NThreading::TFuture> LoadDynamicMetadata(std::shared_ptr meta) override { + Y_UNUSED(ActorSystem); + // TODO: implement + return NThreading::MakeFuture(std::move(meta)); + } + + virtual bool CanLoadDynamicMetadata() const override { + return false; + } + private: static bool IsValidIntervalUnit(const TString& unit) { static constexpr std::array IntervalUnits = { @@ -474,12 +492,14 @@ struct TObjectStorageExternalSource : public IExternalSource { private: const std::vector HostnamePatterns; const size_t PathsLimit; + NActors::TActorSystem* ActorSystem = nullptr; }; } -IExternalSource::TPtr CreateObjectStorageExternalSource(const std::vector& hostnamePatterns, size_t pathsLimit) { - return MakeIntrusive(hostnamePatterns, pathsLimit); + +IExternalSource::TPtr CreateObjectStorageExternalSource(const std::vector& hostnamePatterns, NActors::TActorSystem* actorSystem, size_t pathsLimit) { + return MakeIntrusive(hostnamePatterns, actorSystem, pathsLimit); } NYql::TIssues Validate(const FederatedQuery::Schema& schema, const FederatedQuery::ObjectStorageBinding::Subset& objectStorage, size_t pathsLimit) { diff --git a/ydb/core/external_sources/object_storage.h b/ydb/core/external_sources/object_storage.h index e357be02d994..78765fc68cdf 100644 --- a/ydb/core/external_sources/object_storage.h +++ b/ydb/core/external_sources/object_storage.h @@ -8,7 +8,7 @@ namespace NKikimr::NExternalSource { -IExternalSource::TPtr CreateObjectStorageExternalSource(const std::vector& hostnamePatterns, size_t pathsLimit); +IExternalSource::TPtr CreateObjectStorageExternalSource(const std::vector& hostnamePatterns, NActors::TActorSystem* actorSystem, size_t pathsLimit); NYql::TIssues Validate(const FederatedQuery::Schema& schema, const FederatedQuery::ObjectStorageBinding::Subset& objectStorage, size_t pathsLimit); diff --git a/ydb/core/kqp/gateway/kqp_metadata_loader.cpp b/ydb/core/kqp/gateway/kqp_metadata_loader.cpp index 32ec2f48ef84..c1848433aae7 100644 --- a/ydb/core/kqp/gateway/kqp_metadata_loader.cpp +++ b/ydb/core/kqp/gateway/kqp_metadata_loader.cpp @@ -2,6 +2,7 @@ #include "actors/kqp_ic_gateway_actors.h" #include +#include #include #include #include @@ -9,6 +10,7 @@ #include #include +#include namespace NKikimr::NKqp { @@ -63,7 +65,7 @@ NavigateEntryResult CreateNavigateEntry(const std::pair& pair entry.Operation = NSchemeCache::TSchemeCacheNavigate::EOp::OpList; entry.SyncVersion = true; entry.ShowPrivatePath = settings.WithPrivateTables_; - return {entry, pair.second, std::nullopt}; + return {std::move(entry), pair.second, std::nullopt}; } std::optional CreateNavigateExternalEntry(const TString& path, bool externalDataSource) { @@ -279,6 +281,7 @@ TTableMetadataResult GetExternalDataSourceMetadataResult(const NSchemeCache::TSc tableMeta->ExternalSource.DataSourceAuth = description.GetAuth(); tableMeta->ExternalSource.Properties = description.GetProperties(); tableMeta->ExternalSource.DataSourcePath = tableName; + tableMeta->ExternalSource.TableLocation = JoinPath(entry.Path); return result; } @@ -470,6 +473,61 @@ NThreading::TFuture LoadExternalDataSo return DescribeExternalDataSourceSecrets(authDescription, userToken ? userToken->GetUserSID() : "", actorSystem, maximalSecretsSnapshotWaitTime); } +NExternalSource::TAuth MakeAuth(const NYql::TExternalSource& metadata) { + switch (metadata.DataSourceAuth.identity_case()) { + case NKikimrSchemeOp::TAuth::kNone: + return NExternalSource::NAuth::MakeNone(); + case NKikimrSchemeOp::TAuth::kServiceAccount: + return NExternalSource::NAuth::MakeServiceAccount(metadata.DataSourceAuth.GetServiceAccount().GetId(), metadata.ServiceAccountIdSignature); + case NKikimrSchemeOp::TAuth::kAws: + return NExternalSource::NAuth::MakeAws(metadata.AwsAccessKeyId, metadata.AwsSecretAccessKey, metadata.DataSourceAuth.GetAws().GetAwsRegion()); + case NKikimrSchemeOp::TAuth::kBasic: + case NKikimrSchemeOp::TAuth::kMdbBasic: + case NKikimrSchemeOp::TAuth::kToken: + case NKikimrSchemeOp::TAuth::IDENTITY_NOT_SET: + Y_ABORT("Unimplemented external source auth: %d", metadata.DataSourceAuth.identity_case()); + break; + } + Y_UNREACHABLE(); +} + +std::shared_ptr ConvertToExternalSourceMetadata(const NYql::TKikimrTableMetadata& tableMetadata) { + auto metadata = std::make_shared(); + metadata->TableLocation = tableMetadata.ExternalSource.TableLocation; + metadata->DataSourceLocation = tableMetadata.ExternalSource.DataSourceLocation; + metadata->DataSourcePath = tableMetadata.ExternalSource.DataSourcePath; + metadata->Attributes = tableMetadata.Attributes; + metadata->Auth = MakeAuth(tableMetadata.ExternalSource); + return metadata; +} + +// dynamic metadata from IExternalSource here is propagated into TKikimrTableMetadata, which will be returned as a result of LoadTableMetadata() +bool EnrichMetadata(NYql::TKikimrTableMetadata& tableMetadata, const NExternalSource::TMetadata& dynamicMetadata) { + ui32 id = 0; + for (const auto& column : dynamicMetadata.Schema.column()) { + Ydb::Type::PrimitiveTypeId typeId {}; + if (column.type().has_type_id()) { + typeId = column.type().type_id(); + } else if (column.type().has_optional_type()) { + typeId = column.type().optional_type().item().type_id(); + } else { + Y_ABORT_UNLESS(false); + } + const auto typeInfoMod = NScheme::TypeInfoModFromProtoColumnType(typeId, nullptr); + auto typeName = GetTypeName(typeInfoMod); + + tableMetadata.Columns.emplace( + column.name(), + NYql::TKikimrColumnMetadata( + column.name(), id, typeName, !column.type().has_optional_type(), typeInfoMod.TypeInfo, typeInfoMod.TypeMod + ) + ); + ++id; + } + tableMetadata.Attributes = dynamicMetadata.Attributes; + return true; +} + } // anonymous namespace @@ -680,9 +738,11 @@ NThreading::TFuture TKqpTableMetadataLoader::LoadTableMeta // In this syntax, information about path_in_external_system is already known and we only need information about external_data_source. // To do this, we go to the DefaultCluster and get information about external_data_source from scheme shard const bool resolveEntityInsideDataSource = (cluster != Cluster); + TMaybe externalPath; TPath entityName = id; if constexpr (std::is_same_v) { if (resolveEntityInsideDataSource) { + externalPath = entityName; entityName = cluster; } } else { @@ -720,7 +780,7 @@ NThreading::TFuture TKqpTableMetadataLoader::LoadTableMeta ActorSystem, schemeCacheId, ev.Release(), - [userToken, database, cluster, mainCluster = Cluster, table, settings, expectedSchemaVersion, this, queryName] + [userToken, database, cluster, mainCluster = Cluster, table, settings, expectedSchemaVersion, this, queryName, externalPath] (TPromise promise, TResponse&& response) mutable { try { @@ -759,16 +819,41 @@ NThreading::TFuture TKqpTableMetadataLoader::LoadTableMeta switch (entry.Kind) { case EKind::KindExternalDataSource: { + if (externalPath) { + entry.Path = SplitPath(*externalPath); + } auto externalDataSourceMetadata = GetLoadTableMetadataResult(entry, cluster, mainCluster, table); if (!externalDataSourceMetadata.Success() || !settings.RequestAuthInfo_) { promise.SetValue(externalDataSourceMetadata); return; } LoadExternalDataSourceSecretValues(entry, userToken, MaximalSecretsSnapshotWaitTime, ActorSystem) - .Subscribe([promise, externalDataSourceMetadata](const TFuture& result) mutable + .Subscribe([promise, externalDataSourceMetadata, settings](const TFuture& result) mutable { UpdateExternalDataSourceSecretsValue(externalDataSourceMetadata, result.GetValue()); - promise.SetValue(externalDataSourceMetadata); + NExternalSource::IExternalSource::TPtr externalSource; + if (settings.ExternalSourceFactory) { + externalSource = settings.ExternalSourceFactory->GetOrCreate(externalDataSourceMetadata.Metadata->ExternalSource.Type); + } + + if (externalSource && externalSource->CanLoadDynamicMetadata()) { + auto externalSourceMeta = ConvertToExternalSourceMetadata(*externalDataSourceMetadata.Metadata); + externalSourceMeta->Attributes = settings.ReadAttributes; // attributes, collected from AST + externalSource->LoadDynamicMetadata(std::move(externalSourceMeta)) + .Subscribe([promise = std::move(promise), externalDataSourceMetadata](const TFuture>& result) mutable { + TTableMetadataResult wrapper; + if (result.HasValue() && (!result.GetValue()->Changed || EnrichMetadata(*externalDataSourceMetadata.Metadata, *result.GetValue()))) { + wrapper.SetSuccess(); + wrapper.Metadata = externalDataSourceMetadata.Metadata; + } else { + // TODO: forward exception from result + wrapper.SetException(yexception() << "LoadDynamicMetadata failed"); + } + promise.SetValue(wrapper); + }); + } else { + promise.SetValue(externalDataSourceMetadata); + } }); break; } @@ -785,7 +870,8 @@ NThreading::TFuture TKqpTableMetadataLoader::LoadTableMeta .Apply([promise, externalTableMetadata](const TFuture& result) mutable { auto externalDataSourceMetadata = result.GetValue(); - promise.SetValue(EnrichExternalTable(externalTableMetadata, externalDataSourceMetadata)); + auto newMetadata = EnrichExternalTable(externalTableMetadata, externalDataSourceMetadata); + promise.SetValue(std::move(newMetadata)); }); break; } diff --git a/ydb/core/kqp/host/kqp_host.cpp b/ydb/core/kqp/host/kqp_host.cpp index bc13af0cccc6..86a45c002743 100644 --- a/ydb/core/kqp/host/kqp_host.cpp +++ b/ydb/core/kqp/host/kqp_host.cpp @@ -1064,7 +1064,7 @@ class TKqpHost : public IKqpHost { SessionCtx->SetTempTables(std::move(tempTablesState)); if (FederatedQuerySetup) { - ExternalSourceFactory = NExternalSource::CreateExternalSourceFactory({}, FederatedQuerySetup->S3GatewayConfig.GetGeneratorPathsLimit()); + ExternalSourceFactory = NExternalSource::CreateExternalSourceFactory({}, nullptr, FederatedQuerySetup->S3GatewayConfig.GetGeneratorPathsLimit()); } } diff --git a/ydb/core/kqp/provider/yql_kikimr_datasource.cpp b/ydb/core/kqp/provider/yql_kikimr_datasource.cpp index 2d40820b75d4..62c4b6c5394f 100644 --- a/ydb/core/kqp/provider/yql_kikimr_datasource.cpp +++ b/ydb/core/kqp/provider/yql_kikimr_datasource.cpp @@ -35,7 +35,7 @@ static Ydb::Type CreateYdbType(const NKikimr::NScheme::TTypeInfo& typeInfo, bool return ydbType; } -TExprNode::TPtr BuildExternalTableSettings(TPositionHandle pos, TExprContext& ctx, const TMap& columns, const NKikimr::NExternalSource::IExternalSource::TPtr& source, const TString& content) { +TExprNode::TPtr BuildSchemaFromMetadata(TPositionHandle pos, TExprContext& ctx, const TMap& columns) { TVector> typedColumns; typedColumns.reserve(columns.size()); for (const auto& [n, c] : columns) { @@ -50,7 +50,12 @@ TExprNode::TPtr BuildExternalTableSettings(TPositionHandle pos, TExprContext& ct auto type = ctx.NewCallable(pos, "SqlTypeFromYson"sv, { schema }); auto order = ctx.NewCallable(pos, "SqlColumnOrderFromYson"sv, { schema }); auto userSchema = ctx.NewAtom(pos, "userschema"sv); - items.emplace_back(ctx.NewList(pos, {userSchema, type, order})); + return ctx.NewList(pos, {userSchema, type, order}); +} + +TExprNode::TPtr BuildExternalTableSettings(TPositionHandle pos, TExprContext& ctx, const TMap& columns, const NKikimr::NExternalSource::IExternalSource::TPtr& source, const TString& content) { + TExprNode::TListType items; + items.emplace_back(BuildSchemaFromMetadata(pos, ctx, columns)); for (const auto& [key, values]: source->GetParameters(content)) { TExprNode::TListType children = {ctx.NewAtom(pos, NormalizeName(key))}; @@ -121,6 +126,179 @@ namespace { using namespace NKikimr; using namespace NNodes; +class IAstAttributesVisitor { +public: + virtual ~IAstAttributesVisitor() = default; + + virtual void VisitRead(TExprNode& read, TString cluster, TString tablePath) = 0; + virtual void ExitRead() = 0; + + virtual void VisitAttribute(TString key, TString value) = 0; + + virtual void VisitNonAttribute(TExprNode::TPtr node) = 0; +}; + +class TGatheringAttributesVisitor : public IAstAttributesVisitor { + void VisitRead(TExprNode&, TString cluster, TString tablePath) override { + CurrentSource = &*Result.try_emplace(std::make_pair(cluster, tablePath)).first; + }; + + void ExitRead() override { + CurrentSource = nullptr; + }; + + void VisitAttribute(TString key, TString value) override { + Y_ABORT_UNLESS(CurrentSource, "cannot write %s: %s", key.c_str(), value.c_str()); + CurrentSource->second.try_emplace(key, value); + }; + + void VisitNonAttribute(TExprNode::TPtr) override {} + +public: + THashMap, THashMap> Result; + +private: + decltype(Result)::pointer CurrentSource = nullptr; +}; + +class TAttributesReplacingVisitor : public IAstAttributesVisitor { +public: + TAttributesReplacingVisitor(THashMap attributesBeforeFilter, + TStringBuf cluster, TStringBuf tablePath, + TKikimrTableMetadataPtr metadata, + TExprContext& ctx) + : GatheredAttributes{std::move(attributesBeforeFilter)} + , Cluster{cluster}, TablePath{tablePath} + , Ctx{ctx} + , Metadata{std::move(metadata)} + {} + + void VisitRead(TExprNode& read, TString cluster, TString tablePath) override { + if (cluster == Cluster && tablePath == TablePath) { + Read = &read; + } + }; + + void ExitRead() override { + if (!Read) { + return; + } + if (!ReplacedUserchema && Metadata) { + Children.push_back(BuildSchemaFromMetadata(Read->Pos(), Ctx, Metadata->Columns)); + } + Read->Child(4)->ChangeChildrenInplace(std::move(Children)); + Read = nullptr; + }; + + void VisitAttribute(TString key, TString value) override { + if (!Read) { + return; + } + const bool gotNewAttributes = Metadata && !Metadata->Attributes.empty(); + + if (gotNewAttributes && GatheredAttributes.contains(key) && !Metadata->Attributes.contains(key)) { + return; + } + auto pos = Read->Pos(); + auto attribute = Ctx.NewList(pos, { + Ctx.NewAtom(pos, key), + Ctx.NewAtom(pos, value) + }); + Children.push_back(std::move(attribute)); + }; + + void VisitNonAttribute(TExprNode::TPtr node) override { + if (!Read) { + return; + } + if (!Metadata || Metadata->Columns.empty()) { + Children.push_back(std::move(node)); + return; + } + + auto nodeChildren = node->Children(); + if (!nodeChildren.empty() && nodeChildren[0]->IsAtom()) { + TCoAtom attrName{nodeChildren[0]}; + if (attrName.StringValue().equal("userschema")) { + node = BuildSchemaFromMetadata(Read->Pos(), Ctx, Metadata->Columns); + ReplacedUserchema = true; + } + } + Children.push_back(std::move(node)); + } + +private: + THashMap GatheredAttributes; + TStringBuf Cluster; + TStringBuf TablePath; + + TExprContext& Ctx; + TKikimrTableMetadataPtr Metadata; + TExprNode* Read = nullptr; + std::vector Children; + bool ReplacedUserchema = false; +}; + +std::optional> GetAsTextAttribute(const TExprNode& child) { + if (!child.IsList() || child.ChildrenSize() != 2) { + return std::nullopt; + } + if (!(child.Child(0)->IsAtom() && child.Child(1)->IsAtom())) { + return std::nullopt; + } + + TCoAtom attrKey{child.Child(0)}; + TCoAtom attrVal{child.Child(1)}; + + return std::make_optional(std::make_pair(attrKey.StringValue(), attrVal.StringValue())); +} + +void TraverseReadAttributes(IAstAttributesVisitor& visitor, TExprNode& node, TExprContext& ctx); + +void ExtractReadAttributes(IAstAttributesVisitor& visitor, TExprNode& read, TExprContext& ctx) { + TraverseReadAttributes(visitor, *read.Child(0), ctx); + + TKiDataSource source(read.ChildPtr(1)); + TKikimrKey key{ctx}; + if (!key.Extract(*read.Child(2))) { + return; + } + auto cluster = source.Cluster().StringValue(); + auto tablePath = key.GetTablePath(); + + if (read.ChildrenSize() <= 4) { + return; + } + auto& astAttrs = *read.Child(4); + visitor.VisitRead(read, cluster, tablePath); + for (const auto& child : astAttrs.Children()) { + if (auto asAttribute = GetAsTextAttribute(*child)) { + visitor.VisitAttribute(asAttribute->first, asAttribute->second); + } else { + visitor.VisitNonAttribute(child); + } + } + visitor.ExitRead(); +} + +void TraverseReadAttributes(IAstAttributesVisitor& visitor, TExprNode& node, TExprContext& ctx) { + if (node.IsCallable(ReadName)) { + return ExtractReadAttributes(visitor, node, ctx); + } + if (node.IsCallable()) { + if (node.ChildrenSize() == 0) { + return; + } + return TraverseReadAttributes(visitor, *node.Child(0), ctx); + } +} + +THashMap, THashMap> GatherReadAttributes(TExprNode& node, TExprContext& ctx) { + TGatheringAttributesVisitor visitor; + TraverseReadAttributes(visitor, node, ctx); + return visitor.Result; +} + class TKiSourceIntentDeterminationTransformer: public TKiSourceVisitorTransformer { public: TKiSourceIntentDeterminationTransformer(TIntrusivePtr sessionCtx) @@ -224,11 +402,13 @@ class TKiSourceLoadTableMetadataTransformer : public TGraphTransformerBase { size_t tablesCount = SessionCtx->Tables().GetTables().size(); TVector> futures; futures.reserve(tablesCount); + auto readAttributes = GatherReadAttributes(*input, ctx); for (auto& it : SessionCtx->Tables().GetTables()) { const TString& clusterName = it.first.first; const TString& tableName = it.first.second; TKikimrTableDescription& table = SessionCtx->Tables().GetTable(clusterName, tableName); + auto readAttrs = readAttributes.FindPtr(std::make_pair(clusterName, tableName)); if (table.Metadata || table.GetTableType() != ETableType::Table) { continue; @@ -247,6 +427,8 @@ class TKiSourceLoadTableMetadataTransformer : public TGraphTransformerBase { .WithPrivateTables(IsInternalCall) .WithExternalDatasources(SessionCtx->Config().FeatureFlags.GetEnableExternalDataSources()) .WithAuthInfo(table.GetNeedAuthInfo()) + .WithExternalSourceFactory(ExternalSourceFactory) + .WithReadAttributes(readAttrs ? std::move(*readAttrs) : THashMap{}) ); futures.push_back(future.Apply([result, queryType] @@ -327,18 +509,21 @@ class TKiSourceLoadTableMetadataTransformer : public TGraphTransformerBase { output = input; YQL_ENSURE(AsyncFuture.HasValue()); + auto gatheredAttributes = GatherReadAttributes(*input, ctx); for (auto& it : LoadResults) { const auto& table = it.first; IKikimrGateway::TTableMetadataResult& res = *it.second; if (res.Success()) { res.ReportIssues(ctx.IssueManager); - TKikimrTableDescription* tableDesc; + TString cluster = it.first.first; + TString tablePath; if (res.Metadata->Temporary) { - tableDesc = &SessionCtx->Tables().GetTable(it.first.first, *res.Metadata->QueryName); + tablePath = *res.Metadata->QueryName; } else { - tableDesc = &SessionCtx->Tables().GetTable(it.first.first, it.first.second); + tablePath = it.first.second; } + TKikimrTableDescription* tableDesc = &SessionCtx->Tables().GetTable(cluster, tablePath); YQL_ENSURE(res.Metadata); tableDesc->Metadata = res.Metadata; @@ -357,6 +542,14 @@ class TKiSourceLoadTableMetadataTransformer : public TGraphTransformerBase { return TStatus::Error; } + if (tableDesc->Metadata->Kind == EKikimrTableKind::External) { + auto currentAttributes = gatheredAttributes.FindPtr(std::make_pair(cluster, tablePath)); + if (currentAttributes && !currentAttributes->empty()) { + TAttributesReplacingVisitor replacer{*currentAttributes, cluster, tablePath, tableDesc->Metadata, ctx}; + TraverseReadAttributes(replacer, *input, ctx); + } + } + if (!AddCluster(table, res, input, ctx)) { return TStatus::Error; } @@ -384,6 +577,7 @@ class TKiSourceLoadTableMetadataTransformer : public TGraphTransformerBase { return TStatus::Error; } } + output = input; LoadResults.clear(); return TStatus::Ok; @@ -750,7 +944,6 @@ class TKikimrDataSource : public TDataProviderBase { .Add(ctx.NewCallable(node->Pos(), "MrTableConcat", {newKey})) .Add(ctx.NewCallable(node->Pos(), "Void", {})) .Add(BuildExternalTableSettings(node->Pos(), ctx, tableDesc.Metadata->Columns, source, tableDesc.Metadata->ExternalSource.TableContent)) - .Build() .Done().Ptr(); auto retChildren = node->ChildrenList(); diff --git a/ydb/core/kqp/provider/yql_kikimr_gateway.h b/ydb/core/kqp/provider/yql_kikimr_gateway.h index b7a2f08113df..a5c666a23ab1 100644 --- a/ydb/core/kqp/provider/yql_kikimr_gateway.h +++ b/ydb/core/kqp/provider/yql_kikimr_gateway.h @@ -15,6 +15,7 @@ #include #include +#include #include #include #include @@ -881,6 +882,18 @@ class IKikimrGateway : public TThrRefBase { return *this; } + TLoadTableMetadataSettings& WithExternalSourceFactory(NKikimr::NExternalSource::IExternalSourceFactory::TPtr factory) { + ExternalSourceFactory = std::move(factory); + return *this; + } + + TLoadTableMetadataSettings& WithReadAttributes(THashMap options) { + ReadAttributes = std::move(options); + return *this; + } + + NKikimr::NExternalSource::IExternalSourceFactory::TPtr ExternalSourceFactory; + THashMap ReadAttributes; bool RequestStats_ = false; bool WithPrivateTables_ = false; bool WithExternalDatasources_ = false; diff --git a/ydb/core/tx/schemeshard/schemeshard_impl.cpp b/ydb/core/tx/schemeshard/schemeshard_impl.cpp index a9d8f20bfeeb..d4d22f7a63e2 100644 --- a/ydb/core/tx/schemeshard/schemeshard_impl.cpp +++ b/ydb/core/tx/schemeshard/schemeshard_impl.cpp @@ -6890,6 +6890,7 @@ void TSchemeShard::ApplyConsoleConfigs(const NKikimrConfig::TAppConfig& appConfi const auto& hostnamePatterns = appConfig.GetQueryServiceConfig().GetHostnamePatterns(); ExternalSourceFactory = NExternalSource::CreateExternalSourceFactory( std::vector(hostnamePatterns.begin(), hostnamePatterns.end()), + nullptr, appConfig.GetQueryServiceConfig().GetS3().GetGeneratorPathsLimit() ); }