Skip to content

Commit

Permalink
Merge 45edfdf into 2ed4009
Browse files Browse the repository at this point in the history
  • Loading branch information
EgorkaZ authored May 24, 2024
2 parents 2ed4009 + 45edfdf commit b85c2ce
Show file tree
Hide file tree
Showing 11 changed files with 418 additions and 19 deletions.
8 changes: 8 additions & 0 deletions ydb/core/external_sources/external_data_source.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -52,6 +52,14 @@ struct TExternalDataSource : public IExternalSource {
ValidateHostname(HostnamePatterns, proto.GetLocation());
}

virtual NThreading::TFuture<std::shared_ptr<TMetadata>> LoadDynamicMetadata(std::shared_ptr<TMetadata> meta) override {
return NThreading::MakeFuture(std::move(meta));
}

virtual bool CanLoadDynamicMetadata() const override {
return false;
}

private:
const TString Name;
const TVector<TString> AuthMethods;
Expand Down
78 changes: 78 additions & 0 deletions ydb/core/external_sources/external_source.h
Original file line number Diff line number Diff line change
@@ -1,16 +1,83 @@
#pragma once

#include <library/cpp/threading/future/core/future.h>
#include <util/generic/map.h>
#include <util/generic/string.h>

#include <ydb/core/protos/external_sources.pb.h>
#include <ydb/library/actors/core/actorsystem.h>
#include <ydb/library/yql/public/issue/yql_issue.h>

