diff --git a/ydb/core/external_sources/external_data_source.cpp b/ydb/core/external_sources/external_data_source.cpp index e525eac6df85..e11c7cce2446 100644 --- a/ydb/core/external_sources/external_data_source.cpp +++ b/ydb/core/external_sources/external_data_source.cpp @@ -52,6 +52,14 @@ struct TExternalDataSource : public IExternalSource { ValidateHostname(HostnamePatterns, proto.GetLocation()); } + virtual NThreading::TFuture> LoadDynamicMetadata(std::shared_ptr meta) override { + return NThreading::MakeFuture(std::move(meta)); + } + + virtual bool CanLoadDynamicMetadata() const override { + return false; + } + private: const TString Name; const TVector AuthMethods; diff --git a/ydb/core/external_sources/external_source.h b/ydb/core/external_sources/external_source.h index 78121c7b0fc1..92c565c3c350 100644 --- a/ydb/core/external_sources/external_source.h +++ b/ydb/core/external_sources/external_source.h @@ -1,9 +1,11 @@ #pragma once +#include #include #include #include +#include #include namespace NKikimr::NExternalSource { @@ -11,6 +13,71 @@ namespace NKikimr::NExternalSource { struct TExternalSourceException: public yexception { }; +namespace NAuth { + +struct TNone { + static constexpr std::string_view Method = "NONE"; +}; + +struct TAws { + static constexpr std::string_view Method = "AWS"; + + TAws(const TString& accessKey, const TString& secretAccessKey, const TString& region) + : AccessKey{accessKey} + , SecretAccessKey{secretAccessKey} + , Region{region} + {} + + TString AccessKey; + TString SecretAccessKey; + TString Region; +}; + +struct TServiceAccount { + static constexpr std::string_view Method = "SERVICE_ACCOUNT"; + + TServiceAccount(TString serviceAccountId, TString serviceAccountIdSignature) + : ServiceAccountId{std::move(serviceAccountId)} + , ServiceAccountIdSignature{std::move(serviceAccountIdSignature)} + {} + + TString ServiceAccountId; + TString ServiceAccountIdSignature; +}; + +using TAuth = std::variant; + +std::string_view GetMethod(const TAuth& auth); + +inline TAuth MakeNone() { + return TAuth{std::in_place_type_t{}}; +} + +inline TAuth MakeServiceAccount(const TString& serviceAccountId, const TString& serviceAccountIdSignature) { + return TAuth{std::in_place_type_t{}, serviceAccountId, serviceAccountIdSignature}; +} + +inline TAuth MakeAws(const TString& accessKey, const TString& secretAccessKey, const TString& region) { + return TAuth{std::in_place_type_t{}, accessKey, secretAccessKey, region}; +} +} + +using TAuth = NAuth::TAuth; + +struct TMetadata { + bool Changed = false; + TString TableLocation; + TString DataSourceLocation; + TString DataSourcePath; + TString Type; + + THashMap Attributes; + + TAuth Auth; + + NKikimrExternalSources::TSchema Schema; +}; + struct IExternalSource : public TThrRefBase { using TPtr = TIntrusivePtr; @@ -54,6 +121,17 @@ struct IExternalSource : public TThrRefBase { If an error occurs, an exception is thrown. */ virtual void ValidateExternalDataSource(const TString& externalDataSourceDescription) const = 0; + + /* + Retrieve additional metadata from runtime data, enrich provided metadata + */ + virtual NThreading::TFuture> LoadDynamicMetadata(std::shared_ptr meta) = 0; + + /* + A method that should tell whether there is an implementation + of the previous method. + */ + virtual bool CanLoadDynamicMetadata() const = 0; }; } diff --git a/ydb/core/external_sources/external_source_factory.cpp b/ydb/core/external_sources/external_source_factory.cpp index 49279c20dcf9..f3e845218197 100644 --- a/ydb/core/external_sources/external_source_factory.cpp +++ b/ydb/core/external_sources/external_source_factory.cpp @@ -32,12 +32,12 @@ struct TExternalSourceFactory : public IExternalSourceFactory { } -IExternalSourceFactory::TPtr CreateExternalSourceFactory(const std::vector& hostnamePatterns, size_t pathsLimit) { +IExternalSourceFactory::TPtr CreateExternalSourceFactory(const std::vector& hostnamePatterns, NActors::TActorSystem* actorSystem, size_t pathsLimit) { std::vector hostnamePatternsRegEx(hostnamePatterns.begin(), hostnamePatterns.end()); return MakeIntrusive(TMap{ { ToString(NYql::EDatabaseType::ObjectStorage), - CreateObjectStorageExternalSource(hostnamePatternsRegEx, pathsLimit) + CreateObjectStorageExternalSource(hostnamePatternsRegEx, actorSystem, pathsLimit) }, { ToString(NYql::EDatabaseType::ClickHouse), diff --git a/ydb/core/external_sources/external_source_factory.h b/ydb/core/external_sources/external_source_factory.h index 9a7b3133d9e7..9fe78a60d562 100644 --- a/ydb/core/external_sources/external_source_factory.h +++ b/ydb/core/external_sources/external_source_factory.h @@ -10,6 +10,6 @@ struct IExternalSourceFactory : public TThrRefBase { virtual IExternalSource::TPtr GetOrCreate(const TString& type) const = 0; }; -IExternalSourceFactory::TPtr CreateExternalSourceFactory(const std::vector& hostnamePatterns, size_t pathsLimit = 50000); +IExternalSourceFactory::TPtr CreateExternalSourceFactory(const std::vector& hostnamePatterns, NActors::TActorSystem* actorSystem = nullptr, size_t pathsLimit = 50000); } diff --git a/ydb/core/external_sources/object_storage.cpp b/ydb/core/external_sources/object_storage.cpp index cb3cebe24aa7..62eee02e59db 100644 --- a/ydb/core/external_sources/object_storage.cpp +++ b/ydb/core/external_sources/object_storage.cpp @@ -2,9 +2,12 @@ #include "object_storage.h" #include "validation_functions.h" +#include #include #include +#include #include +#include #include #include #include @@ -20,9 +23,10 @@ namespace NKikimr::NExternalSource { namespace { struct TObjectStorageExternalSource : public IExternalSource { - explicit TObjectStorageExternalSource(const std::vector& hostnamePatterns, size_t pathsLimit) + explicit TObjectStorageExternalSource(const std::vector& hostnamePatterns, NActors::TActorSystem* actorSystem, size_t pathsLimit) : HostnamePatterns(hostnamePatterns) , PathsLimit(pathsLimit) + , ActorSystem(actorSystem) {} virtual TString Pack(const NKikimrExternalSources::TSchema& schema, @@ -255,6 +259,20 @@ struct TObjectStorageExternalSource : public IExternalSource { return issues; } + struct TMetadataResult : NYql::NCommon::TOperationResult { + std::shared_ptr Metadata; + }; + + virtual NThreading::TFuture> LoadDynamicMetadata(std::shared_ptr meta) override { + Y_UNUSED(ActorSystem); + // TODO: implement + return NThreading::MakeFuture(std::move(meta)); + } + + virtual bool CanLoadDynamicMetadata() const override { + return false; + } + private: static bool IsValidIntervalUnit(const TString& unit) { static constexpr std::array IntervalUnits = { @@ -474,12 +492,14 @@ struct TObjectStorageExternalSource : public IExternalSource { private: const std::vector HostnamePatterns; const size_t PathsLimit; + NActors::TActorSystem* ActorSystem = nullptr; }; } -IExternalSource::TPtr CreateObjectStorageExternalSource(const std::vector& hostnamePatterns, size_t pathsLimit) { - return MakeIntrusive(hostnamePatterns, pathsLimit); + +IExternalSource::TPtr CreateObjectStorageExternalSource(const std::vector& hostnamePatterns, NActors::TActorSystem* actorSystem, size_t pathsLimit) { + return MakeIntrusive(hostnamePatterns, actorSystem, pathsLimit); } NYql::TIssues Validate(const FederatedQuery::Schema& schema, const FederatedQuery::ObjectStorageBinding::Subset& objectStorage, size_t pathsLimit) { diff --git a/ydb/core/external_sources/object_storage.h b/ydb/core/external_sources/object_storage.h index e357be02d994..78765fc68cdf 100644 --- a/ydb/core/external_sources/object_storage.h +++ b/ydb/core/external_sources/object_storage.h @@ -8,7 +8,7 @@ namespace NKikimr::NExternalSource { -IExternalSource::TPtr CreateObjectStorageExternalSource(const std::vector& hostnamePatterns, size_t pathsLimit); +IExternalSource::TPtr CreateObjectStorageExternalSource(const std::vector& hostnamePatterns, NActors::TActorSystem* actorSystem, size_t pathsLimit); NYql::TIssues Validate(const FederatedQuery::Schema& schema, const FederatedQuery::ObjectStorageBinding::Subset& objectStorage, size_t pathsLimit); diff --git a/ydb/core/external_sources/object_storage_ut.cpp b/ydb/core/external_sources/object_storage_ut.cpp index b93907e6b7ae..e5bc5c406221 100644 --- a/ydb/core/external_sources/object_storage_ut.cpp +++ b/ydb/core/external_sources/object_storage_ut.cpp @@ -8,14 +8,14 @@ namespace NKikimr { Y_UNIT_TEST_SUITE(ObjectStorageTest) { Y_UNIT_TEST(SuccessValidation) { - auto source = NExternalSource::CreateObjectStorageExternalSource({}, 1000); + auto source = NExternalSource::CreateObjectStorageExternalSource({}, nullptr, 1000); NKikimrExternalSources::TSchema schema; NKikimrExternalSources::TGeneral general; UNIT_ASSERT_NO_EXCEPTION(source->Pack(schema, general)); } Y_UNIT_TEST(FailedCreate) { - auto source = NExternalSource::CreateObjectStorageExternalSource({}, 1000); + auto source = NExternalSource::CreateObjectStorageExternalSource({}, nullptr, 1000); NKikimrExternalSources::TSchema schema; NKikimrExternalSources::TGeneral general; general.mutable_attributes()->insert({"a", "b"}); @@ -23,7 +23,7 @@ Y_UNIT_TEST_SUITE(ObjectStorageTest) { } Y_UNIT_TEST(FailedValidation) { - auto source = NExternalSource::CreateObjectStorageExternalSource({}, 1000); + auto source = NExternalSource::CreateObjectStorageExternalSource({}, nullptr, 1000); NKikimrExternalSources::TSchema schema; NKikimrExternalSources::TGeneral general; general.mutable_attributes()->insert({"projection.h", "b"}); diff --git a/ydb/core/grpc_services/ya.make b/ydb/core/grpc_services/ya.make index e2bc378b161d..452657fb80f1 100644 --- a/ydb/core/grpc_services/ya.make +++ b/ydb/core/grpc_services/ya.make @@ -106,6 +106,7 @@ PEERDIR( ydb/core/io_formats/ydb_dump ydb/core/kesus/tablet ydb/core/kqp/common + ydb/core/kqp/session_actor ydb/core/protos ydb/core/scheme ydb/core/sys_view diff --git a/ydb/core/kqp/gateway/behaviour/external_data_source/ya.make b/ydb/core/kqp/gateway/behaviour/external_data_source/ya.make index e7ed0ff336e9..9d6441c58912 100644 --- a/ydb/core/kqp/gateway/behaviour/external_data_source/ya.make +++ b/ydb/core/kqp/gateway/behaviour/external_data_source/ya.make @@ -9,6 +9,7 @@ PEERDIR( ydb/services/metadata/initializer ydb/services/metadata/abstract ydb/core/kqp/gateway/actors + ydb/core/kqp/federated_query ydb/core/kqp/gateway/utils ydb/core/kqp/gateway/behaviour/tablestore/operations ) diff --git a/ydb/core/kqp/gateway/kqp_metadata_loader.cpp b/ydb/core/kqp/gateway/kqp_metadata_loader.cpp index a76b64f98bca..40e8974628a4 100644 --- a/ydb/core/kqp/gateway/kqp_metadata_loader.cpp +++ b/ydb/core/kqp/gateway/kqp_metadata_loader.cpp @@ -2,6 +2,7 @@ #include "actors/kqp_ic_gateway_actors.h" #include +#include #include #include #include @@ -9,6 +10,7 @@ #include #include +#include namespace NKikimr::NKqp { @@ -64,7 +66,7 @@ NavigateEntryResult CreateNavigateEntry(const std::pair& pair entry.Operation = NSchemeCache::TSchemeCacheNavigate::EOp::OpList; entry.SyncVersion = true; entry.ShowPrivatePath = settings.WithPrivateTables_; - return {entry, pair.second, std::nullopt}; + return {std::move(entry), pair.second, std::nullopt}; } std::optional CreateNavigateExternalEntry(const TString& path, bool externalDataSource) { @@ -280,6 +282,7 @@ TTableMetadataResult GetExternalDataSourceMetadataResult(const NSchemeCache::TSc tableMeta->ExternalSource.DataSourceAuth = description.GetAuth(); tableMeta->ExternalSource.Properties = description.GetProperties(); tableMeta->ExternalSource.DataSourcePath = tableName; + tableMeta->ExternalSource.TableLocation = JoinPath(entry.Path); return result; } @@ -465,6 +468,66 @@ NThreading::TFuture LoadExternalDataSo } // anonymous namespace +NExternalSource::TAuth MakeAuth(const NYql::TExternalSource& metadata) { + switch (metadata.DataSourceAuth.identity_case()) { + case NKikimrSchemeOp::TAuth::IDENTITY_NOT_SET: + case NKikimrSchemeOp::TAuth::kNone: + return NExternalSource::NAuth::MakeNone(); + case NKikimrSchemeOp::TAuth::kServiceAccount: + return NExternalSource::NAuth::MakeServiceAccount(metadata.DataSourceAuth.GetServiceAccount().GetId(), metadata.ServiceAccountIdSignature); + case NKikimrSchemeOp::TAuth::kAws: + return NExternalSource::NAuth::MakeAws(metadata.AwsAccessKeyId, metadata.AwsSecretAccessKey, metadata.DataSourceAuth.GetAws().GetAwsRegion()); + case NKikimrSchemeOp::TAuth::kBasic: + case NKikimrSchemeOp::TAuth::kMdbBasic: + case NKikimrSchemeOp::TAuth::kToken: + Y_ABORT("Unimplemented external source auth: %d", metadata.DataSourceAuth.identity_case()); + break; + } + Y_UNREACHABLE(); +} + +std::shared_ptr ConvertToExternalSourceMetadata(const NYql::TKikimrTableMetadata& tableMetadata) { + auto metadata = std::make_shared(); + metadata->TableLocation = tableMetadata.ExternalSource.TableLocation; + metadata->DataSourceLocation = tableMetadata.ExternalSource.DataSourceLocation; + metadata->DataSourcePath = tableMetadata.ExternalSource.DataSourcePath; + metadata->Type = tableMetadata.ExternalSource.Type; + metadata->Attributes = tableMetadata.Attributes; + metadata->Auth = MakeAuth(tableMetadata.ExternalSource); + return metadata; +} + +// dynamic metadata from IExternalSource here is propagated into TKikimrTableMetadata, which will be returned as a result of LoadTableMetadata() +bool EnrichMetadata(NYql::TKikimrTableMetadata& tableMetadata, const NExternalSource::TMetadata& dynamicMetadata) { + ui32 id = 0; + for (const auto& column : dynamicMetadata.Schema.column()) { + Ydb::Type::PrimitiveTypeId typeId {}; + if (column.type().has_type_id()) { + typeId = column.type().type_id(); + } else if (column.type().has_optional_type()) { + typeId = column.type().optional_type().item().type_id(); + } else { + Y_ABORT_UNLESS(false); + } + const auto typeInfoMod = NScheme::TypeInfoModFromProtoColumnType(typeId, nullptr); + auto typeName = GetTypeName(typeInfoMod); + + tableMetadata.Columns.emplace( + column.name(), + NYql::TKikimrColumnMetadata( + column.name(), id, typeName, !column.type().has_optional_type(), typeInfoMod.TypeInfo, typeInfoMod.TypeMod + ) + ); + ++id; + } + tableMetadata.Attributes = dynamicMetadata.Attributes; + tableMetadata.ExternalSource.TableLocation = dynamicMetadata.TableLocation; + tableMetadata.ExternalSource.DataSourceLocation = dynamicMetadata.DataSourceLocation; + tableMetadata.ExternalSource.DataSourcePath = dynamicMetadata.DataSourcePath; + tableMetadata.ExternalSource.Type = dynamicMetadata.Type; + return true; +} + TVector TKqpTableMetadataLoader::GetCollectedSchemeData() { TVector result(std::move(CollectedSchemeData)); @@ -673,9 +736,11 @@ NThreading::TFuture TKqpTableMetadataLoader::LoadTableMeta // In this syntax, information about path_in_external_system is already known and we only need information about external_data_source. // To do this, we go to the DefaultCluster and get information about external_data_source from scheme shard const bool resolveEntityInsideDataSource = (cluster != Cluster); + TMaybe externalPath; TPath entityName = id; if constexpr (std::is_same_v) { if (resolveEntityInsideDataSource) { + externalPath = entityName; entityName = cluster; } } else { @@ -713,7 +778,7 @@ NThreading::TFuture TKqpTableMetadataLoader::LoadTableMeta ActorSystem, schemeCacheId, ev.Release(), - [userToken, database, cluster, mainCluster = Cluster, table, settings, expectedSchemaVersion, this, queryName] + [userToken, database, cluster, mainCluster = Cluster, table, settings, expectedSchemaVersion, this, queryName, externalPath] (TPromise promise, TResponse&& response) mutable { try { @@ -757,16 +822,45 @@ NThreading::TFuture TKqpTableMetadataLoader::LoadTableMeta switch (entry.Kind) { case EKind::KindExternalDataSource: { + if (externalPath) { + entry.Path = SplitPath(*externalPath); + } auto externalDataSourceMetadata = GetLoadTableMetadataResult(entry, cluster, mainCluster, table); if (!externalDataSourceMetadata.Success() || !settings.RequestAuthInfo_) { promise.SetValue(externalDataSourceMetadata); return; } LoadExternalDataSourceSecretValues(entry, userToken, MaximalSecretsSnapshotWaitTime, ActorSystem) - .Subscribe([promise, externalDataSourceMetadata](const TFuture& result) mutable + .Subscribe([promise, externalDataSourceMetadata, settings](const TFuture& result) mutable { UpdateExternalDataSourceSecretsValue(externalDataSourceMetadata, result.GetValue()); - promise.SetValue(externalDataSourceMetadata); + NExternalSource::IExternalSource::TPtr externalSource; + if (settings.ExternalSourceFactory) { + externalSource = settings.ExternalSourceFactory->GetOrCreate(externalDataSourceMetadata.Metadata->ExternalSource.Type); + } + + if (externalSource && externalSource->CanLoadDynamicMetadata()) { + auto externalSourceMeta = ConvertToExternalSourceMetadata(*externalDataSourceMetadata.Metadata); + externalSourceMeta->Attributes = settings.ReadAttributes; // attributes, collected from AST + externalSource->LoadDynamicMetadata(std::move(externalSourceMeta)) + .Subscribe([promise = std::move(promise), externalDataSourceMetadata](const TFuture>& result) mutable { + TTableMetadataResult wrapper; + try { + auto& dynamicMetadata = result.GetValue(); + if (!dynamicMetadata->Changed || EnrichMetadata(*externalDataSourceMetadata.Metadata, *dynamicMetadata)) { + wrapper.SetSuccess(); + wrapper.Metadata = externalDataSourceMetadata.Metadata; + } else { + wrapper.SetException(yexception() << "couldn't enrich metadata with dynamically loaded part"); + } + } catch (const std::exception& exception) { + wrapper.SetException(yexception() << "couldn't load table metadata: " << exception.what()); + } + promise.SetValue(wrapper); + }); + } else { + promise.SetValue(externalDataSourceMetadata); + } }); break; } @@ -783,7 +877,8 @@ NThreading::TFuture TKqpTableMetadataLoader::LoadTableMeta .Apply([promise, externalTableMetadata](const TFuture& result) mutable { auto externalDataSourceMetadata = result.GetValue(); - promise.SetValue(EnrichExternalTable(externalTableMetadata, externalDataSourceMetadata)); + auto newMetadata = EnrichExternalTable(externalTableMetadata, externalDataSourceMetadata); + promise.SetValue(std::move(newMetadata)); }); break; } diff --git a/ydb/core/kqp/gateway/kqp_metadata_loader.h b/ydb/core/kqp/gateway/kqp_metadata_loader.h index 798e9cca88b8..6cb0ff907413 100644 --- a/ydb/core/kqp/gateway/kqp_metadata_loader.h +++ b/ydb/core/kqp/gateway/kqp_metadata_loader.h @@ -11,6 +11,11 @@ namespace NKikimr::NKqp { +// only exposed to be unit-tested +NExternalSource::TAuth MakeAuth(const NYql::TExternalSource& metadata); +std::shared_ptr ConvertToExternalSourceMetadata(const NYql::TKikimrTableMetadata& tableMetadata); +bool EnrichMetadata(NYql::TKikimrTableMetadata& tableMetadata, const NExternalSource::TMetadata& dynamicMetadata); + class TKqpTableMetadataLoader : public NYql::IKikimrGateway::IKqpTableMetadataLoader { public: diff --git a/ydb/core/kqp/gateway/ut/metadata_conversion.cpp b/ydb/core/kqp/gateway/ut/metadata_conversion.cpp new file mode 100644 index 000000000000..f86239bbc6fd --- /dev/null +++ b/ydb/core/kqp/gateway/ut/metadata_conversion.cpp @@ -0,0 +1,78 @@ +#include + +#include +#include + +using namespace NKikimr; + +TEST(MetadataConversion, MakeAuthTest) { + NYql::TExternalSource externalSource; + auto auth = NKqp::MakeAuth(externalSource); + ASSERT_TRUE(std::holds_alternative(auth)); + + externalSource.DataSourceAuth.MutableNone(); + auth = NKqp::MakeAuth(externalSource); + ASSERT_TRUE(std::holds_alternative(auth)); + + externalSource.DataSourceAuth.ClearNone(); + auto& serviceAccount = *externalSource.DataSourceAuth.MutableServiceAccount(); + { + serviceAccount.SetId("sa-id"); + serviceAccount.SetSecretName("sa-name-of-secret"); + externalSource.ServiceAccountIdSignature = "sa-id-signature"; + } + auth = NKqp::MakeAuth(externalSource); + ASSERT_TRUE(std::holds_alternative(auth)); + { + auto& saAuth = std::get(auth); + ASSERT_EQ(saAuth.ServiceAccountId, "sa-id"); + ASSERT_EQ(saAuth.ServiceAccountIdSignature, "sa-id-signature"); + } + + externalSource.DataSourceAuth.ClearServiceAccount(); + auto& awsAccount = *externalSource.DataSourceAuth.MutableAws(); + { + awsAccount.SetAwsRegion("aws-test"); + awsAccount.SetAwsAccessKeyIdSecretName("aws-ak-secret-name"); + awsAccount.SetAwsSecretAccessKeySecretName("aws-sak-secret-name"); + externalSource.AwsAccessKeyId = "aws-ak"; + externalSource.AwsSecretAccessKey = "aws-sak"; + } + auth = NKqp::MakeAuth(externalSource); + ASSERT_TRUE(std::holds_alternative(auth)); + { + auto& awsAuth = std::get(auth); + ASSERT_EQ(awsAuth.Region, "aws-test"); + ASSERT_EQ(awsAuth.AccessKey, "aws-ak"); + ASSERT_EQ(awsAuth.SecretAccessKey, "aws-sak"); + } +} + +TEST(MetadataConversion, ConvertingExternalSourceMetadata) { + NYql::TExternalSource externalSource{ + .Type = "type", + .TableLocation = "table-loc", + .DataSourcePath = "ds-path", + .DataSourceLocation = "ds-loc", + }; + THashMap attributes{{"key1", "val1"}, {"key2", "val2"}}; + + std::shared_ptr externalMetadata; + { + NYql::TKikimrTableMetadata tableMetadata; + tableMetadata.ExternalSource = externalSource; + tableMetadata.Attributes = attributes; + externalMetadata = NKqp::ConvertToExternalSourceMetadata(tableMetadata); + } + ASSERT_TRUE(externalMetadata); + ASSERT_TRUE(std::holds_alternative(externalMetadata->Auth)); + + NYql::TKikimrTableMetadata tableMetadata; + ASSERT_TRUE(NKqp::EnrichMetadata(tableMetadata, *externalMetadata)); + + ASSERT_EQ(tableMetadata.ExternalSource.Type, externalSource.Type); + ASSERT_EQ(tableMetadata.ExternalSource.TableLocation, externalSource.TableLocation); + ASSERT_EQ(tableMetadata.ExternalSource.DataSourcePath, externalSource.DataSourcePath); + ASSERT_EQ(tableMetadata.ExternalSource.DataSourceLocation, externalSource.DataSourceLocation); + ASSERT_THAT(tableMetadata.Attributes, testing::ContainerEq(attributes)); +} diff --git a/ydb/core/kqp/gateway/ut/ya.make b/ydb/core/kqp/gateway/ut/ya.make new file mode 100644 index 000000000000..8490d2025802 --- /dev/null +++ b/ydb/core/kqp/gateway/ut/ya.make @@ -0,0 +1,17 @@ +GTEST() + +SRCS( + metadata_conversion.cpp +) + +PEERDIR( + ydb/core/kqp/gateway + ydb/library/yql/parser/pg_wrapper + ydb/library/yql/public/udf/service/stub + ydb/services/metadata +) + +YQL_LAST_ABI_VERSION() + +END() + diff --git a/ydb/core/kqp/gateway/ya.make b/ydb/core/kqp/gateway/ya.make index 5d669920e458..42a45f677775 100644 --- a/ydb/core/kqp/gateway/ya.make +++ b/ydb/core/kqp/gateway/ya.make @@ -32,3 +32,5 @@ RECURSE( local_rpc utils ) + +RECURSE_FOR_TESTS(ut) diff --git a/ydb/core/kqp/host/kqp_host.cpp b/ydb/core/kqp/host/kqp_host.cpp index ab0d1d99bb3c..a0450bc0fec4 100644 --- a/ydb/core/kqp/host/kqp_host.cpp +++ b/ydb/core/kqp/host/kqp_host.cpp @@ -985,7 +985,7 @@ class TKqpHost : public IKqpHost { SessionCtx->SetTempTables(std::move(tempTablesState)); if (FederatedQuerySetup) { - ExternalSourceFactory = NExternalSource::CreateExternalSourceFactory({}, FederatedQuerySetup->S3GatewayConfig.GetGeneratorPathsLimit()); + ExternalSourceFactory = NExternalSource::CreateExternalSourceFactory({}, ActorSystem, FederatedQuerySetup->S3GatewayConfig.GetGeneratorPathsLimit()); } } diff --git a/ydb/core/kqp/provider/read_attributes_utils.cpp b/ydb/core/kqp/provider/read_attributes_utils.cpp new file mode 100644 index 000000000000..c261c2c494e5 --- /dev/null +++ b/ydb/core/kqp/provider/read_attributes_utils.cpp @@ -0,0 +1,229 @@ +#include "read_attributes_utils.h" +#include +#include +#include +#include +#include + +namespace NYql { + +using namespace NNodes; + +class TGatheringAttributesVisitor : public IAstAttributesVisitor { + void VisitRead(TExprNode&, TString cluster, TString tablePath) override { + CurrentSource = &*Result.try_emplace(std::make_pair(cluster, tablePath)).first; + }; + + void ExitRead() override { + CurrentSource = nullptr; + }; + + void VisitAttribute(TString key, TString value) override { + Y_ABORT_UNLESS(CurrentSource, "cannot write %s: %s", key.c_str(), value.c_str()); + CurrentSource->second.try_emplace(key, value); + }; + + void VisitNonAttribute(TExprNode::TPtr) override {} + +public: + THashMap, THashMap> Result; + +private: + decltype(Result)::pointer CurrentSource = nullptr; +}; + +class TAttributesReplacingVisitor : public IAstAttributesVisitor { +public: + TAttributesReplacingVisitor(THashMap&& attributesBeforeFilter, + TStringBuf cluster, TStringBuf tablePath, + TKikimrTableMetadataPtr&& metadata, + TExprContext& ctx) + : GatheredAttributes{std::move(attributesBeforeFilter)} + , Cluster{cluster}, TablePath{tablePath} + , Ctx{ctx} + , Metadata{std::move(metadata)} + , NewAttributes{Metadata->Attributes} + {} + + void VisitRead(TExprNode& read, TString cluster, TString tablePath) override { + if (cluster == Cluster && tablePath == TablePath) { + Read = &read; + } + }; + + void ExitRead() override { + if (!Read) { + return; + } + if (!ReplacedUserchema && Metadata && !Metadata->Columns.empty()) { + Children.push_back(BuildSchemaFromMetadata(Read->Pos(), Ctx, Metadata->Columns)); + } + for (const auto& [key, value] : NewAttributes) { + Children.push_back(Ctx.NewList(Read->Pos(), { + Ctx.NewAtom(Read->Pos(), key), + Ctx.NewAtom(Read->Pos(), value), + })); + } + Read->Child(4)->ChangeChildrenInplace(std::move(Children)); + Read = nullptr; + }; + + void VisitAttribute(TString key, TString value) override { + if (!Read) { + return; + } + const bool gotNewAttributes = Metadata && !Metadata->Attributes.empty(); + + if (gotNewAttributes && GatheredAttributes.contains(key) && !Metadata->Attributes.contains(key)) { + return; + } + auto mbNewValue = Metadata->Attributes.FindPtr(key); + NewAttributes.erase(key); + + auto pos = Read->Pos(); + auto attribute = Ctx.NewList(pos, { + Ctx.NewAtom(pos, key), + Ctx.NewAtom(pos, mbNewValue ? *mbNewValue : value) + }); + Children.push_back(std::move(attribute)); + }; + + void VisitNonAttribute(TExprNode::TPtr node) override { + if (!Read) { + return; + } + if (!Metadata || Metadata->Columns.empty()) { + Children.push_back(std::move(node)); + return; + } + + auto nodeChildren = node->Children(); + if (!nodeChildren.empty() && nodeChildren[0]->IsAtom()) { + TCoAtom attrName{nodeChildren[0]}; + if (attrName.StringValue().equal("userschema")) { + node = BuildSchemaFromMetadata(Read->Pos(), Ctx, Metadata->Columns); + ReplacedUserchema = true; + } + } + Children.push_back(std::move(node)); + } + +private: + THashMap GatheredAttributes; + TStringBuf Cluster; + TStringBuf TablePath; + + TExprContext& Ctx; + TKikimrTableMetadataPtr Metadata; + TExprNode* Read = nullptr; + std::vector Children; + THashMap NewAttributes; + bool ReplacedUserchema = false; +}; + +namespace { + +std::optional> GetAsTextAttribute(const TExprNode& child) { + if (!child.IsList() || child.ChildrenSize() != 2) { + return std::nullopt; + } + if (!(child.Child(0)->IsAtom() && child.Child(1)->IsAtom())) { + return std::nullopt; + } + + TCoAtom attrKey{child.Child(0)}; + TCoAtom attrVal{child.Child(1)}; + + return std::make_optional(std::make_pair(attrKey.StringValue(), attrVal.StringValue())); +} + +} // namespace anonymous + +void ExtractReadAttributes(IAstAttributesVisitor& visitor, TExprNode& read, TExprContext& ctx) { + TraverseReadAttributes(visitor, *read.Child(0), ctx); + + TKiDataSource source(read.ChildPtr(1)); + TKikimrKey key{ctx}; + if (!key.Extract(*read.Child(2))) { + return; + } + auto cluster = source.Cluster().StringValue(); + auto tablePath = key.GetTablePath(); + + if (read.ChildrenSize() <= 4) { + return; + } + auto& astAttrs = *read.Child(4); + visitor.VisitRead(read, cluster, tablePath); + for (const auto& child : astAttrs.Children()) { + if (auto asAttribute = GetAsTextAttribute(*child)) { + visitor.VisitAttribute(asAttribute->first, asAttribute->second); + } else { + visitor.VisitNonAttribute(child); + } + } + visitor.ExitRead(); +} + +void TraverseReadAttributes(IAstAttributesVisitor& visitor, TExprNode& node, TExprContext& ctx) { + if (node.IsCallable(ReadName)) { + return ExtractReadAttributes(visitor, node, ctx); + } + if (node.IsCallable()) { + if (node.ChildrenSize() == 0) { + return; + } + return TraverseReadAttributes(visitor, *node.Child(0), ctx); + } +} + +THashMap, THashMap> GatherReadAttributes(TExprNode& node, TExprContext& ctx) { + TGatheringAttributesVisitor visitor; + TraverseReadAttributes(visitor, node, ctx); + return visitor.Result; +} + +void ReplaceReadAttributes(TExprNode& node, + THashMap attributesBeforeFilter, + TStringBuf cluster, TStringBuf tablePath, + TKikimrTableMetadataPtr metadata, + TExprContext& ctx) { + TAttributesReplacingVisitor visitor{std::move(attributesBeforeFilter), cluster, tablePath, std::move(metadata), ctx}; + TraverseReadAttributes(visitor, node, ctx); +} + +static Ydb::Type CreateYdbType(const NKikimr::NScheme::TTypeInfo& typeInfo, bool notNull) { + Ydb::Type ydbType; + if (typeInfo.GetTypeId() == NKikimr::NScheme::NTypeIds::Pg) { + auto* typeDesc = typeInfo.GetTypeDesc(); + auto* pg = ydbType.mutable_pg_type(); + pg->set_type_name(NKikimr::NPg::PgTypeNameFromTypeDesc(typeDesc)); + pg->set_oid(NKikimr::NPg::PgTypeIdFromTypeDesc(typeDesc)); + } else { + auto& item = notNull + ? ydbType + : *ydbType.mutable_optional_type()->mutable_item(); + item.set_type_id((Ydb::Type::PrimitiveTypeId)typeInfo.GetTypeId()); + } + return ydbType; +} + +TExprNode::TPtr BuildSchemaFromMetadata(TPositionHandle pos, TExprContext& ctx, const TMap& columns) { + TVector> typedColumns; + typedColumns.reserve(columns.size()); + for (const auto& [n, c] : columns) { + NYdb::TTypeParser parser(NYdb::TType(CreateYdbType(c.TypeInfo, c.NotNull))); + auto type = NFq::MakeType(parser, ctx); + typedColumns.emplace_back(n, type); + } + + const TString ysonSchema = NYql::NCommon::WriteTypeToYson(NFq::MakeStructType(typedColumns, ctx), NYson::EYsonFormat::Text); + TExprNode::TListType items; + auto schema = ctx.NewAtom(pos, ysonSchema); + auto type = ctx.NewCallable(pos, "SqlTypeFromYson"sv, { schema }); + auto order = ctx.NewCallable(pos, "SqlColumnOrderFromYson"sv, { schema }); + auto userSchema = ctx.NewAtom(pos, "userschema"sv); + return ctx.NewList(pos, {userSchema, type, order}); +} + +} // namespace NYql diff --git a/ydb/core/kqp/provider/read_attributes_utils.h b/ydb/core/kqp/provider/read_attributes_utils.h new file mode 100644 index 000000000000..08faba074ef2 --- /dev/null +++ b/ydb/core/kqp/provider/read_attributes_utils.h @@ -0,0 +1,29 @@ +#include +#include + +namespace NYql { + +class IAstAttributesVisitor { +public: + virtual ~IAstAttributesVisitor() = default; + + virtual void VisitRead(TExprNode& read, TString cluster, TString tablePath) = 0; + virtual void ExitRead() = 0; + + virtual void VisitAttribute(TString key, TString value) = 0; + + virtual void VisitNonAttribute(TExprNode::TPtr node) = 0; +}; + +void TraverseReadAttributes(IAstAttributesVisitor& visitor, TExprNode& node, TExprContext& ctx); + +THashMap, THashMap> GatherReadAttributes(TExprNode& node, TExprContext& ctx); + +void ReplaceReadAttributes(TExprNode& node, + THashMap attributesBeforeFilter, + TStringBuf cluster, TStringBuf tablePath, + TKikimrTableMetadataPtr metadata, + TExprContext& ctx); + +TExprNode::TPtr BuildSchemaFromMetadata(TPositionHandle pos, TExprContext& ctx, const TMap& columns); +} // namespace NYql diff --git a/ydb/core/kqp/provider/read_attributes_utils_ut.cpp b/ydb/core/kqp/provider/read_attributes_utils_ut.cpp new file mode 100644 index 000000000000..74e1b3c94bc7 --- /dev/null +++ b/ydb/core/kqp/provider/read_attributes_utils_ut.cpp @@ -0,0 +1,164 @@ +#include "read_attributes_utils.h" + +#include +#include + +#include + +namespace { +using namespace NYql; + +struct TAstHelper { + TExprNode::TPtr CreateReadNode(TStringBuf cluster, TStringBuf path, TExprNode::TListType attributes, TExprNode::TPtr world = nullptr) { + if (!world) { + world = Ctx.NewWorld(Pos); + } + return Ctx.NewCallable(Pos, ReadName, { + std::move(world), + Ctx.NewCallable(Pos, "DataSource", { + Ctx.NewAtom(Pos, "kikimr"), + Ctx.NewAtom(Pos, cluster), + }), + Ctx.NewCallable(Pos, "Key", { + Ctx.NewList(Pos, { + Ctx.NewAtom(Pos, "table"), + Ctx.NewCallable(Pos, "String", { + Ctx.NewAtom(Pos, path) + }), + }), + }), + Ctx.NewCallable(Pos, "Void", {}), + Ctx.NewList(Pos, std::move(attributes)), + }); + } + + TExprNode::TPtr NewAttribute(TStringBuf key, TStringBuf value) { + return NewList({ + NewAtom(key), + NewAtom(value), + }); + } + + TExprNode::TPtr NewAtom(TStringBuf value) { + return Ctx.NewAtom(Pos, value); + } + + TExprNode::TPtr NewCallable(TStringBuf name, TExprNode::TListType arguments) { + return Ctx.NewCallable(Pos, name, std::move(arguments)); + } + + TExprNode::TPtr NewList(TExprNode::TListType children) { + return Ctx.NewList(Pos, std::move(children)); + } + + TExprContext Ctx; + TPosition Pos; +}; + +void CheckAttributes(const THashMap, THashMap>& allAttributes, + TStringBuf cluster, TStringBuf path, THashMap expected) { + auto attributes = allAttributes.FindPtr(std::make_pair(cluster, path)); + UNIT_ASSERT(attributes); + ASSERT_THAT(*attributes, testing::ContainerEq(expected)); +} +} + +Y_UNIT_TEST_SUITE(ReadAttributesUtils) { + using namespace NYql; + + Y_UNIT_TEST(AttributesGatheringEmpry) { + TAstHelper helper; + auto node = helper.CreateReadNode("cluster", "path/to/file", {}); + auto allAttributes = GatherReadAttributes(*node, helper.Ctx); + UNIT_ASSERT_EQUAL(allAttributes.size(), 1); + + CheckAttributes(allAttributes, "cluster", "path/to/file", {}); + } + + Y_UNIT_TEST(AttributesGatheringFilter) { + TAstHelper helper; + auto node = helper.CreateReadNode("cluster", "path/to/file", { + helper.NewList({ + helper.NewAtom("should ignore"), + helper.NewAtom("because has"), + helper.NewAtom("three atoms"), + }), + helper.NewAttribute("good", "attribute"), + helper.NewList({ + helper.NewAtom("second is not atom, thus should be ignored"), + helper.NewCallable("callable", {}), + }), + helper.NewAttribute("also good", "attribute"), + helper.NewAtom("no children - not an attribute"), + helper.NewCallable("callable instead of list", { + helper.NewAtom("should"), + helper.NewAtom("ignore"), + }) + }); + auto allAttributes = GatherReadAttributes(*node, helper.Ctx); + UNIT_ASSERT_EQUAL(allAttributes.size(), 1); + + CheckAttributes(allAttributes, "cluster", "path/to/file", {{"good", "attribute"}, {"also good", "attribute"}}); + } + + Y_UNIT_TEST(AttributesGatheringRecursive) { + TAstHelper helper; + auto node = helper.CreateReadNode("cluster", "path/1", { + helper.NewAttribute("key11", "val11"), + }, helper.NewCallable("Left!", { + helper.CreateReadNode("cluster", "path/2", { + helper.NewAttribute("key21", "val21"), + helper.NewAttribute("key22", "val22"), + helper.NewAttribute("key23", "val23"), + }), + })); + auto allAttributes = GatherReadAttributes(*node, helper.Ctx); + UNIT_ASSERT_EQUAL(allAttributes.size(), 2); + + CheckAttributes(allAttributes, "cluster", "path/1", {{"key11", "val11"}}); + CheckAttributes(allAttributes, "cluster", "path/2", {{"key21", "val21"}, {"key22", "val22"}, {"key23", "val23"}}); + } + + Y_UNIT_TEST(ReplaceAttributesEmpty) { + TAstHelper helper; + auto node = helper.CreateReadNode("cluster", "path", { + helper.NewAttribute("key1", "val1"), + helper.NewAttribute("key2", "val2"), + }); + auto metadata = MakeIntrusive(); + // empty attributes in metadata should not change anything + ReplaceReadAttributes(*node, {{"key", "val"}}, "cluster", "path", metadata, helper.Ctx); + + auto allAttributes = GatherReadAttributes(*node, helper.Ctx); + CheckAttributes(allAttributes, "cluster", "path", {{"key1", "val1"}, {"key2", "val2"}}); + } + + Y_UNIT_TEST(ReplaceAttributesFilter) { + TAstHelper helper; + auto node = helper.CreateReadNode("cluster", "path/to/file", { + helper.NewList({ + helper.NewAtom("should ignore"), + helper.NewAtom("because has"), + helper.NewAtom("three atoms"), + }), + helper.NewAttribute("good", "attribute"), + helper.NewList({ + helper.NewAtom("second is not atom, thus should be ignored"), + helper.NewCallable("callable", {}), + }), + helper.NewAttribute("also good", "attribute"), + helper.NewAtom("no children - not an attribute"), + helper.NewCallable("callable instead of list", { + helper.NewAtom("should"), + helper.NewAtom("ignore"), + }) + }); + auto metadata = MakeIntrusive(); + // NB! changes to attributes from metadata, will result into replacing AST metadata + metadata->Attributes = {{"good", "still good"}, {"new key", "new value"}}; + ReplaceReadAttributes(*node, {{"good", "attribute"}, {"also good", "attribute"}}, "cluster", "path/to/file", metadata, helper.Ctx); + + auto allAttributes = GatherReadAttributes(*node, helper.Ctx); + CheckAttributes(allAttributes, "cluster", "path/to/file", metadata->Attributes); + } +} \ No newline at end of file diff --git a/ydb/core/kqp/provider/ut/ya.make b/ydb/core/kqp/provider/ut/ya.make index 7991978375bc..0b71054c6a86 100644 --- a/ydb/core/kqp/provider/ut/ya.make +++ b/ydb/core/kqp/provider/ut/ya.make @@ -3,12 +3,14 @@ UNITTEST_FOR(ydb/core/kqp/provider) SRCS( yql_kikimr_gateway_ut.cpp yql_kikimr_provider_ut.cpp + read_attributes_utils_ut.cpp ) PEERDIR( ydb/core/client/minikql_result_lib ydb/core/kqp/ut/common ydb/library/yql/sql/pg_dummy + library/cpp/testing/gmock_in_unittest ) YQL_LAST_ABI_VERSION() diff --git a/ydb/core/kqp/provider/ya.make b/ydb/core/kqp/provider/ya.make index 95ced6214c37..0fbf87c97eb7 100644 --- a/ydb/core/kqp/provider/ya.make +++ b/ydb/core/kqp/provider/ya.make @@ -1,6 +1,7 @@ LIBRARY() SRCS( + read_attributes_utils.cpp rewrite_io_utils.cpp yql_kikimr_datasink.cpp yql_kikimr_datasource.cpp diff --git a/ydb/core/kqp/provider/yql_kikimr_datasource.cpp b/ydb/core/kqp/provider/yql_kikimr_datasource.cpp index 3ba7d447d957..2b3215bf44ea 100644 --- a/ydb/core/kqp/provider/yql_kikimr_datasource.cpp +++ b/ydb/core/kqp/provider/yql_kikimr_datasource.cpp @@ -1,3 +1,4 @@ +#include "read_attributes_utils.h" #include "rewrite_io_utils.h" #include "yql_kikimr_provider_impl.h" @@ -19,38 +20,9 @@ namespace NYql { -static Ydb::Type CreateYdbType(const NKikimr::NScheme::TTypeInfo& typeInfo, bool notNull) { - Ydb::Type ydbType; - if (typeInfo.GetTypeId() == NKikimr::NScheme::NTypeIds::Pg) { - auto* typeDesc = typeInfo.GetTypeDesc(); - auto* pg = ydbType.mutable_pg_type(); - pg->set_type_name(NKikimr::NPg::PgTypeNameFromTypeDesc(typeDesc)); - pg->set_oid(NKikimr::NPg::PgTypeIdFromTypeDesc(typeDesc)); - } else { - auto& item = notNull - ? ydbType - : *ydbType.mutable_optional_type()->mutable_item(); - item.set_type_id((Ydb::Type::PrimitiveTypeId)typeInfo.GetTypeId()); - } - return ydbType; -} - TExprNode::TPtr BuildExternalTableSettings(TPositionHandle pos, TExprContext& ctx, const TMap& columns, const NKikimr::NExternalSource::IExternalSource::TPtr& source, const TString& content) { - TVector> typedColumns; - typedColumns.reserve(columns.size()); - for (const auto& [n, c] : columns) { - NYdb::TTypeParser parser(NYdb::TType(CreateYdbType(c.TypeInfo, c.NotNull))); - auto type = NFq::MakeType(parser, ctx); - typedColumns.emplace_back(n, type); - } - - const TString ysonSchema = NYql::NCommon::WriteTypeToYson(NFq::MakeStructType(typedColumns, ctx), NYson::EYsonFormat::Text); TExprNode::TListType items; - auto schema = ctx.NewAtom(pos, ysonSchema); - auto type = ctx.NewCallable(pos, "SqlTypeFromYson"sv, { schema }); - auto order = ctx.NewCallable(pos, "SqlColumnOrderFromYson"sv, { schema }); - auto userSchema = ctx.NewAtom(pos, "userschema"sv); - items.emplace_back(ctx.NewList(pos, {userSchema, type, order})); + items.emplace_back(BuildSchemaFromMetadata(pos, ctx, columns)); for (const auto& [key, values]: source->GetParameters(content)) { TExprNode::TListType children = {ctx.NewAtom(pos, NormalizeName(key))}; @@ -216,6 +188,7 @@ class TKiSourceLoadTableMetadataTransformer : public TGraphTransformerBase { size_t tablesCount = SessionCtx->Tables().GetTables().size(); TVector> futures; futures.reserve(tablesCount); + std::optional, THashMap>> readAttributes; for (auto& it : SessionCtx->Tables().GetTables()) { const TString& clusterName = it.first.first; @@ -226,6 +199,12 @@ class TKiSourceLoadTableMetadataTransformer : public TGraphTransformerBase { continue; } + const THashMap* readAttrs = nullptr; + if (!table.Metadata && table.GetTableType() == ETableType::ExternalTable) { + readAttributes = GatherReadAttributes(*input, ctx); + readAttrs = readAttributes->FindPtr(std::make_pair(clusterName, tableName)); + } + auto emplaceResult = LoadResults.emplace(std::make_pair(clusterName, tableName), std::make_shared()); @@ -239,6 +218,8 @@ class TKiSourceLoadTableMetadataTransformer : public TGraphTransformerBase { .WithPrivateTables(IsInternalCall) .WithExternalDatasources(SessionCtx->Config().FeatureFlags.GetEnableExternalDataSources()) .WithAuthInfo(table.GetNeedAuthInfo()) + .WithExternalSourceFactory(ExternalSourceFactory) + .WithReadAttributes(readAttrs ? std::move(*readAttrs) : THashMap{}) ); futures.push_back(future.Apply([result, queryType] @@ -319,18 +300,21 @@ class TKiSourceLoadTableMetadataTransformer : public TGraphTransformerBase { output = input; YQL_ENSURE(AsyncFuture.HasValue()); + auto gatheredAttributes = GatherReadAttributes(*input, ctx); for (auto& it : LoadResults) { const auto& table = it.first; IKikimrGateway::TTableMetadataResult& res = *it.second; if (res.Success()) { res.ReportIssues(ctx.IssueManager); - TKikimrTableDescription* tableDesc; + TString cluster = it.first.first; + TString tablePath; if (res.Metadata->Temporary) { - tableDesc = &SessionCtx->Tables().GetTable(it.first.first, *res.Metadata->QueryName); + tablePath = *res.Metadata->QueryName; } else { - tableDesc = &SessionCtx->Tables().GetTable(it.first.first, it.first.second); + tablePath = it.first.second; } + TKikimrTableDescription* tableDesc = &SessionCtx->Tables().GetTable(cluster, tablePath); YQL_ENSURE(res.Metadata); tableDesc->Metadata = res.Metadata; @@ -349,6 +333,13 @@ class TKiSourceLoadTableMetadataTransformer : public TGraphTransformerBase { return TStatus::Error; } + if (tableDesc->Metadata->Kind == EKikimrTableKind::External) { + auto currentAttributes = gatheredAttributes.FindPtr(std::make_pair(cluster, tablePath)); + if (currentAttributes && !currentAttributes->empty()) { + ReplaceReadAttributes(*input, *currentAttributes, cluster, tablePath, tableDesc->Metadata, ctx); + } + } + if (!AddCluster(table, res, input, ctx)) { return TStatus::Error; } @@ -376,6 +367,7 @@ class TKiSourceLoadTableMetadataTransformer : public TGraphTransformerBase { return TStatus::Error; } } + output = input; LoadResults.clear(); return TStatus::Ok; @@ -740,7 +732,6 @@ class TKikimrDataSource : public TDataProviderBase { .Add(ctx.NewCallable(node->Pos(), "MrTableConcat", {newKey})) .Add(ctx.NewCallable(node->Pos(), "Void", {})) .Add(BuildExternalTableSettings(node->Pos(), ctx, tableDesc.Metadata->Columns, source, tableDesc.Metadata->ExternalSource.TableContent)) - .Build() .Done().Ptr(); auto retChildren = node->ChildrenList(); diff --git a/ydb/core/kqp/provider/yql_kikimr_gateway.h b/ydb/core/kqp/provider/yql_kikimr_gateway.h index 30b55a1058bf..19b889acbed7 100644 --- a/ydb/core/kqp/provider/yql_kikimr_gateway.h +++ b/ydb/core/kqp/provider/yql_kikimr_gateway.h @@ -15,6 +15,7 @@ #include #include +#include #include #include #include @@ -770,6 +771,18 @@ class IKikimrGateway : public TThrRefBase { return *this; } + TLoadTableMetadataSettings& WithExternalSourceFactory(NKikimr::NExternalSource::IExternalSourceFactory::TPtr factory) { + ExternalSourceFactory = std::move(factory); + return *this; + } + + TLoadTableMetadataSettings& WithReadAttributes(THashMap options) { + ReadAttributes = std::move(options); + return *this; + } + + NKikimr::NExternalSource::IExternalSourceFactory::TPtr ExternalSourceFactory; + THashMap ReadAttributes; bool RequestStats_ = false; bool WithPrivateTables_ = false; bool WithExternalDatasources_ = false; diff --git a/ydb/core/tx/schemeshard/schemeshard_impl.cpp b/ydb/core/tx/schemeshard/schemeshard_impl.cpp index b614027b8b5f..a5f9aeecf8a7 100644 --- a/ydb/core/tx/schemeshard/schemeshard_impl.cpp +++ b/ydb/core/tx/schemeshard/schemeshard_impl.cpp @@ -6848,6 +6848,7 @@ void TSchemeShard::ApplyConsoleConfigs(const NKikimrConfig::TAppConfig& appConfi const auto& hostnamePatterns = appConfig.GetQueryServiceConfig().GetHostnamePatterns(); ExternalSourceFactory = NExternalSource::CreateExternalSourceFactory( std::vector(hostnamePatterns.begin(), hostnamePatterns.end()), + nullptr, appConfig.GetQueryServiceConfig().GetS3().GetGeneratorPathsLimit() ); }