Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

YQ-3152 fix error failed to execute callable ResWrite! #6940

Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
21 changes: 13 additions & 8 deletions ydb/core/external_sources/object_storage.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -10,9 +10,11 @@
#include <ydb/core/protos/flat_scheme_op.pb.h>
#include <ydb/library/yql/providers/common/http_gateway/yql_http_gateway.h>
#include <ydb/library/yql/providers/common/provider/yql_provider_names.h>
#include <ydb/library/yql/providers/common/structured_token/yql_token_builder.h>
#include <ydb/library/yql/providers/s3/credentials/credentials.h>
#include <ydb/library/yql/providers/s3/object_listers/yql_s3_list.h>
#include <ydb/library/yql/providers/s3/path_generator/yql_s3_path_generator.h>
#include <ydb/library/yql/providers/s3/proto/credentials.pb.h>
#include <ydb/public/api/protos/ydb_status_codes.pb.h>
#include <ydb/public/sdk/cpp/client/ydb_value/value.h>

Expand Down Expand Up @@ -284,12 +286,13 @@ struct TObjectStorageExternalSource : public IExternalSource {
return NThreading::MakeFuture(std::move(meta));
}

NYql::TS3Credentials::TAuthInfo authInfo{};
NYql::TStructuredTokenBuilder structuredTokenBuilder;
if (std::holds_alternative<NAuth::TAws>(meta->Auth)) {
auto& awsAuth = std::get<NAuth::TAws>(meta->Auth);
authInfo.AwsAccessKey = awsAuth.AccessKey;
authInfo.AwsAccessSecret = awsAuth.SecretAccessKey;
authInfo.AwsRegion = awsAuth.Region;
NYql::NS3::TAwsParams params;
params.SetAwsAccessKey(awsAuth.AccessKey);
params.SetAwsRegion(awsAuth.Region);
structuredTokenBuilder.SetBasicAuth(params.SerializeAsString(), awsAuth.SecretAccessKey);
} else if (std::holds_alternative<NAuth::TServiceAccount>(meta->Auth)) {
if (!CredentialsFactory) {
try {
Expand All @@ -299,15 +302,17 @@ struct TObjectStorageExternalSource : public IExternalSource {
}
}
auto& saAuth = std::get<NAuth::TServiceAccount>(meta->Auth);
NYql::GetAuthInfo(CredentialsFactory, "");
authInfo.Token = CredentialsFactory->Create(saAuth.ServiceAccountId, saAuth.ServiceAccountIdSignature)->CreateProvider()->GetAuthInfo();
structuredTokenBuilder.SetServiceAccountIdAuth(saAuth.ServiceAccountId, saAuth.ServiceAccountIdSignature);
} else {
structuredTokenBuilder.SetNoAuth();
}

const NYql::TS3Credentials credentials(CredentialsFactory, structuredTokenBuilder.ToJson());
auto httpGateway = NYql::IHTTPGateway::Make();
auto httpRetryPolicy = NYql::GetHTTPDefaultRetryPolicy(NYql::THttpRetryPolicyOptions{.RetriedCurlCodes = NYql::FqRetriedCurlCodes()});
auto s3Lister = NYql::NS3Lister::MakeS3Lister(httpGateway, httpRetryPolicy, NYql::NS3Lister::TListingRequest{
.Url = meta->DataSourceLocation,
.AuthInfo = authInfo,
.Credentials = credentials,
.Pattern = meta->TableLocation,
}, Nothing(), false);
auto afterListing = s3Lister->Next().Apply([path = meta->TableLocation](const NThreading::TFuture<NYql::NS3Lister::TListResult>& listResFut) {
Expand All @@ -332,7 +337,7 @@ struct TObjectStorageExternalSource : public IExternalSource {
meta->DataSourceLocation,
httpGateway,
NYql::IHTTPGateway::TRetryPolicy::GetNoRetryPolicy(),
std::move(authInfo)
credentials
));

meta->Attributes.erase("withinfer");
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -45,7 +45,7 @@ class ArrowInferenceTest : public testing::Test {
BaseUrl,
Gateway,
NYql::IHTTPGateway::TRetryPolicy::GetNoRetryPolicy(),
NYql::TS3Credentials::TAuthInfo{}), 1);
NYql::TS3Credentials{}), 1);
}