namespace NKikimr::NExternalSource {

struct TExternalSourceException: public yexception {
};

namespace NAuth {

struct TNone {
static constexpr std::string_view Method = "NONE";
};

struct TAws {
static constexpr std::string_view Method = "AWS";

TAws(const TString& accessKey, const TString& secretAccessKey, const TString& region)
: AccessKey{accessKey}
, SecretAccessKey{secretAccessKey}
, Region{region}
{}

TString AccessKey;
TString SecretAccessKey;
TString Region;
};

struct TServiceAccount {
static constexpr std::string_view Method = "SERVICE_ACCOUNT";

TServiceAccount(TString serviceAccountId, TString serviceAccountIdSignature)
: ServiceAccountId{std::move(serviceAccountId)}
, ServiceAccountIdSignature{std::move(serviceAccountIdSignature)}
{}

TString ServiceAccountId;
TString ServiceAccountIdSignature;
};

using TAuth = std::variant<TNone, TServiceAccount, TAws>;

std::string_view GetMethod(const TAuth& auth);

inline TAuth MakeNone() {
return TAuth{std::in_place_type_t<TNone>{}};
}

inline TAuth MakeServiceAccount(const TString& serviceAccountId, const TString& serviceAccountIdSignature) {
return TAuth{std::in_place_type_t<TServiceAccount>{}, serviceAccountId, serviceAccountIdSignature};
}

inline TAuth MakeAws(const TString& accessKey, const TString& secretAccessKey, const TString& region) {
return TAuth{std::in_place_type_t<TAws>{}, accessKey, secretAccessKey, region};
}
}

using TAuth = NAuth::TAuth;

struct TMetadata {
bool Changed = false;
TString TableLocation;
TString DataSourceLocation;
TString DataSourcePath;
TString Type;

THashMap<TString, TString> Attributes;

TAuth Auth;

NKikimrExternalSources::TSchema Schema;
};

struct IExternalSource : public TThrRefBase {
using TPtr = TIntrusivePtr<IExternalSource>;

Expand Down Expand Up @@ -54,6 +121,17 @@ struct IExternalSource : public TThrRefBase {
If an error occurs, an exception is thrown.
*/
virtual void ValidateExternalDataSource(const TString& externalDataSourceDescription) const = 0;

/*
Retrieve additional metadata from runtime data, enrich provided metadata
*/
virtual NThreading::TFuture<std::shared_ptr<TMetadata>> LoadDynamicMetadata(std::shared_ptr<TMetadata> meta) = 0;

/*
A method that should tell whether there is an implementation
of the previous method.
*/
virtual bool CanLoadDynamicMetadata() const = 0;
};

}
4 changes: 2 additions & 2 deletions ydb/core/external_sources/external_source_factory.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -32,12 +32,12 @@ struct TExternalSourceFactory : public IExternalSourceFactory {

}

IExternalSourceFactory::TPtr CreateExternalSourceFactory(const std::vector<TString>& hostnamePatterns, size_t pathsLimit) {
IExternalSourceFactory::TPtr CreateExternalSourceFactory(const std::vector<TString>& hostnamePatterns, NActors::TActorSystem* actorSystem, size_t pathsLimit) {
std::vector<TRegExMatch> hostnamePatternsRegEx(hostnamePatterns.begin(), hostnamePatterns.end());
return MakeIntrusive<TExternalSourceFactory>(TMap<TString, IExternalSource::TPtr>{
{
ToString(NYql::EDatabaseType::ObjectStorage),
CreateObjectStorageExternalSource(hostnamePatternsRegEx, pathsLimit)
CreateObjectStorageExternalSource(hostnamePatternsRegEx, actorSystem, pathsLimit)
},
{
ToString(NYql::EDatabaseType::ClickHouse),
Expand Down
2 changes: 1 addition & 1 deletion ydb/core/external_sources/external_source_factory.h
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,6 @@ struct IExternalSourceFactory : public TThrRefBase {
virtual IExternalSource::TPtr GetOrCreate(const TString& type) const = 0;
};

IExternalSourceFactory::TPtr CreateExternalSourceFactory(const std::vector<TString>& hostnamePatterns, size_t pathsLimit = 50000);
IExternalSourceFactory::TPtr CreateExternalSourceFactory(const std::vector<TString>& hostnamePatterns, NActors::TActorSystem* actorSystem = nullptr, size_t pathsLimit = 50000);

}
26 changes: 23 additions & 3 deletions ydb/core/external_sources/object_storage.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -2,9 +2,12 @@
#include "object_storage.h"
#include "validation_functions.h"

#include <ydb/core/kqp/gateway/actors/kqp_ic_gateway_actors.h>
#include <ydb/core/protos/external_sources.pb.h>
#include <ydb/core/protos/flat_scheme_op.pb.h>
#include <ydb/library/yql/providers/common/http_gateway/yql_http_gateway.h>
#include <ydb/library/yql/providers/common/provider/yql_provider_names.h>
#include <ydb/library/yql/providers/s3/credentials/credentials.h>
#include <ydb/library/yql/providers/s3/path_generator/yql_s3_path_generator.h>
#include <ydb/public/api/protos/ydb_status_codes.pb.h>
#include <ydb/public/sdk/cpp/client/ydb_value/value.h>
Expand All @@ -20,9 +23,10 @@ namespace NKikimr::NExternalSource {
namespace {

struct TObjectStorageExternalSource : public IExternalSource {
explicit TObjectStorageExternalSource(const std::vector<TRegExMatch>& hostnamePatterns, size_t pathsLimit)
explicit TObjectStorageExternalSource(const std::vector<TRegExMatch>& hostnamePatterns, NActors::TActorSystem* actorSystem, size_t pathsLimit)
: HostnamePatterns(hostnamePatterns)
, PathsLimit(pathsLimit)
, ActorSystem(actorSystem)
{}

virtual TString Pack(const NKikimrExternalSources::TSchema& schema,
Expand Down Expand Up @@ -255,6 +259,20 @@ struct TObjectStorageExternalSource : public IExternalSource {
return issues;
}

struct TMetadataResult : NYql::NCommon::TOperationResult {
std::shared_ptr<TMetadata> Metadata;
};

virtual NThreading::TFuture<std::shared_ptr<TMetadata>> LoadDynamicMetadata(std::shared_ptr<TMetadata> meta) override {
Y_UNUSED(ActorSystem);
// TODO: implement
return NThreading::MakeFuture(std::move(meta));
}

virtual bool CanLoadDynamicMetadata() const override {
return false;
}

private:
static bool IsValidIntervalUnit(const TString& unit) {
static constexpr std::array<std::string_view, 7> IntervalUnits = {
Expand Down Expand Up @@ -474,12 +492,14 @@ struct TObjectStorageExternalSource : public IExternalSource {
private:
const std::vector<TRegExMatch> HostnamePatterns;
const size_t PathsLimit;
NActors::TActorSystem* ActorSystem = nullptr;
};

}

IExternalSource::TPtr CreateObjectStorageExternalSource(const std::vector<TRegExMatch>& hostnamePatterns, size_t pathsLimit) {
return MakeIntrusive<TObjectStorageExternalSource>(hostnamePatterns, pathsLimit);

IExternalSource::TPtr CreateObjectStorageExternalSource(const std::vector<TRegExMatch>& hostnamePatterns, NActors::TActorSystem* actorSystem, size_t pathsLimit) {
return MakeIntrusive<TObjectStorageExternalSource>(hostnamePatterns, actorSystem, pathsLimit);
}

NYql::TIssues Validate(const FederatedQuery::Schema& schema, const FederatedQuery::ObjectStorageBinding::Subset& objectStorage, size_t pathsLimit) {
Expand Down
2 changes: 1 addition & 1 deletion ydb/core/external_sources/object_storage.h
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,7 @@

namespace NKikimr::NExternalSource {

IExternalSource::TPtr CreateObjectStorageExternalSource(const std::vector<TRegExMatch>& hostnamePatterns, size_t pathsLimit);
IExternalSource::TPtr CreateObjectStorageExternalSource(const std::vector<TRegExMatch>& hostnamePatterns, NActors::TActorSystem* actorSystem, size_t pathsLimit);

NYql::TIssues Validate(const FederatedQuery::Schema& schema, const FederatedQuery::ObjectStorageBinding::Subset& objectStorage, size_t pathsLimit);

Expand Down
96 changes: 91 additions & 5 deletions ydb/core/kqp/gateway/kqp_metadata_loader.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -2,13 +2,15 @@
#include "actors/kqp_ic_gateway_actors.h"

#include <ydb/core/base/path.h>
#include <ydb/core/external_sources/external_source_factory.h>
#include <ydb/core/kqp/federated_query/kqp_federated_query_actors.h>
#include <ydb/core/kqp/gateway/utils/scheme_helpers.h>
#include <ydb/core/statistics/events.h>
#include <ydb/core/statistics/stat_service.h>

#include <ydb/library/actors/core/hfunc.h>
#include <ydb/library/actors/core/log.h>
#include <ydb/library/yql/utils/signals/utils.h>


namespace NKikimr::NKqp {
Expand Down Expand Up @@ -63,7 +65,7 @@ NavigateEntryResult CreateNavigateEntry(const std::pair<TIndexId, TString>& pair
entry.Operation = NSchemeCache::TSchemeCacheNavigate::EOp::OpList;
entry.SyncVersion = true;
entry.ShowPrivatePath = settings.WithPrivateTables_;
return {entry, pair.second, std::nullopt};
return {std::move(entry), pair.second, std::nullopt};
}

std::optional<NavigateEntryResult> CreateNavigateExternalEntry(const TString& path, bool externalDataSource) {
Expand Down Expand Up @@ -279,6 +281,7 @@ TTableMetadataResult GetExternalDataSourceMetadataResult(const NSchemeCache::TSc
tableMeta->ExternalSource.DataSourceAuth = description.GetAuth();
tableMeta->ExternalSource.Properties = description.GetProperties();
tableMeta->ExternalSource.DataSourcePath = tableName;
tableMeta->ExternalSource.TableLocation = JoinPath(entry.Path);
return result;
}

Expand Down Expand Up @@ -470,6 +473,61 @@ NThreading::TFuture<TEvDescribeSecretsResponse::TDescription> LoadExternalDataSo
return DescribeExternalDataSourceSecrets(authDescription, userToken ? userToken->GetUserSID() : "", actorSystem, maximalSecretsSnapshotWaitTime);
}

NExternalSource::TAuth MakeAuth(const NYql::TExternalSource& metadata) {
switch (metadata.DataSourceAuth.identity_case()) {
case NKikimrSchemeOp::TAuth::kNone:
return NExternalSource::NAuth::MakeNone();
case NKikimrSchemeOp::TAuth::kServiceAccount:
return NExternalSource::NAuth::MakeServiceAccount(metadata.DataSourceAuth.GetServiceAccount().GetId(), metadata.ServiceAccountIdSignature);
case NKikimrSchemeOp::TAuth::kAws:
return NExternalSource::NAuth::MakeAws(metadata.AwsAccessKeyId, metadata.AwsSecretAccessKey, metadata.DataSourceAuth.GetAws().GetAwsRegion());
case NKikimrSchemeOp::TAuth::kBasic:
case NKikimrSchemeOp::TAuth::kMdbBasic:
case NKikimrSchemeOp::TAuth::kToken:
case NKikimrSchemeOp::TAuth::IDENTITY_NOT_SET:
Y_ABORT("Unimplemented external source auth: %d", metadata.DataSourceAuth.identity_case());
break;
}
Y_UNREACHABLE();
}

std::shared_ptr<NExternalSource::TMetadata> ConvertToExternalSourceMetadata(const NYql::TKikimrTableMetadata& tableMetadata) {
auto metadata = std::make_shared<NExternalSource::TMetadata>();
metadata->TableLocation = tableMetadata.ExternalSource.TableLocation;
metadata->DataSourceLocation = tableMetadata.ExternalSource.DataSourceLocation;
metadata->DataSourcePath = tableMetadata.ExternalSource.DataSourcePath;
metadata->Attributes = tableMetadata.Attributes;
metadata->Auth = MakeAuth(tableMetadata.ExternalSource);
return metadata;
}

// dynamic metadata from IExternalSource here is propagated into TKikimrTableMetadata, which will be returned as a result of LoadTableMetadata()
bool EnrichMetadata(NYql::TKikimrTableMetadata& tableMetadata, const NExternalSource::TMetadata& dynamicMetadata) {
ui32 id = 0;
for (const auto& column : dynamicMetadata.Schema.column()) {
Ydb::Type::PrimitiveTypeId typeId {};
if (column.type().has_type_id()) {
typeId = column.type().type_id();
} else if (column.type().has_optional_type()) {
typeId = column.type().optional_type().item().type_id();
} else {
Y_ABORT_UNLESS(false);
}
const auto typeInfoMod = NScheme::TypeInfoModFromProtoColumnType(typeId, nullptr);
auto typeName = GetTypeName(typeInfoMod);

tableMetadata.Columns.emplace(
column.name(),
NYql::TKikimrColumnMetadata(
column.name(), id, typeName, !column.type().has_optional_type(), typeInfoMod.TypeInfo, typeInfoMod.TypeMod
)
);
++id;
}
tableMetadata.Attributes = dynamicMetadata.Attributes;
return true;
}

} // anonymous namespace


Expand Down Expand Up @@ -680,9 +738,11 @@ NThreading::TFuture<TTableMetadataResult> TKqpTableMetadataLoader::LoadTableMeta
// In this syntax, information about path_in_external_system is already known and we only need information about external_data_source.
// To do this, we go to the DefaultCluster and get information about external_data_source from scheme shard
const bool resolveEntityInsideDataSource = (cluster != Cluster);
TMaybe<TString> externalPath;
TPath entityName = id;
if constexpr (std::is_same_v<TPath, TString>) {
if (resolveEntityInsideDataSource) {
externalPath = entityName;
entityName = cluster;
}
} else {
Expand Down Expand Up @@ -720,7 +780,7 @@ NThreading::TFuture<TTableMetadataResult> TKqpTableMetadataLoader::LoadTableMeta
ActorSystem,
schemeCacheId,
ev.Release(),
[userToken, database, cluster, mainCluster = Cluster, table, settings, expectedSchemaVersion, this, queryName]
[userToken, database, cluster, mainCluster = Cluster, table, settings, expectedSchemaVersion, this, queryName, externalPath]
(TPromise<TResult> promise, TResponse&& response) mutable
{
try {
Expand Down Expand Up @@ -759,16 +819,41 @@ NThreading::TFuture<TTableMetadataResult> TKqpTableMetadataLoader::LoadTableMeta

switch (entry.Kind) {
case EKind::KindExternalDataSource: {
if (externalPath) {
entry.Path = SplitPath(*externalPath);
}
auto externalDataSourceMetadata = GetLoadTableMetadataResult(entry, cluster, mainCluster, table);
if (!externalDataSourceMetadata.Success() || !settings.RequestAuthInfo_) {
promise.SetValue(externalDataSourceMetadata);
return;
}
LoadExternalDataSourceSecretValues(entry, userToken, MaximalSecretsSnapshotWaitTime, ActorSystem)
.Subscribe([promise, externalDataSourceMetadata](const TFuture<TEvDescribeSecretsResponse::TDescription>& result) mutable
.Subscribe([promise, externalDataSourceMetadata, settings](const TFuture<TEvDescribeSecretsResponse::TDescription>& result) mutable
{
UpdateExternalDataSourceSecretsValue(externalDataSourceMetadata, result.GetValue());
promise.SetValue(externalDataSourceMetadata);
NExternalSource::IExternalSource::TPtr externalSource;
if (settings.ExternalSourceFactory) {
externalSource = settings.ExternalSourceFactory->GetOrCreate(externalDataSourceMetadata.Metadata->ExternalSource.Type);
}

if (externalSource && externalSource->CanLoadDynamicMetadata()) {
auto externalSourceMeta = ConvertToExternalSourceMetadata(*externalDataSourceMetadata.Metadata);
externalSourceMeta->Attributes = settings.ReadAttributes; // attributes, collected from AST
externalSource->LoadDynamicMetadata(std::move(externalSourceMeta))
.Subscribe([promise = std::move(promise), externalDataSourceMetadata](const TFuture<std::shared_ptr<NExternalSource::TMetadata>>& result) mutable {
TTableMetadataResult wrapper;
if (result.HasValue() && (!result.GetValue()->Changed || EnrichMetadata(*externalDataSourceMetadata.Metadata, *result.GetValue()))) {
wrapper.SetSuccess();
wrapper.Metadata = externalDataSourceMetadata.Metadata;
} else {
// TODO: forward exception from result
wrapper.SetException(yexception() << "LoadDynamicMetadata failed");
}
promise.SetValue(wrapper);
});
} else {
promise.SetValue(externalDataSourceMetadata);
}
});
break;
}
Expand All @@ -785,7 +870,8 @@ NThreading::TFuture<TTableMetadataResult> TKqpTableMetadataLoader::LoadTableMeta
.Apply([promise, externalTableMetadata](const TFuture<TTableMetadataResult>& result) mutable
{
auto externalDataSourceMetadata = result.GetValue();
promise.SetValue(EnrichExternalTable(externalTableMetadata, externalDataSourceMetadata));
auto newMetadata = EnrichExternalTable(externalTableMetadata, externalDataSourceMetadata);
promise.SetValue(std::move(newMetadata));
});
break;
}
Expand Down
2 changes: 1 addition & 1 deletion ydb/core/kqp/host/kqp_host.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -1064,7 +1064,7 @@ class TKqpHost : public IKqpHost {
SessionCtx->SetTempTables(std::move(tempTablesState));

if (FederatedQuerySetup) {
ExternalSourceFactory = NExternalSource::CreateExternalSourceFactory({}, FederatedQuerySetup->S3GatewayConfig.GetGeneratorPathsLimit());
ExternalSourceFactory = NExternalSource::CreateExternalSourceFactory({}, nullptr, FederatedQuerySetup->S3GatewayConfig.GetGeneratorPathsLimit());
}
}

Expand Down
Loading

0 comments on commit b85c2ce

Please sign in to comment.