diff --git a/ydb/core/fq/libs/checkpointing/checkpoint_coordinator.cpp b/ydb/core/fq/libs/checkpointing/checkpoint_coordinator.cpp index 6c2b5d9afd31..4624c456d36c 100644 --- a/ydb/core/fq/libs/checkpointing/checkpoint_coordinator.cpp +++ b/ydb/core/fq/libs/checkpointing/checkpoint_coordinator.cpp @@ -372,11 +372,12 @@ void TCheckpointCoordinator::Handle(const TEvCheckpointCoordinator::TEvScheduleC CC_LOG_D("Got TEvScheduleCheckpointing"); ScheduleNextCheckpoint(); const auto checkpointsInFly = PendingCheckpoints.size() + PendingCommitCheckpoints.size(); - if (checkpointsInFly >= Settings.GetMaxInflight() || InitingZeroCheckpoint) { + if (checkpointsInFly >= Settings.GetMaxInflight() || (InitingZeroCheckpoint && !FailedZeroCheckpoint)) { CC_LOG_W("Skip schedule checkpoint event since inflight checkpoint limit exceeded: current: " << checkpointsInFly << ", limit: " << Settings.GetMaxInflight()); Metrics.SkippedDueToInFlightLimit->Inc(); return; } + FailedZeroCheckpoint = false; Metrics.SkippedDueToInFlightLimit->Set(0); InitCheckpoint(); } @@ -389,6 +390,7 @@ void TCheckpointCoordinator::Handle(const TEvCheckpointStorage::TEvCreateCheckpo if (issues) { CC_LOG_E("[" << checkpointId << "] StorageError: can't create checkpoint: " << issues.ToOneLineString()); PendingCheckpoints.erase(checkpointId); + FailedZeroCheckpoint = InitingZeroCheckpoint; UpdateInProgressMetric(); ++*Metrics.FailedToCreate; ++*Metrics.StorageError; @@ -470,6 +472,7 @@ void TCheckpointCoordinator::Handle(const NYql::NDq::TEvDqCompute::TEvSaveTaskSt CC_LOG_E("[" << checkpointId << "] Got all acks for aborted checkpoint, aborting in storage"); CheckpointingSnapshotRotationIndex = CheckpointingSnapshotRotationPeriod; // Next checkpoint is snapshot. Send(StorageProxy, new TEvCheckpointStorage::TEvAbortCheckpointRequest(CoordinatorId, checkpointId, "Can't save node state"), IEventHandle::FlagTrackDelivery); + FailedZeroCheckpoint = InitingZeroCheckpoint; } else { CC_LOG_I("[" << checkpointId << "] Got all acks, changing checkpoint status to 'PendingCommit'"); Send(StorageProxy, new TEvCheckpointStorage::TEvSetCheckpointPendingCommitStatusRequest(CoordinatorId, checkpointId, checkpoint.GetStats().StateSize), IEventHandle::FlagTrackDelivery); @@ -494,6 +497,7 @@ void TCheckpointCoordinator::Handle(const TEvCheckpointStorage::TEvSetCheckpoint CC_LOG_E("[" << checkpointId << "] StorageError: can't change checkpoint status to 'PendingCommit': " << issues.ToString()); ++*Metrics.StorageError; PendingCheckpoints.erase(it); + FailedZeroCheckpoint = InitingZeroCheckpoint; return; } @@ -571,6 +575,7 @@ void TCheckpointCoordinator::Handle(const TEvCheckpointStorage::TEvAbortCheckpoi ++*Metrics.Aborted; } PendingCheckpoints.erase(checkpointId); + FailedZeroCheckpoint = InitingZeroCheckpoint; PendingCommitCheckpoints.erase(checkpointId); UpdateInProgressMetric(); } @@ -616,6 +621,8 @@ void TCheckpointCoordinator::Handle(NActors::TEvents::TEvPoison::TPtr& ev) { } void TCheckpointCoordinator::Handle(const TEvCheckpointCoordinator::TEvRunGraph::TPtr&) { + Y_DEBUG_ABORT_UNLESS(InitingZeroCheckpoint); + Y_DEBUG_ABORT_UNLESS(!FailedZeroCheckpoint); InitingZeroCheckpoint = false; // TODO: run graph only now, not before zero checkpoint inited } diff --git a/ydb/core/fq/libs/checkpointing/checkpoint_coordinator.h b/ydb/core/fq/libs/checkpointing/checkpoint_coordinator.h index 280130f38163..b5740178dadd 100644 --- a/ydb/core/fq/libs/checkpointing/checkpoint_coordinator.h +++ b/ydb/core/fq/libs/checkpointing/checkpoint_coordinator.h @@ -193,6 +193,7 @@ class TCheckpointCoordinator : public NYql::TTaskControllerImpl PendingInit; bool GraphIsRunning = false; bool InitingZeroCheckpoint = false; + bool FailedZeroCheckpoint = false; bool RestoringFromForeignCheckpoint = false; TCheckpointCoordinatorMetrics Metrics; diff --git a/ydb/core/fq/libs/checkpointing/ut/checkpoint_coordinator_ut.cpp b/ydb/core/fq/libs/checkpointing/ut/checkpoint_coordinator_ut.cpp index bab3c2aec960..77cc91fbff37 100644 --- a/ydb/core/fq/libs/checkpointing/ut/checkpoint_coordinator_ut.cpp +++ b/ydb/core/fq/libs/checkpointing/ut/checkpoint_coordinator_ut.cpp @@ -411,7 +411,6 @@ Y_UNIT_TEST_SUITE(TCheckpointCoordinatorTests) { TEvCheckpointStorage::TEvCompleteCheckpointRequest(CoordinatorId, checkpointId, 300, type)); MockCompleteCheckpointResponse(checkpointId); - MockRunGraph(); } void SaveFailed(TCheckpointId checkpointId) { @@ -423,7 +422,6 @@ Y_UNIT_TEST_SUITE(TCheckpointCoordinatorTests) { ExpectEvent(StorageProxy, TEvCheckpointStorage::TEvAbortCheckpointRequest( CoordinatorId, checkpointId, "Can't save node state")); MockAbortCheckpointResponse(checkpointId); - MockRunGraph(); } void ScheduleCheckpointing() { @@ -436,6 +434,7 @@ Y_UNIT_TEST_SUITE(TCheckpointCoordinatorTests) { test.RegisterCoordinator(); test.InjectCheckpoint(test.CheckpointId1); test.AllSavedAndCommited(test.CheckpointId1); + test.MockRunGraph(); } Y_UNIT_TEST(ShouldTriggerCheckpointWithSourcesAndWithChannel) { @@ -443,6 +442,7 @@ Y_UNIT_TEST_SUITE(TCheckpointCoordinatorTests) { test.RegisterCoordinator(); test.InjectCheckpoint(test.CheckpointId1); test.AllSavedAndCommited(test.CheckpointId1); + test.MockRunGraph(); } Y_UNIT_TEST(ShouldAllSnapshots) { @@ -450,6 +450,7 @@ Y_UNIT_TEST_SUITE(TCheckpointCoordinatorTests) { test.RegisterCoordinator(); test.InjectCheckpoint(test.CheckpointId1); test.AllSavedAndCommited(test.CheckpointId1); + test.MockRunGraph(); test.ScheduleCheckpointing(); test.InjectCheckpoint(test.CheckpointId2, test.GraphDescId, NYql::NDqProto::CHECKPOINT_TYPE_SNAPSHOT); @@ -461,6 +462,7 @@ Y_UNIT_TEST_SUITE(TCheckpointCoordinatorTests) { test.RegisterCoordinator(); test.InjectCheckpoint(test.CheckpointId1); test.AllSavedAndCommited(test.CheckpointId1); + test.MockRunGraph(); test.ScheduleCheckpointing(); test.InjectCheckpoint(test.CheckpointId2, test.GraphDescId, NYql::NDqProto::CHECKPOINT_TYPE_INCREMENT_OR_SNAPSHOT);