From ec024aa63469720f2963bf74f42a34449decc2a7 Mon Sep 17 00:00:00 2001 From: yumkam Date: Wed, 29 Jan 2025 12:51:18 +0300 Subject: [PATCH] checkpoint storage: fix future handling on error path (#13909) --- .../libs/checkpoint_storage/storage_proxy.cpp | 46 ++++--------------- 1 file changed, 10 insertions(+), 36 deletions(-) diff --git a/ydb/core/fq/libs/checkpoint_storage/storage_proxy.cpp b/ydb/core/fq/libs/checkpoint_storage/storage_proxy.cpp index 7037b2734ea0..52d9ab6edfb5 100644 --- a/ydb/core/fq/libs/checkpoint_storage/storage_proxy.cpp +++ b/ydb/core/fq/libs/checkpoint_storage/storage_proxy.cpp @@ -168,40 +168,18 @@ void TStorageProxy::Handle(TEvCheckpointStorage::TEvCreateCheckpointRequest::TPt cookie = ev->Cookie, sender = ev->Sender, totalGraphCheckpointsSizeLimit = Config.GetStateStorageLimits().GetMaxGraphCheckpointsSizeBytes(), - actorSystem = TActivationContext::ActorSystem()] + graphDesc = std::move(event->GraphDescription), + storage = CheckpointStorage] (const NThreading::TFuture& resultFuture) { - auto result = resultFuture.GetValue(); - auto issues = result.second; - - if (issues) { - LOG_STREAMS_STORAGE_SERVICE_AS_WARN(*actorSystem, "[" << coordinatorId << "] [" << checkpointId << "] Failed to fetch total graph checkpoints size: " << issues.ToString()); - actorSystem->Send(sender, new TEvCheckpointStorage::TEvCreateCheckpointResponse(checkpointId, std::move(issues), TString()), 0, cookie); - return false; - } + auto [totalGraphCheckpointsSize, issues] = resultFuture.GetValue(); - auto totalGraphCheckpointsSize = result.first; - - if (totalGraphCheckpointsSize > totalGraphCheckpointsSizeLimit) { + if (!issues && totalGraphCheckpointsSize > totalGraphCheckpointsSizeLimit) { TStringStream ss; - ss << "[" << coordinatorId << "] [" << checkpointId << "] Graph checkpoints size limit exceeded: limit " << totalGraphCheckpointsSizeLimit << ", current checkpoints size: " << totalGraphCheckpointsSize; - auto message = ss.Str(); - LOG_STREAMS_STORAGE_SERVICE_AS_WARN(*actorSystem, message) - issues.AddIssue(message); - LOG_STREAMS_STORAGE_SERVICE_AS_DEBUG(*actorSystem, "[" << coordinatorId << "] [" << checkpointId << "] Send TEvCreateCheckpointResponse"); - actorSystem->Send(sender, new TEvCheckpointStorage::TEvCreateCheckpointResponse(checkpointId, std::move(issues), TString()), 0, cookie); - return false; + ss << "Graph checkpoints size limit exceeded: limit " << totalGraphCheckpointsSizeLimit << ", current checkpoints size: " << totalGraphCheckpointsSize; + issues.AddIssue(std::move(ss.Str())); } - return true; - }) - .Apply([checkpointId = event->CheckpointId, - coordinatorId = event->CoordinatorId, - cookie = ev->Cookie, - sender = ev->Sender, - graphDesc = event->GraphDescription, - storage = CheckpointStorage] - (const NThreading::TFuture& passedSizeLimitCheckFuture) { - if (!passedSizeLimitCheckFuture.GetValue()) { - return NThreading::TFuture(); + if (issues) { + return NThreading::MakeFuture(ICheckpointStorage::TCreateCheckpointResult {TString(), std::move(issues) } ); } if (std::holds_alternative(graphDesc)) { return storage->CreateCheckpoint(coordinatorId, checkpointId, std::get(graphDesc), ECheckpointStatus::Pending); @@ -215,12 +193,8 @@ void TStorageProxy::Handle(TEvCheckpointStorage::TEvCreateCheckpointRequest::TPt sender = ev->Sender, actorSystem = TActivationContext::ActorSystem()] (const NThreading::TFuture& resultFuture) { - if (!resultFuture.Initialized()) { // didn't pass the size limit check - return; - } - auto result = resultFuture.GetValue(); - auto issues = result.second; - auto response = std::make_unique(checkpointId, std::move(issues), result.first); + auto [graphDescId, issues] = resultFuture.GetValue(); + auto response = std::make_unique(checkpointId, std::move(issues), std::move(graphDescId)); if (response->Issues) { LOG_STREAMS_STORAGE_SERVICE_AS_WARN(*actorSystem, "[" << coordinatorId << "] [" << checkpointId << "] Failed to create checkpoint: " << response->Issues.ToString()); } else {