Skip to content

Commit

Permalink
YQ-3152 fix error failed to execute callable ResWrite! (#6752)
Browse files Browse the repository at this point in the history
  • Loading branch information
GrigoriyPA committed Jul 22, 2024
1 parent 9577402 commit 8449008
Show file tree
Hide file tree
Showing 16 changed files with 91 additions and 60 deletions.
21 changes: 13 additions & 8 deletions ydb/core/external_sources/object_storage.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -10,9 +10,11 @@
#include <ydb/core/protos/flat_scheme_op.pb.h>
#include <ydb/library/yql/providers/common/http_gateway/yql_http_gateway.h>
#include <ydb/library/yql/providers/common/provider/yql_provider_names.h>
#include <ydb/library/yql/providers/common/structured_token/yql_token_builder.h>
#include <ydb/library/yql/providers/s3/credentials/credentials.h>
#include <ydb/library/yql/providers/s3/object_listers/yql_s3_list.h>
#include <ydb/library/yql/providers/s3/path_generator/yql_s3_path_generator.h>
#include <ydb/library/yql/providers/s3/proto/credentials.pb.h>
#include <ydb/public/api/protos/ydb_status_codes.pb.h>
#include <ydb/public/sdk/cpp/client/ydb_value/value.h>

Expand Down Expand Up @@ -284,12 +286,13 @@ struct TObjectStorageExternalSource : public IExternalSource {
return NThreading::MakeFuture(std::move(meta));
}

NYql::TS3Credentials::TAuthInfo authInfo{};
NYql::TStructuredTokenBuilder structuredTokenBuilder;
if (std::holds_alternative<NAuth::TAws>(meta->Auth)) {
auto& awsAuth = std::get<NAuth::TAws>(meta->Auth);
authInfo.AwsAccessKey = awsAuth.AccessKey;
authInfo.AwsAccessSecret = awsAuth.SecretAccessKey;
authInfo.AwsRegion = awsAuth.Region;
NYql::NS3::TAwsParams params;
params.SetAwsAccessKey(awsAuth.AccessKey);
params.SetAwsRegion(awsAuth.Region);
structuredTokenBuilder.SetBasicAuth(params.SerializeAsString(), awsAuth.SecretAccessKey);
} else if (std::holds_alternative<NAuth::TServiceAccount>(meta->Auth)) {
if (!CredentialsFactory) {
try {
Expand All @@ -299,15 +302,17 @@ struct TObjectStorageExternalSource : public IExternalSource {
}
}
auto& saAuth = std::get<NAuth::TServiceAccount>(meta->Auth);
NYql::GetAuthInfo(CredentialsFactory, "");
authInfo.Token = CredentialsFactory->Create(saAuth.ServiceAccountId, saAuth.ServiceAccountIdSignature)->CreateProvider()->GetAuthInfo();
structuredTokenBuilder.SetServiceAccountIdAuth(saAuth.ServiceAccountId, saAuth.ServiceAccountIdSignature);
} else {
structuredTokenBuilder.SetNoAuth();
}

const NYql::TS3Credentials credentials(CredentialsFactory, structuredTokenBuilder.ToJson());
auto httpGateway = NYql::IHTTPGateway::Make();
auto httpRetryPolicy = NYql::GetHTTPDefaultRetryPolicy(NYql::THttpRetryPolicyOptions{.RetriedCurlCodes = NYql::FqRetriedCurlCodes()});
auto s3Lister = NYql::NS3Lister::MakeS3Lister(httpGateway, httpRetryPolicy, NYql::NS3Lister::TListingRequest{
.Url = meta->DataSourceLocation,
.AuthInfo = authInfo,
.Credentials = credentials,
.Pattern = meta->TableLocation,
}, Nothing(), false);
auto afterListing = s3Lister->Next().Apply([path = meta->TableLocation](const NThreading::TFuture<NYql::NS3Lister::TListResult>& listResFut) {
Expand All @@ -332,7 +337,7 @@ struct TObjectStorageExternalSource : public IExternalSource {
meta->DataSourceLocation,
httpGateway,
NYql::IHTTPGateway::TRetryPolicy::GetNoRetryPolicy(),
std::move(authInfo)
credentials
));

meta->Attributes.erase("withinfer");
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -45,7 +45,7 @@ class ArrowInferenceTest : public testing::Test {
BaseUrl,
Gateway,
NYql::IHTTPGateway::TRetryPolicy::GetNoRetryPolicy(),
NYql::TS3Credentials::TAuthInfo{}), 1);
NYql::TS3Credentials{}), 1);
}

