Skip to content

Commit

Permalink
checkpoint storage: fix future handling on error path (#13909)
Browse files Browse the repository at this point in the history
  • Loading branch information
yumkam authored Jan 29, 2025
1 parent a4ee29d commit ec024aa
Showing 1 changed file with 10 additions and 36 deletions.
46 changes: 10 additions & 36 deletions ydb/core/fq/libs/checkpoint_storage/storage_proxy.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -168,40 +168,18 @@ void TStorageProxy::Handle(TEvCheckpointStorage::TEvCreateCheckpointRequest::TPt
cookie = ev->Cookie,
sender = ev->Sender,
totalGraphCheckpointsSizeLimit = Config.GetStateStorageLimits().GetMaxGraphCheckpointsSizeBytes(),
actorSystem = TActivationContext::ActorSystem()]
graphDesc = std::move(event->GraphDescription),
storage = CheckpointStorage]
(const NThreading::TFuture<ICheckpointStorage::TGetTotalCheckpointsStateSizeResult>& resultFuture) {
auto result = resultFuture.GetValue();
auto issues = result.second;

if (issues) {
LOG_STREAMS_STORAGE_SERVICE_AS_WARN(*actorSystem, "[" << coordinatorId << "] [" << checkpointId << "] Failed to fetch total graph checkpoints size: " << issues.ToString());
actorSystem->Send(sender, new TEvCheckpointStorage::TEvCreateCheckpointResponse(checkpointId, std::move(issues), TString()), 0, cookie);
return false;
}
auto [totalGraphCheckpointsSize, issues] = resultFuture.GetValue();

auto totalGraphCheckpointsSize = result.first;

if (totalGraphCheckpointsSize > totalGraphCheckpointsSizeLimit) {
if (!issues && totalGraphCheckpointsSize > totalGraphCheckpointsSizeLimit) {
TStringStream ss;
ss << "[" << coordinatorId << "] [" << checkpointId << "] Graph checkpoints size limit exceeded: limit " << totalGraphCheckpointsSizeLimit << ", current checkpoints size: " << totalGraphCheckpointsSize;
auto message = ss.Str();
LOG_STREAMS_STORAGE_SERVICE_AS_WARN(*actorSystem, message)
issues.AddIssue(message);
LOG_STREAMS_STORAGE_SERVICE_AS_DEBUG(*actorSystem, "[" << coordinatorId << "] [" << checkpointId << "] Send TEvCreateCheckpointResponse");
actorSystem->Send(sender, new TEvCheckpointStorage::TEvCreateCheckpointResponse(checkpointId, std::move(issues), TString()), 0, cookie);
return false;
ss << "Graph checkpoints size limit exceeded: limit " << totalGraphCheckpointsSizeLimit << ", current checkpoints size: " << totalGraphCheckpointsSize;
issues.AddIssue(std::move(ss.Str()));
}
return true;
})
.Apply([checkpointId = event->CheckpointId,
coordinatorId = event->CoordinatorId,
cookie = ev->Cookie,
sender = ev->Sender,
graphDesc = event->GraphDescription,
storage = CheckpointStorage]
(const NThreading::TFuture<bool>& passedSizeLimitCheckFuture) {
if (!passedSizeLimitCheckFuture.GetValue()) {
return NThreading::TFuture<ICheckpointStorage::TCreateCheckpointResult>();
if (issues) {
return NThreading::MakeFuture(ICheckpointStorage::TCreateCheckpointResult {TString(), std::move(issues) } );
}
if (std::holds_alternative<TString>(graphDesc)) {
return storage->CreateCheckpoint(coordinatorId, checkpointId, std::get<TString>(graphDesc), ECheckpointStatus::Pending);
Expand All @@ -215,12 +193,8 @@ void TStorageProxy::Handle(TEvCheckpointStorage::TEvCreateCheckpointRequest::TPt
sender = ev->Sender,
actorSystem = TActivationContext::ActorSystem()]
(const NThreading::TFuture<ICheckpointStorage::TCreateCheckpointResult>& resultFuture) {
if (!resultFuture.Initialized()) { // didn't pass the size limit check
return;
}
auto result = resultFuture.GetValue();
auto issues = result.second;
auto response = std::make_unique<TEvCheckpointStorage::TEvCreateCheckpointResponse>(checkpointId, std::move(issues), result.first);
auto [graphDescId, issues] = resultFuture.GetValue();
auto response = std::make_unique<TEvCheckpointStorage::TEvCreateCheckpointResponse>(checkpointId, std::move(issues), std::move(graphDescId));
if (response->Issues) {
LOG_STREAMS_STORAGE_SERVICE_AS_WARN(*actorSystem, "[" << coordinatorId << "] [" << checkpointId << "] Failed to create checkpoint: " << response->Issues.ToString());
} else {
Expand Down

0 comments on commit ec024aa

Please sign in to comment.