Skip to content

Commit

Permalink
YQ-3154 improve error in s3 applicator actor (ydb-platform#8008)
Browse files Browse the repository at this point in the history
  • Loading branch information
GrigoriyPA authored Aug 20, 2024
1 parent d470edd commit d3f247f
Show file tree
Hide file tree
Showing 2 changed files with 54 additions and 12 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -179,7 +179,11 @@ class TScriptFinalizerActor : public TActorBootstrapped<TScriptFinalizerActor> {

void Handle(NFq::TEvents::TEvEffectApplicationResult::TPtr& ev) {
if (ev->Get()->FatalError) {
FinishScriptFinalization(Ydb::StatusIds::BAD_REQUEST, std::move(ev->Get()->Issues));
NYql::TIssue rootIssue("Failed to commit/abort s3 multipart uploads");
for (const NYql::TIssue& issue : ev->Get()->Issues) {
rootIssue.AddSubIssue(MakeIntrusive<NYql::TIssue>(issue));
}
FinishScriptFinalization(Ydb::StatusIds::BAD_REQUEST, {rootIssue});
} else {
FinishScriptFinalization();
}
Expand Down
60 changes: 49 additions & 11 deletions ydb/library/yql/providers/s3/actors/yql_s3_applicator_actor.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -273,16 +273,43 @@ class TS3ApplicatorActor : public NActors::TActorBootstrapped<TS3ApplicatorActor
hFunc(TEvPrivate::TEvListParts, Handle);
)

bool RetryOperation(CURLcode curlResponseCode, ui32 httpResponseCode, const TString& url, const TString& operationName) {
auto result = RetryCount && RetryPolicy->CreateRetryState()->GetNextRetryDelay(curlResponseCode, httpResponseCode);
Issues.AddIssue(TStringBuilder() << "Retry operation " << operationName << ", curl error: " << curl_easy_strerror(curlResponseCode) << ", http code: " << httpResponseCode << ", url: " << url);
bool RetryOperation(IHTTPGateway::TResult&& operationResult, const TString& url, const TString& operationName) {
const auto curlResponseCode = operationResult.CurlResponseCode;
const auto httpResponseCode = operationResult.Content.HttpResponseCode;
const auto result = RetryCount && GetRetryState(operationName)->GetNextRetryDelay(curlResponseCode, httpResponseCode);

NYql::TIssues issues = std::move(operationResult.Issues);
TStringBuilder errorMessage = TStringBuilder() << "Retry operation " << operationName << ", curl error: " << curl_easy_strerror(curlResponseCode) << ", url: " << url;
if (const TString errorText = operationResult.Content.Extract()) {
TString errorCode;
TString message;
if (!ParseS3ErrorResponse(errorText, errorCode, message)) {
message = errorText;
}
issues.AddIssues(BuildIssues(httpResponseCode, errorCode, message));
} else {
errorMessage << ", HTTP code: " << httpResponseCode;
}

if (issues) {
RetryIssues.AddIssues(NS3Util::AddParentIssue(errorMessage, std::move(issues)));
} else {
RetryIssues.AddIssue(errorMessage);
}

if (result) {
RetryCount--;
} else {
Finish(true, RetryCount
? TString("Number of retries exceeded limit per operation")
: TStringBuilder() << "Number of retries exceeded global limit in " << GLOBAL_RETRY_LIMIT << " retries");
Issues.AddIssues(NS3Util::AddParentIssue(
RetryCount
? TStringBuilder() << "Number of retries exceeded limit for operation " << operationName
: TStringBuilder() << "Number of retries exceeded global limit in " << GLOBAL_RETRY_LIMIT << " retries",
NYql::TIssues(RetryIssues)
));
RetryIssues.Clear();
Finish(true);
}

return result;
}

Expand Down Expand Up @@ -377,7 +404,7 @@ class TS3ApplicatorActor : public NActors::TActorBootstrapped<TS3ApplicatorActor
}
const TString& url = ev->Get()->State->BuildUrl();
LOG_D("CommitMultipartUpload ERROR " << url);
if (RetryOperation(result.CurlResponseCode, result.Content.HttpResponseCode, url, "CommitMultipartUpload")) {
if (RetryOperation(std::move(result), url, "CommitMultipartUpload")) {
PushCommitMultipartUpload(ev->Get()->State);
}
}
Expand Down Expand Up @@ -422,7 +449,9 @@ class TS3ApplicatorActor : public NActors::TActorBootstrapped<TS3ApplicatorActor
auto prefix = ev->Get()->State->Url + ev->Get()->State->Prefix;
if (!UnknownPrefixes.contains(prefix)) {
UnknownPrefixes.insert(prefix);
Issues.AddIssue(TIssue("Unknown uncommitted upload with prefix: " + prefix));
TIssue issue(TStringBuilder() << "Unknown uncommitted upload with prefix: " << prefix);
issue.SetCode(NYql::DEFAULT_ERROR, NYql::TSeverityIds::S_INFO);
Issues.AddIssue(std::move(issue));
}
} else {
pos += KeyPrefix.size();
Expand Down Expand Up @@ -452,7 +481,7 @@ class TS3ApplicatorActor : public NActors::TActorBootstrapped<TS3ApplicatorActor
}
const TString& url = ev->Get()->State->BuildUrl();
LOG_D("ListMultipartUploads ERROR " << url);
if (RetryOperation(result.CurlResponseCode, result.Content.HttpResponseCode, url, "ListMultipartUploads")) {
if (RetryOperation(std::move(result), url, "ListMultipartUploads")) {
PushListMultipartUploads(ev->Get()->State);
}
}
Expand All @@ -476,7 +505,7 @@ class TS3ApplicatorActor : public NActors::TActorBootstrapped<TS3ApplicatorActor
}
const TString& url = ev->Get()->State->BuildUrl();
LOG_D("AbortMultipartUpload ERROR " << url);
if (RetryOperation(result.CurlResponseCode, result.Content.HttpResponseCode, url, "AbortMultipartUpload")) {
if (RetryOperation(std::move(result), url, "AbortMultipartUpload")) {
PushAbortMultipartUpload(ev->Get()->State);
}
}
Expand Down Expand Up @@ -527,7 +556,7 @@ class TS3ApplicatorActor : public NActors::TActorBootstrapped<TS3ApplicatorActor
}
const TString& url = ev->Get()->State->BuildUrl();
LOG_D("ListParts ERROR " << url);
if (RetryOperation(result.CurlResponseCode, result.Content.HttpResponseCode, url, "ListParts")) {
if (RetryOperation(std::move(result), url, "ListParts")) {
PushListParts(ev->Get()->State);
}
}
Expand Down Expand Up @@ -597,6 +626,13 @@ class TS3ApplicatorActor : public NActors::TActorBootstrapped<TS3ApplicatorActor
actorSystem->Send(new NActors::IEventHandle(selfId, {}, new TEvPrivate::TEvListParts(state, std::move(result))));
}

IHTTPGateway::TRetryPolicy::IRetryState::TPtr& GetRetryState(const TString& operationName) {
if (const auto it = RetryStates.find(operationName); it != RetryStates.end()) {
return it->second;
}
return RetryStates.insert({operationName, RetryPolicy->CreateRetryState()}).first->second;
}

private:
NActors::TActorId ParentId;
IHTTPGateway::TPtr Gateway;
Expand All @@ -609,11 +645,13 @@ class TS3ApplicatorActor : public NActors::TActorBootstrapped<TS3ApplicatorActor
NYql::NDqProto::TExternalEffect ExternalEffect;
NActors::TActorSystem* const ActorSystem;
const IHTTPGateway::TRetryPolicy::TPtr RetryPolicy;
std::unordered_map<TString, IHTTPGateway::TRetryPolicy::IRetryState::TPtr> RetryStates;
ui64 HttpRequestInflight = 0;
ui64 RetryCount;
THashSet<TString> UnknownPrefixes;
THashSet<TString> CommitUploads;
NYql::TIssues Issues;
NYql::TIssues RetryIssues;
std::queue<TObjectStorageRequest> RequestQueue;
bool ApplicationFinished = false;
};
Expand Down

0 comments on commit d3f247f

Please sign in to comment.