Skip to content

Commit

Permalink
YQ-2885 fix issue script execution is canceled (ydb-platform#2106)
Browse files Browse the repository at this point in the history
  • Loading branch information
GrigoriyPA authored and EgorkaZ committed Apr 5, 2024
1 parent 84c6d9e commit dcad5e3
Show file tree
Hide file tree
Showing 8 changed files with 208 additions and 84 deletions.
3 changes: 2 additions & 1 deletion ydb/core/fq/libs/compute/ydb/actors_factory.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -80,8 +80,9 @@ struct TActorFactory : public IActorFactory {

std::unique_ptr<NActors::IActor> CreateStopper(const NActors::TActorId& parent,
const NActors::TActorId& connector,
const NActors::TActorId& pinger,
const NYdb::TOperation::TOperationId& operationId) const override {
return CreateStopperActor(Params, parent, connector, operationId, Counters);
return CreateStopperActor(Params, parent, connector, pinger, operationId, Counters);
}

private:
Expand Down
1 change: 1 addition & 0 deletions ydb/core/fq/libs/compute/ydb/actors_factory.h
Original file line number Diff line number Diff line change
Expand Up @@ -40,6 +40,7 @@ struct IActorFactory : public TThrRefBase {
FederatedQuery::QueryMeta::ComputeStatus status) const = 0;
virtual std::unique_ptr<NActors::IActor> CreateStopper(const NActors::TActorId& parent,
const NActors::TActorId& connector,
const NActors::TActorId& pinger,
const NYdb::TOperation::TOperationId& operationId) const = 0;
};

Expand Down
96 changes: 96 additions & 0 deletions ydb/core/fq/libs/compute/ydb/base_status_updater_actor.h
Original file line number Diff line number Diff line change
@@ -0,0 +1,96 @@
#pragma once

#include "base_compute_actor.h"

#include <ydb/core/fq/libs/common/compression.h>
#include <ydb/core/fq/libs/compute/common/utils.h>

#include <ydb/library/yql/public/issue/yql_issue_message.h>

namespace NFq {

template<typename TDerived>
class TBaseStatusUpdaterActor : public TBaseComputeActor<TDerived> {
public:
using TBase = TBaseComputeActor<TDerived>;

TBaseStatusUpdaterActor(const NConfig::TCommonConfig& commonConfig, const ::NYql::NCommon::TServiceCounters& queryCounters, const TString& stepName)
: TBase(queryCounters, stepName)
, Compressor(commonConfig.GetQueryArtifactsCompressionMethod(), commonConfig.GetQueryArtifactsCompressionMinSize())
{}

TBaseStatusUpdaterActor(const NConfig::TCommonConfig& commonConfig, const ::NMonitoring::TDynamicCounterPtr& baseCounters, const TString& stepName)
: TBase(baseCounters, stepName)
, Compressor(commonConfig.GetQueryArtifactsCompressionMethod(), commonConfig.GetQueryArtifactsCompressionMinSize())
{}

void SetPingCounters(TComputeRequestCountersPtr pingCounters) {
PingCounters = std::move(pingCounters);
}

void OnPingRequestStart() {
if (!PingCounters) {
return;
}

StartTime = TInstant::Now();
PingCounters->InFly->Inc();
}

void OnPingRequestFinish(bool success) {
if (!PingCounters) {
return;
}

PingCounters->InFly->Dec();
PingCounters->LatencyMs->Collect((TInstant::Now() - StartTime).MilliSeconds());
if (success) {
PingCounters->Ok->Inc();
} else {
PingCounters->Error->Inc();
}
}

Fq::Private::PingTaskRequest GetPingTaskRequest(std::optional<FederatedQuery::QueryMeta::ComputeStatus> computeStatus, std::optional<NYql::NDqProto::StatusIds::StatusCode> pendingStatusCode, const NYql::TIssues& issues, const Ydb::TableStats::QueryStats& queryStats) const {
Fq::Private::PingTaskRequest pingTaskRequest;
NYql::IssuesToMessage(issues, pingTaskRequest.mutable_issues());
if (computeStatus) {
pingTaskRequest.set_status(*computeStatus);
}
if (pendingStatusCode) {
pingTaskRequest.set_pending_status_code(*pendingStatusCode);
}
PrepareAstAndPlan(pingTaskRequest, queryStats.query_plan(), queryStats.query_ast());
return pingTaskRequest;
}

// Can throw errors
Fq::Private::PingTaskRequest GetPingTaskRequestStatistics(std::optional<FederatedQuery::QueryMeta::ComputeStatus> computeStatus, std::optional<NYql::NDqProto::StatusIds::StatusCode> pendingStatusCode, const NYql::TIssues& issues, const Ydb::TableStats::QueryStats& queryStats, double* cpuUsage = nullptr) const {
Fq::Private::PingTaskRequest pingTaskRequest = GetPingTaskRequest(computeStatus, pendingStatusCode, issues, queryStats);
pingTaskRequest.set_statistics(GetV1StatFromV2Plan(queryStats.query_plan(), cpuUsage));
return pingTaskRequest;
}

protected:
void PrepareAstAndPlan(Fq::Private::PingTaskRequest& request, const TString& plan, const TString& expr) const {
if (Compressor.IsEnabled()) {
auto [astCompressionMethod, astCompressed] = Compressor.Compress(expr);
request.mutable_ast_compressed()->set_method(astCompressionMethod);
request.mutable_ast_compressed()->set_data(astCompressed);

auto [planCompressionMethod, planCompressed] = Compressor.Compress(plan);
request.mutable_plan_compressed()->set_method(planCompressionMethod);
request.mutable_plan_compressed()->set_data(planCompressed);
} else {
request.set_ast(expr);
request.set_plan(plan);
}
}

private:
TInstant StartTime;
TComputeRequestCountersPtr PingCounters;
const TCompressor Compressor;
};

} /* NFq */
103 changes: 35 additions & 68 deletions ydb/core/fq/libs/compute/ydb/status_tracker_actor.cpp
Original file line number Diff line number Diff line change
@@ -1,11 +1,9 @@
#include "base_compute_actor.h"
#include "base_status_updater_actor.h"

#include <ydb/core/fq/libs/common/compression.h>
#include <ydb/core/fq/libs/common/util.h>
#include <ydb/core/fq/libs/compute/common/metrics.h>
#include <ydb/core/fq/libs/compute/common/retry_actor.h>
#include <ydb/core/fq/libs/compute/common/run_actor_params.h>
#include <ydb/core/fq/libs/compute/common/utils.h>
#include <ydb/core/fq/libs/compute/ydb/events/events.h>
#include <ydb/core/fq/libs/compute/ydb/control_plane/compute_database_control_plane_service.h>
#include <ydb/core/fq/libs/ydb/ydb.h>
Expand All @@ -14,7 +12,6 @@

#include <ydb/library/yql/dq/actors/dq.h>
#include <ydb/library/yql/providers/common/metrics/service_counters.h>
#include <ydb/library/yql/public/issue/yql_issue_message.h>

#include <ydb/public/sdk/cpp/client/ydb_query/client.h>
#include <ydb/public/sdk/cpp/client/ydb_operation/operation.h>
Expand All @@ -37,7 +34,7 @@ namespace NFq {
using namespace NActors;
using namespace NFq;

class TStatusTrackerActor : public TBaseComputeActor<TStatusTrackerActor> {
class TStatusTrackerActor : public TBaseStatusUpdaterActor<TStatusTrackerActor> {
public:
using IRetryPolicy = IRetryPolicy<const TEvYdbCompute::TEvGetOperationResponse::TPtr&>;

Expand Down Expand Up @@ -70,16 +67,17 @@ class TStatusTrackerActor : public TBaseComputeActor<TStatusTrackerActor> {
};

TStatusTrackerActor(const TRunActorParams& params, const TActorId& parent, const TActorId& connector, const TActorId& pinger, const NYdb::TOperation::TOperationId& operationId, const ::NYql::NCommon::TServiceCounters& queryCounters)
: TBaseComputeActor(queryCounters, "StatusTracker")
: TBaseStatusUpdaterActor(params.Config.GetCommon(), queryCounters, "StatusTracker")
, Params(params)
, Parent(parent)
, Connector(connector)
, Pinger(pinger)
, OperationId(operationId)
, Counters(GetStepCountersSubgroup())
, BackoffTimer(20, 1000)
, Compressor(params.Config.GetCommon().GetQueryArtifactsCompressionMethod(), params.Config.GetCommon().GetQueryArtifactsCompressionMinSize())
{}
{
SetPingCounters(Counters.GetCounters(ERequestType::RT_PING));
}

static constexpr char ActorName[] = "FQ_STATUS_TRACKER";

Expand All @@ -95,21 +93,17 @@ class TStatusTrackerActor : public TBaseComputeActor<TStatusTrackerActor> {
)

void Handle(const TEvents::TEvForwardPingResponse::TPtr& ev) {
auto pingCounters = Counters.GetCounters(ERequestType::RT_PING);
pingCounters->InFly->Dec();
OnPingRequestFinish(ev.Get()->Get()->Success);

if (ev->Cookie) {
return;
}

pingCounters->LatencyMs->Collect((TInstant::Now() - StartTime).MilliSeconds());
if (ev.Get()->Get()->Success) {
pingCounters->Ok->Inc();
LOG_I("Information about the status of operation is stored");
Send(Parent, new TEvYdbCompute::TEvStatusTrackerResponse(Issues, Status, ExecStatus, ComputeStatus));
CompleteAndPassAway();
} else {
pingCounters->Error->Inc();
LOG_E("Error saving information about the status of operation");
Send(Parent, new TEvYdbCompute::TEvStatusTrackerResponse(NYql::TIssues{NYql::TIssue{TStringBuilder{} << "Error saving information about the status of operation: " << ProtoToString(OperationId)}}, NYdb::EStatus::INTERNAL_ERROR, ExecStatus, ComputeStatus));
FailedAndPassAway();
Expand Down Expand Up @@ -140,7 +134,6 @@ class TStatusTrackerActor : public TBaseComputeActor<TStatusTrackerActor> {
}

ReportPublicCounters(response.QueryStats);
StartTime = TInstant::Now();
LOG_D("Execution status: " << static_cast<int>(response.ExecStatus));
switch (response.ExecStatus) {
case NYdb::NQuery::EExecStatus::Unspecified:
Expand Down Expand Up @@ -217,75 +210,51 @@ class TStatusTrackerActor : public TBaseComputeActor<TStatusTrackerActor> {
Register(new TRetryActor<TEvYdbCompute::TEvGetOperationRequest, TEvYdbCompute::TEvGetOperationResponse, NYdb::TOperation::TOperationId>(Counters.GetCounters(ERequestType::RT_GET_OPERATION), delay, SelfId(), Connector, OperationId));
}

void UpdateProgress() {
auto pingCounters = Counters.GetCounters(ERequestType::RT_PING);
pingCounters->InFly->Inc();
std::pair<Fq::Private::PingTaskRequest, double> GetPingTaskRequestWithStatistic(std::optional<FederatedQuery::QueryMeta::ComputeStatus> computeStatus, std::optional<NYql::NDqProto::StatusIds::StatusCode> pendingStatusCode) {
Fq::Private::PingTaskRequest pingTaskRequest;
PrepareAstAndPlan(pingTaskRequest, QueryStats.query_plan(), QueryStats.query_ast());
double cpuUsage = 0.0;
try {
pingTaskRequest.set_statistics(GetV1StatFromV2Plan(QueryStats.query_plan()));
pingTaskRequest = GetPingTaskRequestStatistics(computeStatus, pendingStatusCode, Issues, QueryStats, &cpuUsage);
} catch(const NJson::TJsonException& ex) {
LOG_E("Error statistics conversion: " << ex.what());
}

return { pingTaskRequest, cpuUsage };
}

void UpdateProgress() {
OnPingRequestStart();

Fq::Private::PingTaskRequest pingTaskRequest = GetPingTaskRequestWithStatistic(std::nullopt, std::nullopt).first;
Send(Pinger, new TEvents::TEvForwardPingRequest(pingTaskRequest), 0, 1);
}

void UpdateCpuQuota(double cpuUsage) {
TDuration duration = TDuration::MicroSeconds(QueryStats.total_duration_us());
if (cpuUsage && duration) {
Send(NFq::ComputeDatabaseControlPlaneServiceActorId(), new TEvYdbCompute::TEvCpuQuotaAdjust(Params.Scope.ToString(), duration, cpuUsage));
}
}

void Failed() {
LOG_I("Execution status: Failed, Status: " << Status << ", StatusCode: " << NYql::NDqProto::StatusIds::StatusCode_Name(StatusCode) << " Issues: " << Issues.ToOneLineString());
auto pingCounters = Counters.GetCounters(ERequestType::RT_PING);
pingCounters->InFly->Inc();
Fq::Private::PingTaskRequest pingTaskRequest;
NYql::IssuesToMessage(Issues, pingTaskRequest.mutable_issues());
pingTaskRequest.set_pending_status_code(StatusCode);
PrepareAstAndPlan(pingTaskRequest, QueryStats.query_plan(), QueryStats.query_ast());
try {
TDuration duration = TDuration::MicroSeconds(QueryStats.total_duration_us());
double cpuUsage = 0.0;
pingTaskRequest.set_statistics(GetV1StatFromV2Plan(QueryStats.query_plan(), &cpuUsage));
if (duration && cpuUsage) {
Send(NFq::ComputeDatabaseControlPlaneServiceActorId(), new TEvYdbCompute::TEvCpuQuotaAdjust(Params.Scope.ToString(), duration, cpuUsage));
}
} catch(const NJson::TJsonException& ex) {
LOG_E("Error statistics conversion: " << ex.what());
}
OnPingRequestStart();

auto [pingTaskRequest, cpuUsage] = GetPingTaskRequestWithStatistic(std::nullopt, StatusCode);
UpdateCpuQuota(cpuUsage);

Send(Pinger, new TEvents::TEvForwardPingRequest(pingTaskRequest));
}

void Complete() {
LOG_I("Execution status: Complete " << Status << ", StatusCode: " << NYql::NDqProto::StatusIds::StatusCode_Name(StatusCode) << " Issues: " << Issues.ToOneLineString());
auto pingCounters = Counters.GetCounters(ERequestType::RT_PING);
pingCounters->InFly->Inc();
Fq::Private::PingTaskRequest pingTaskRequest;
NYql::IssuesToMessage(Issues, pingTaskRequest.mutable_issues());
ComputeStatus = ::FederatedQuery::QueryMeta::COMPLETING;
pingTaskRequest.set_status(ComputeStatus);
PrepareAstAndPlan(pingTaskRequest, QueryStats.query_plan(), QueryStats.query_ast());
try {
TDuration duration = TDuration::MicroSeconds(QueryStats.total_duration_us());
double cpuUsage = 0.0;
pingTaskRequest.set_statistics(GetV1StatFromV2Plan(QueryStats.query_plan(), &cpuUsage));
if (duration && cpuUsage) {
Send(NFq::ComputeDatabaseControlPlaneServiceActorId(), new TEvYdbCompute::TEvCpuQuotaAdjust(Params.Scope.ToString(), duration, cpuUsage));
}
} catch(const NJson::TJsonException& ex) {
LOG_E("Error statistics conversion: " << ex.what());
}
Send(Pinger, new TEvents::TEvForwardPingRequest(pingTaskRequest));
}
OnPingRequestStart();

void PrepareAstAndPlan(Fq::Private::PingTaskRequest& request, const TString& plan, const TString& expr) const {
if (Compressor.IsEnabled()) {
auto [astCompressionMethod, astCompressed] = Compressor.Compress(expr);
request.mutable_ast_compressed()->set_method(astCompressionMethod);
request.mutable_ast_compressed()->set_data(astCompressed);
ComputeStatus = ::FederatedQuery::QueryMeta::COMPLETING;
auto [pingTaskRequest, cpuUsage] = GetPingTaskRequestWithStatistic(ComputeStatus, std::nullopt);
UpdateCpuQuota(cpuUsage);

auto [planCompressionMethod, planCompressed] = Compressor.Compress(plan);
request.mutable_plan_compressed()->set_method(planCompressionMethod);
request.mutable_plan_compressed()->set_data(planCompressed);
} else {
request.set_ast(expr);
request.set_plan(plan);
}
Send(Pinger, new TEvents::TEvForwardPingRequest(pingTaskRequest));
}

private:
Expand All @@ -295,14 +264,12 @@ class TStatusTrackerActor : public TBaseComputeActor<TStatusTrackerActor> {
TActorId Pinger;
NYdb::TOperation::TOperationId OperationId;
TCounters Counters;
TInstant StartTime;
NYql::TIssues Issues;
NYdb::EStatus Status = NYdb::EStatus::SUCCESS;
NYdb::NQuery::EExecStatus ExecStatus = NYdb::NQuery::EExecStatus::Unspecified;
NYql::NDqProto::StatusIds::StatusCode StatusCode = NYql::NDqProto::StatusIds::StatusCode::StatusIds_StatusCode_UNSPECIFIED;
Ydb::TableStats::QueryStats QueryStats;
NKikimr::TBackoffTimer BackoffTimer;
const TCompressor Compressor;
FederatedQuery::QueryMeta::ComputeStatus ComputeStatus = FederatedQuery::QueryMeta::RUNNING;
};

Expand Down
Loading

0 comments on commit dcad5e3

Please sign in to comment.