Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

YQ-3154 improve error in s3 applicator actor #8010

Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -181,7 +181,11 @@ class TScriptFinalizerActor : public TActorBootstrapped<TScriptFinalizerActor> {

void Handle(NFq::TEvents::TEvEffectApplicationResult::TPtr& ev) {
if (ev->Get()->FatalError) {
FinishScriptFinalization(Ydb::StatusIds::BAD_REQUEST, std::move(ev->Get()->Issues));
NYql::TIssue rootIssue("Failed to commit/abort s3 multipart uploads");
for (const NYql::TIssue& issue : ev->Get()->Issues) {
rootIssue.AddSubIssue(MakeIntrusive<NYql::TIssue>(issue));
}
FinishScriptFinalization(Ydb::StatusIds::BAD_REQUEST, {rootIssue});
} else {
FinishScriptFinalization();
}
Expand Down
60 changes: 49 additions & 11 deletions ydb/library/yql/providers/s3/actors/yql_s3_applicator_actor.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -273,16 +273,43 @@ class TS3ApplicatorActor : public NActors::TActorBootstrapped<TS3ApplicatorActor
hFunc(TEvPrivate::TEvListParts, Handle);
)

bool RetryOperation(CURLcode curlResponseCode, ui32 httpResponseCode, const TString& url, const TString& operationName) {
auto result = RetryCount && RetryPolicy->CreateRetryState()->GetNextRetryDelay(curlResponseCode, httpResponseCode);
Issues.AddIssue(TStringBuilder() << "Retry operation " << operationName << ", curl error: " << curl_easy_strerror(curlResponseCode) << ", http code: " << httpResponseCode << ", url: " << url);
bool RetryOperation(IHTTPGateway::TResult&& operationResult, const TString& url, const TString& operationName) {
const auto curlResponseCode = operationResult.CurlResponseCode;
const auto httpResponseCode = operationResult.Content.HttpResponseCode;
const auto result = RetryCount && GetRetryState(operationName)->GetNextRetryDelay(curlResponseCode, httpResponseCode);

NYql::TIssues issues = std::move(operationResult.Issues);
TStringBuilder errorMessage = TStringBuilder() << "Retry operation " << operationName << ", curl error: " << curl_easy_strerror(curlResponseCode) << ", url: " << url;
if (const TString errorText = operationResult.Content.Extract()) {
TString errorCode;
TString message;
if (!ParseS3ErrorResponse(errorText, errorCode, message)) {
message = errorText;
}
issues.AddIssues(BuildIssues(httpResponseCode, errorCode, message));
} else {
errorMessage << ", HTTP code: " << httpResponseCode;
}

if (issues) {
RetryIssues.AddIssues(NS3Util::AddParentIssue(errorMessage, std::move(issues)));
} else {
RetryIssues.AddIssue(errorMessage);
}

if (result) {
RetryCount--;
} else {
Finish(true, RetryCount
? TString("Number of retries exceeded limit per operation")
: TStringBuilder() << "Number of retries exceeded global limit in " << GLOBAL_RETRY_LIMIT << " retries");
Issues.AddIssues(NS3Util::AddParentIssue(
RetryCount
? TStringBuilder() << "Number of retries exceeded limit for operation " << operationName
: TStringBuilder() << "Number of retries exceeded global limit in " << GLOBAL_RETRY_LIMIT << " retries",
NYql::TIssues(RetryIssues)
));
RetryIssues.Clear();
Finish(true);
}

return result;
}

Expand Down Expand Up @@ -377,7 +404,7 @@ class TS3ApplicatorActor : public NActors::TActorBootstrapped<TS3ApplicatorActor
}
const TString& url = ev->Get()->State->BuildUrl();
LOG_D("CommitMultipartUpload ERROR " << url);
if (RetryOperation(result.CurlResponseCode, result.Content.HttpResponseCode, url, "CommitMultipartUpload")) {
if (RetryOperation(std::move(result), url, "CommitMultipartUpload")) {
PushCommitMultipartUpload(ev->Get()->State);
}
}
Expand Down Expand Up @@ -422,7 +449,9 @@ class TS3ApplicatorActor : public NActors::TActorBootstrapped<TS3ApplicatorActor
auto prefix = ev->Get()->State->Url + ev->Get()->State->Prefix;
if (!UnknownPrefixes.contains(prefix)) {
UnknownPrefixes.insert(prefix);
Issues.AddIssue(TIssue("Unknown uncommitted upload with prefix: " + prefix));
TIssue issue(TStringBuilder() << "Unknown uncommitted upload with prefix: " << prefix);
issue.SetCode(NYql::DEFAULT_ERROR, NYql::TSeverityIds::S_INFO);
Issues.AddIssue(std::move(issue));
}
} else {
pos += KeyPrefix.size();
Expand Down Expand Up @@ -452,7 +481,7 @@ class TS3ApplicatorActor : public NActors::TActorBootstrapped<TS3ApplicatorActor
}
const TString& url = ev->Get()->State->BuildUrl();
LOG_D("ListMultipartUploads ERROR " << url);
if (RetryOperation(result.CurlResponseCode, result.Content.HttpResponseCode, url, "ListMultipartUploads")) {
if (RetryOperation(std::move(result), url, "ListMultipartUploads")) {
PushListMultipartUploads(ev->Get()->State);
}
}
Expand All @@ -476,7 +505,7 @@ class TS3ApplicatorActor : public NActors::TActorBootstrapped<TS3ApplicatorActor
}
const TString& url = ev->Get()->State->BuildUrl();
LOG_D("AbortMultipartUpload ERROR " << url);
if (RetryOperation(result.CurlResponseCode, result.Content.HttpResponseCode, url, "AbortMultipartUpload")) {
if (RetryOperation(std::move(result), url, "AbortMultipartUpload")) {
PushAbortMultipartUpload(ev->Get()->State);
}
}
Expand Down Expand Up @@ -527,7 +556,7 @@ class TS3ApplicatorActor : public NActors::TActorBootstrapped<TS3ApplicatorActor
}
const TString& url = ev->Get()->State->BuildUrl();
LOG_D("ListParts ERROR " << url);
if (RetryOperation(result.CurlResponseCode, result.Content.HttpResponseCode, url, "ListParts")) {
if (RetryOperation(std::move(result), url, "ListParts")) {
PushListParts(ev->Get()->State);
}
}
Expand Down Expand Up @@ -597,6 +626,13 @@ class TS3ApplicatorActor : public NActors::TActorBootstrapped<TS3ApplicatorActor
actorSystem->Send(new NActors::IEventHandle(selfId, {}, new TEvPrivate::TEvListParts(state, std::move(result))));
}

IHTTPGateway::TRetryPolicy::IRetryState::TPtr& GetRetryState(const TString& operationName) {
if (const auto it = RetryStates.find(operationName); it != RetryStates.end()) {
return it->second;
}
return RetryStates.insert({operationName, RetryPolicy->CreateRetryState()}).first->second;
}

private:
NActors::TActorId ParentId;
IHTTPGateway::TPtr Gateway;
Expand All @@ -609,11 +645,13 @@ class TS3ApplicatorActor : public NActors::TActorBootstrapped<TS3ApplicatorActor
NYql::NDqProto::TExternalEffect ExternalEffect;
NActors::TActorSystem* const ActorSystem;
const IHTTPGateway::TRetryPolicy::TPtr RetryPolicy;
std::unordered_map<TString, IHTTPGateway::TRetryPolicy::IRetryState::TPtr> RetryStates;
ui64 HttpRequestInflight = 0;
ui64 RetryCount;
THashSet<TString> UnknownPrefixes;
THashSet<TString> CommitUploads;
NYql::TIssues Issues;
NYql::TIssues RetryIssues;
std::queue<TObjectStorageRequest> RequestQueue;
bool ApplicationFinished = false;
};
Expand Down
Loading