Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merge from main: type infering, s3 async decode, plan fields (compression, binding, path) #5619

Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 6 additions & 0 deletions ydb/core/base/events.h
Original file line number Diff line number Diff line change
Expand Up @@ -174,6 +174,12 @@ struct TKikimrEvents : TEvents {
ES_REPLICATION_SERVICE,
ES_CHANGE_EXCHANGE,
ES_S3_FILE_QUEUE,
ES_S3_PROVIDER,
ES_NEBIUS_ACCESS_SERVICE,
ES_BACKUP_SERVICE,
ES_TX_BACKGROUND,
ES_SS_BG_TASKS,
ES_LIMITER
};
};

Expand Down
8 changes: 8 additions & 0 deletions ydb/core/external_sources/external_data_source.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -52,6 +52,14 @@ struct TExternalDataSource : public IExternalSource {
ValidateHostname(HostnamePatterns, proto.GetLocation());
}

virtual NThreading::TFuture<std::shared_ptr<TMetadata>> LoadDynamicMetadata(std::shared_ptr<TMetadata> meta) override {
return NThreading::MakeFuture(std::move(meta));
}

virtual bool CanLoadDynamicMetadata() const override {
return false;
}

private:
const TString Name;
const TVector<TString> AuthMethods;
Expand Down
78 changes: 78 additions & 0 deletions ydb/core/external_sources/external_source.h
Original file line number Diff line number Diff line change
@@ -1,16 +1,83 @@
#pragma once

#include <library/cpp/threading/future/core/future.h>
#include <util/generic/map.h>
#include <util/generic/string.h>

#include <ydb/core/protos/external_sources.pb.h>
#include <ydb/library/actors/core/actorsystem.h>
#include <ydb/library/yql/public/issue/yql_issue.h>

