Skip to content

Commit

Permalink
Merge 9a04bce into 340580b
Browse files Browse the repository at this point in the history
  • Loading branch information
dorooleg authored Jun 17, 2024
2 parents 340580b + 9a04bce commit deaf465
Show file tree
Hide file tree
Showing 70 changed files with 4,316 additions and 1,647 deletions.
6 changes: 6 additions & 0 deletions ydb/core/base/events.h
Original file line number Diff line number Diff line change
Expand Up @@ -174,6 +174,12 @@ struct TKikimrEvents : TEvents {
ES_REPLICATION_SERVICE,
ES_CHANGE_EXCHANGE,
ES_S3_FILE_QUEUE,
ES_S3_PROVIDER,
ES_NEBIUS_ACCESS_SERVICE,
ES_BACKUP_SERVICE,
ES_TX_BACKGROUND,
ES_SS_BG_TASKS,
ES_LIMITER
};
};

Expand Down
8 changes: 8 additions & 0 deletions ydb/core/external_sources/external_data_source.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -52,6 +52,14 @@ struct TExternalDataSource : public IExternalSource {
ValidateHostname(HostnamePatterns, proto.GetLocation());
}

virtual NThreading::TFuture<std::shared_ptr<TMetadata>> LoadDynamicMetadata(std::shared_ptr<TMetadata> meta) override {
return NThreading::MakeFuture(std::move(meta));
}

virtual bool CanLoadDynamicMetadata() const override {
return false;
}

private:
const TString Name;
const TVector<TString> AuthMethods;
Expand Down
78 changes: 78 additions & 0 deletions ydb/core/external_sources/external_source.h
Original file line number Diff line number Diff line change
@@ -1,16 +1,83 @@
#pragma once

#include <library/cpp/threading/future/core/future.h>
#include <util/generic/map.h>
#include <util/generic/string.h>

#include <ydb/core/protos/external_sources.pb.h>
#include <ydb/library/actors/core/actorsystem.h>
#include <ydb/library/yql/public/issue/yql_issue.h>