NActors::TActorId RegisterInferencinator(TStringBuf formatStr) {
Expand Down
17 changes: 9 additions & 8 deletions ydb/core/external_sources/object_storage/s3_fetcher.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -10,11 +10,11 @@ class S3Fetcher : public NActors::TActorBootstrapped<S3Fetcher> {
TString url,
NYql::IHTTPGateway::TPtr gateway,
NYql::IHTTPGateway::TRetryPolicy::TPtr retryPolicy,
NYql::TS3Credentials::TAuthInfo authInfo)
const NYql::TS3Credentials& credentials)
: Url_{std::move(url)}
, Gateway_{std::move(gateway)}
, RetryPolicy_{std::move(retryPolicy)}
, AuthInfo_{std::move(authInfo)}
, Credentials_(credentials)
{}

void Bootstrap() {
Expand Down Expand Up @@ -60,12 +60,13 @@ class S3Fetcher : public NActors::TActorBootstrapped<S3Fetcher> {

void StartDownload(std::shared_ptr<TEvRequestS3Range>&& request, NActors::TActorSystem* actorSystem) {
auto length = request->End - request->Start;
const auto& authInfo = Credentials_.GetAuthInfo();
auto headers = NYql::IHTTPGateway::MakeYcHeaders(
request->RequestId.AsGuidString(),
AuthInfo_.GetToken(),
authInfo.GetToken(),
{},
AuthInfo_.GetAwsUserPwd(),
AuthInfo_.GetAwsSigV4()
authInfo.GetAwsUserPwd(),
authInfo.GetAwsSigV4()
);

Gateway_->Download(
Expand All @@ -79,15 +80,15 @@ class S3Fetcher : public NActors::TActorBootstrapped<S3Fetcher> {
TString Url_;
NYql::IHTTPGateway::TPtr Gateway_;
NYql::IHTTPGateway::TRetryPolicy::TPtr RetryPolicy_;
NYql::TS3Credentials::TAuthInfo AuthInfo_;
const NYql::TS3Credentials Credentials_;
};

NActors::IActor* CreateS3FetcherActor(
TString url,
NYql::IHTTPGateway::TPtr gateway,
NYql::IHTTPGateway::TRetryPolicy::TPtr retryPolicy,
NYql::TS3Credentials::TAuthInfo authInfo) {
const NYql::TS3Credentials& credentials) {

return new S3Fetcher(std::move(url), std::move(gateway), std::move(retryPolicy), std::move(authInfo));
return new S3Fetcher(std::move(url), std::move(gateway), std::move(retryPolicy), credentials);
}
} // namespace NKikimr::NExternalSource::NObjectStorage
2 changes: 1 addition & 1 deletion ydb/core/external_sources/object_storage/s3_fetcher.h
Original file line number Diff line number Diff line change
Expand Up @@ -13,5 +13,5 @@ NActors::IActor* CreateS3FetcherActor(
TString url,
NYql::IHTTPGateway::TPtr gateway,
NYql::IHTTPGateway::TRetryPolicy::TPtr retryPolicy,
NYql::TS3Credentials::TAuthInfo authInfo);
const NYql::TS3Credentials& credentials);
} // namespace NKikimr::NExternalSource::NObjectStorage
21 changes: 11 additions & 10 deletions ydb/library/yql/providers/s3/actors/yql_s3_raw_read_actor.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -42,7 +42,7 @@ class TS3ReadActor : public NActors::TActorBootstrapped<TS3ReadActor>, public ID
IHTTPGateway::TPtr gateway,
const NKikimr::NMiniKQL::THolderFactory& holderFactory,
const TString& url,
const TS3Credentials::TAuthInfo& authInfo,
const TS3Credentials& credentials,
const TString& pattern,
NYql::NS3Lister::ES3PatternVariant patternVariant,
NYql::NS3Details::TPathList&& paths,
Expand All @@ -69,7 +69,7 @@ class TS3ReadActor : public NActors::TActorBootstrapped<TS3ReadActor>, public ID
, RetryPolicy(retryPolicy)
, ActorSystem(NActors::TActivationContext::ActorSystem())
, Url(url)
, AuthInfo(authInfo)
, Credentials(credentials)
, Pattern(pattern)
, PatternVariant(patternVariant)
, Paths(std::move(paths))
Expand Down Expand Up @@ -113,7 +113,7 @@ class TS3ReadActor : public NActors::TActorBootstrapped<TS3ReadActor>, public ID
Gateway,
RetryPolicy,
Url,
AuthInfo,
Credentials,
Pattern,
PatternVariant,
NYql::NS3Lister::ES3PatternType::Wildcard));
Expand Down Expand Up @@ -164,10 +164,11 @@ class TS3ReadActor : public NActors::TActorBootstrapped<TS3ReadActor>, public ID
auto url = Url + object.GetPath();
auto id = object.GetPathIndex();
const TString requestId = CreateGuidAsString();
const auto& authInfo = Credentials.GetAuthInfo();
LOG_D("TS3ReadActor", "Download: " << url << ", ID: " << id << ", request id: [" << requestId << "]");
Gateway->Download(
UrlEscapeRet(url, true),
IHTTPGateway::MakeYcHeaders(requestId, AuthInfo.GetToken(), {}, AuthInfo.GetAwsUserPwd(), AuthInfo.GetAwsSigV4()),
IHTTPGateway::MakeYcHeaders(requestId, authInfo.GetToken(), {}, authInfo.GetAwsUserPwd(), authInfo.GetAwsSigV4()),
0U,
std::min(object.GetSize(), SizeLimit),
std::bind(&TS3ReadActor::OnDownloadFinished, ActorSystem, SelfId(), requestId, std::placeholders::_1, id, object.GetPath()),
Expand Down Expand Up @@ -456,7 +457,7 @@ class TS3ReadActor : public NActors::TActorBootstrapped<TS3ReadActor>, public ID
NActors::TActorSystem* const ActorSystem;

const TString Url;
const TS3Credentials::TAuthInfo AuthInfo;
const TS3Credentials Credentials;
const TString Pattern;
const NYql::NS3Lister::ES3PatternVariant PatternVariant;
NYql::NS3Details::TPathList Paths;
Expand Down Expand Up @@ -503,7 +504,7 @@ std::pair<NYql::NDq::IDqComputeActorAsyncInput*, NActors::IActor*> CreateRawRead
IHTTPGateway::TPtr gateway,
const NKikimr::NMiniKQL::THolderFactory& holderFactory,
const TString& url,
const TS3Credentials::TAuthInfo& authInfo,
const TS3Credentials& credentials,
const TString& pattern,
NYql::NS3Lister::ES3PatternVariant patternVariant,
NYql::NS3Details::TPathList&& paths,
Expand All @@ -527,14 +528,14 @@ std::pair<NYql::NDq::IDqComputeActorAsyncInput*, NActors::IActor*> CreateRawRead
statsLevel,
txId,
std::move(gateway),
holderFactory,
url,
authInfo,
holderFactory,
url,
credentials,
pattern,
patternVariant,
std::move(paths),
addPathIndex,
computeActorId,
computeActorId,
sizeLimit,
retryPolicy,
readActorFactoryCfg,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,7 @@ std::pair<NYql::NDq::IDqComputeActorAsyncInput*, NActors::IActor*> CreateRawRead
IHTTPGateway::TPtr gateway,
const NKikimr::NMiniKQL::THolderFactory& holderFactory,
const TString& url,
const TS3Credentials::TAuthInfo& authInfo,
const TS3Credentials& credentials,
const TString& pattern,
NYql::NS3Lister::ES3PatternVariant patternVariant,
NYql::NS3Details::TPathList&& paths,
Expand Down
17 changes: 9 additions & 8 deletions ydb/library/yql/providers/s3/actors/yql_s3_read_actor.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -1201,7 +1201,7 @@ class TS3StreamReadActor : public TActorBootstrapped<TS3StreamReadActor>, public
IHTTPGateway::TPtr gateway,
const THolderFactory& holderFactory,
const TString& url,
const TS3Credentials::TAuthInfo& authInfo,
const TS3Credentials& credentials,
const TString& pattern,
ES3PatternVariant patternVariant,
TPathList&& paths,
Expand Down Expand Up @@ -1230,7 +1230,7 @@ class TS3StreamReadActor : public TActorBootstrapped<TS3StreamReadActor>, public
, ComputeActorId(computeActorId)
, RetryPolicy(retryPolicy)
, Url(url)
, AuthInfo(authInfo)
, Credentials(credentials)
, Pattern(pattern)
, PatternVariant(patternVariant)
, Paths(std::move(paths))
Expand Down Expand Up @@ -1321,7 +1321,7 @@ class TS3StreamReadActor : public TActorBootstrapped<TS3StreamReadActor>, public
Gateway,
RetryPolicy,
Url,
AuthInfo,
Credentials,
Pattern,
PatternVariant,
ES3PatternType::Wildcard));
Expand Down Expand Up @@ -1385,10 +1385,11 @@ class TS3StreamReadActor : public TActorBootstrapped<TS3StreamReadActor>, public
<< pathIndex);

