Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Support CollectDiagnostics in DATA_QUERY/GENERIC_QUERY #11371

Open
wants to merge 10 commits into
base: main
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from 9 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
24 changes: 24 additions & 0 deletions ydb/core/grpc_services/query/rpc_execute_query.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -175,6 +175,25 @@ bool NeedReportAst(const Ydb::Query::ExecuteQueryRequest& req) {
}
}

bool NeedCollectDiagnostics(const Ydb::Query::ExecuteQueryRequest& req) {
switch (req.exec_mode()) {
case Ydb::Query::EXEC_MODE_EXPLAIN:
return true;

case Ydb::Query::EXEC_MODE_EXECUTE:
switch (req.stats_mode()) {
case Ydb::Query::StatsMode::STATS_MODE_FULL:
case Ydb::Query::StatsMode::STATS_MODE_PROFILE:
return true;
default:
return false;
}

default:
return false;
}
}

class TExecuteQueryRPC : public TActorBootstrapped<TExecuteQueryRPC> {
public:
static constexpr NKikimrServices::TActivity::EType ActorActivityType() {
Expand Down Expand Up @@ -278,6 +297,8 @@ class TExecuteQueryRPC : public TActorBootstrapped<TExecuteQueryRPC> {
settings,
req->pool_id());

ev->Record.MutableRequest()->SetCollectDiagnostics(NeedCollectDiagnostics(*req));

if (!ctx.Send(NKqp::MakeKqpProxyID(ctx.SelfID.NodeId()), ev.Release(), 0, 0, Span_.GetTraceId())) {
NYql::TIssues issues;
issues.AddIssue(MakeIssue(NKikimrIssues::TIssuesIds::DEFAULT_ERROR, "Internal error"));
Expand Down Expand Up @@ -375,6 +396,9 @@ class TExecuteQueryRPC : public TActorBootstrapped<TExecuteQueryRPC> {
if (NeedReportAst(*Request_->GetProtoRequest())) {
response.mutable_exec_stats()->set_query_ast(kqpResponse.GetQueryAst());
}
if (NeedCollectDiagnostics(*Request_->GetProtoRequest())) {
response.mutable_exec_stats()->set_query_diagnostics(kqpResponse.GetQueryDiagnostics());
}
}

if (record.GetYdbStatus() == Ydb::StatusIds::SUCCESS) {
Expand Down
13 changes: 13 additions & 0 deletions ydb/core/grpc_services/rpc_execute_data_query.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,16 @@ using namespace Ydb;
using namespace Ydb::Table;
using namespace NKqp;

bool NeedCollectDiagnostics(const Ydb::Table::ExecuteDataQueryRequest& req) {
switch (req.collect_stats()) {
case Ydb::Table::QueryStatsCollection::STATS_COLLECTION_FULL:
case Ydb::Table::QueryStatsCollection::STATS_COLLECTION_PROFILE:
return true;
default:
return false;
}
}

using TEvExecuteDataQueryRequest = TGrpcRequestOperationCall<Ydb::Table::ExecuteDataQueryRequest,
Ydb::Table::ExecuteDataQueryResponse>;

Expand Down Expand Up @@ -146,6 +156,8 @@ class TExecuteDataQueryRPC : public TRpcKqpRequestActor<TExecuteDataQueryRPC, TE
req->collect_stats(),
req->has_query_cache_policy() ? &req->query_cache_policy() : nullptr,
req->has_operation_params() ? &req->operation_params() : nullptr);

ev->Record.MutableRequest()->SetCollectDiagnostics(NeedCollectDiagnostics(*req));

ReportCostInfo_ = req->operation_params().report_cost_info() == Ydb::FeatureFlag::ENABLED;

Expand All @@ -166,6 +178,7 @@ class TExecuteDataQueryRPC : public TRpcKqpRequestActor<TExecuteDataQueryRPC, TE
if (from.HasQueryStats()) {
FillQueryStats(*to->mutable_query_stats(), from);
to->mutable_query_stats()->set_query_ast(from.GetQueryAst());
to->mutable_query_stats()->set_query_diagnostics(from.GetQueryDiagnostics());
return;
}
}
Expand Down
28 changes: 26 additions & 2 deletions ydb/core/grpc_services/rpc_stream_execute_scan_query.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -93,6 +93,27 @@ bool NeedReportPlan(const Ydb::Table::ExecuteScanQueryRequest& req) {
}
}

bool NeedCollectDiagnostics(const Ydb::Table::ExecuteScanQueryRequest& req) {
switch (req.mode()) {
case ExecuteScanQueryRequest_Mode_MODE_EXPLAIN:
return true;

case ExecuteScanQueryRequest_Mode_MODE_EXEC:
switch (req.collect_stats()) {
case Ydb::Table::QueryStatsCollection::STATS_COLLECTION_FULL:
case Ydb::Table::QueryStatsCollection::STATS_COLLECTION_PROFILE:
return true;
default:
break;
}

return false;

default:
return false;
}
}

bool CheckRequest(const Ydb::Table::ExecuteScanQueryRequest& req, TParseRequestError& error)
{
switch (req.mode()) {
Expand Down Expand Up @@ -228,7 +249,7 @@ class TStreamExecuteScanQueryRPC : public TActorBootstrapped<TStreamExecuteScanQ
nullptr
);

ev->Record.MutableRequest()->SetCollectDiagnostics(req->Getcollect_full_diagnostics());
ev->Record.MutableRequest()->SetCollectDiagnostics(NeedCollectDiagnostics(*req));

if (!ctx.Send(NKqp::MakeKqpProxyID(ctx.SelfID.NodeId()), ev.Release())) {
NYql::TIssues issues;
Expand Down Expand Up @@ -291,6 +312,7 @@ class TStreamExecuteScanQueryRPC : public TActorBootstrapped<TStreamExecuteScanQ

bool reportStats = NeedReportStats(*Request_->GetProtoRequest());
bool reportPlan = reportStats && NeedReportPlan(*Request_->GetProtoRequest());
bool collectDiagnostics = NeedCollectDiagnostics(*Request_->GetProtoRequest());

if (reportStats) {
if (kqpResponse.HasQueryStats()) {
Expand All @@ -308,7 +330,9 @@ class TStreamExecuteScanQueryRPC : public TActorBootstrapped<TStreamExecuteScanQ
response.mutable_result()->mutable_query_stats()->set_query_ast(kqpResponse.GetQueryAst());
}

response.mutable_result()->set_query_full_diagnostics(kqpResponse.GetQueryDiagnostics());
if (collectDiagnostics) {
response.mutable_result()->mutable_query_stats()->set_query_diagnostics(kqpResponse.GetQueryDiagnostics());
}

Y_PROTOBUF_SUPPRESS_NODISCARD response.SerializeToString(&out);
Request_->SendSerializedResult(std::move(out), record.GetYdbStatus());
Expand Down
12 changes: 6 additions & 6 deletions ydb/core/kqp/common/compilation/events.h
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,7 @@ namespace NKikimr::NKqp::NPrivateEvents {

struct TEvCompileRequest: public TEventLocal<TEvCompileRequest, TKqpEvents::EvCompileRequest> {
TEvCompileRequest(const TIntrusiveConstPtr<NACLib::TUserToken>& userToken, const TString& clientAddress, const TMaybe<TString>& uid,
TMaybe<TKqpQueryId>&& query, bool keepInCache, bool isQueryActionPrepare, bool perStatementResult, TInstant deadline,
TMaybe<TKqpQueryId>&& query, bool keepInCache, NKikimrKqp::EQueryAction queryAction, bool perStatementResult, TInstant deadline,
TKqpDbCountersPtr dbCounters, const TGUCSettings::TPtr& gUCSettings, const TMaybe<TString>& applicationName,
std::shared_ptr<std::atomic<bool>> intrestedInResult, const TIntrusivePtr<TUserRequestContext>& userRequestContext, NLWTrace::TOrbit orbit = {},
TKqpTempTablesState::TConstPtr tempTablesState = nullptr, bool collectDiagnostics = false, TMaybe<TQueryAst> queryAst = Nothing(),
Expand All @@ -26,7 +26,7 @@ struct TEvCompileRequest: public TEventLocal<TEvCompileRequest, TKqpEvents::EvCo
, Uid(uid)
, Query(std::move(query))
, KeepInCache(keepInCache)
, IsQueryActionPrepare(isQueryActionPrepare)
, QueryAction(queryAction)
, PerStatementResult(perStatementResult)
, Deadline(deadline)
, DbCounters(dbCounters)
Expand All @@ -50,7 +50,7 @@ struct TEvCompileRequest: public TEventLocal<TEvCompileRequest, TKqpEvents::EvCo
TMaybe<TString> Uid;
TMaybe<TKqpQueryId> Query;
bool KeepInCache = false;
bool IsQueryActionPrepare = false;
NKikimrKqp::EQueryAction QueryAction;
bool PerStatementResult = false;
// it is allowed for local event to use absolute time (TInstant) instead of time interval (TDuration)
TInstant Deadline;
Expand All @@ -76,7 +76,7 @@ struct TEvCompileRequest: public TEventLocal<TEvCompileRequest, TKqpEvents::EvCo

struct TEvRecompileRequest: public TEventLocal<TEvRecompileRequest, TKqpEvents::EvRecompileRequest> {
TEvRecompileRequest(const TIntrusiveConstPtr<NACLib::TUserToken>& userToken, const TString& clientAddress, const TString& uid,
const TMaybe<TKqpQueryId>& query, bool isQueryActionPrepare, TInstant deadline,
const TMaybe<TKqpQueryId>& query, NKikimrKqp::EQueryAction queryAction, TInstant deadline,
TKqpDbCountersPtr dbCounters, const TGUCSettings::TPtr& gUCSettings, const TMaybe<TString>& applicationName,
std::shared_ptr<std::atomic<bool>> intrestedInResult, const TIntrusivePtr<TUserRequestContext>& userRequestContext,
NLWTrace::TOrbit orbit = {}, TKqpTempTablesState::TConstPtr tempTablesState = nullptr, TMaybe<TQueryAst> queryAst = Nothing(),
Expand All @@ -85,7 +85,7 @@ struct TEvRecompileRequest: public TEventLocal<TEvRecompileRequest, TKqpEvents::
, ClientAddress(clientAddress)
, Uid(uid)
, Query(query)
, IsQueryActionPrepare(isQueryActionPrepare)
, QueryAction(queryAction)
, Deadline(deadline)
, DbCounters(dbCounters)
, GUCSettings(gUCSettings)
Expand All @@ -105,7 +105,7 @@ struct TEvRecompileRequest: public TEventLocal<TEvRecompileRequest, TKqpEvents::
TString ClientAddress;
TString Uid;
TMaybe<TKqpQueryId> Query;
bool IsQueryActionPrepare = false;
NKikimrKqp::EQueryAction QueryAction;

TInstant Deadline;
TKqpDbCountersPtr DbCounters;
Expand Down
2 changes: 1 addition & 1 deletion ydb/core/kqp/common/events/query.h
Original file line number Diff line number Diff line change
Expand Up @@ -282,7 +282,7 @@ struct TEvQueryRequest: public NActors::TEventLocal<TEvQueryRequest, TKqpEvents:
}

bool GetCollectDiagnostics() const {
return Record.GetRequest().GetCollectDiagnostics();
return Record.MutableRequest()->GetCollectDiagnostics();
}

ui32 CalculateSerializedSize() const override {
Expand Down
12 changes: 8 additions & 4 deletions ydb/core/kqp/compile_service/kqp_compile_actor.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -52,7 +52,7 @@ class TKqpCompileActor : public TActorBootstrapped<TKqpCompileActor> {
TKqpDbCountersPtr dbCounters, std::optional<TKqpFederatedQuerySetup> federatedQuerySetup,
const TIntrusivePtr<TUserRequestContext>& userRequestContext,
NWilson::TTraceId traceId, TKqpTempTablesState::TConstPtr tempTablesState, bool collectFullDiagnostics,
bool perStatementResult,
bool perStatementResult, NKikimrKqp::EQueryAction queryAction,
ECompileActorAction compileAction, TMaybe<TQueryAst> queryAst,
NYql::TExprContext* splitCtx,
NYql::TExprNode::TPtr splitExpr)
Expand All @@ -75,6 +75,7 @@ class TKqpCompileActor : public TActorBootstrapped<TKqpCompileActor> {
, CompileActorSpan(TWilsonKqp::CompileActor, std::move(traceId), "CompileActor")
, TempTablesState(std::move(tempTablesState))
, CollectFullDiagnostics(collectFullDiagnostics)
, QueryAction(queryAction)
, CompileAction(compileAction)
, QueryAst(std::move(queryAst))
, SplitCtx(splitCtx)
Expand Down Expand Up @@ -365,7 +366,9 @@ class TKqpCompileActor : public TActorBootstrapped<TKqpCompileActor> {
replayMessage.InsertValue("query_syntax", ToString(Config->_KqpYqlSyntaxVersion.Get().GetRef()));
replayMessage.InsertValue("query_database", QueryId.Database);
replayMessage.InsertValue("query_cluster", QueryId.Cluster);
replayMessage.InsertValue("query_plan", queryPlan);
if (QueryAction == NKikimrKqp::QUERY_ACTION_EXPLAIN) {
replayMessage.InsertValue("query_plan", queryPlan);
}
replayMessage.InsertValue("query_type", ToString(QueryId.Settings.QueryType));

if (CollectFullDiagnostics) {
Expand Down Expand Up @@ -613,6 +616,7 @@ class TKqpCompileActor : public TActorBootstrapped<TKqpCompileActor> {
bool CollectFullDiagnostics;

bool PerStatementResult;
NKikimrKqp::EQueryAction QueryAction;
ECompileActorAction CompileAction;
TMaybe<TQueryAst> QueryAst;

Expand Down Expand Up @@ -665,7 +669,7 @@ IActor* CreateKqpCompileActor(const TActorId& owner, const TKqpSettings::TConstP
const TString& uid, const TKqpQueryId& query, const TIntrusiveConstPtr<NACLib::TUserToken>& userToken, const TString& clientAddress,
std::optional<TKqpFederatedQuerySetup> federatedQuerySetup, TKqpDbCountersPtr dbCounters, const TGUCSettings::TPtr& gUCSettings,
const TMaybe<TString>& applicationName, const TIntrusivePtr<TUserRequestContext>& userRequestContext,
NWilson::TTraceId traceId, TKqpTempTablesState::TConstPtr tempTablesState,
NWilson::TTraceId traceId, TKqpTempTablesState::TConstPtr tempTablesState, NKikimrKqp::EQueryAction queryAction,
ECompileActorAction compileAction, TMaybe<TQueryAst> queryAst, bool collectFullDiagnostics,
bool perStatementResult, NYql::TExprContext* splitCtx, NYql::TExprNode::TPtr splitExpr)
{
Expand All @@ -674,7 +678,7 @@ IActor* CreateKqpCompileActor(const TActorId& owner, const TKqpSettings::TConstP
uid, query, userToken, clientAddress, dbCounters,
federatedQuerySetup, userRequestContext,
std::move(traceId), std::move(tempTablesState), collectFullDiagnostics,
perStatementResult, compileAction, std::move(queryAst),
perStatementResult, queryAction, compileAction, std::move(queryAst),
splitCtx, splitExpr);
}

Expand Down
14 changes: 7 additions & 7 deletions ydb/core/kqp/compile_service/kqp_compile_service.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -29,17 +29,17 @@ using namespace NYql;


struct TKqpCompileSettings {
TKqpCompileSettings(bool keepInCache, bool isQueryActionPrepare, bool perStatementResult,
TKqpCompileSettings(bool keepInCache, NKikimrKqp::EQueryAction queryAction, bool perStatementResult,
const TInstant& deadline, ECompileActorAction action = ECompileActorAction::COMPILE)
: KeepInCache(keepInCache)
, IsQueryActionPrepare(isQueryActionPrepare)
, QueryAction(queryAction)
, PerStatementResult(perStatementResult)
, Deadline(deadline)
, Action(action)
{}

bool KeepInCache;
bool IsQueryActionPrepare;
NKikimrKqp::EQueryAction QueryAction;
bool PerStatementResult;
TInstant Deadline;
ECompileActorAction Action;
Expand Down Expand Up @@ -464,7 +464,7 @@ class TKqpCompileService : public TActorBootstrapped<TKqpCompileService> {

TKqpCompileSettings compileSettings(
request.KeepInCache,
request.IsQueryActionPrepare,
request.QueryAction,
request.PerStatementResult,
request.Deadline,
ev->Get()->Split
Expand Down Expand Up @@ -532,7 +532,7 @@ class TKqpCompileService : public TActorBootstrapped<TKqpCompileService> {

TKqpCompileSettings compileSettings(
true,
request.IsQueryActionPrepare,
request.QueryAction,
false,
request.Deadline,
ev->Get()->Split
Expand Down Expand Up @@ -626,7 +626,7 @@ class TKqpCompileService : public TActorBootstrapped<TKqpCompileService> {
try {
if (compileResult->Status == Ydb::StatusIds::SUCCESS) {
if (!hasTempTablesNameClashes) {
UpdateQueryCache(ctx, compileResult, keepInCache, compileRequest.CompileSettings.IsQueryActionPrepare, isPerStatementExecution);
UpdateQueryCache(ctx, compileResult, keepInCache, compileRequest.CompileSettings.QueryAction == NKikimrKqp::QUERY_ACTION_PREPARE, isPerStatementExecution);
}

if (ev->Get()->ReplayMessage && !QueryReplayBackend->IsNull()) {
Expand Down Expand Up @@ -849,7 +849,7 @@ class TKqpCompileService : public TActorBootstrapped<TKqpCompileService> {
void StartCompilation(TKqpCompileRequest&& request, const TActorContext& ctx) {
auto compileActor = CreateKqpCompileActor(ctx.SelfID, KqpSettings, TableServiceConfig, QueryServiceConfig, ModuleResolverState, Counters,
request.Uid, request.Query, request.UserToken, request.ClientAddress, FederatedQuerySetup, request.DbCounters, request.GUCSettings, request.ApplicationName, request.UserRequestContext,
request.CompileServiceSpan.GetTraceId(), request.TempTablesState, request.CompileSettings.Action, std::move(request.QueryAst), CollectDiagnostics,
request.CompileServiceSpan.GetTraceId(), request.TempTablesState, request.CompileSettings.QueryAction, request.CompileSettings.Action, std::move(request.QueryAst), CollectDiagnostics,
request.CompileSettings.PerStatementResult, request.SplitCtx, request.SplitExpr);
auto compileActorId = ctx.ExecutorThread.RegisterActor(compileActor, TMailboxType::HTSwap,
AppData(ctx)->UserPoolId);
Expand Down
1 change: 1 addition & 0 deletions ydb/core/kqp/compile_service/kqp_compile_service.h
Original file line number Diff line number Diff line change
Expand Up @@ -166,6 +166,7 @@ IActor* CreateKqpCompileActor(const TActorId& owner, const TKqpSettings::TConstP
TKqpDbCountersPtr dbCounters, const TGUCSettings::TPtr& gUCSettings, const TMaybe<TString>& applicationName,
const TIntrusivePtr<TUserRequestContext>& userRequestContext, NWilson::TTraceId traceId = {},
TKqpTempTablesState::TConstPtr tempTablesState = nullptr,
NKikimrKqp::EQueryAction queryAction = NKikimrKqp::QUERY_ACTION_EXECUTE,
ECompileActorAction compileAction = ECompileActorAction::COMPILE,
TMaybe<TQueryAst> queryAst = {},
bool collectFullDiagnostics = false,
Expand Down
9 changes: 3 additions & 6 deletions ydb/core/kqp/session_actor/kqp_query_state.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -300,16 +300,14 @@ std::unique_ptr<TEvKqp::TEvCompileRequest> TKqpQueryState::BuildCompileRequest(s
compileDeadline = Min(compileDeadline, QueryDeadlines.CancelAt);
}

bool isQueryActionPrepare = GetAction() == NKikimrKqp::QUERY_ACTION_PREPARE;

TMaybe<TQueryAst> statementAst;
if (!Statements.empty()) {
YQL_ENSURE(CurrentStatementId < Statements.size());
statementAst = Statements[CurrentStatementId];
}

return std::make_unique<TEvKqp::TEvCompileRequest>(UserToken, ClientAddress, uid, std::move(query), keepInCache,
isQueryActionPrepare, perStatementResult, compileDeadline, DbCounters, gUCSettingsPtr, ApplicationName, std::move(cookie),
GetAction(), perStatementResult, compileDeadline, DbCounters, gUCSettingsPtr, ApplicationName, std::move(cookie),
UserRequestContext, std::move(Orbit), TempTablesState, GetCollectDiagnostics(), statementAst);
}

Expand Down Expand Up @@ -344,12 +342,11 @@ std::unique_ptr<TEvKqp::TEvRecompileRequest> TKqpQueryState::BuildReCompileReque
}

auto compileDeadline = QueryDeadlines.TimeoutAt;
bool isQueryActionPrepare = GetAction() == NKikimrKqp::QUERY_ACTION_PREPARE;
if (QueryDeadlines.CancelAt) {
compileDeadline = Min(compileDeadline, QueryDeadlines.CancelAt);
}

return std::make_unique<TEvKqp::TEvRecompileRequest>(UserToken, ClientAddress, CompileResult->Uid, query, isQueryActionPrepare,
return std::make_unique<TEvKqp::TEvRecompileRequest>(UserToken, ClientAddress, CompileResult->Uid, query, GetAction(),
compileDeadline, DbCounters, gUCSettingsPtr, ApplicationName, std::move(cookie), UserRequestContext, std::move(Orbit), TempTablesState,
CompileResult->QueryAst);
}
Expand Down Expand Up @@ -392,7 +389,7 @@ std::unique_ptr<TEvKqp::TEvCompileRequest> TKqpQueryState::BuildCompileSplittedR
}

return std::make_unique<TEvKqp::TEvCompileRequest>(UserToken, ClientAddress, uid, std::move(query), false,
false, perStatementResult, compileDeadline, DbCounters, gUCSettingsPtr, ApplicationName, std::move(cookie),
NKikimrKqp::QUERY_ACTION_EXECUTE, perStatementResult, compileDeadline, DbCounters, gUCSettingsPtr, ApplicationName, std::move(cookie),
UserRequestContext, std::move(Orbit), TempTablesState, GetCollectDiagnostics(), statementAst,
false, SplittedCtx.get(), SplittedExprs.at(NextSplittedExpr));
}
Expand Down
4 changes: 2 additions & 2 deletions ydb/core/kqp/ut/olap/kqp_olap_ut.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -308,7 +308,7 @@ Y_UNIT_TEST_SUITE(KqpOlap) {

{
TStreamExecScanQuerySettings settings;
settings.CollectQueryStats(ECollectQueryStatsMode::Full);
settings.CollectQueryStats(ECollectQueryStatsMode::Basic);
auto it = client.StreamExecuteScanQuery(R"(
--!syntax_v1
SELECT `resource_id`, `timestamp`
Expand All @@ -325,7 +325,7 @@ Y_UNIT_TEST_SUITE(KqpOlap) {
{
TStreamExecScanQuerySettings settings;
settings.CollectQueryStats(ECollectQueryStatsMode::Full);
settings.CollectFullDiagnostics(true);

auto it = client.StreamExecuteScanQuery(R"(
--!syntax_v1
SELECT `resource_id`, `timestamp`
Expand Down
Loading
Loading