namespace NKikimr::NExternalSource {

struct TExternalSourceException: public yexception {
};

namespace NAuth {

struct TNone {
static constexpr std::string_view Method = "NONE";
};

struct TAws {
static constexpr std::string_view Method = "AWS";

TAws(const TString& accessKey, const TString& secretAccessKey, const TString& region)
: AccessKey{accessKey}
, SecretAccessKey{secretAccessKey}
, Region{region}
{}

TString AccessKey;
TString SecretAccessKey;
TString Region;
};

struct TServiceAccount {
static constexpr std::string_view Method = "SERVICE_ACCOUNT";

TServiceAccount(TString serviceAccountId, TString serviceAccountIdSignature)
: ServiceAccountId{std::move(serviceAccountId)}
, ServiceAccountIdSignature{std::move(serviceAccountIdSignature)}
{}

TString ServiceAccountId;
TString ServiceAccountIdSignature;
};

using TAuth = std::variant<TNone, TServiceAccount, TAws>;

std::string_view GetMethod(const TAuth& auth);

inline TAuth MakeNone() {
return TAuth{std::in_place_type_t<TNone>{}};
}

inline TAuth MakeServiceAccount(const TString& serviceAccountId, const TString& serviceAccountIdSignature) {
return TAuth{std::in_place_type_t<TServiceAccount>{}, serviceAccountId, serviceAccountIdSignature};
}

inline TAuth MakeAws(const TString& accessKey, const TString& secretAccessKey, const TString& region) {
return TAuth{std::in_place_type_t<TAws>{}, accessKey, secretAccessKey, region};
}
}

using TAuth = NAuth::TAuth;

struct TMetadata {
bool Changed = false;
TString TableLocation;
TString DataSourceLocation;
TString DataSourcePath;
TString Type;

THashMap<TString, TString> Attributes;

TAuth Auth;

NKikimrExternalSources::TSchema Schema;
};

struct IExternalSource : public TThrRefBase {
using TPtr = TIntrusivePtr<IExternalSource>;

Expand Down Expand Up @@ -54,6 +121,17 @@ struct IExternalSource : public TThrRefBase {
If an error occurs, an exception is thrown.
*/
virtual void ValidateExternalDataSource(const TString& externalDataSourceDescription) const = 0;

/*
Retrieve additional metadata from runtime data, enrich provided metadata
*/
virtual NThreading::TFuture<std::shared_ptr<TMetadata>> LoadDynamicMetadata(std::shared_ptr<TMetadata> meta) = 0;

/*
A method that should tell whether there is an implementation
of the previous method.
*/
virtual bool CanLoadDynamicMetadata() const = 0;
};

}
8 changes: 6 additions & 2 deletions ydb/core/external_sources/external_source_factory.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -32,12 +32,16 @@ struct TExternalSourceFactory : public IExternalSourceFactory {

}

IExternalSourceFactory::TPtr CreateExternalSourceFactory(const std::vector<TString>& hostnamePatterns, size_t pathsLimit) {
IExternalSourceFactory::TPtr CreateExternalSourceFactory(const std::vector<TString>& hostnamePatterns,
NActors::TActorSystem* actorSystem,
size_t pathsLimit,
std::shared_ptr<NYql::ISecuredServiceAccountCredentialsFactory> credentialsFactory,
bool enableInfer) {
std::vector<TRegExMatch> hostnamePatternsRegEx(hostnamePatterns.begin(), hostnamePatterns.end());
return MakeIntrusive<TExternalSourceFactory>(TMap<TString, IExternalSource::TPtr>{
{
ToString(NYql::EDatabaseType::ObjectStorage),
CreateObjectStorageExternalSource(hostnamePatternsRegEx, pathsLimit)
CreateObjectStorageExternalSource(hostnamePatternsRegEx, actorSystem, pathsLimit, std::move(credentialsFactory), enableInfer)
},
{
ToString(NYql::EDatabaseType::ClickHouse),
Expand Down
7 changes: 6 additions & 1 deletion ydb/core/external_sources/external_source_factory.h
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
#pragma once

#include "external_source.h"
#include <ydb/library/yql/providers/common/token_accessor/client/factory.h>

namespace NKikimr::NExternalSource {

Expand All @@ -10,6 +11,10 @@ struct IExternalSourceFactory : public TThrRefBase {
virtual IExternalSource::TPtr GetOrCreate(const TString& type) const = 0;
};

IExternalSourceFactory::TPtr CreateExternalSourceFactory(const std::vector<TString>& hostnamePatterns, size_t pathsLimit = 50000);
IExternalSourceFactory::TPtr CreateExternalSourceFactory(const std::vector<TString>& hostnamePatterns,
NActors::TActorSystem* actorSystem = nullptr,
size_t pathsLimit = 50000,
std::shared_ptr<NYql::ISecuredServiceAccountCredentialsFactory> credentialsFactory = nullptr,
bool enableInfer = false);

}
134 changes: 131 additions & 3 deletions ydb/core/external_sources/object_storage.cpp
Original file line number Diff line number Diff line change
@@ -1,10 +1,17 @@
#include "external_source.h"
#include "object_storage.h"
#include "validation_functions.h"
#include "object_storage/s3_fetcher.h"

#include <ydb/core/external_sources/object_storage/inference/arrow_fetcher.h>
#include <ydb/core/external_sources/object_storage/inference/arrow_inferencinator.h>
#include <ydb/core/kqp/gateway/actors/kqp_ic_gateway_actors.h>
#include <ydb/core/protos/external_sources.pb.h>
#include <ydb/core/protos/flat_scheme_op.pb.h>
#include <ydb/library/yql/providers/common/http_gateway/yql_http_gateway.h>
#include <ydb/library/yql/providers/common/provider/yql_provider_names.h>
#include <ydb/library/yql/providers/s3/credentials/credentials.h>
#include <ydb/library/yql/providers/s3/object_listers/yql_s3_list.h>
#include <ydb/library/yql/providers/s3/path_generator/yql_s3_path_generator.h>
#include <ydb/public/api/protos/ydb_status_codes.pb.h>
#include <ydb/public/sdk/cpp/client/ydb_value/value.h>
Expand All @@ -20,9 +27,16 @@ namespace NKikimr::NExternalSource {
namespace {

struct TObjectStorageExternalSource : public IExternalSource {
explicit TObjectStorageExternalSource(const std::vector<TRegExMatch>& hostnamePatterns, size_t pathsLimit)
explicit TObjectStorageExternalSource(const std::vector<TRegExMatch>& hostnamePatterns,
NActors::TActorSystem* actorSystem,
size_t pathsLimit,
std::shared_ptr<NYql::ISecuredServiceAccountCredentialsFactory> credentialsFactory,
bool enableInfer)
: HostnamePatterns(hostnamePatterns)
, PathsLimit(pathsLimit)
, ActorSystem(actorSystem)
, CredentialsFactory(std::move(credentialsFactory))
, EnableInfer(enableInfer)
{}

virtual TString Pack(const NKikimrExternalSources::TSchema& schema,
Expand Down Expand Up @@ -255,6 +269,112 @@ struct TObjectStorageExternalSource : public IExternalSource {
return issues;
}

struct TMetadataResult : NYql::NCommon::TOperationResult {
std::shared_ptr<TMetadata> Metadata;
};

virtual NThreading::TFuture<std::shared_ptr<TMetadata>> LoadDynamicMetadata(std::shared_ptr<TMetadata> meta) override {
Y_UNUSED(ActorSystem);
auto format = meta->Attributes.FindPtr("format");
if (!format || !meta->Attributes.contains("withinfer")) {
return NThreading::MakeFuture(std::move(meta));
}

if (!NObjectStorage::NInference::IsArrowInferredFormat(*format)) {
return NThreading::MakeFuture(std::move(meta));
}

NYql::TS3Credentials::TAuthInfo authInfo{};
if (std::holds_alternative<NAuth::TAws>(meta->Auth)) {
auto& awsAuth = std::get<NAuth::TAws>(meta->Auth);
authInfo.AwsAccessKey = awsAuth.AccessKey;
authInfo.AwsAccessSecret = awsAuth.SecretAccessKey;
authInfo.AwsRegion = awsAuth.Region;
} else if (std::holds_alternative<NAuth::TServiceAccount>(meta->Auth)) {
if (!CredentialsFactory) {
try {
throw yexception{} << "trying to authenticate with service account credentials, internal error";
} catch (const yexception& error) {
return NThreading::MakeErrorFuture<std::shared_ptr<TMetadata>>(std::current_exception());
}
}
auto& saAuth = std::get<NAuth::TServiceAccount>(meta->Auth);
NYql::GetAuthInfo(CredentialsFactory, "");
authInfo.Token = CredentialsFactory->Create(saAuth.ServiceAccountId, saAuth.ServiceAccountIdSignature)->CreateProvider()->GetAuthInfo();
}

auto httpGateway = NYql::IHTTPGateway::Make();
auto s3Lister = NYql::NS3Lister::MakeS3Lister(httpGateway, NYql::NS3Lister::TListingRequest{
.Url = meta->DataSourceLocation,
.AuthInfo = authInfo,
.Pattern = meta->TableLocation,
}, Nothing(), false);
auto afterListing = s3Lister->Next().Apply([path = meta->TableLocation](const NThreading::TFuture<NYql::NS3Lister::TListResult>& listResFut) {
auto& listRes = listResFut.GetValue();
if (std::holds_alternative<NYql::NS3Lister::TListError>(listRes)) {
auto& error = std::get<NYql::NS3Lister::TListError>(listRes);
throw yexception() << error.Issues.ToString();
}
auto& entries = std::get<NYql::NS3Lister::TListEntries>(listRes);
if (entries.Objects.empty()) {
throw yexception() << "couldn't find files at " << path;
}
for (const auto& entry : entries.Objects) {
if (entry.Size > 0) {
return entry.Path;
}
}
throw yexception() << "couldn't find any files for type inference, please check that the right path is provided";
});

auto s3FetcherId = ActorSystem->Register(NObjectStorage::CreateS3FetcherActor(
meta->DataSourceLocation,
httpGateway,
NYql::IHTTPGateway::TRetryPolicy::GetNoRetryPolicy(),
std::move(authInfo)
));

meta->Attributes.erase("withinfer");

auto fileFormat = NObjectStorage::NInference::ConvertFileFormat(*format);
auto arrowFetcherId = ActorSystem->Register(NObjectStorage::NInference::CreateArrowFetchingActor(s3FetcherId, fileFormat));
auto arrowInferencinatorId = ActorSystem->Register(NObjectStorage::NInference::CreateArrowInferencinator(arrowFetcherId, fileFormat, meta->Attributes));

return afterListing.Apply([arrowInferencinatorId, meta, actorSystem = ActorSystem](const NThreading::TFuture<TString>& pathFut) {
auto promise = NThreading::NewPromise<TMetadataResult>();
auto schemaToMetadata = [meta](NThreading::TPromise<TMetadataResult> metaPromise, NObjectStorage::TEvInferredFileSchema&& response) {
meta->Changed = true;
meta->Schema.clear_column();
for (const auto& column : response.Fields) {
auto& destColumn = *meta->Schema.add_column();
destColumn = column;
}
TMetadataResult result;
result.SetSuccess();
result.Metadata = meta;
metaPromise.SetValue(std::move(result));
};
actorSystem->Register(new NKqp::TActorRequestHandler<NObjectStorage::TEvInferFileSchema, NObjectStorage::TEvInferredFileSchema, TMetadataResult>(
arrowInferencinatorId,
new NObjectStorage::TEvInferFileSchema(TString{pathFut.GetValue()}),
promise,
std::move(schemaToMetadata)
));

return promise.GetFuture();
}).Apply([](const NThreading::TFuture<TMetadataResult>& result) {
auto& value = result.GetValue();
if (value.Success()) {
return value.Metadata;
}
ythrow TExternalSourceException{} << value.Issues().ToOneLineString();
});
}

virtual bool CanLoadDynamicMetadata() const override {
return EnableInfer;
}

private:
static bool IsValidIntervalUnit(const TString& unit) {
static constexpr std::array<std::string_view, 7> IntervalUnits = {
Expand Down Expand Up @@ -474,12 +594,20 @@ struct TObjectStorageExternalSource : public IExternalSource {
private:
const std::vector<TRegExMatch> HostnamePatterns;
const size_t PathsLimit;
NActors::TActorSystem* ActorSystem = nullptr;
std::shared_ptr<NYql::ISecuredServiceAccountCredentialsFactory> CredentialsFactory;
const bool EnableInfer = false;
};

}

IExternalSource::TPtr CreateObjectStorageExternalSource(const std::vector<TRegExMatch>& hostnamePatterns, size_t pathsLimit) {
return MakeIntrusive<TObjectStorageExternalSource>(hostnamePatterns, pathsLimit);

IExternalSource::TPtr CreateObjectStorageExternalSource(const std::vector<TRegExMatch>& hostnamePatterns,
NActors::TActorSystem* actorSystem,
size_t pathsLimit,
std::shared_ptr<NYql::ISecuredServiceAccountCredentialsFactory> credentialsFactory,
bool enableInfer) {
return MakeIntrusive<TObjectStorageExternalSource>(hostnamePatterns, actorSystem, pathsLimit, std::move(credentialsFactory), enableInfer);
}

NYql::TIssues Validate(const FederatedQuery::Schema& schema, const FederatedQuery::ObjectStorageBinding::Subset& objectStorage, size_t pathsLimit) {
Expand Down
7 changes: 6 additions & 1 deletion ydb/core/external_sources/object_storage.h
Original file line number Diff line number Diff line change
Expand Up @@ -4,11 +4,16 @@

#include <library/cpp/regex/pcre/regexp.h>

#include <ydb/library/yql/providers/common/token_accessor/client/factory.h>
#include <ydb/public/api/protos/draft/fq.pb.h>

namespace NKikimr::NExternalSource {

IExternalSource::TPtr CreateObjectStorageExternalSource(const std::vector<TRegExMatch>& hostnamePatterns, size_t pathsLimit);
IExternalSource::TPtr CreateObjectStorageExternalSource(const std::vector<TRegExMatch>& hostnamePatterns,
NActors::TActorSystem* actorSystem,
size_t pathsLimit,
std::shared_ptr<NYql::ISecuredServiceAccountCredentialsFactory> credentialsFactory,
bool enableInfer);

NYql::TIssues Validate(const FederatedQuery::Schema& schema, const FederatedQuery::ObjectStorageBinding::Subset& objectStorage, size_t pathsLimit);

Expand Down
Loading
Loading