namespace NKikimr::NExternalSource {

struct TExternalSourceException: public yexception {
};

namespace NAuth {

struct TNone {
static constexpr std::string_view Method = "NONE";
};

struct TAws {
static constexpr std::string_view Method = "AWS";

TAws(const TString& accessKey, const TString& secretAccessKey, const TString& region)
: AccessKey{accessKey}
, SecretAccessKey{secretAccessKey}
, Region{region}
{}

TString AccessKey;
TString SecretAccessKey;
TString Region;
};

struct TServiceAccount {
static constexpr std::string_view Method = "SERVICE_ACCOUNT";

TServiceAccount(TString serviceAccountId, TString serviceAccountIdSignature)
: ServiceAccountId{std::move(serviceAccountId)}
, ServiceAccountIdSignature{std::move(serviceAccountIdSignature)}
{}

TString ServiceAccountId;
TString ServiceAccountIdSignature;
};

using TAuth = std::variant<TNone, TServiceAccount, TAws>;

std::string_view GetMethod(const TAuth& auth);

inline TAuth MakeNone() {
return TAuth{std::in_place_type_t<TNone>{}};
}

inline TAuth MakeServiceAccount(const TString& serviceAccountId, const TString& serviceAccountIdSignature) {
return TAuth{std::in_place_type_t<TServiceAccount>{}, serviceAccountId, serviceAccountIdSignature};
}

inline TAuth MakeAws(const TString& accessKey, const TString& secretAccessKey, const TString& region) {
return TAuth{std::in_place_type_t<TAws>{}, accessKey, secretAccessKey, region};
}
}

using TAuth = NAuth::TAuth;

struct TMetadata {
bool Changed = false;
TString TableLocation;
TString DataSourceLocation;
TString DataSourcePath;
TString Type;

THashMap<TString, TString> Attributes;

TAuth Auth;

NKikimrExternalSources::TSchema Schema;
};

struct IExternalSource : public TThrRefBase {
using TPtr = TIntrusivePtr<IExternalSource>;

Expand Down Expand Up @@ -54,6 +121,17 @@ struct IExternalSource : public TThrRefBase {
If an error occurs, an exception is thrown.
*/
virtual void ValidateExternalDataSource(const TString& externalDataSourceDescription) const = 0;

/*
Retrieve additional metadata from runtime data, enrich provided metadata
*/
virtual NThreading::TFuture<std::shared_ptr<TMetadata>> LoadDynamicMetadata(std::shared_ptr<TMetadata> meta) = 0;

/*
A method that should tell whether there is an implementation
of the previous method.
*/
virtual bool CanLoadDynamicMetadata() const = 0;
};

}
8 changes: 6 additions & 2 deletions ydb/core/external_sources/external_source_factory.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -32,12 +32,16 @@ struct TExternalSourceFactory : public IExternalSourceFactory {

}

IExternalSourceFactory::TPtr CreateExternalSourceFactory(const std::vector<TString>& hostnamePatterns, size_t pathsLimit) {
IExternalSourceFactory::TPtr CreateExternalSourceFactory(const std::vector<TString>& hostnamePatterns,
NActors::TActorSystem* actorSystem,
size_t pathsLimit,
std::shared_ptr<NYql::ISecuredServiceAccountCredentialsFactory> credentialsFactory,
bool enableInfer) {
std::vector<TRegExMatch> hostnamePatternsRegEx(hostnamePatterns.begin(), hostnamePatterns.end());
return MakeIntrusive<TExternalSourceFactory>(TMap<TString, IExternalSource::TPtr>{
{
ToString(NYql::EDatabaseType::ObjectStorage),
CreateObjectStorageExternalSource(hostnamePatternsRegEx, pathsLimit)
CreateObjectStorageExternalSource(hostnamePatternsRegEx, actorSystem, pathsLimit, std::move(credentialsFactory), enableInfer)
},
{
ToString(NYql::EDatabaseType::ClickHouse),
Expand Down
7 changes: 6 additions & 1 deletion ydb/core/external_sources/external_source_factory.h
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
#pragma once

#include "external_source.h"
#include <ydb/library/yql/providers/common/token_accessor/client/factory.h>

namespace NKikimr::NExternalSource {

Expand All @@ -10,6 +11,10 @@ struct IExternalSourceFactory : public TThrRefBase {
virtual IExternalSource::TPtr GetOrCreate(const TString& type) const = 0;
};

IExternalSourceFactory::TPtr CreateExternalSourceFactory(const std::vector<TString>& hostnamePatterns, size_t pathsLimit = 50000);
IExternalSourceFactory::TPtr CreateExternalSourceFactory(const std::vector<TString>& hostnamePatterns,
NActors::TActorSystem* actorSystem = nullptr,
size_t pathsLimit = 50000,
std::shared_ptr<NYql::ISecuredServiceAccountCredentialsFactory> credentialsFactory = nullptr,
bool enableInfer = false);

}
134 changes: 131 additions & 3 deletions ydb/core/external_sources/object_storage.cpp
Original file line number Diff line number Diff line change
@@ -1,10 +1,17 @@
#include "external_source.h"
#include "object_storage.h"
#include "validation_functions.h"
#include "object_storage/s3_fetcher.h"

#include <ydb/core/external_sources/object_storage/inference/arrow_fetcher.h>
#include <ydb/core/external_sources/object_storage/inference/arrow_inferencinator.h>
#include <ydb/core/kqp/gateway/actors/kqp_ic_gateway_actors.h>
#include <ydb/core/protos/external_sources.pb.h>
#include <ydb/core/protos/flat_scheme_op.pb.h>
#include <ydb/library/yql/providers/common/http_gateway/yql_http_gateway.h>
#include <ydb/library/yql/providers/common/provider/yql_provider_names.h>
#include <ydb/library/yql/providers/s3/credentials/credentials.h>
#include <ydb/library/yql/providers/s3/object_listers/yql_s3_list.h>
#include <ydb/library/yql/providers/s3/path_generator/yql_s3_path_generator.h>
#include <ydb/public/api/protos/ydb_status_codes.pb.h>
#include <ydb/public/sdk/cpp/client/ydb_value/value.h>
Expand All @@ -20,9 +27,16 @@ namespace NKikimr::NExternalSource {
namespace {

struct TObjectStorageExternalSource : public IExternalSource {
explicit TObjectStorageExternalSource(const std::vector<TRegExMatch>& hostnamePatterns, size_t pathsLimit)
explicit TObjectStorageExternalSource(const std::vector<TRegExMatch>& hostnamePatterns,
NActors::TActorSystem* actorSystem,
size_t pathsLimit,
std::shared_ptr<NYql::ISecuredServiceAccountCredentialsFactory> credentialsFactory,
bool enableInfer)
: HostnamePatterns(hostnamePatterns)
, PathsLimit(pathsLimit)
, ActorSystem(actorSystem)
, CredentialsFactory(std::move(credentialsFactory))
, EnableInfer(enableInfer)
{}

virtual TString Pack(const NKikimrExternalSources::TSchema& schema,
Expand Down Expand Up @@ -255,6 +269,112 @@ struct TObjectStorageExternalSource : public IExternalSource {
return issues;
}

struct TMetadataResult : NYql::NCommon::TOperationResult {
std::shared_ptr<TMetadata> Metadata;
};

virtual NThreading::TFuture<std::shared_ptr<TMetadata>> LoadDynamicMetadata(std::shared_ptr<TMetadata> meta) override {
Y_UNUSED(ActorSystem);
auto format = meta->Attributes.FindPtr("format");
if (!format || !meta->Attributes.contains("withinfer")) {
return NThreading::MakeFuture(std::move(meta));
}

if (!NObjectStorage::NInference::IsArrowInferredFormat(*format)) {
return NThreading::MakeFuture(std::move(meta));
}

NYql::TS3Credentials::TAuthInfo authInfo{};
if (std::holds_alternative<NAuth::TAws>(meta->Auth)) {
auto& awsAuth = std::get<NAuth::TAws>(meta->Auth);
authInfo.AwsAccessKey = awsAuth.AccessKey;
authInfo.AwsAccessSecret = awsAuth.SecretAccessKey;
authInfo.AwsRegion = awsAuth.Region;
} else if (std::holds_alternative<NAuth::TServiceAccount>(meta->Auth)) {
if (!CredentialsFactory) {
try {
throw yexception{} << "trying to authenticate with service account credentials, internal error";
} catch (const yexception& error) {
return NThreading::MakeErrorFuture<std::shared_ptr<TMetadata>>(std::current_exception());
}
}
auto& saAuth = std::get<NAuth::TServiceAccount>(meta->Auth);
NYql::GetAuthInfo(CredentialsFactory, "");
authInfo.Token = CredentialsFactory->Create(saAuth.ServiceAccountId, saAuth.ServiceAccountIdSignature)->CreateProvider()->GetAuthInfo();
}

auto httpGateway = NYql::IHTTPGateway::Make();
auto s3Lister = NYql::NS3Lister::MakeS3Lister(httpGateway, NYql::NS3Lister::TListingRequest{
.Url = meta->DataSourceLocation,
.AuthInfo = authInfo,
.Pattern = meta->TableLocation,
}, Nothing(), false);
auto afterListing = s3Lister->Next().Apply([path = meta->TableLocation](const NThreading::TFuture<NYql::NS3Lister::TListResult>& listResFut) {
auto& listRes = listResFut.GetValue();
if (std::holds_alternative<NYql::NS3Lister::TListError>(listRes)) {
auto& error = std::get<NYql::NS3Lister::TListError>(listRes);
throw yexception() << error.Issues.ToString();
}
auto& entries = std::get<NYql::NS3Lister::TListEntries>(listRes);
if (entries.Objects.empty()) {
throw yexception() << "couldn't find files at " << path;
}
for (const auto& entry : entries.Objects) {
if (entry.Size > 0) {
return entry.Path;
}
}
throw yexception() << "couldn't find any files for type inference, please check that the right path is provided";
});

auto s3FetcherId = ActorSystem->Register(NObjectStorage::CreateS3FetcherActor(
meta->DataSourceLocation,
httpGateway,
NYql::IHTTPGateway::TRetryPolicy::GetNoRetryPolicy(),
std::move(authInfo)
));

meta->Attributes.erase("withinfer");

auto fileFormat = NObjectStorage::NInference::ConvertFileFormat(*format);
auto arrowFetcherId = ActorSystem->Register(NObjectStorage::NInference::CreateArrowFetchingActor(s3FetcherId, fileFormat));
auto arrowInferencinatorId = ActorSystem->Register(NObjectStorage::NInference::CreateArrowInferencinator(arrowFetcherId, fileFormat, meta->Attributes));

return afterListing.Apply([arrowInferencinatorId, meta, actorSystem = ActorSystem](const NThreading::TFuture<TString>& pathFut) {
auto promise = NThreading::NewPromise<TMetadataResult>();
auto schemaToMetadata = [meta](NThreading::TPromise<TMetadataResult> metaPromise, NObjectStorage::TEvInferredFileSchema&& response) {
meta->Changed = true;
meta->Schema.clear_column();
for (const auto& column : response.Fields) {
auto& destColumn = *meta->Schema.add_column();
destColumn = column;
}
TMetadataResult result;
result.SetSuccess();
result.Metadata = meta;
metaPromise.SetValue(std::move(result));
};
actorSystem->Register(new NKqp::TActorRequestHandler<NObjectStorage::TEvInferFileSchema, NObjectStorage::TEvInferredFileSchema, TMetadataResult>(
arrowInferencinatorId,
new NObjectStorage::TEvInferFileSchema(TString{pathFut.GetValue()}),
promise,
std::move(schemaToMetadata)
));

return promise.GetFuture();
}).Apply([](const NThreading::TFuture<TMetadataResult>& result) {
auto& value = result.GetValue();
if (value.Success()) {
return value.Metadata;
}
ythrow TExternalSourceException{} << value.Issues().ToOneLineString();
});
}

virtual bool CanLoadDynamicMetadata() const override {
return EnableInfer;
}

private:
static bool IsValidIntervalUnit(const TString& unit) {
static constexpr std::array<std::string_view, 7> IntervalUnits = {
Expand Down Expand Up @@ -474,12 +594,20 @@ struct TObjectStorageExternalSource : public IExternalSource {
private:
const std::vector<TRegExMatch> HostnamePatterns;
const size_t PathsLimit;
NActors::TActorSystem* ActorSystem = nullptr;
std::shared_ptr<NYql::ISecuredServiceAccountCredentialsFactory> CredentialsFactory;
const bool EnableInfer = false;
};

}

IExternalSource::TPtr CreateObjectStorageExternalSource(const std::vector<TRegExMatch>& hostnamePatterns, size_t pathsLimit) {
return MakeIntrusive<TObjectStorageExternalSource>(hostnamePatterns, pathsLimit);

IExternalSource::TPtr CreateObjectStorageExternalSource(const std::vector<TRegExMatch>& hostnamePatterns,
NActors::TActorSystem* actorSystem,
size_t pathsLimit,
std::shared_ptr<NYql::ISecuredServiceAccountCredentialsFactory> credentialsFactory,
bool enableInfer) {
return MakeIntrusive<TObjectStorageExternalSource>(hostnamePatterns, actorSystem, pathsLimit, std::move(credentialsFactory), enableInfer);
}

NYql::TIssues Validate(const FederatedQuery::Schema& schema, const FederatedQuery::ObjectStorageBinding::Subset& objectStorage, size_t pathsLimit) {
Expand Down
7 changes: 6 additions & 1 deletion ydb/core/external_sources/object_storage.h
Original file line number Diff line number Diff line change
Expand Up @@ -4,11 +4,16 @@

#include <library/cpp/regex/pcre/regexp.h>

#include <ydb/library/yql/providers/common/token_accessor/client/factory.h>
#include <ydb/public/api/protos/draft/fq.pb.h>

namespace NKikimr::NExternalSource {

IExternalSource::TPtr CreateObjectStorageExternalSource(const std::vector<TRegExMatch>& hostnamePatterns, size_t pathsLimit);
IExternalSource::TPtr CreateObjectStorageExternalSource(const std::vector<TRegExMatch>& hostnamePatterns,
NActors::TActorSystem* actorSystem,
size_t pathsLimit,
std::shared_ptr<NYql::ISecuredServiceAccountCredentialsFactory> credentialsFactory,
bool enableInfer);

NYql::TIssues Validate(const FederatedQuery::Schema& schema, const FederatedQuery::ObjectStorageBinding::Subset& objectStorage, size_t pathsLimit);

Expand Down
Loading

0 comments on commit deaf465

Please sign in to comment.