NActors::TActorId RegisterInferencinator(TStringBuf formatStr) {
Expand Down
17 changes: 9 additions & 8 deletions ydb/core/external_sources/object_storage/s3_fetcher.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -10,11 +10,11 @@ class S3Fetcher : public NActors::TActorBootstrapped<S3Fetcher> {
TString url,
NYql::IHTTPGateway::TPtr gateway,
NYql::IHTTPGateway::TRetryPolicy::TPtr retryPolicy,
NYql::TS3Credentials::TAuthInfo authInfo)
const NYql::TS3Credentials& credentials)
: Url_{std::move(url)}
, Gateway_{std::move(gateway)}
, RetryPolicy_{std::move(retryPolicy)}
, AuthInfo_{std::move(authInfo)}
, Credentials_(credentials)
{}

void Bootstrap() {
Expand Down Expand Up @@ -60,12 +60,13 @@ class S3Fetcher : public NActors::TActorBootstrapped<S3Fetcher> {

void StartDownload(std::shared_ptr<TEvRequestS3Range>&& request, NActors::TActorSystem* actorSystem) {
auto length = request->End - request->Start;
const auto& authInfo = Credentials_.GetAuthInfo();
auto headers = NYql::IHTTPGateway::MakeYcHeaders(
request->RequestId.AsGuidString(),
AuthInfo_.GetToken(),
authInfo.GetToken(),
{},
AuthInfo_.GetAwsUserPwd(),
AuthInfo_.GetAwsSigV4()
authInfo.GetAwsUserPwd(),
authInfo.GetAwsSigV4()
);

Gateway_->Download(
Expand All @@ -79,15 +80,15 @@ class S3Fetcher : public NActors::TActorBootstrapped<S3Fetcher> {
TString Url_;
NYql::IHTTPGateway::TPtr Gateway_;
NYql::IHTTPGateway::TRetryPolicy::TPtr RetryPolicy_;
NYql::TS3Credentials::TAuthInfo AuthInfo_;
const NYql::TS3Credentials Credentials_;
};

NActors::IActor* CreateS3FetcherActor(
TString url,
NYql::IHTTPGateway::TPtr gateway,
NYql::IHTTPGateway::TRetryPolicy::TPtr retryPolicy,
NYql::TS3Credentials::TAuthInfo authInfo) {
const NYql::TS3Credentials& credentials) {

return new S3Fetcher(std::move(url), std::move(gateway), std::move(retryPolicy), std::move(authInfo));
return new S3Fetcher(std::move(url), std::move(gateway), std::move(retryPolicy), credentials);
}
} // namespace NKikimr::NExternalSource::NObjectStorage
2 changes: 1 addition & 1 deletion ydb/core/external_sources/object_storage/s3_fetcher.h
Original file line number Diff line number Diff line change
Expand Up @@ -13,5 +13,5 @@ NActors::IActor* CreateS3FetcherActor(
TString url,
NYql::IHTTPGateway::TPtr gateway,
NYql::IHTTPGateway::TRetryPolicy::TPtr retryPolicy,
NYql::TS3Credentials::TAuthInfo authInfo);
const NYql::TS3Credentials& credentials);
} // namespace NKikimr::NExternalSource::NObjectStorage
21 changes: 11 additions & 10 deletions ydb/library/yql/providers/s3/actors/yql_s3_raw_read_actor.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -42,7 +42,7 @@ class TS3ReadActor : public NActors::TActorBootstrapped<TS3ReadActor>, public ID
IHTTPGateway::TPtr gateway,
const NKikimr::NMiniKQL::THolderFactory& holderFactory,
const TString& url,
const TS3Credentials::TAuthInfo& authInfo,
const TS3Credentials& credentials,
const TString& pattern,
NYql::NS3Lister::ES3PatternVariant patternVariant,
NYql::NS3Details::TPathList&& paths,
Expand All @@ -69,7 +69,7 @@ class TS3ReadActor : public NActors::TActorBootstrapped<TS3ReadActor>, public ID
, RetryPolicy(retryPolicy)
, ActorSystem(NActors::TActivationContext::ActorSystem())
, Url(url)
, AuthInfo(authInfo)
, Credentials(credentials)
, Pattern(pattern)
, PatternVariant(patternVariant)
, Paths(std::move(paths))
Expand Down Expand Up @@ -113,7 +113,7 @@ class TS3ReadActor : public NActors::TActorBootstrapped<TS3ReadActor>, public ID
Gateway,
RetryPolicy,
Url,
AuthInfo,
Credentials,
Pattern,
PatternVariant,
NYql::NS3Lister::ES3PatternType::Wildcard));
Expand Down Expand Up @@ -164,10 +164,11 @@ class TS3ReadActor : public NActors::TActorBootstrapped<TS3ReadActor>, public ID
auto url = Url + object.GetPath();
auto id = object.GetPathIndex();
const TString requestId = CreateGuidAsString();
const auto& authInfo = Credentials.GetAuthInfo();
LOG_D("TS3ReadActor", "Download: " << url << ", ID: " << id << ", request id: [" << requestId << "]");
Gateway->Download(
UrlEscapeRet(url, true),
IHTTPGateway::MakeYcHeaders(requestId, AuthInfo.GetToken(), {}, AuthInfo.GetAwsUserPwd(), AuthInfo.GetAwsSigV4()),
IHTTPGateway::MakeYcHeaders(requestId, authInfo.GetToken(), {}, authInfo.GetAwsUserPwd(), authInfo.GetAwsSigV4()),
0U,
std::min(object.GetSize(), SizeLimit),
std::bind(&TS3ReadActor::OnDownloadFinished, ActorSystem, SelfId(), requestId, std::placeholders::_1, id, object.GetPath()),
Expand Down Expand Up @@ -456,7 +457,7 @@ class TS3ReadActor : public NActors::TActorBootstrapped<TS3ReadActor>, public ID
NActors::TActorSystem* const ActorSystem;

const TString Url;
const TS3Credentials::TAuthInfo AuthInfo;
const TS3Credentials Credentials;
const TString Pattern;
const NYql::NS3Lister::ES3PatternVariant PatternVariant;
NYql::NS3Details::TPathList Paths;
Expand Down Expand Up @@ -503,7 +504,7 @@ std::pair<NYql::NDq::IDqComputeActorAsyncInput*, NActors::IActor*> CreateRawRead
IHTTPGateway::TPtr gateway,
const NKikimr::NMiniKQL::THolderFactory& holderFactory,
const TString& url,
const TS3Credentials::TAuthInfo& authInfo,
const TS3Credentials& credentials,
const TString& pattern,
NYql::NS3Lister::ES3PatternVariant patternVariant,
NYql::NS3Details::TPathList&& paths,
Expand All @@ -527,14 +528,14 @@ std::pair<NYql::NDq::IDqComputeActorAsyncInput*, NActors::IActor*> CreateRawRead
statsLevel,
txId,
std::move(gateway),
holderFactory,
url,
authInfo,
holderFactory,
url,
credentials,
pattern,
patternVariant,
std::move(paths),
addPathIndex,
computeActorId,
computeActorId,
sizeLimit,
retryPolicy,
readActorFactoryCfg,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,7 @@ std::pair<NYql::NDq::IDqComputeActorAsyncInput*, NActors::IActor*> CreateRawRead
IHTTPGateway::TPtr gateway,
const NKikimr::NMiniKQL::THolderFactory& holderFactory,
const TString& url,
const TS3Credentials::TAuthInfo& authInfo,
const TS3Credentials& credentials,
const TString& pattern,
NYql::NS3Lister::ES3PatternVariant patternVariant,
NYql::NS3Details::TPathList&& paths,
Expand Down
17 changes: 9 additions & 8 deletions ydb/library/yql/providers/s3/actors/yql_s3_read_actor.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -1201,7 +1201,7 @@ class TS3StreamReadActor : public TActorBootstrapped<TS3StreamReadActor>, public
IHTTPGateway::TPtr gateway,
const THolderFactory& holderFactory,
const TString& url,
const TS3Credentials::TAuthInfo& authInfo,
const TS3Credentials& credentials,
const TString& pattern,
ES3PatternVariant patternVariant,
TPathList&& paths,
Expand Down Expand Up @@ -1230,7 +1230,7 @@ class TS3StreamReadActor : public TActorBootstrapped<TS3StreamReadActor>, public
, ComputeActorId(computeActorId)
, RetryPolicy(retryPolicy)
, Url(url)
, AuthInfo(authInfo)
, Credentials(credentials)
, Pattern(pattern)
, PatternVariant(patternVariant)
, Paths(std::move(paths))
Expand Down Expand Up @@ -1321,7 +1321,7 @@ class TS3StreamReadActor : public TActorBootstrapped<TS3StreamReadActor>, public
Gateway,
RetryPolicy,
Url,
AuthInfo,
Credentials,
Pattern,
PatternVariant,
ES3PatternType::Wildcard));
Expand Down Expand Up @@ -1385,10 +1385,11 @@ class TS3StreamReadActor : public TActorBootstrapped<TS3StreamReadActor>, public
<< pathIndex);