TActorId actorId;
const auto& authInfo = Credentials.GetAuthInfo();
auto stuff = std::make_shared<TRetryStuff>(
Gateway,
Url + object.GetPath(),
IHTTPGateway::MakeYcHeaders(requestId, AuthInfo.GetToken(), {}, AuthInfo.GetAwsUserPwd(), AuthInfo.GetAwsSigV4()),
IHTTPGateway::MakeYcHeaders(requestId, authInfo.GetToken(), {}, authInfo.GetAwsUserPwd(), authInfo.GetAwsSigV4()),
object.GetSize(),
TxId,
requestId,
Expand Down Expand Up @@ -1786,7 +1787,7 @@ class TS3StreamReadActor : public TActorBootstrapped<TS3StreamReadActor>, public
const IHTTPGateway::TRetryPolicy::TPtr RetryPolicy;

const TString Url;
const TS3Credentials::TAuthInfo AuthInfo;
const TS3Credentials Credentials;
const TString Pattern;
const ES3PatternVariant PatternVariant;
TPathList Paths;
Expand Down Expand Up @@ -2000,7 +2001,7 @@ std::pair<NYql::NDq::IDqComputeActorAsyncInput*, IActor*> CreateS3ReadActor(
ReadPathsList(params, taskParams, readRanges, paths);

const auto token = secureParams.Value(params.GetToken(), TString{});
const auto authInfo = GetAuthInfo(credentialsFactory, token);
const TS3Credentials credentials(credentialsFactory, token);

const auto& settings = params.GetSettings();
TString pathPattern = "*";
Expand Down Expand Up @@ -2178,7 +2179,7 @@ std::pair<NYql::NDq::IDqComputeActorAsyncInput*, IActor*> CreateS3ReadActor(
sizeLimit = FromString<ui64>(it->second);
}

const auto actor = new TS3StreamReadActor(inputIndex, statsLevel, txId, std::move(gateway), holderFactory, params.GetUrl(), authInfo, pathPattern, pathPatternVariant,
const auto actor = new TS3StreamReadActor(inputIndex, statsLevel, txId, std::move(gateway), holderFactory, params.GetUrl(), credentials, pathPattern, pathPatternVariant,
std::move(paths), addPathIndex, readSpec, computeActorId, retryPolicy,
cfg, counters, taskCounters, fileSizeLimit, sizeLimit, rowsLimitHint, memoryQuotaManager,
params.GetUseRuntimeListing(), fileQueueActor, fileQueueBatchSizeLimit, fileQueueBatchObjectCountLimit, fileQueueConsumersCountDelta,
Expand All @@ -2190,7 +2191,7 @@ std::pair<NYql::NDq::IDqComputeActorAsyncInput*, IActor*> CreateS3ReadActor(
if (const auto it = settings.find("sizeLimit"); settings.cend() != it)
sizeLimit = FromString<ui64>(it->second);

return CreateRawReadActor(inputIndex, statsLevel, txId, std::move(gateway), holderFactory, params.GetUrl(), authInfo, pathPattern, pathPatternVariant,
return CreateRawReadActor(inputIndex, statsLevel, txId, std::move(gateway), holderFactory, params.GetUrl(), credentials, pathPattern, pathPatternVariant,
std::move(paths), addPathIndex, computeActorId, sizeLimit, retryPolicy,
cfg, counters, taskCounters, fileSizeLimit, rowsLimitHint,
params.GetUseRuntimeListing(), fileQueueActor, fileQueueBatchSizeLimit, fileQueueBatchObjectCountLimit, fileQueueConsumersCountDelta);
Expand Down
2 changes: 1 addition & 1 deletion ydb/library/yql/providers/s3/actors/yql_s3_read_actor.h
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,7 @@ NActors::IActor* CreateS3FileQueueActor(
IHTTPGateway::TPtr gateway,
IHTTPGateway::TRetryPolicy::TPtr retryPolicy,
TString url,
TS3Credentials::TAuthInfo authInfo,
const TS3Credentials& credentials,
TString pattern,
NYql::NS3Lister::ES3PatternVariant patternVariant,
NS3Lister::ES3PatternType patternType);
Expand Down
12 changes: 6 additions & 6 deletions ydb/library/yql/providers/s3/actors/yql_s3_source_queue.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -173,7 +173,7 @@ class TS3FileQueueActor : public NActors::TActorBootstrapped<TS3FileQueueActor>
IHTTPGateway::TPtr gateway,
IHTTPGateway::TRetryPolicy::TPtr retryPolicy,
TString url,
TS3Credentials::TAuthInfo authInfo,
const TS3Credentials& credentials,
TString pattern,
NS3Lister::ES3PatternVariant patternVariant,
NS3Lister::ES3PatternType patternType)
Expand All @@ -189,7 +189,7 @@ class TS3FileQueueActor : public NActors::TActorBootstrapped<TS3FileQueueActor>
, Gateway(std::move(gateway))
, RetryPolicy(std::move(retryPolicy))
, Url(std::move(url))
, AuthInfo(std::move(authInfo))
, Credentials(credentials)
, Pattern(std::move(pattern))
, PatternVariant(patternVariant)
, PatternType(patternType) {
Expand Down Expand Up @@ -493,7 +493,7 @@ class TS3FileQueueActor : public NActors::TActorBootstrapped<TS3FileQueueActor>
RetryPolicy,
NS3Lister::TListingRequest{
Url,
AuthInfo,
Credentials,
PatternVariant == NS3Lister::ES3PatternVariant::PathPattern
? Pattern
: TStringBuilder{} << object.GetPath() << Pattern,
Expand Down Expand Up @@ -616,7 +616,7 @@ class TS3FileQueueActor : public NActors::TActorBootstrapped<TS3FileQueueActor>
const IHTTPGateway::TPtr Gateway;
const IHTTPGateway::TRetryPolicy::TPtr RetryPolicy;
const TString Url;
const TS3Credentials::TAuthInfo AuthInfo;
const TS3Credentials Credentials;
const TString Pattern;
const NS3Lister::ES3PatternVariant PatternVariant;
const NS3Lister::ES3PatternType PatternType;
Expand All @@ -638,7 +638,7 @@ NActors::IActor* CreateS3FileQueueActor(
IHTTPGateway::TPtr gateway,
IHTTPGateway::TRetryPolicy::TPtr retryPolicy,
TString url,
TS3Credentials::TAuthInfo authInfo,
const TS3Credentials& credentials,
TString pattern,
NS3Lister::ES3PatternVariant patternVariant,
NS3Lister::ES3PatternType patternType) {
Expand All @@ -655,7 +655,7 @@ NActors::IActor* CreateS3FileQueueActor(
gateway,
retryPolicy,
url,
authInfo,
credentials,
pattern,
patternVariant,
patternType
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,7 @@ NActors::IActor* CreateS3FileQueueActor(
IHTTPGateway::TPtr gateway,
IHTTPGateway::TRetryPolicy::TPtr retryPolicy,
TString url,
TS3Credentials::TAuthInfo authInfo,
const TS3Credentials& credentials,
TString pattern,
NYql::NS3Lister::ES3PatternVariant patternVariant,
NS3Lister::ES3PatternType patternType);
Expand Down
14 changes: 13 additions & 1 deletion ydb/library/yql/providers/s3/credentials/credentials.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@
namespace NYql {

TS3Credentials::TS3Credentials(ISecuredServiceAccountCredentialsFactory::TPtr factory, const TString& structuredTokenJson, bool addBearerToToken)
: StructuredTokenJson(structuredTokenJson)
{
if (NYql::IsStructuredTokenJson(structuredTokenJson)) {
NYql::TStructuredTokenParser parser = NYql::CreateStructuredTokenParser(structuredTokenJson);
Expand All @@ -24,7 +25,7 @@ TS3Credentials::TS3Credentials(ISecuredServiceAccountCredentialsFactory::TPtr fa
}

auto providerFactory = CreateCredentialsProviderFactoryForStructuredToken(factory, structuredTokenJson, addBearerToToken);
CredentialsProvider = providerFactory->CreateProvider();
CredentialsProvider = providerFactory->CreateProvider(); // Heavy operation, BLOCKs thread until TA reply
}

TS3Credentials::TAuthInfo TS3Credentials::GetAuthInfo() const {
Expand All @@ -34,6 +35,17 @@ TS3Credentials::TAuthInfo TS3Credentials::GetAuthInfo() const {
return AuthInfo;
}

bool TS3Credentials::operator<(const TS3Credentials& other) const {
return StructuredTokenJson < other.StructuredTokenJson;
}

IOutputStream& operator<<(IOutputStream& stream, const TS3Credentials& credentials) {
const auto& authInfo = credentials.AuthInfo;
return stream << "TS3Credentials{.ServiceAccountAuth=" << static_cast<bool>(credentials.CredentialsProvider)
<< ",.AwsUserPwd=<some token with length" << authInfo.GetAwsUserPwd().length() << ">"
<< ",.AwsSigV4=<some sig with length" << authInfo.GetAwsSigV4().length() << ">}";
}

// string value after AWS prefix should be suitable for passing it to curl as CURLOPT_USERPWD, see details here:
// https://curl.se/libcurl/c/CURLOPT_AWS_SIGV4.html
// CURLOPT_USERPWD = "MY_ACCESS_KEY:MY_SECRET_KEY"
Expand Down
Loading
Loading