Skip to content

Commit

Permalink
Replaced proto types to SDK types in ExecuteScript settings (#6094)
Browse files Browse the repository at this point in the history
  • Loading branch information
Gazizonoki authored Jul 3, 2024
1 parent 9bc35ed commit bd87563
Show file tree
Hide file tree
Showing 13 changed files with 115 additions and 83 deletions.
25 changes: 14 additions & 11 deletions ydb/core/fq/libs/compute/common/utils.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,8 @@
#include <library/cpp/json/yson/json2yson.h>
#include <ydb/library/yql/public/issue/yql_issue_message.h>

#include <ydb/public/sdk/cpp/client/ydb_proto/accessor.h>

namespace NFq {

using TAggregates = std::map<TString, std::optional<ui64>>;
Expand Down Expand Up @@ -1133,8 +1135,8 @@ TString SimplifiedPlan(const TString& plan) {
}

struct TNoneStatProcessor : IPlanStatProcessor {
Ydb::Query::StatsMode GetStatsMode() override {
return Ydb::Query::StatsMode::STATS_MODE_NONE;
NYdb::NQuery::EStatsMode GetStatsMode() override {
return NYdb::NQuery::EStatsMode::None;
}

TString ConvertPlan(const TString& plan) override {
Expand All @@ -1160,14 +1162,14 @@ struct TNoneStatProcessor : IPlanStatProcessor {
};

struct TBasicStatProcessor : TNoneStatProcessor {
Ydb::Query::StatsMode GetStatsMode() override {
return Ydb::Query::StatsMode::STATS_MODE_BASIC;
NYdb::NQuery::EStatsMode GetStatsMode() override {
return NYdb::NQuery::EStatsMode::Basic;
}
};

struct TPlanStatProcessor : IPlanStatProcessor {
Ydb::Query::StatsMode GetStatsMode() override {
return Ydb::Query::StatsMode::STATS_MODE_FULL;
NYdb::NQuery::EStatsMode GetStatsMode() override {
return NYdb::NQuery::EStatsMode::Full;
}

TString ConvertPlan(const TString& plan) override {
Expand Down Expand Up @@ -1204,8 +1206,8 @@ struct TCostStatProcessor : TPlanStatProcessor {
};

struct TProfileStatProcessor : TPlanStatProcessor {
Ydb::Query::StatsMode GetStatsMode() override {
return Ydb::Query::StatsMode::STATS_MODE_PROFILE;
NYdb::NQuery::EStatsMode GetStatsMode() override {
return NYdb::NQuery::EStatsMode::Profile;
}
};

Expand Down Expand Up @@ -1233,7 +1235,7 @@ PingTaskRequestBuilder::PingTaskRequestBuilder(const NConfig::TCommonConfig& com
{}

Fq::Private::PingTaskRequest PingTaskRequestBuilder::Build(
const Ydb::TableStats::QueryStats& queryStats,
const NYdb::NQuery::TExecStats& queryStats,
const NYql::TIssues& issues,
std::optional<FederatedQuery::QueryMeta::ComputeStatus> computeStatus,
std::optional<NYql::NDqProto::StatusIds::StatusCode> pendingStatusCode
Expand All @@ -1256,8 +1258,9 @@ Fq::Private::PingTaskRequest PingTaskRequestBuilder::Build(
}


Fq::Private::PingTaskRequest PingTaskRequestBuilder::Build(const Ydb::TableStats::QueryStats& queryStats) {
return Build(queryStats.query_plan(), queryStats.query_ast(), queryStats.compilation().duration_us(), queryStats.total_duration_us());
Fq::Private::PingTaskRequest PingTaskRequestBuilder::Build(const NYdb::NQuery::TExecStats& queryStats) {
const auto& statsProto = NYdb::TProtoAccessor().GetProto(queryStats);
return Build(statsProto.query_plan(), statsProto.query_ast(), statsProto.compilation().duration_us(), statsProto.total_duration_us());
}

Fq::Private::PingTaskRequest PingTaskRequestBuilder::Build(const TString& queryPlan, const TString& queryAst, int64_t compilationTimeUs, int64_t computeTimeUs) {
Expand Down
7 changes: 4 additions & 3 deletions ydb/core/fq/libs/compute/common/utils.h
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@
#include <ydb/core/fq/libs/ydb/ydb.h>

#include <ydb/public/sdk/cpp/client/ydb_table/table.h>
#include <ydb/public/sdk/cpp/client/ydb_query/query.h>

namespace NFq {

Expand Down Expand Up @@ -53,7 +54,7 @@ TPublicStat GetPublicStat(const TString& statistics);

struct IPlanStatProcessor {
virtual ~IPlanStatProcessor() = default;
virtual Ydb::Query::StatsMode GetStatsMode() = 0;
virtual NYdb::NQuery::EStatsMode GetStatsMode() = 0;
virtual TString ConvertPlan(const TString& plan) = 0;
virtual TString GetPlanVisualization(const TString& plan) = 0;
virtual TString GetQueryStat(const TString& plan, double& cpuUsage) = 0;
Expand All @@ -67,12 +68,12 @@ class PingTaskRequestBuilder {
public:
PingTaskRequestBuilder(const NConfig::TCommonConfig& commonConfig, std::unique_ptr<IPlanStatProcessor>&& processor);
Fq::Private::PingTaskRequest Build(
const Ydb::TableStats::QueryStats& queryStats,
const NYdb::NQuery::TExecStats& queryStats,
const NYql::TIssues& issues,
std::optional<FederatedQuery::QueryMeta::ComputeStatus> computeStatus = std::nullopt,
std::optional<NYql::NDqProto::StatusIds::StatusCode> pendingStatusCode = std::nullopt
);
Fq::Private::PingTaskRequest Build(const Ydb::TableStats::QueryStats& queryStats);
Fq::Private::PingTaskRequest Build(const NYdb::NQuery::TExecStats& queryStats);
Fq::Private::PingTaskRequest Build(const TString& queryPlan, const TString& queryAst, int64_t compilationTimeUs, int64_t computeTimeUs);
NYql::TIssues Issues;
double CpuUsage = 0.0;
Expand Down
14 changes: 7 additions & 7 deletions ydb/core/fq/libs/compute/ydb/events/events.h
Original file line number Diff line number Diff line change
Expand Up @@ -71,7 +71,7 @@ struct TEvYdbCompute {

// Events
struct TEvExecuteScriptRequest : public NActors::TEventLocal<TEvExecuteScriptRequest, EvExecuteScriptRequest> {
TEvExecuteScriptRequest(TString sql, TString idempotencyKey, const TDuration& resultTtl, const TDuration& operationTimeout, Ydb::Query::Syntax syntax, Ydb::Query::ExecMode execMode, Ydb::Query::StatsMode statsMode, const TString& traceId, const std::map<TString, Ydb::TypedValue>& queryParameters)
TEvExecuteScriptRequest(TString sql, TString idempotencyKey, const TDuration& resultTtl, const TDuration& operationTimeout, NYdb::NQuery::ESyntax syntax, NYdb::NQuery::EExecMode execMode, NYdb::NQuery::EStatsMode statsMode, const TString& traceId, const std::map<TString, Ydb::TypedValue>& queryParameters)
: Sql(std::move(sql))
, IdempotencyKey(std::move(idempotencyKey))
, ResultTtl(resultTtl)
Expand All @@ -87,9 +87,9 @@ struct TEvYdbCompute {
TString IdempotencyKey;
TDuration ResultTtl;
TDuration OperationTimeout;
Ydb::Query::Syntax Syntax = Ydb::Query::SYNTAX_YQL_V1;
Ydb::Query::ExecMode ExecMode = Ydb::Query::EXEC_MODE_EXECUTE;
Ydb::Query::StatsMode StatsMode = Ydb::Query::StatsMode::STATS_MODE_FULL;
NYdb::NQuery::ESyntax Syntax = NYdb::NQuery::ESyntax::YqlV1;
NYdb::NQuery::EExecMode ExecMode = NYdb::NQuery::EExecMode::Execute;
NYdb::NQuery::EStatsMode StatsMode = NYdb::NQuery::EStatsMode::Full;
TString TraceId;
std::map<TString, Ydb::TypedValue> QueryParameters;
};
Expand Down Expand Up @@ -127,7 +127,7 @@ struct TEvYdbCompute {
, Ready(ready)
{}

TEvGetOperationResponse(NYdb::NQuery::EExecStatus execStatus, Ydb::StatusIds::StatusCode statusCode, const TVector<Ydb::Query::ResultSetMeta>& resultSetsMeta, const Ydb::TableStats::QueryStats& queryStats, NYql::TIssues issues, bool ready = true)
TEvGetOperationResponse(NYdb::NQuery::EExecStatus execStatus, Ydb::StatusIds::StatusCode statusCode, const std::vector<NYdb::NQuery::TResultSetMeta>& resultSetsMeta, const NYdb::NQuery::TExecStats& queryStats, NYql::TIssues issues, bool ready = true)
: ExecStatus(execStatus)
, StatusCode(statusCode)
, ResultSetsMeta(resultSetsMeta)
Expand All @@ -139,8 +139,8 @@ struct TEvYdbCompute {

NYdb::NQuery::EExecStatus ExecStatus = NYdb::NQuery::EExecStatus::Unspecified;
Ydb::StatusIds::StatusCode StatusCode = Ydb::StatusIds::STATUS_CODE_UNSPECIFIED;
TVector<Ydb::Query::ResultSetMeta> ResultSetsMeta;
Ydb::TableStats::QueryStats QueryStats;
std::vector<NYdb::NQuery::TResultSetMeta> ResultSetsMeta;
NYdb::NQuery::TExecStats QueryStats;
NYql::TIssues Issues;
NYdb::EStatus Status;
bool Ready;
Expand Down
28 changes: 14 additions & 14 deletions ydb/core/fq/libs/compute/ydb/executer_actor.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -59,7 +59,7 @@ class TExecuterActor : public TBaseComputeActor<TExecuterActor> {
}
};

TExecuterActor(const TRunActorParams& params, Ydb::Query::StatsMode statsMode, const TActorId& parent, const TActorId& connector, const TActorId& pinger, const ::NYql::NCommon::TServiceCounters& queryCounters)
TExecuterActor(const TRunActorParams& params, NYdb::NQuery::EStatsMode statsMode, const TActorId& parent, const TActorId& connector, const TActorId& pinger, const ::NYql::NCommon::TServiceCounters& queryCounters)
: TBaseComputeActor(queryCounters, "Executer")
, Params(params)
, StatsMode(statsMode)
Expand Down Expand Up @@ -115,38 +115,38 @@ class TExecuterActor : public TBaseComputeActor<TExecuterActor> {
}

void SendExecuteScript() {
Register(new TRetryActor<TEvYdbCompute::TEvExecuteScriptRequest, TEvYdbCompute::TEvExecuteScriptResponse, TString, TString, TDuration, TDuration, Ydb::Query::Syntax, Ydb::Query::ExecMode, Ydb::Query::StatsMode, TString, std::map<TString, Ydb::TypedValue>>(Counters.GetCounters(ERequestType::RT_EXECUTE_SCRIPT), SelfId(), Connector, Params.Sql, Params.JobId, Params.ResultTtl, Params.ExecutionTtl, GetSyntax(), GetExecuteMode(), StatsMode, Params.JobId + "_" + ToString(Params.RestartCount), Params.QueryParameters));
Register(new TRetryActor<TEvYdbCompute::TEvExecuteScriptRequest, TEvYdbCompute::TEvExecuteScriptResponse, TString, TString, TDuration, TDuration, NYdb::NQuery::ESyntax, NYdb::NQuery::EExecMode, NYdb::NQuery::EStatsMode, TString, std::map<TString, Ydb::TypedValue>>(Counters.GetCounters(ERequestType::RT_EXECUTE_SCRIPT), SelfId(), Connector, Params.Sql, Params.JobId, Params.ResultTtl, Params.ExecutionTtl, GetSyntax(), GetExecuteMode(), StatsMode, Params.JobId + "_" + ToString(Params.RestartCount), Params.QueryParameters));
}

Ydb::Query::Syntax GetSyntax() const {
NYdb::NQuery::ESyntax GetSyntax() const {
switch (Params.QuerySyntax) {
case FederatedQuery::QueryContent::PG:
return Ydb::Query::SYNTAX_PG;
return NYdb::NQuery::ESyntax::Pg;
case FederatedQuery::QueryContent::YQL_V1:
return Ydb::Query::SYNTAX_YQL_V1;
return NYdb::NQuery::ESyntax::YqlV1;
case FederatedQuery::QueryContent::QUERY_SYNTAX_UNSPECIFIED:
case FederatedQuery::QueryContent_QuerySyntax_QueryContent_QuerySyntax_INT_MAX_SENTINEL_DO_NOT_USE_:
case FederatedQuery::QueryContent_QuerySyntax_QueryContent_QuerySyntax_INT_MIN_SENTINEL_DO_NOT_USE_:
return Ydb::Query::SYNTAX_UNSPECIFIED;
return NYdb::NQuery::ESyntax::Unspecified;
}
}

Ydb::Query::ExecMode GetExecuteMode() const {
NYdb::NQuery::EExecMode GetExecuteMode() const {
switch (Params.ExecuteMode) {
case FederatedQuery::RUN:
return Ydb::Query::ExecMode::EXEC_MODE_EXECUTE;
return NYdb::NQuery::EExecMode::Execute;
case FederatedQuery::PARSE:
return Ydb::Query::ExecMode::EXEC_MODE_PARSE;
return NYdb::NQuery::EExecMode::Parse;
case FederatedQuery::VALIDATE:
return Ydb::Query::ExecMode::EXEC_MODE_VALIDATE;
return NYdb::NQuery::EExecMode::Validate;
case FederatedQuery::EXPLAIN:
return Ydb::Query::ExecMode::EXEC_MODE_EXPLAIN;
return NYdb::NQuery::EExecMode::Explain;
case FederatedQuery::EXECUTE_MODE_UNSPECIFIED:
case FederatedQuery::COMPILE:
case FederatedQuery::SAVE:
case FederatedQuery::ExecuteMode_INT_MAX_SENTINEL_DO_NOT_USE_:
case FederatedQuery::ExecuteMode_INT_MIN_SENTINEL_DO_NOT_USE_:
return Ydb::Query::ExecMode::EXEC_MODE_UNSPECIFIED;
return NYdb::NQuery::EExecMode::Unspecified;
}
}

Expand All @@ -163,7 +163,7 @@ class TExecuterActor : public TBaseComputeActor<TExecuterActor> {

private:
TRunActorParams Params;
Ydb::Query::StatsMode StatsMode;
NYdb::NQuery::EStatsMode StatsMode;
TActorId Parent;
TActorId Connector;
TActorId Pinger;
Expand All @@ -174,7 +174,7 @@ class TExecuterActor : public TBaseComputeActor<TExecuterActor> {
};

std::unique_ptr<NActors::IActor> CreateExecuterActor(const TRunActorParams& params,
Ydb::Query::StatsMode statsMode,
NYdb::NQuery::EStatsMode statsMode,
const TActorId& parent,
const TActorId& connector,
const TActorId& pinger,
Expand Down
4 changes: 3 additions & 1 deletion ydb/core/fq/libs/compute/ydb/executer_actor.h
Original file line number Diff line number Diff line change
Expand Up @@ -6,10 +6,12 @@

#include <ydb/library/actors/core/actor.h>

#include <ydb/public/sdk/cpp/client/ydb_query/query.h>

namespace NFq {

std::unique_ptr<NActors::IActor> CreateExecuterActor(const TRunActorParams& params,
Ydb::Query::StatsMode statsMode,
NYdb::NQuery::EStatsMode statsMode,
const NActors::TActorId& parent,
const NActors::TActorId& connector,
const NActors::TActorId& pinger,
Expand Down
6 changes: 4 additions & 2 deletions ydb/core/fq/libs/compute/ydb/result_writer_actor.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -327,8 +327,10 @@ class TResultWriterActor : public TBaseComputeActor<TResultWriterActor> {

for (const auto& resultSetMeta: ev.Get()->Get()->ResultSetsMeta) {
auto& meta = *PingTaskRequest.add_result_set_meta();
for (const auto& column: resultSetMeta.columns()) {
*meta.add_column() = column;
for (const auto& column: resultSetMeta.Columns) {
auto& new_column = *meta.add_column();
new_column.set_name(column.Name);
*new_column.mutable_type() = column.Type.GetProto();
}
}

Expand Down
4 changes: 2 additions & 2 deletions ydb/core/fq/libs/compute/ydb/status_tracker_actor.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -234,7 +234,7 @@ class TStatusTrackerActor : public TBaseComputeActor<TStatusTrackerActor> {
}

void UpdateCpuQuota(double cpuUsage) {
TDuration duration = TDuration::MicroSeconds(QueryStats.total_duration_us());
TDuration duration = QueryStats.GetTotalDuration();
if (cpuUsage && duration) {
Send(NFq::ComputeDatabaseControlPlaneServiceActorId(), new TEvYdbCompute::TEvCpuQuotaAdjust(Params.Scope.ToString(), duration, cpuUsage));
}
Expand Down Expand Up @@ -282,7 +282,7 @@ class TStatusTrackerActor : public TBaseComputeActor<TStatusTrackerActor> {
NYdb::EStatus Status = NYdb::EStatus::SUCCESS;
NYdb::NQuery::EExecStatus ExecStatus = NYdb::NQuery::EExecStatus::Unspecified;
NYql::NDqProto::StatusIds::StatusCode StatusCode = NYql::NDqProto::StatusIds::StatusCode::StatusIds_StatusCode_UNSPECIFIED;
Ydb::TableStats::QueryStats QueryStats;
NYdb::NQuery::TExecStats QueryStats;
NKikimr::TBackoffTimer BackoffTimer;
NFq::TStatusCodeByScopeCounters::TPtr FailedStatusCodeCounters;
FederatedQuery::QueryMeta::ComputeStatus ComputeStatus = FederatedQuery::QueryMeta::RUNNING;
Expand Down
6 changes: 3 additions & 3 deletions ydb/core/kqp/ut/federated_query/s3/kqp_federated_query_ut.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -188,7 +188,7 @@ Y_UNIT_TEST_SUITE(KqpFederatedQuery) {
auto result = session.ExecuteSchemeQuery(query).GetValueSync();
UNIT_ASSERT_C(result.GetStatus() == NYdb::EStatus::SUCCESS, result.GetIssues().ToString());

auto settings = TExecuteScriptSettings().StatsMode(Ydb::Query::STATS_MODE_BASIC);
auto settings = TExecuteScriptSettings().StatsMode(EStatsMode::Basic);

const TString sql = fmt::format(R"(
SELECT * FROM `{external_table}`
Expand All @@ -201,12 +201,12 @@ Y_UNIT_TEST_SUITE(KqpFederatedQuery) {

NYdb::NQuery::TScriptExecutionOperation readyOp = WaitScriptExecutionOperation(scriptExecutionOperation.Id(), kikimr->GetDriver());
UNIT_ASSERT_EQUAL(readyOp.Metadata().ExecStatus, EExecStatus::Completed);
UNIT_ASSERT_EQUAL(readyOp.Metadata().ExecStats.compilation().from_cache(), false);
UNIT_ASSERT_EQUAL(TProtoAccessor().GetProto(readyOp.Metadata().ExecStats).compilation().from_cache(), false);

scriptExecutionOperation = db.ExecuteScript(sql, settings).ExtractValueSync();
readyOp = WaitScriptExecutionOperation(scriptExecutionOperation.Id(), kikimr->GetDriver());
UNIT_ASSERT_EQUAL(readyOp.Metadata().ExecStatus, EExecStatus::Completed);
UNIT_ASSERT_EQUAL(readyOp.Metadata().ExecStats.compilation().from_cache(), false);
UNIT_ASSERT_EQUAL(TProtoAccessor().GetProto(readyOp.Metadata().ExecStats).compilation().from_cache(), false);
}

Y_UNIT_TEST(ExecuteScriptWithDataSource) {
Expand Down
Loading

0 comments on commit bd87563

Please sign in to comment.