TActorId actorId;
const auto& authInfo = Credentials.GetAuthInfo();
auto stuff = std::make_shared<TRetryStuff>(
Gateway,
Url + object.GetPath(),
IHTTPGateway::MakeYcHeaders(requestId, AuthInfo.GetToken(), {}, AuthInfo.GetAwsUserPwd(), AuthInfo.GetAwsSigV4()),
IHTTPGateway::MakeYcHeaders(requestId, authInfo.GetToken(), {}, authInfo.GetAwsUserPwd(), authInfo.GetAwsSigV4()),
object.GetSize(),
TxId,
requestId,
Expand Down Expand Up @@ -1786,7 +1787,7 @@ class TS3StreamReadActor : public TActorBootstrapped<TS3StreamReadActor>, public
const IHTTPGateway::TRetryPolicy::TPtr RetryPolicy;

const TString Url;
const TS3Credentials::TAuthInfo AuthInfo;
const TS3Credentials Credentials;
const TString Pattern;
const ES3PatternVariant PatternVariant;
TPathList Paths;
Expand Down Expand Up @@ -2000,7 +2001,7 @@ std::pair<NYql::NDq::IDqComputeActorAsyncInput*, IActor*> CreateS3ReadActor(
ReadPathsList(params, taskParams, readRanges, paths);

const auto token = secureParams.Value(params.GetToken(), TString{});
const auto authInfo = GetAuthInfo(credentialsFactory, token);
const TS3Credentials credentials(credentialsFactory, token);

const auto& settings = params.GetSettings();
TString pathPattern = "*";
Expand Down Expand Up @@ -2178,7 +2179,7 @@ std::pair<NYql::NDq::IDqComputeActorAsyncInput*, IActor*> CreateS3ReadActor(
sizeLimit = FromString<ui64>(it->second);
}

const auto actor = new TS3StreamReadActor(inputIndex, statsLevel, txId, std::move(gateway), holderFactory, params.GetUrl(), authInfo, pathPattern, pathPatternVariant,
const auto actor = new TS3StreamReadActor(inputIndex, statsLevel, txId, std::move(gateway), holderFactory, params.GetUrl(), credentials, pathPattern, pathPatternVariant,
std::move(paths), addPathIndex, readSpec, computeActorId, retryPolicy,
cfg, counters, taskCounters, fileSizeLimit, sizeLimit, rowsLimitHint, memoryQuotaManager,
params.GetUseRuntimeListing(), fileQueueActor, fileQueueBatchSizeLimit, fileQueueBatchObjectCountLimit, fileQueueConsumersCountDelta,
Expand All @@ -2190,7 +2191,7 @@ std::pair<NYql::NDq::IDqComputeActorAsyncInput*, IActor*> CreateS3ReadActor(
if (const auto it = settings.find("sizeLimit"); settings.cend() != it)
sizeLimit = FromString<ui64>(it->second);

return CreateRawReadActor(inputIndex, statsLevel, txId, std::move(gateway), holderFactory, params.GetUrl(), authInfo, pathPattern, pathPatternVariant,
return CreateRawReadActor(inputIndex, statsLevel, txId, std::move(gateway), holderFactory, params.GetUrl(), credentials, pathPattern, pathPatternVariant,
std::move(paths), addPathIndex, computeActorId, sizeLimit, retryPolicy,
cfg, counters, taskCounters, fileSizeLimit, rowsLimitHint,
params.GetUseRuntimeListing(), fileQueueActor, fileQueueBatchSizeLimit, fileQueueBatchObjectCountLimit, fileQueueConsumersCountDelta);
Expand Down
2 changes: 1 addition & 1 deletion ydb/library/yql/providers/s3/actors/yql_s3_read_actor.h
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,7 @@ NActors::IActor* CreateS3FileQueueActor(
IHTTPGateway::TPtr gateway,
IHTTPGateway::TRetryPolicy::TPtr retryPolicy,
TString url,
TS3Credentials::TAuthInfo authInfo,
const TS3Credentials& credentials,
TString pattern,
NYql::NS3Lister::ES3PatternVariant patternVariant,
NS3Lister::ES3PatternType patternType);
Expand Down
12 changes: 6 additions & 6 deletions ydb/library/yql/providers/s3/actors/yql_s3_source_queue.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -173,7 +173,7 @@ class TS3FileQueueActor : public NActors::TActorBootstrapped<TS3FileQueueActor>
IHTTPGateway::TPtr gateway,
IHTTPGateway::TRetryPolicy::TPtr retryPolicy,
TString url,
TS3Credentials::TAuthInfo authInfo,
const TS3Credentials& credentials,
TString pattern,
NS3Lister::ES3PatternVariant patternVariant,
NS3Lister::ES3PatternType patternType)
Expand All @@ -189,7 +189,7 @@ class TS3FileQueueActor : public NActors::TActorBootstrapped<TS3FileQueueActor>
, Gateway(std::move(gateway))
, RetryPolicy(std::move(retryPolicy))
, Url(std::move(url))
, AuthInfo(std::move(authInfo))
, Credentials(credentials)
, Pattern(std::move(pattern))
, PatternVariant(patternVariant)
, PatternType(patternType) {
Expand Down Expand Up @@ -493,7 +493,7 @@ class TS3FileQueueActor : public NActors::TActorBootstrapped<TS3FileQueueActor>
RetryPolicy,
NS3Lister::TListingRequest{
Url,
AuthInfo,
Credentials,
PatternVariant == NS3Lister::ES3PatternVariant::PathPattern
? Pattern
: TStringBuilder{} << object.GetPath() << Pattern,
Expand Down Expand Up @@ -616,7 +616,7 @@ class TS3FileQueueActor : public NActors::TActorBootstrapped<TS3FileQueueActor>
const IHTTPGateway::TPtr Gateway;
const IHTTPGateway::TRetryPolicy::TPtr RetryPolicy;
const TString Url;
const TS3Credentials::TAuthInfo AuthInfo;
const TS3Credentials Credentials;
const TString Pattern;
const NS3Lister::ES3PatternVariant PatternVariant;
const NS3Lister::ES3PatternType PatternType;
Expand All @@ -638,7 +638,7 @@ NActors::IActor* CreateS3FileQueueActor(
IHTTPGateway::TPtr gateway,
IHTTPGateway::TRetryPolicy::TPtr retryPolicy,
TString url,
TS3Credentials::TAuthInfo authInfo,
const TS3Credentials& credentials,
TString pattern,
NS3Lister::ES3PatternVariant patternVariant,
NS3Lister::ES3PatternType patternType) {
Expand All @@ -655,7 +655,7 @@ NActors::IActor* CreateS3FileQueueActor(
gateway,
retryPolicy,
url,
authInfo,
credentials,
pattern,
patternVariant,
patternType
Expand Down
2 changes: 1 addition & 1 deletion ydb/library/yql/providers/s3/actors/yql_s3_source_queue.h
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,7 @@ NActors::IActor* CreateS3FileQueueActor(
IHTTPGateway::TPtr gateway,
IHTTPGateway::TRetryPolicy::TPtr retryPolicy,
TString url,
TS3Credentials::TAuthInfo authInfo,
const TS3Credentials& credentials,
TString pattern,
NYql::NS3Lister::ES3PatternVariant patternVariant,
NS3Lister::ES3PatternType patternType);
Expand Down
14 changes: 13 additions & 1 deletion ydb/library/yql/providers/s3/credentials/credentials.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@
namespace NYql {

TS3Credentials::TS3Credentials(ISecuredServiceAccountCredentialsFactory::TPtr factory, const TString& structuredTokenJson, bool addBearerToToken)
: StructuredTokenJson(structuredTokenJson)
{
if (NYql::IsStructuredTokenJson(structuredTokenJson)) {
NYql::TStructuredTokenParser parser = NYql::CreateStructuredTokenParser(structuredTokenJson);
Expand All @@ -24,7 +25,7 @@ TS3Credentials::TS3Credentials(ISecuredServiceAccountCredentialsFactory::TPtr fa
}

auto providerFactory = CreateCredentialsProviderFactoryForStructuredToken(factory, structuredTokenJson, addBearerToToken);
CredentialsProvider = providerFactory->CreateProvider();
CredentialsProvider = providerFactory->CreateProvider(); // Heavy operation, BLOCKs thread until TA reply
}

TS3Credentials::TAuthInfo TS3Credentials::GetAuthInfo() const {
Expand All @@ -34,6 +35,17 @@ TS3Credentials::TAuthInfo TS3Credentials::GetAuthInfo() const {
return AuthInfo;
}

bool TS3Credentials::operator<(const TS3Credentials& other) const {
return StructuredTokenJson < other.StructuredTokenJson;
}

IOutputStream& operator<<(IOutputStream& stream, const TS3Credentials& credentials) {
const auto& authInfo = credentials.AuthInfo;
return stream << "TS3Credentials{.ServiceAccountAuth=" << static_cast<bool>(credentials.CredentialsProvider)
<< ",.AwsUserPwd=<some token with length" << authInfo.GetAwsUserPwd().length() << ">"
<< ",.AwsSigV4=<some sig with length" << authInfo.GetAwsSigV4().length() << ">}";
}

// string value after AWS prefix should be suitable for passing it to curl as CURLOPT_USERPWD, see details here:
// https://curl.se/libcurl/c/CURLOPT_AWS_SIGV4.html
// CURLOPT_USERPWD = "MY_ACCESS_KEY:MY_SECRET_KEY"
Expand Down
Loading

0 comments on commit 8449008

Please sign in to comment.