From 84490083f6fc5a9be38fd6b77153031602e6153f Mon Sep 17 00:00:00 2001 From: Pisarenko Grigoriy <79596613+GrigoriyPA@users.noreply.github.com> Date: Mon, 22 Jul 2024 15:17:32 +0300 Subject: [PATCH] YQ-3152 fix error failed to execute callable ResWrite! (#6752) --- ydb/core/external_sources/object_storage.cpp | 21 ++++++++++------- .../inference/ut/arrow_inference_ut.cpp | 2 +- .../object_storage/s3_fetcher.cpp | 17 +++++++------- .../object_storage/s3_fetcher.h | 2 +- .../s3/actors/yql_s3_raw_read_actor.cpp | 21 +++++++++-------- .../s3/actors/yql_s3_raw_read_actor.h | 2 +- .../providers/s3/actors/yql_s3_read_actor.cpp | 17 +++++++------- .../providers/s3/actors/yql_s3_read_actor.h | 2 +- .../s3/actors/yql_s3_source_queue.cpp | 12 +++++----- .../providers/s3/actors/yql_s3_source_queue.h | 2 +- .../providers/s3/credentials/credentials.cpp | 14 ++++++++++- .../providers/s3/credentials/credentials.h | 5 ++++ .../s3/object_listers/yql_s3_list.cpp | 7 +++--- .../providers/s3/object_listers/yql_s3_list.h | 2 +- .../s3/provider/yql_s3_dq_integration.cpp | 2 +- .../s3/provider/yql_s3_io_discovery.cpp | 23 ++++++++++++------- 16 files changed, 91 insertions(+), 60 deletions(-) diff --git a/ydb/core/external_sources/object_storage.cpp b/ydb/core/external_sources/object_storage.cpp index 36a87a69b8cd..37e5eaa0fa1b 100644 --- a/ydb/core/external_sources/object_storage.cpp +++ b/ydb/core/external_sources/object_storage.cpp @@ -10,9 +10,11 @@ #include #include #include +#include #include #include #include +#include #include #include @@ -284,12 +286,13 @@ struct TObjectStorageExternalSource : public IExternalSource { return NThreading::MakeFuture(std::move(meta)); } - NYql::TS3Credentials::TAuthInfo authInfo{}; + NYql::TStructuredTokenBuilder structuredTokenBuilder; if (std::holds_alternative(meta->Auth)) { auto& awsAuth = std::get(meta->Auth); - authInfo.AwsAccessKey = awsAuth.AccessKey; - authInfo.AwsAccessSecret = awsAuth.SecretAccessKey; - authInfo.AwsRegion = awsAuth.Region; + NYql::NS3::TAwsParams params; + params.SetAwsAccessKey(awsAuth.AccessKey); + params.SetAwsRegion(awsAuth.Region); + structuredTokenBuilder.SetBasicAuth(params.SerializeAsString(), awsAuth.SecretAccessKey); } else if (std::holds_alternative(meta->Auth)) { if (!CredentialsFactory) { try { @@ -299,15 +302,17 @@ struct TObjectStorageExternalSource : public IExternalSource { } } auto& saAuth = std::get(meta->Auth); - NYql::GetAuthInfo(CredentialsFactory, ""); - authInfo.Token = CredentialsFactory->Create(saAuth.ServiceAccountId, saAuth.ServiceAccountIdSignature)->CreateProvider()->GetAuthInfo(); + structuredTokenBuilder.SetServiceAccountIdAuth(saAuth.ServiceAccountId, saAuth.ServiceAccountIdSignature); + } else { + structuredTokenBuilder.SetNoAuth(); } + const NYql::TS3Credentials credentials(CredentialsFactory, structuredTokenBuilder.ToJson()); auto httpGateway = NYql::IHTTPGateway::Make(); auto httpRetryPolicy = NYql::GetHTTPDefaultRetryPolicy(NYql::THttpRetryPolicyOptions{.RetriedCurlCodes = NYql::FqRetriedCurlCodes()}); auto s3Lister = NYql::NS3Lister::MakeS3Lister(httpGateway, httpRetryPolicy, NYql::NS3Lister::TListingRequest{ .Url = meta->DataSourceLocation, - .AuthInfo = authInfo, + .Credentials = credentials, .Pattern = meta->TableLocation, }, Nothing(), false); auto afterListing = s3Lister->Next().Apply([path = meta->TableLocation](const NThreading::TFuture& listResFut) { @@ -332,7 +337,7 @@ struct TObjectStorageExternalSource : public IExternalSource { meta->DataSourceLocation, httpGateway, NYql::IHTTPGateway::TRetryPolicy::GetNoRetryPolicy(), - std::move(authInfo) + credentials )); meta->Attributes.erase("withinfer"); diff --git a/ydb/core/external_sources/object_storage/inference/ut/arrow_inference_ut.cpp b/ydb/core/external_sources/object_storage/inference/ut/arrow_inference_ut.cpp index 8edd7a424212..88a46386035f 100644 --- a/ydb/core/external_sources/object_storage/inference/ut/arrow_inference_ut.cpp +++ b/ydb/core/external_sources/object_storage/inference/ut/arrow_inference_ut.cpp @@ -45,7 +45,7 @@ class ArrowInferenceTest : public testing::Test { BaseUrl, Gateway, NYql::IHTTPGateway::TRetryPolicy::GetNoRetryPolicy(), - NYql::TS3Credentials::TAuthInfo{}), 1); + NYql::TS3Credentials{}), 1); } NActors::TActorId RegisterInferencinator(TStringBuf formatStr) { diff --git a/ydb/core/external_sources/object_storage/s3_fetcher.cpp b/ydb/core/external_sources/object_storage/s3_fetcher.cpp index 1238147ee089..c9dc7ca45e32 100644 --- a/ydb/core/external_sources/object_storage/s3_fetcher.cpp +++ b/ydb/core/external_sources/object_storage/s3_fetcher.cpp @@ -10,11 +10,11 @@ class S3Fetcher : public NActors::TActorBootstrapped { TString url, NYql::IHTTPGateway::TPtr gateway, NYql::IHTTPGateway::TRetryPolicy::TPtr retryPolicy, - NYql::TS3Credentials::TAuthInfo authInfo) + const NYql::TS3Credentials& credentials) : Url_{std::move(url)} , Gateway_{std::move(gateway)} , RetryPolicy_{std::move(retryPolicy)} - , AuthInfo_{std::move(authInfo)} + , Credentials_(credentials) {} void Bootstrap() { @@ -60,12 +60,13 @@ class S3Fetcher : public NActors::TActorBootstrapped { void StartDownload(std::shared_ptr&& request, NActors::TActorSystem* actorSystem) { auto length = request->End - request->Start; + const auto& authInfo = Credentials_.GetAuthInfo(); auto headers = NYql::IHTTPGateway::MakeYcHeaders( request->RequestId.AsGuidString(), - AuthInfo_.GetToken(), + authInfo.GetToken(), {}, - AuthInfo_.GetAwsUserPwd(), - AuthInfo_.GetAwsSigV4() + authInfo.GetAwsUserPwd(), + authInfo.GetAwsSigV4() ); Gateway_->Download( @@ -79,15 +80,15 @@ class S3Fetcher : public NActors::TActorBootstrapped { TString Url_; NYql::IHTTPGateway::TPtr Gateway_; NYql::IHTTPGateway::TRetryPolicy::TPtr RetryPolicy_; - NYql::TS3Credentials::TAuthInfo AuthInfo_; + const NYql::TS3Credentials Credentials_; }; NActors::IActor* CreateS3FetcherActor( TString url, NYql::IHTTPGateway::TPtr gateway, NYql::IHTTPGateway::TRetryPolicy::TPtr retryPolicy, - NYql::TS3Credentials::TAuthInfo authInfo) { + const NYql::TS3Credentials& credentials) { - return new S3Fetcher(std::move(url), std::move(gateway), std::move(retryPolicy), std::move(authInfo)); + return new S3Fetcher(std::move(url), std::move(gateway), std::move(retryPolicy), credentials); } } // namespace NKikimr::NExternalSource::NObjectStorage diff --git a/ydb/core/external_sources/object_storage/s3_fetcher.h b/ydb/core/external_sources/object_storage/s3_fetcher.h index 51310ec3ca64..51b77210f5b2 100644 --- a/ydb/core/external_sources/object_storage/s3_fetcher.h +++ b/ydb/core/external_sources/object_storage/s3_fetcher.h @@ -13,5 +13,5 @@ NActors::IActor* CreateS3FetcherActor( TString url, NYql::IHTTPGateway::TPtr gateway, NYql::IHTTPGateway::TRetryPolicy::TPtr retryPolicy, - NYql::TS3Credentials::TAuthInfo authInfo); + const NYql::TS3Credentials& credentials); } // namespace NKikimr::NExternalSource::NObjectStorage diff --git a/ydb/library/yql/providers/s3/actors/yql_s3_raw_read_actor.cpp b/ydb/library/yql/providers/s3/actors/yql_s3_raw_read_actor.cpp index 749c86b9db44..d5bfdd479e2f 100644 --- a/ydb/library/yql/providers/s3/actors/yql_s3_raw_read_actor.cpp +++ b/ydb/library/yql/providers/s3/actors/yql_s3_raw_read_actor.cpp @@ -42,7 +42,7 @@ class TS3ReadActor : public NActors::TActorBootstrapped, public ID IHTTPGateway::TPtr gateway, const NKikimr::NMiniKQL::THolderFactory& holderFactory, const TString& url, - const TS3Credentials::TAuthInfo& authInfo, + const TS3Credentials& credentials, const TString& pattern, NYql::NS3Lister::ES3PatternVariant patternVariant, NYql::NS3Details::TPathList&& paths, @@ -69,7 +69,7 @@ class TS3ReadActor : public NActors::TActorBootstrapped, public ID , RetryPolicy(retryPolicy) , ActorSystem(NActors::TActivationContext::ActorSystem()) , Url(url) - , AuthInfo(authInfo) + , Credentials(credentials) , Pattern(pattern) , PatternVariant(patternVariant) , Paths(std::move(paths)) @@ -113,7 +113,7 @@ class TS3ReadActor : public NActors::TActorBootstrapped, public ID Gateway, RetryPolicy, Url, - AuthInfo, + Credentials, Pattern, PatternVariant, NYql::NS3Lister::ES3PatternType::Wildcard)); @@ -164,10 +164,11 @@ class TS3ReadActor : public NActors::TActorBootstrapped, public ID auto url = Url + object.GetPath(); auto id = object.GetPathIndex(); const TString requestId = CreateGuidAsString(); + const auto& authInfo = Credentials.GetAuthInfo(); LOG_D("TS3ReadActor", "Download: " << url << ", ID: " << id << ", request id: [" << requestId << "]"); Gateway->Download( UrlEscapeRet(url, true), - IHTTPGateway::MakeYcHeaders(requestId, AuthInfo.GetToken(), {}, AuthInfo.GetAwsUserPwd(), AuthInfo.GetAwsSigV4()), + IHTTPGateway::MakeYcHeaders(requestId, authInfo.GetToken(), {}, authInfo.GetAwsUserPwd(), authInfo.GetAwsSigV4()), 0U, std::min(object.GetSize(), SizeLimit), std::bind(&TS3ReadActor::OnDownloadFinished, ActorSystem, SelfId(), requestId, std::placeholders::_1, id, object.GetPath()), @@ -456,7 +457,7 @@ class TS3ReadActor : public NActors::TActorBootstrapped, public ID NActors::TActorSystem* const ActorSystem; const TString Url; - const TS3Credentials::TAuthInfo AuthInfo; + const TS3Credentials Credentials; const TString Pattern; const NYql::NS3Lister::ES3PatternVariant PatternVariant; NYql::NS3Details::TPathList Paths; @@ -503,7 +504,7 @@ std::pair CreateRawRead IHTTPGateway::TPtr gateway, const NKikimr::NMiniKQL::THolderFactory& holderFactory, const TString& url, - const TS3Credentials::TAuthInfo& authInfo, + const TS3Credentials& credentials, const TString& pattern, NYql::NS3Lister::ES3PatternVariant patternVariant, NYql::NS3Details::TPathList&& paths, @@ -527,14 +528,14 @@ std::pair CreateRawRead statsLevel, txId, std::move(gateway), - holderFactory, - url, - authInfo, + holderFactory, + url, + credentials, pattern, patternVariant, std::move(paths), addPathIndex, - computeActorId, + computeActorId, sizeLimit, retryPolicy, readActorFactoryCfg, diff --git a/ydb/library/yql/providers/s3/actors/yql_s3_raw_read_actor.h b/ydb/library/yql/providers/s3/actors/yql_s3_raw_read_actor.h index 3c76d44bf7d8..102ea19c94c3 100644 --- a/ydb/library/yql/providers/s3/actors/yql_s3_raw_read_actor.h +++ b/ydb/library/yql/providers/s3/actors/yql_s3_raw_read_actor.h @@ -19,7 +19,7 @@ std::pair CreateRawRead IHTTPGateway::TPtr gateway, const NKikimr::NMiniKQL::THolderFactory& holderFactory, const TString& url, - const TS3Credentials::TAuthInfo& authInfo, + const TS3Credentials& credentials, const TString& pattern, NYql::NS3Lister::ES3PatternVariant patternVariant, NYql::NS3Details::TPathList&& paths, diff --git a/ydb/library/yql/providers/s3/actors/yql_s3_read_actor.cpp b/ydb/library/yql/providers/s3/actors/yql_s3_read_actor.cpp index 358448c2d451..81ec94b2d51a 100644 --- a/ydb/library/yql/providers/s3/actors/yql_s3_read_actor.cpp +++ b/ydb/library/yql/providers/s3/actors/yql_s3_read_actor.cpp @@ -1201,7 +1201,7 @@ class TS3StreamReadActor : public TActorBootstrapped, public IHTTPGateway::TPtr gateway, const THolderFactory& holderFactory, const TString& url, - const TS3Credentials::TAuthInfo& authInfo, + const TS3Credentials& credentials, const TString& pattern, ES3PatternVariant patternVariant, TPathList&& paths, @@ -1230,7 +1230,7 @@ class TS3StreamReadActor : public TActorBootstrapped, public , ComputeActorId(computeActorId) , RetryPolicy(retryPolicy) , Url(url) - , AuthInfo(authInfo) + , Credentials(credentials) , Pattern(pattern) , PatternVariant(patternVariant) , Paths(std::move(paths)) @@ -1321,7 +1321,7 @@ class TS3StreamReadActor : public TActorBootstrapped, public Gateway, RetryPolicy, Url, - AuthInfo, + Credentials, Pattern, PatternVariant, ES3PatternType::Wildcard)); @@ -1385,10 +1385,11 @@ class TS3StreamReadActor : public TActorBootstrapped, public << pathIndex); TActorId actorId; + const auto& authInfo = Credentials.GetAuthInfo(); auto stuff = std::make_shared( Gateway, Url + object.GetPath(), - IHTTPGateway::MakeYcHeaders(requestId, AuthInfo.GetToken(), {}, AuthInfo.GetAwsUserPwd(), AuthInfo.GetAwsSigV4()), + IHTTPGateway::MakeYcHeaders(requestId, authInfo.GetToken(), {}, authInfo.GetAwsUserPwd(), authInfo.GetAwsSigV4()), object.GetSize(), TxId, requestId, @@ -1786,7 +1787,7 @@ class TS3StreamReadActor : public TActorBootstrapped, public const IHTTPGateway::TRetryPolicy::TPtr RetryPolicy; const TString Url; - const TS3Credentials::TAuthInfo AuthInfo; + const TS3Credentials Credentials; const TString Pattern; const ES3PatternVariant PatternVariant; TPathList Paths; @@ -2000,7 +2001,7 @@ std::pair CreateS3ReadActor( ReadPathsList(params, taskParams, readRanges, paths); const auto token = secureParams.Value(params.GetToken(), TString{}); - const auto authInfo = GetAuthInfo(credentialsFactory, token); + const TS3Credentials credentials(credentialsFactory, token); const auto& settings = params.GetSettings(); TString pathPattern = "*"; @@ -2178,7 +2179,7 @@ std::pair CreateS3ReadActor( sizeLimit = FromString(it->second); } - const auto actor = new TS3StreamReadActor(inputIndex, statsLevel, txId, std::move(gateway), holderFactory, params.GetUrl(), authInfo, pathPattern, pathPatternVariant, + const auto actor = new TS3StreamReadActor(inputIndex, statsLevel, txId, std::move(gateway), holderFactory, params.GetUrl(), credentials, pathPattern, pathPatternVariant, std::move(paths), addPathIndex, readSpec, computeActorId, retryPolicy, cfg, counters, taskCounters, fileSizeLimit, sizeLimit, rowsLimitHint, memoryQuotaManager, params.GetUseRuntimeListing(), fileQueueActor, fileQueueBatchSizeLimit, fileQueueBatchObjectCountLimit, fileQueueConsumersCountDelta, @@ -2190,7 +2191,7 @@ std::pair CreateS3ReadActor( if (const auto it = settings.find("sizeLimit"); settings.cend() != it) sizeLimit = FromString(it->second); - return CreateRawReadActor(inputIndex, statsLevel, txId, std::move(gateway), holderFactory, params.GetUrl(), authInfo, pathPattern, pathPatternVariant, + return CreateRawReadActor(inputIndex, statsLevel, txId, std::move(gateway), holderFactory, params.GetUrl(), credentials, pathPattern, pathPatternVariant, std::move(paths), addPathIndex, computeActorId, sizeLimit, retryPolicy, cfg, counters, taskCounters, fileSizeLimit, rowsLimitHint, params.GetUseRuntimeListing(), fileQueueActor, fileQueueBatchSizeLimit, fileQueueBatchObjectCountLimit, fileQueueConsumersCountDelta); diff --git a/ydb/library/yql/providers/s3/actors/yql_s3_read_actor.h b/ydb/library/yql/providers/s3/actors/yql_s3_read_actor.h index 69de502f94e4..5de66acf6f1f 100644 --- a/ydb/library/yql/providers/s3/actors/yql_s3_read_actor.h +++ b/ydb/library/yql/providers/s3/actors/yql_s3_read_actor.h @@ -26,7 +26,7 @@ NActors::IActor* CreateS3FileQueueActor( IHTTPGateway::TPtr gateway, IHTTPGateway::TRetryPolicy::TPtr retryPolicy, TString url, - TS3Credentials::TAuthInfo authInfo, + const TS3Credentials& credentials, TString pattern, NYql::NS3Lister::ES3PatternVariant patternVariant, NS3Lister::ES3PatternType patternType); diff --git a/ydb/library/yql/providers/s3/actors/yql_s3_source_queue.cpp b/ydb/library/yql/providers/s3/actors/yql_s3_source_queue.cpp index eb6bebed5624..060afbb4aea5 100644 --- a/ydb/library/yql/providers/s3/actors/yql_s3_source_queue.cpp +++ b/ydb/library/yql/providers/s3/actors/yql_s3_source_queue.cpp @@ -173,7 +173,7 @@ class TS3FileQueueActor : public NActors::TActorBootstrapped IHTTPGateway::TPtr gateway, IHTTPGateway::TRetryPolicy::TPtr retryPolicy, TString url, - TS3Credentials::TAuthInfo authInfo, + const TS3Credentials& credentials, TString pattern, NS3Lister::ES3PatternVariant patternVariant, NS3Lister::ES3PatternType patternType) @@ -189,7 +189,7 @@ class TS3FileQueueActor : public NActors::TActorBootstrapped , Gateway(std::move(gateway)) , RetryPolicy(std::move(retryPolicy)) , Url(std::move(url)) - , AuthInfo(std::move(authInfo)) + , Credentials(credentials) , Pattern(std::move(pattern)) , PatternVariant(patternVariant) , PatternType(patternType) { @@ -493,7 +493,7 @@ class TS3FileQueueActor : public NActors::TActorBootstrapped RetryPolicy, NS3Lister::TListingRequest{ Url, - AuthInfo, + Credentials, PatternVariant == NS3Lister::ES3PatternVariant::PathPattern ? Pattern : TStringBuilder{} << object.GetPath() << Pattern, @@ -616,7 +616,7 @@ class TS3FileQueueActor : public NActors::TActorBootstrapped const IHTTPGateway::TPtr Gateway; const IHTTPGateway::TRetryPolicy::TPtr RetryPolicy; const TString Url; - const TS3Credentials::TAuthInfo AuthInfo; + const TS3Credentials Credentials; const TString Pattern; const NS3Lister::ES3PatternVariant PatternVariant; const NS3Lister::ES3PatternType PatternType; @@ -638,7 +638,7 @@ NActors::IActor* CreateS3FileQueueActor( IHTTPGateway::TPtr gateway, IHTTPGateway::TRetryPolicy::TPtr retryPolicy, TString url, - TS3Credentials::TAuthInfo authInfo, + const TS3Credentials& credentials, TString pattern, NS3Lister::ES3PatternVariant patternVariant, NS3Lister::ES3PatternType patternType) { @@ -655,7 +655,7 @@ NActors::IActor* CreateS3FileQueueActor( gateway, retryPolicy, url, - authInfo, + credentials, pattern, patternVariant, patternType diff --git a/ydb/library/yql/providers/s3/actors/yql_s3_source_queue.h b/ydb/library/yql/providers/s3/actors/yql_s3_source_queue.h index 12a90ffa3faa..86fd9aa1d385 100644 --- a/ydb/library/yql/providers/s3/actors/yql_s3_source_queue.h +++ b/ydb/library/yql/providers/s3/actors/yql_s3_source_queue.h @@ -24,7 +24,7 @@ NActors::IActor* CreateS3FileQueueActor( IHTTPGateway::TPtr gateway, IHTTPGateway::TRetryPolicy::TPtr retryPolicy, TString url, - TS3Credentials::TAuthInfo authInfo, + const TS3Credentials& credentials, TString pattern, NYql::NS3Lister::ES3PatternVariant patternVariant, NS3Lister::ES3PatternType patternType); diff --git a/ydb/library/yql/providers/s3/credentials/credentials.cpp b/ydb/library/yql/providers/s3/credentials/credentials.cpp index eedd583778de..5c0ecf85b1d4 100644 --- a/ydb/library/yql/providers/s3/credentials/credentials.cpp +++ b/ydb/library/yql/providers/s3/credentials/credentials.cpp @@ -6,6 +6,7 @@ namespace NYql { TS3Credentials::TS3Credentials(ISecuredServiceAccountCredentialsFactory::TPtr factory, const TString& structuredTokenJson, bool addBearerToToken) + : StructuredTokenJson(structuredTokenJson) { if (NYql::IsStructuredTokenJson(structuredTokenJson)) { NYql::TStructuredTokenParser parser = NYql::CreateStructuredTokenParser(structuredTokenJson); @@ -24,7 +25,7 @@ TS3Credentials::TS3Credentials(ISecuredServiceAccountCredentialsFactory::TPtr fa } auto providerFactory = CreateCredentialsProviderFactoryForStructuredToken(factory, structuredTokenJson, addBearerToToken); - CredentialsProvider = providerFactory->CreateProvider(); + CredentialsProvider = providerFactory->CreateProvider(); // Heavy operation, BLOCKs thread until TA reply } TS3Credentials::TAuthInfo TS3Credentials::GetAuthInfo() const { @@ -34,6 +35,17 @@ TS3Credentials::TAuthInfo TS3Credentials::GetAuthInfo() const { return AuthInfo; } +bool TS3Credentials::operator<(const TS3Credentials& other) const { + return StructuredTokenJson < other.StructuredTokenJson; +} + +IOutputStream& operator<<(IOutputStream& stream, const TS3Credentials& credentials) { + const auto& authInfo = credentials.AuthInfo; + return stream << "TS3Credentials{.ServiceAccountAuth=" << static_cast(credentials.CredentialsProvider) + << ",.AwsUserPwd=" + << ",.AwsSigV4=}"; +} + // string value after AWS prefix should be suitable for passing it to curl as CURLOPT_USERPWD, see details here: // https://curl.se/libcurl/c/CURLOPT_AWS_SIGV4.html // CURLOPT_USERPWD = "MY_ACCESS_KEY:MY_SECRET_KEY" diff --git a/ydb/library/yql/providers/s3/credentials/credentials.h b/ydb/library/yql/providers/s3/credentials/credentials.h index 3d9b41ea75a4..4c299c9d015f 100644 --- a/ydb/library/yql/providers/s3/credentials/credentials.h +++ b/ydb/library/yql/providers/s3/credentials/credentials.h @@ -19,11 +19,16 @@ struct TS3Credentials { TString AwsRegion; }; + TS3Credentials() = default; TS3Credentials(ISecuredServiceAccountCredentialsFactory::TPtr factory, const TString& structuredTokenJson, bool addBearerToToken = false); TAuthInfo GetAuthInfo() const; + bool operator<(const TS3Credentials& other) const; + friend IOutputStream& operator<<(IOutputStream& stream, const TS3Credentials& credentials); + private: + TString StructuredTokenJson; NYdb::TCredentialsProviderPtr CredentialsProvider; TS3Credentials::TAuthInfo AuthInfo; }; diff --git a/ydb/library/yql/providers/s3/object_listers/yql_s3_list.cpp b/ydb/library/yql/providers/s3/object_listers/yql_s3_list.cpp index 15392deac1f1..32f2df4629b0 100644 --- a/ydb/library/yql/providers/s3/object_listers/yql_s3_list.cpp +++ b/ydb/library/yql/providers/s3/object_listers/yql_s3_list.cpp @@ -29,9 +29,7 @@ IOutputStream& operator<<(IOutputStream& stream, const TListingRequest& request) << ",.Prefix=" << request.Prefix << ",.Pattern=" << request.Pattern << ",.PatternType=" << request.PatternType - << ",.AwsUserPwd=" - << ",.AwsSigV4=" << request.AuthInfo.GetAwsSigV4().length() - << ",.Token=}"; + << ",.Credentials=" << request.Credentials << "}"; } namespace { @@ -287,7 +285,8 @@ class TS3Lister : public IS3Lister { ~TS3Lister() override = default; private: static void SubmitRequestIntoGateway(TListingContext& ctx) { - IHTTPGateway::THeaders headers = IHTTPGateway::MakeYcHeaders(ctx.RequestId, ctx.ListingRequest.AuthInfo.GetToken(), {}, ctx.ListingRequest.AuthInfo.GetAwsUserPwd(), ctx.ListingRequest.AuthInfo.GetAwsSigV4()); + const auto& authInfo = ctx.ListingRequest.Credentials.GetAuthInfo(); + IHTTPGateway::THeaders headers = IHTTPGateway::MakeYcHeaders(ctx.RequestId, authInfo.GetToken(), {}, authInfo.GetAwsUserPwd(), authInfo.GetAwsSigV4()); // We have to sort the cgi parameters for the correct aws signature // This requirement will be fixed in the curl library diff --git a/ydb/library/yql/providers/s3/object_listers/yql_s3_list.h b/ydb/library/yql/providers/s3/object_listers/yql_s3_list.h index 24d70e4f9cae..93fafae19057 100644 --- a/ydb/library/yql/providers/s3/object_listers/yql_s3_list.h +++ b/ydb/library/yql/providers/s3/object_listers/yql_s3_list.h @@ -150,7 +150,7 @@ using TListResult = std::variant; struct TListingRequest { TString Url; - TS3Credentials::TAuthInfo AuthInfo; + TS3Credentials Credentials; TString Pattern; ES3PatternType PatternType = ES3PatternType::Wildcard; TString Prefix; diff --git a/ydb/library/yql/providers/s3/provider/yql_s3_dq_integration.cpp b/ydb/library/yql/providers/s3/provider/yql_s3_dq_integration.cpp index c9f3d40a42cb..b90ada844c3b 100644 --- a/ydb/library/yql/providers/s3/provider/yql_s3_dq_integration.cpp +++ b/ydb/library/yql/providers/s3/provider/yql_s3_dq_integration.cpp @@ -542,7 +542,7 @@ class TS3DqIntegration: public TDqIntegrationBase { State_->Gateway, State_->GatewayRetryPolicy, connect.Url, - GetAuthInfo(State_->CredentialsFactory, State_->Configuration->Tokens.at(cluster)), + TS3Credentials(State_->CredentialsFactory, State_->Configuration->Tokens.at(cluster)), pathPattern, pathPatternVariant, NS3Lister::ES3PatternType::Wildcard diff --git a/ydb/library/yql/providers/s3/provider/yql_s3_io_discovery.cpp b/ydb/library/yql/providers/s3/provider/yql_s3_io_discovery.cpp index 155810c3485b..2c1ee3313622 100644 --- a/ydb/library/yql/providers/s3/provider/yql_s3_io_discovery.cpp +++ b/ydb/library/yql/providers/s3/provider/yql_s3_io_discovery.cpp @@ -63,10 +63,8 @@ struct TListRequest { }; bool operator<(const TListRequest& a, const TListRequest& b) { - const auto& lhs = a.S3Request.AuthInfo; - const auto& rhs = b.S3Request.AuthInfo; - return std::tie(lhs.Token, lhs.AwsAccessKey, lhs.AwsAccessSecret, lhs.AwsRegion, a.S3Request.Url, a.S3Request.Pattern) < - std::tie(rhs.Token, rhs.AwsAccessKey, rhs.AwsAccessSecret, rhs.AwsRegion, b.S3Request.Url, b.S3Request.Pattern); + return std::tie(a.S3Request.Credentials, a.S3Request.Url, a.S3Request.Pattern) < + std::tie(b.S3Request.Credentials, b.S3Request.Url, b.S3Request.Pattern); } using TPendingRequests = TMap>; @@ -588,7 +586,7 @@ class TS3IODiscoveryTransformer : public TGraphTransformerBase { const auto& connect = State_->Configuration->Clusters.at(dataSource.Cluster().StringValue()); const auto& token = State_->Configuration->Tokens.at(dataSource.Cluster().StringValue()); - const auto authInfo = GetAuthInfo(State_->CredentialsFactory, token); + const auto& credentials = GetOrCreateCredentials(token); const TString url = connect.Url; auto s3ParseSettings = source.Input().Maybe().Cast(); TString filePattern; @@ -620,7 +618,7 @@ class TS3IODiscoveryTransformer : public TGraphTransformerBase { auto req = TListRequest{.S3Request{ .Url = url, - .AuthInfo = authInfo, + .Credentials = credentials, .Pattern = NS3::NormalizePath( TStringBuilder() << dir.Path << "/" << effectiveFilePattern), .PatternType = NS3Lister::ES3PatternType::Wildcard, @@ -744,7 +742,7 @@ class TS3IODiscoveryTransformer : public TGraphTransformerBase { const auto& connect = State_->Configuration->Clusters.at(read.DataSource().Cluster().StringValue()); const auto& token = State_->Configuration->Tokens.at(read.DataSource().Cluster().StringValue()); - const auto authInfo = GetAuthInfo(State_->CredentialsFactory, token); + const auto& credentials = GetOrCreateCredentials(token); const TString url = connect.Url; TGeneratedColumnsConfig config; @@ -772,7 +770,7 @@ class TS3IODiscoveryTransformer : public TGraphTransformerBase { State_->Configuration->UseConcurrentDirectoryLister.Get().GetOrElse( State_->Configuration->AllowConcurrentListings); auto req = TListRequest{ - .S3Request{.Url = url, .AuthInfo = authInfo}, + .S3Request{.Url = url, .Credentials = credentials}, .FilePattern = effectiveFilePattern, .Options{ .IsConcurrentListing = isConcurrentListingEnabled, @@ -880,6 +878,14 @@ class TS3IODiscoveryTransformer : public TGraphTransformerBase { return true; } + TS3Credentials GetOrCreateCredentials(const TString& token) { + auto it = S3Credentials_.find(token); + if (it != S3Credentials_.end()) { + return it->second; + } + return S3Credentials_.insert({token, TS3Credentials(State_->CredentialsFactory, token)}).first->second; + } + const TS3State::TPtr State_; const NS3Lister::IS3ListerFactory::TPtr ListerFactory_; const IS3ListingStrategy::TPtr ListingStrategy_; @@ -887,6 +893,7 @@ class TS3IODiscoveryTransformer : public TGraphTransformerBase { TPendingRequests PendingRequests_; TNodeMap> RequestsByNode_; TNodeMap GenColumnsByNode_; + std::unordered_map S3Credentials_; NThreading::TFuture AllFuture_; };