From 6df8b5ff21aa566f78fd260a68d4ab8dd054588f Mon Sep 17 00:00:00 2001 From: Gigoriy Pisarenko Date: Tue, 20 Feb 2024 12:52:54 +0000 Subject: [PATCH] Updated stopper logic --- .../fq/libs/compute/ydb/actors_factory.cpp | 3 +- ydb/core/fq/libs/compute/ydb/actors_factory.h | 1 + .../compute/ydb/base_status_updater_actor.h | 96 ++++++++++++++++ .../libs/compute/ydb/status_tracker_actor.cpp | 103 ++++++------------ .../fq/libs/compute/ydb/stopper_actor.cpp | 80 ++++++++++++-- ydb/core/fq/libs/compute/ydb/stopper_actor.h | 1 + .../fq/libs/compute/ydb/ydb_run_actor.cpp | 2 +- 7 files changed, 206 insertions(+), 80 deletions(-) create mode 100644 ydb/core/fq/libs/compute/ydb/base_status_updater_actor.h diff --git a/ydb/core/fq/libs/compute/ydb/actors_factory.cpp b/ydb/core/fq/libs/compute/ydb/actors_factory.cpp index 9db333a97da9..16a836f76bab 100644 --- a/ydb/core/fq/libs/compute/ydb/actors_factory.cpp +++ b/ydb/core/fq/libs/compute/ydb/actors_factory.cpp @@ -79,8 +79,9 @@ struct TActorFactory : public IActorFactory { std::unique_ptr CreateStopper(const NActors::TActorId& parent, const NActors::TActorId& connector, + const NActors::TActorId& pinger, const NYdb::TOperation::TOperationId& operationId) const override { - return CreateStopperActor(Params, parent, connector, operationId, Counters); + return CreateStopperActor(Params, parent, connector, pinger, operationId, Counters); } private: diff --git a/ydb/core/fq/libs/compute/ydb/actors_factory.h b/ydb/core/fq/libs/compute/ydb/actors_factory.h index ae85da060f7a..004e2d0617c8 100644 --- a/ydb/core/fq/libs/compute/ydb/actors_factory.h +++ b/ydb/core/fq/libs/compute/ydb/actors_factory.h @@ -39,6 +39,7 @@ struct IActorFactory : public TThrRefBase { FederatedQuery::QueryMeta::ComputeStatus status) const = 0; virtual std::unique_ptr CreateStopper(const NActors::TActorId& parent, const NActors::TActorId& connector, + const NActors::TActorId& pinger, const NYdb::TOperation::TOperationId& operationId) const = 0; }; diff --git a/ydb/core/fq/libs/compute/ydb/base_status_updater_actor.h b/ydb/core/fq/libs/compute/ydb/base_status_updater_actor.h new file mode 100644 index 000000000000..abeb6dbd9476 --- /dev/null +++ b/ydb/core/fq/libs/compute/ydb/base_status_updater_actor.h @@ -0,0 +1,96 @@ +#pragma once + +#include "base_compute_actor.h" + +#include +#include + +#include + +namespace NFq { + +template +class TBaseStatusUpdaterActor : public TBaseComputeActor { +public: + using TBase = NActors::TActorBootstrapped; + + TBaseStatusUpdaterActor(const NConfig::TCommonConfig& commonConfig, const ::NYql::NCommon::TServiceCounters& queryCounters, const TString& stepName) + : TBase(queryCounters, stepName) + , Compressor(commonConfig.GetQueryArtifactsCompressionMethod(), commonConfig.GetQueryArtifactsCompressionMinSize()) + {} + + TBaseStatusUpdaterActor(const NConfig::TCommonConfig& commonConfig, const ::NMonitoring::TDynamicCounterPtr& baseCounters, const TString& stepName) + : TBase(baseCounters, stepName) + , Compressor(commonConfig.GetQueryArtifactsCompressionMethod(), commonConfig.GetQueryArtifactsCompressionMinSize()) + {} + + void SetPingCounters(TComputeRequestCountersPtr pingCounters) { + PingCounters = std::move(pingCounters); + } + + void OnPingRequestStart() { + if (!PingCounters) { + return; + } + + StartTime = TInstant::Now(); + PingCounters->InFly->Inc(); + } + + void OnPingRequestFinish(bool success) { + if (!PingCounters) { + return; + } + + PingCounters->InFly->Dec(); + PingCounters->LatencyMs->Collect((TInstant::Now() - StartTime).MilliSeconds()); + if (success) { + PingCounters->Ok->Inc(); + } else { + PingCounters->Error->Inc(); + } + } + + Fq::Private::PingTaskRequest GetPingTaskRequest(std::optional computeStatus, std::optional pendingStatusCode, const NYql::TIssues& issues, const Ydb::TableStats::QueryStats& queryStats) const { + Fq::Private::PingTaskRequest pingTaskRequest; + NYql::IssuesToMessage(issues, pingTaskRequest.mutable_issues()); + if (computeStatus) { + pingTaskRequest.set_status(*computeStatus); + } + if (pendingStatusCode) { + pingTaskRequest.set_pending_status_code(*pendingStatusCode); + } + PrepareAstAndPlan(pingTaskRequest, queryStats.query_plan(), queryStats.query_ast()); + return pingTaskRequest; + } + + // Can throw errors + Fq::Private::PingTaskRequest GetPingTaskRequestStatistics(std::optional computeStatus, std::optional pendingStatusCode, const NYql::TIssues& issues, const Ydb::TableStats::QueryStats& queryStats, double* cpuUsage = nullptr) const { + Fq::Private::PingTaskRequest pingTaskRequest = GetPingTaskRequest(computeStatus, pendingStatusCode, issues, queryStats); + pingTaskRequest.set_statistics(GetV1StatFromV2Plan(queryStats.query_plan(), cpuUsage)); + return pingTaskRequest; + } + +protected: + void PrepareAstAndPlan(Fq::Private::PingTaskRequest& request, const TString& plan, const TString& expr) const { + if (Compressor.IsEnabled()) { + auto [astCompressionMethod, astCompressed] = Compressor.Compress(expr); + request.mutable_ast_compressed()->set_method(astCompressionMethod); + request.mutable_ast_compressed()->set_data(astCompressed); + + auto [planCompressionMethod, planCompressed] = Compressor.Compress(plan); + request.mutable_plan_compressed()->set_method(planCompressionMethod); + request.mutable_plan_compressed()->set_data(planCompressed); + } else { + request.set_ast(expr); + request.set_plan(plan); + } + } + +private: + TInstant StartTime; + TComputeRequestCountersPtr PingCounters; + const TCompressor Compressor; +}; + +} /* NFq */ diff --git a/ydb/core/fq/libs/compute/ydb/status_tracker_actor.cpp b/ydb/core/fq/libs/compute/ydb/status_tracker_actor.cpp index d6ef6600f05a..8c263131363d 100644 --- a/ydb/core/fq/libs/compute/ydb/status_tracker_actor.cpp +++ b/ydb/core/fq/libs/compute/ydb/status_tracker_actor.cpp @@ -1,11 +1,9 @@ -#include "base_compute_actor.h" +#include "base_status_updater_actor.h" -#include #include #include #include #include -#include #include #include #include @@ -14,7 +12,6 @@ #include #include -#include #include #include @@ -37,7 +34,7 @@ namespace NFq { using namespace NActors; using namespace NFq; -class TStatusTrackerActor : public TBaseComputeActor { +class TStatusTrackerActor : public TBaseStatusUpdaterActor { public: using IRetryPolicy = IRetryPolicy; @@ -70,7 +67,7 @@ class TStatusTrackerActor : public TBaseComputeActor { }; TStatusTrackerActor(const TRunActorParams& params, const TActorId& parent, const TActorId& connector, const TActorId& pinger, const NYdb::TOperation::TOperationId& operationId, const ::NYql::NCommon::TServiceCounters& queryCounters) - : TBaseComputeActor(queryCounters, "StatusTracker") + : TBaseStatusUpdaterActor(params.Config.GetCommon(), queryCounters, "StatusTracker") , Params(params) , Parent(parent) , Connector(connector) @@ -78,8 +75,9 @@ class TStatusTrackerActor : public TBaseComputeActor { , OperationId(operationId) , Counters(GetStepCountersSubgroup()) , BackoffTimer(20, 1000) - , Compressor(params.Config.GetCommon().GetQueryArtifactsCompressionMethod(), params.Config.GetCommon().GetQueryArtifactsCompressionMinSize()) - {} + { + SetPingCounters(Counters.GetCounters(ERequestType::RT_PING)); + } static constexpr char ActorName[] = "FQ_STATUS_TRACKER"; @@ -95,21 +93,17 @@ class TStatusTrackerActor : public TBaseComputeActor { ) void Handle(const TEvents::TEvForwardPingResponse::TPtr& ev) { - auto pingCounters = Counters.GetCounters(ERequestType::RT_PING); - pingCounters->InFly->Dec(); + OnPingRequestFinish(ev.Get()->Get()->Success); if (ev->Cookie) { return; } - pingCounters->LatencyMs->Collect((TInstant::Now() - StartTime).MilliSeconds()); if (ev.Get()->Get()->Success) { - pingCounters->Ok->Inc(); LOG_I("Information about the status of operation is stored"); Send(Parent, new TEvYdbCompute::TEvStatusTrackerResponse(Issues, Status, ExecStatus, ComputeStatus)); CompleteAndPassAway(); } else { - pingCounters->Error->Inc(); LOG_E("Error saving information about the status of operation"); Send(Parent, new TEvYdbCompute::TEvStatusTrackerResponse(NYql::TIssues{NYql::TIssue{TStringBuilder{} << "Error saving information about the status of operation: " << ProtoToString(OperationId)}}, NYdb::EStatus::INTERNAL_ERROR, ExecStatus, ComputeStatus)); FailedAndPassAway(); @@ -134,7 +128,6 @@ class TStatusTrackerActor : public TBaseComputeActor { } ReportPublicCounters(response.QueryStats); - StartTime = TInstant::Now(); LOG_D("Execution status: " << static_cast(response.ExecStatus)); switch (response.ExecStatus) { case NYdb::NQuery::EExecStatus::Unspecified: @@ -211,75 +204,51 @@ class TStatusTrackerActor : public TBaseComputeActor { Register(new TRetryActor(Counters.GetCounters(ERequestType::RT_GET_OPERATION), delay, SelfId(), Connector, OperationId)); } - void UpdateProgress() { - auto pingCounters = Counters.GetCounters(ERequestType::RT_PING); - pingCounters->InFly->Inc(); + std::pair GetPingTaskRequestWithStatistic(std::optional computeStatus, std::optional pendingStatusCode) { Fq::Private::PingTaskRequest pingTaskRequest; - PrepareAstAndPlan(pingTaskRequest, QueryStats.query_plan(), QueryStats.query_ast()); + double cpuUsage = 0.0; try { - pingTaskRequest.set_statistics(GetV1StatFromV2Plan(QueryStats.query_plan())); + pingTaskRequest = GetPingTaskRequestStatistics(computeStatus, pendingStatusCode, Issues, QueryStats, &cpuUsage); } catch(const NJson::TJsonException& ex) { LOG_E("Error statistics conversion: " << ex.what()); } + + return { pingTaskRequest, cpuUsage }; + } + + void UpdateProgress() { + OnPingRequestStart(); + + Fq::Private::PingTaskRequest pingTaskRequest = GetPingTaskRequestWithStatistic(std::nullopt, std::nullopt).first; Send(Pinger, new TEvents::TEvForwardPingRequest(pingTaskRequest), 0, 1); } + void UpdateCpuQuota(double cpuUsage) { + TDuration duration = TDuration::MicroSeconds(QueryStats.total_duration_us()); + if (cpuUsage && duration) { + Send(NFq::ComputeDatabaseControlPlaneServiceActorId(), new TEvYdbCompute::TEvCpuQuotaAdjust(Params.Scope.ToString(), duration, cpuUsage)); + } + } + void Failed() { LOG_I("Execution status: Failed, Status: " << Status << ", StatusCode: " << NYql::NDqProto::StatusIds::StatusCode_Name(StatusCode) << " Issues: " << Issues.ToOneLineString()); - auto pingCounters = Counters.GetCounters(ERequestType::RT_PING); - pingCounters->InFly->Inc(); - Fq::Private::PingTaskRequest pingTaskRequest; - NYql::IssuesToMessage(Issues, pingTaskRequest.mutable_issues()); - pingTaskRequest.set_pending_status_code(StatusCode); - PrepareAstAndPlan(pingTaskRequest, QueryStats.query_plan(), QueryStats.query_ast()); - try { - TDuration duration = TDuration::MicroSeconds(QueryStats.total_duration_us()); - double cpuUsage = 0.0; - pingTaskRequest.set_statistics(GetV1StatFromV2Plan(QueryStats.query_plan(), &cpuUsage)); - if (duration && cpuUsage) { - Send(NFq::ComputeDatabaseControlPlaneServiceActorId(), new TEvYdbCompute::TEvCpuQuotaAdjust(Params.Scope.ToString(), duration, cpuUsage)); - } - } catch(const NJson::TJsonException& ex) { - LOG_E("Error statistics conversion: " << ex.what()); - } + OnPingRequestStart(); + + auto [pingTaskRequest, cpuUsage] = GetPingTaskRequestWithStatistic(std::nullopt, StatusCode); + UpdateCpuQuota(cpuUsage); + Send(Pinger, new TEvents::TEvForwardPingRequest(pingTaskRequest)); } void Complete() { LOG_I("Execution status: Complete " << Status << ", StatusCode: " << NYql::NDqProto::StatusIds::StatusCode_Name(StatusCode) << " Issues: " << Issues.ToOneLineString()); - auto pingCounters = Counters.GetCounters(ERequestType::RT_PING); - pingCounters->InFly->Inc(); - Fq::Private::PingTaskRequest pingTaskRequest; - NYql::IssuesToMessage(Issues, pingTaskRequest.mutable_issues()); - ComputeStatus = ::FederatedQuery::QueryMeta::COMPLETING; - pingTaskRequest.set_status(ComputeStatus); - PrepareAstAndPlan(pingTaskRequest, QueryStats.query_plan(), QueryStats.query_ast()); - try { - TDuration duration = TDuration::MicroSeconds(QueryStats.total_duration_us()); - double cpuUsage = 0.0; - pingTaskRequest.set_statistics(GetV1StatFromV2Plan(QueryStats.query_plan(), &cpuUsage)); - if (duration && cpuUsage) { - Send(NFq::ComputeDatabaseControlPlaneServiceActorId(), new TEvYdbCompute::TEvCpuQuotaAdjust(Params.Scope.ToString(), duration, cpuUsage)); - } - } catch(const NJson::TJsonException& ex) { - LOG_E("Error statistics conversion: " << ex.what()); - } - Send(Pinger, new TEvents::TEvForwardPingRequest(pingTaskRequest)); - } + OnPingRequestStart(); - void PrepareAstAndPlan(Fq::Private::PingTaskRequest& request, const TString& plan, const TString& expr) const { - if (Compressor.IsEnabled()) { - auto [astCompressionMethod, astCompressed] = Compressor.Compress(expr); - request.mutable_ast_compressed()->set_method(astCompressionMethod); - request.mutable_ast_compressed()->set_data(astCompressed); + ComputeStatus = ::FederatedQuery::QueryMeta::COMPLETING; + auto [pingTaskRequest, cpuUsage] = GetPingTaskRequestWithStatistic(ComputeStatus, std::nullopt); + UpdateCpuQuota(cpuUsage); - auto [planCompressionMethod, planCompressed] = Compressor.Compress(plan); - request.mutable_plan_compressed()->set_method(planCompressionMethod); - request.mutable_plan_compressed()->set_data(planCompressed); - } else { - request.set_ast(expr); - request.set_plan(plan); - } + Send(Pinger, new TEvents::TEvForwardPingRequest(pingTaskRequest)); } private: @@ -289,14 +258,12 @@ class TStatusTrackerActor : public TBaseComputeActor { TActorId Pinger; NYdb::TOperation::TOperationId OperationId; TCounters Counters; - TInstant StartTime; NYql::TIssues Issues; NYdb::EStatus Status = NYdb::EStatus::SUCCESS; NYdb::NQuery::EExecStatus ExecStatus = NYdb::NQuery::EExecStatus::Unspecified; NYql::NDqProto::StatusIds::StatusCode StatusCode = NYql::NDqProto::StatusIds::StatusCode::StatusIds_StatusCode_UNSPECIFIED; Ydb::TableStats::QueryStats QueryStats; NKikimr::TBackoffTimer BackoffTimer; - const TCompressor Compressor; FederatedQuery::QueryMeta::ComputeStatus ComputeStatus = FederatedQuery::QueryMeta::RUNNING; }; diff --git a/ydb/core/fq/libs/compute/ydb/stopper_actor.cpp b/ydb/core/fq/libs/compute/ydb/stopper_actor.cpp index de66c3c1c167..9897d088fec0 100644 --- a/ydb/core/fq/libs/compute/ydb/stopper_actor.cpp +++ b/ydb/core/fq/libs/compute/ydb/stopper_actor.cpp @@ -1,4 +1,4 @@ -#include "base_compute_actor.h" +#include "base_status_updater_actor.h" #include "resources_cleaner_actor.h" #include @@ -9,6 +9,8 @@ #include #include +#include + #include #include @@ -30,16 +32,20 @@ namespace NFq { using namespace NActors; using namespace NFq; -class TStopperActor : public TBaseComputeActor { +class TStopperActor : public TBaseStatusUpdaterActor { public: enum ERequestType { RT_CANCEL_OPERATION, + RT_GET_OPERATION, + RT_PING, RT_MAX }; class TCounters: public virtual TThrRefBase { std::array Requests = CreateArray({ - { MakeIntrusive("CancelOperation") } + { MakeIntrusive("CancelOperation") }, + { MakeIntrusive("GetOperation") }, + { MakeIntrusive("Ping") } }); ::NMonitoring::TDynamicCounterPtr Counters; @@ -58,14 +64,17 @@ class TStopperActor : public TBaseComputeActor { } }; - TStopperActor(const TRunActorParams& params, const TActorId& parent, const TActorId& connector, const NYdb::TOperation::TOperationId& operationId, const ::NYql::NCommon::TServiceCounters& queryCounters) - : TBaseComputeActor(queryCounters, "Stopper") + TStopperActor(const TRunActorParams& params, const TActorId& parent, const TActorId& connector, const TActorId& pinger, const NYdb::TOperation::TOperationId& operationId, const ::NYql::NCommon::TServiceCounters& queryCounters) + : TBaseStatusUpdaterActor(params.Config.GetCommon(), queryCounters, "Stopper") , Params(params) , Parent(parent) , Connector(connector) + , Pinger(pinger) , OperationId(operationId) , Counters(GetStepCountersSubgroup()) - {} + { + SetPingCounters(Counters.GetCounters(ERequestType::RT_PING)); + } static constexpr char ActorName[] = "FQ_STOPPER_ACTOR"; @@ -77,17 +86,66 @@ class TStopperActor : public TBaseComputeActor { STRICT_STFUNC(StateFunc, hFunc(TEvYdbCompute::TEvCancelOperationResponse, Handle); + hFunc(TEvYdbCompute::TEvGetOperationResponse, Handle); + hFunc(TEvents::TEvForwardPingResponse, Handle); ) void Handle(const TEvYdbCompute::TEvCancelOperationResponse::TPtr& ev) { const auto& response = *ev.Get()->Get(); if (response.Status != NYdb::EStatus::SUCCESS && response.Status != NYdb::EStatus::NOT_FOUND && response.Status != NYdb::EStatus::PRECONDITION_FAILED) { - LOG_E("Can't cancel operation: " << ev->Get()->Issues.ToOneLineString()); - Send(Parent, new TEvYdbCompute::TEvStopperResponse(response.Issues, response.Status)); - FailedAndPassAway(); + LOG_E("Can't cancel operation: " << response.Issues.ToOneLineString()); + Failed(response.Status, response.Issues); + return; + } + + if (response.Status == NYdb::EStatus::NOT_FOUND) { + LOG_I("Operation successfully canceled and already removed"); + Complete(); return; } + LOG_I("Operation successfully canceled: " << response.Status); + Register(new TRetryActor(Counters.GetCounters(ERequestType::RT_GET_OPERATION), SelfId(), Connector, OperationId)); + } + + void Handle(const TEvYdbCompute::TEvGetOperationResponse::TPtr& ev) { + const auto& response = *ev.Get()->Get(); + if (response.Status != NYdb::EStatus::SUCCESS && response.Status != NYdb::EStatus::NOT_FOUND) { + LOG_E("Can't get operation: " << response.Issues.ToOneLineString()); + Failed(response.Status, response.Issues); + return; + } + + if (response.Status == NYdb::EStatus::NOT_FOUND) { + LOG_I("Operation has been already removed"); + Complete(); + return; + } + + LOG_I("Operation successfully fetched, Status: " << response.Status << ", StatusCode: " << NYql::NDqProto::StatusIds::StatusCode_Name(response.StatusCode) << " Issues: " << response.Issues.ToOneLineString()); + OnPingRequestStart(); + Fq::Private::PingTaskRequest pingTaskRequest = GetPingTaskRequest(FederatedQuery::QueryMeta::ABORTING_BY_USER, NYql::NDq::YdbStatusToDqStatus(response.StatusCode), response.Issues, response.QueryStats); + Send(Pinger, new TEvents::TEvForwardPingRequest(pingTaskRequest)); + } + + void Handle(const TEvents::TEvForwardPingResponse::TPtr& ev) { + OnPingRequestFinish(ev.Get()->Get()->Success); + + if (ev.Get()->Get()->Success) { + LOG_I("Information about the status of operation is updated"); + } else { + LOG_E("Error updating information about the status of operation"); + } + + Complete(); + } + + void Failed(NYdb::EStatus status, NYql::TIssues issues) { + Send(Parent, new TEvYdbCompute::TEvStopperResponse(issues, status)); + FailedAndPassAway(); + } + + void Complete() { Send(Parent, new TEvYdbCompute::TEvStopperResponse({}, NYdb::EStatus::SUCCESS)); CompleteAndPassAway(); } @@ -96,6 +154,7 @@ class TStopperActor : public TBaseComputeActor { TRunActorParams Params; TActorId Parent; TActorId Connector; + TActorId Pinger; NYdb::TOperation::TOperationId OperationId; TCounters Counters; }; @@ -103,9 +162,10 @@ class TStopperActor : public TBaseComputeActor { std::unique_ptr CreateStopperActor(const TRunActorParams& params, const TActorId& parent, const TActorId& connector, + const TActorId& pinger, const NYdb::TOperation::TOperationId& operationId, const ::NYql::NCommon::TServiceCounters& queryCounters) { - return std::make_unique(params, parent, connector, operationId, queryCounters); + return std::make_unique(params, parent, connector, pinger, operationId, queryCounters); } } diff --git a/ydb/core/fq/libs/compute/ydb/stopper_actor.h b/ydb/core/fq/libs/compute/ydb/stopper_actor.h index e4046dc176f6..4f41a3e5dcf3 100644 --- a/ydb/core/fq/libs/compute/ydb/stopper_actor.h +++ b/ydb/core/fq/libs/compute/ydb/stopper_actor.h @@ -11,6 +11,7 @@ namespace NFq { std::unique_ptr CreateStopperActor(const TRunActorParams& params, const NActors::TActorId& parent, const NActors::TActorId& connector, + const NActors::TActorId& pinger, const NYdb::TOperation::TOperationId& operationId, const ::NYql::NCommon::TServiceCounters& queryCounters); diff --git a/ydb/core/fq/libs/compute/ydb/ydb_run_actor.cpp b/ydb/core/fq/libs/compute/ydb/ydb_run_actor.cpp index 41701816a815..c37ab2ef2ec1 100644 --- a/ydb/core/fq/libs/compute/ydb/ydb_run_actor.cpp +++ b/ydb/core/fq/libs/compute/ydb/ydb_run_actor.cpp @@ -163,7 +163,7 @@ class TYdbRunActor : public NActors::TActorBootstrapped { // Start cancel operation only when StatusTracker or ResultWriter is running if (Params.OperationId.GetKind() != Ydb::TOperationId::UNUSED && !IsAborted && !FinalizationStarted) { IsAborted = true; - Register(ActorFactory->CreateStopper(SelfId(), Connector, Params.OperationId).release()); + Register(ActorFactory->CreateStopper(SelfId(), Connector, Pinger, Params.OperationId).release()); } }