From 101d0bff4b023f064bb017f1593607170881c6ad Mon Sep 17 00:00:00 2001 From: Pisarenko Grigoriy <79596613+GrigoriyPA@users.noreply.github.com> Date: Tue, 20 Feb 2024 12:25:47 +0300 Subject: [PATCH] YQ-2884 fixed fq cancel operation race (#2055) --- .../fq/libs/compute/ydb/ydb_run_actor.cpp | 47 +++++++++++++++---- 1 file changed, 38 insertions(+), 9 deletions(-) diff --git a/ydb/core/fq/libs/compute/ydb/ydb_run_actor.cpp b/ydb/core/fq/libs/compute/ydb/ydb_run_actor.cpp index e382035e183a..41701816a815 100644 --- a/ydb/core/fq/libs/compute/ydb/ydb_run_actor.cpp +++ b/ydb/core/fq/libs/compute/ydb/ydb_run_actor.cpp @@ -98,10 +98,14 @@ class TYdbRunActor : public NActors::TActorBootstrapped { } void Handle(const TEvYdbCompute::TEvStatusTrackerResponse::TPtr& ev) { + if (CancelOperationIsRunning("StatusTrackerResponse (aborting). ")) { + return; + } + auto& response = *ev->Get(); if (response.Status == NYdb::EStatus::NOT_FOUND) { // FAILING / ABORTING_BY_USER / ABORTING_BY_SYSTEM LOG_I("StatusTrackerResponse (not found). Status: " << response.Status << " Issues: " << response.Issues.ToOneLineString()); - Register(ActorFactory->CreateFinalizer(Params, SelfId(), Pinger, ExecStatus, Params.Status).release()); + CreateFinalizer(Params.Status); return; } @@ -116,11 +120,15 @@ class TYdbRunActor : public NActors::TActorBootstrapped { if (response.ExecStatus == NYdb::NQuery::EExecStatus::Completed) { Register(ActorFactory->CreateResultWriter(SelfId(), Connector, Pinger, Params.OperationId).release()); } else { - Register(ActorFactory->CreateResourcesCleaner(SelfId(), Connector, Params.OperationId).release()); + CreateResourcesCleaner(); } } void Handle(const TEvYdbCompute::TEvResultWriterResponse::TPtr& ev) { + if (CancelOperationIsRunning("ResultWriterResponse (aborting). ")) { + return; + } + auto& response = *ev->Get(); if (response.Status != NYdb::EStatus::SUCCESS) { LOG_I("ResultWriterResponse (failed). Status: " << response.Status << " Issues: " << response.Issues.ToOneLineString()); @@ -128,7 +136,7 @@ class TYdbRunActor : public NActors::TActorBootstrapped { return; } LOG_I("ResultWriterResponse (success) " << response.Status << " Issues: " << response.Issues.ToOneLineString()); - Register(ActorFactory->CreateResourcesCleaner(SelfId(), Connector, Params.OperationId).release()); + CreateResourcesCleaner(); } void Handle(const TEvYdbCompute::TEvResourcesCleanerResponse::TPtr& ev) { @@ -139,20 +147,21 @@ class TYdbRunActor : public NActors::TActorBootstrapped { return; } LOG_I("ResourcesCleanerResponse (success) " << response.Status << " Issues: " << response.Issues.ToOneLineString()); - Register(ActorFactory->CreateFinalizer(Params, SelfId(), Pinger, ExecStatus, IsAborted ? FederatedQuery::QueryMeta::ABORTING_BY_USER : Params.Status).release()); + CreateFinalizer(IsAborted ? FederatedQuery::QueryMeta::ABORTING_BY_USER : Params.Status); } void Handle(const TEvYdbCompute::TEvFinalizerResponse::TPtr ev) { // Pinger is no longer available at this place. // The query can be restarted only after the expiration of lease in case of error auto& response = *ev->Get(); - LOG_I("FinalizerResponse ( " << (response.Status == NYdb::EStatus::SUCCESS ? "success" : "failed") << ") " << response.Status << " Issues: " << response.Issues.ToOneLineString()); + LOG_I("FinalizerResponse ( " << (response.Status == NYdb::EStatus::SUCCESS ? "success" : "failed") << " ) " << response.Status << " Issues: " << response.Issues.ToOneLineString()); FinishAndPassAway(); } void Handle(TEvents::TEvQueryActionResult::TPtr& ev) { LOG_I("QueryActionResult: " << FederatedQuery::QueryAction_Name(ev->Get()->Action)); - if (Params.OperationId.GetKind() != Ydb::TOperationId::UNUSED && !IsAborted) { + // Start cancel operation only when StatusTracker or ResultWriter is running + if (Params.OperationId.GetKind() != Ydb::TOperationId::UNUSED && !IsAborted && !FinalizationStarted) { IsAborted = true; Register(ActorFactory->CreateStopper(SelfId(), Connector, Params.OperationId).release()); } @@ -166,7 +175,7 @@ class TYdbRunActor : public NActors::TActorBootstrapped { return; } LOG_I("StopperResponse (success) " << response.Status << " Issues: " << response.Issues.ToOneLineString()); - Register(ActorFactory->CreateResourcesCleaner(SelfId(), Connector, Params.OperationId).release()); + CreateResourcesCleaner(); } void Run() { // recover points @@ -185,7 +194,7 @@ class TYdbRunActor : public NActors::TActorBootstrapped { if (Params.OperationId.GetKind() != Ydb::TOperationId::UNUSED) { Register(ActorFactory->CreateResultWriter(SelfId(), Connector, Pinger, Params.OperationId).release()); } else { - Register(ActorFactory->CreateFinalizer(Params, SelfId(), Pinger, ExecStatus, Params.Status).release()); + CreateFinalizer(Params.Status); } break; case FederatedQuery::QueryMeta::FAILING: @@ -194,7 +203,7 @@ class TYdbRunActor : public NActors::TActorBootstrapped { if (Params.OperationId.GetKind() != Ydb::TOperationId::UNUSED) { Register(ActorFactory->CreateStatusTracker(SelfId(), Connector, Pinger, Params.OperationId).release()); } else { - Register(ActorFactory->CreateFinalizer(Params, SelfId(), Pinger, ExecStatus, Params.Status).release()); + CreateFinalizer(Params.Status); } break; default: @@ -220,8 +229,28 @@ class TYdbRunActor : public NActors::TActorBootstrapped { PassAway(); } + void CreateResourcesCleaner() { + FinalizationStarted = true; + Register(ActorFactory->CreateResourcesCleaner(SelfId(), Connector, Params.OperationId).release()); + } + + void CreateFinalizer(FederatedQuery::QueryMeta::ComputeStatus status) { + FinalizationStarted = true; + Register(ActorFactory->CreateFinalizer(Params, SelfId(), Pinger, ExecStatus, status).release()); + } + + bool CancelOperationIsRunning(const TString& stage) const { + if (!IsAborted) { + return false; + } + + LOG_I(stage << "Stop task execution, cancel operation now is running"); + return true; + } + private: bool IsAborted = false; + bool FinalizationStarted = false; TActorId FetcherId; NYdb::NQuery::EExecStatus ExecStatus = NYdb::NQuery::EExecStatus::Unspecified; TRunActorParams Params;