Skip to content

Commit

Permalink
Merge c706d98 into 8193406
Browse files Browse the repository at this point in the history
  • Loading branch information
shnikd authored Dec 27, 2024
2 parents 8193406 + c706d98 commit c15077f
Show file tree
Hide file tree
Showing 25 changed files with 353 additions and 64 deletions.
24 changes: 24 additions & 0 deletions ydb/core/grpc_services/query/rpc_execute_query.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -178,6 +178,25 @@ bool NeedReportAst(const Ydb::Query::ExecuteQueryRequest& req) {
}
}

bool NeedCollectDiagnostics(const Ydb::Query::ExecuteQueryRequest& req) {
switch (req.exec_mode()) {
case Ydb::Query::EXEC_MODE_EXPLAIN:
return true;

case Ydb::Query::EXEC_MODE_EXECUTE:
switch (req.stats_mode()) {
case Ydb::Query::StatsMode::STATS_MODE_FULL:
case Ydb::Query::StatsMode::STATS_MODE_PROFILE:
return true;
default:
return false;
}

default:
return false;
}
}

class TExecuteQueryRPC : public TActorBootstrapped<TExecuteQueryRPC> {
public:
static constexpr NKikimrServices::TActivity::EType ActorActivityType() {
Expand Down Expand Up @@ -281,6 +300,8 @@ class TExecuteQueryRPC : public TActorBootstrapped<TExecuteQueryRPC> {
settings,
req->pool_id());

ev->Record.MutableRequest()->SetCollectDiagnostics(NeedCollectDiagnostics(*req));

if (!ctx.Send(NKqp::MakeKqpProxyID(ctx.SelfID.NodeId()), ev.Release(), 0, 0, Span_.GetTraceId())) {
NYql::TIssues issues;
issues.AddIssue(MakeIssue(NKikimrIssues::TIssuesIds::DEFAULT_ERROR, "Internal error"));
Expand Down Expand Up @@ -378,6 +399,9 @@ class TExecuteQueryRPC : public TActorBootstrapped<TExecuteQueryRPC> {
if (NeedReportAst(*Request_->GetProtoRequest())) {
response.mutable_exec_stats()->set_query_ast(kqpResponse.GetQueryAst());
}
if (NeedCollectDiagnostics(*Request_->GetProtoRequest())) {
response.mutable_exec_stats()->set_query_diagnostics(kqpResponse.GetQueryDiagnostics());
}
}

if (record.GetYdbStatus() == Ydb::StatusIds::SUCCESS) {
Expand Down
13 changes: 13 additions & 0 deletions ydb/core/grpc_services/rpc_execute_data_query.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,16 @@ using namespace Ydb;
using namespace Ydb::Table;
using namespace NKqp;

bool NeedCollectDiagnostics(const Ydb::Table::ExecuteDataQueryRequest& req) {
switch (req.collect_stats()) {
case Ydb::Table::QueryStatsCollection::STATS_COLLECTION_FULL:
case Ydb::Table::QueryStatsCollection::STATS_COLLECTION_PROFILE:
return true;
default:
return false;
}
}

using TEvExecuteDataQueryRequest = TGrpcRequestOperationCall<Ydb::Table::ExecuteDataQueryRequest,
Ydb::Table::ExecuteDataQueryResponse>;

Expand Down Expand Up @@ -146,6 +156,8 @@ class TExecuteDataQueryRPC : public TRpcKqpRequestActor<TExecuteDataQueryRPC, TE
req->collect_stats(),
req->has_query_cache_policy() ? &req->query_cache_policy() : nullptr,
req->has_operation_params() ? &req->operation_params() : nullptr);

ev->Record.MutableRequest()->SetCollectDiagnostics(NeedCollectDiagnostics(*req));

ReportCostInfo_ = req->operation_params().report_cost_info() == Ydb::FeatureFlag::ENABLED;

Expand All @@ -166,6 +178,7 @@ class TExecuteDataQueryRPC : public TRpcKqpRequestActor<TExecuteDataQueryRPC, TE
if (from.HasQueryStats()) {
FillQueryStats(*to->mutable_query_stats(), from);
to->mutable_query_stats()->set_query_ast(from.GetQueryAst());
to->mutable_query_stats()->set_query_diagnostics(from.GetQueryDiagnostics());
return;
}
}
Expand Down
28 changes: 26 additions & 2 deletions ydb/core/grpc_services/rpc_stream_execute_scan_query.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -93,6 +93,27 @@ bool NeedReportPlan(const Ydb::Table::ExecuteScanQueryRequest& req) {
}
}

bool NeedCollectDiagnostics(const Ydb::Table::ExecuteScanQueryRequest& req) {
switch (req.mode()) {
case ExecuteScanQueryRequest_Mode_MODE_EXPLAIN:
return true;

case ExecuteScanQueryRequest_Mode_MODE_EXEC:
switch (req.collect_stats()) {
case Ydb::Table::QueryStatsCollection::STATS_COLLECTION_FULL:
case Ydb::Table::QueryStatsCollection::STATS_COLLECTION_PROFILE:
return true;
default:
break;
}

return false;

default:
return false;
}
}

bool CheckRequest(const Ydb::Table::ExecuteScanQueryRequest& req, TParseRequestError& error)
{
switch (req.mode()) {
Expand Down Expand Up @@ -228,7 +249,7 @@ class TStreamExecuteScanQueryRPC : public TActorBootstrapped<TStreamExecuteScanQ
nullptr
);

ev->Record.MutableRequest()->SetCollectDiagnostics(req->Getcollect_full_diagnostics());
ev->Record.MutableRequest()->SetCollectDiagnostics(NeedCollectDiagnostics(*req));

if (!ctx.Send(NKqp::MakeKqpProxyID(ctx.SelfID.NodeId()), ev.Release())) {
NYql::TIssues issues;
Expand Down Expand Up @@ -291,6 +312,7 @@ class TStreamExecuteScanQueryRPC : public TActorBootstrapped<TStreamExecuteScanQ

bool reportStats = NeedReportStats(*Request_->GetProtoRequest());
bool reportPlan = reportStats && NeedReportPlan(*Request_->GetProtoRequest());
bool collectDiagnostics = NeedCollectDiagnostics(*Request_->GetProtoRequest());

if (reportStats) {
if (kqpResponse.HasQueryStats()) {
Expand All @@ -308,7 +330,9 @@ class TStreamExecuteScanQueryRPC : public TActorBootstrapped<TStreamExecuteScanQ
response.mutable_result()->mutable_query_stats()->set_query_ast(kqpResponse.GetQueryAst());
}

response.mutable_result()->set_query_full_diagnostics(kqpResponse.GetQueryDiagnostics());
if (collectDiagnostics) {
response.mutable_result()->mutable_query_stats()->set_query_diagnostics(kqpResponse.GetQueryDiagnostics());
}

Y_PROTOBUF_SUPPRESS_NODISCARD response.SerializeToString(&out);
Request_->SendSerializedResult(std::move(out), record.GetYdbStatus());
Expand Down
12 changes: 6 additions & 6 deletions ydb/core/kqp/common/compilation/events.h
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,7 @@ namespace NKikimr::NKqp::NPrivateEvents {

struct TEvCompileRequest: public TEventLocal<TEvCompileRequest, TKqpEvents::EvCompileRequest> {
TEvCompileRequest(const TIntrusiveConstPtr<NACLib::TUserToken>& userToken, const TString& clientAddress, const TMaybe<TString>& uid,
TMaybe<TKqpQueryId>&& query, bool keepInCache, bool isQueryActionPrepare, bool perStatementResult, TInstant deadline,
TMaybe<TKqpQueryId>&& query, bool keepInCache, NKikimrKqp::EQueryAction queryAction, bool perStatementResult, TInstant deadline,
TKqpDbCountersPtr dbCounters, const TGUCSettings::TPtr& gUCSettings, const TMaybe<TString>& applicationName,
std::shared_ptr<std::atomic<bool>> intrestedInResult, const TIntrusivePtr<TUserRequestContext>& userRequestContext, NLWTrace::TOrbit orbit = {},
TKqpTempTablesState::TConstPtr tempTablesState = nullptr, bool collectDiagnostics = false, TMaybe<TQueryAst> queryAst = Nothing(),
Expand All @@ -26,7 +26,7 @@ struct TEvCompileRequest: public TEventLocal<TEvCompileRequest, TKqpEvents::EvCo
, Uid(uid)
, Query(std::move(query))
, KeepInCache(keepInCache)
, IsQueryActionPrepare(isQueryActionPrepare)
, QueryAction(queryAction)
, PerStatementResult(perStatementResult)
, Deadline(deadline)
, DbCounters(dbCounters)
Expand All @@ -50,7 +50,7 @@ struct TEvCompileRequest: public TEventLocal<TEvCompileRequest, TKqpEvents::EvCo
TMaybe<TString> Uid;
TMaybe<TKqpQueryId> Query;
bool KeepInCache = false;
bool IsQueryActionPrepare = false;
NKikimrKqp::EQueryAction QueryAction;
bool PerStatementResult = false;
// it is allowed for local event to use absolute time (TInstant) instead of time interval (TDuration)
TInstant Deadline;
Expand All @@ -76,7 +76,7 @@ struct TEvCompileRequest: public TEventLocal<TEvCompileRequest, TKqpEvents::EvCo

struct TEvRecompileRequest: public TEventLocal<TEvRecompileRequest, TKqpEvents::EvRecompileRequest> {
TEvRecompileRequest(const TIntrusiveConstPtr<NACLib::TUserToken>& userToken, const TString& clientAddress, const TString& uid,
const TMaybe<TKqpQueryId>& query, bool isQueryActionPrepare, TInstant deadline,
const TMaybe<TKqpQueryId>& query, NKikimrKqp::EQueryAction queryAction, TInstant deadline,
TKqpDbCountersPtr dbCounters, const TGUCSettings::TPtr& gUCSettings, const TMaybe<TString>& applicationName,
std::shared_ptr<std::atomic<bool>> intrestedInResult, const TIntrusivePtr<TUserRequestContext>& userRequestContext,
NLWTrace::TOrbit orbit = {}, TKqpTempTablesState::TConstPtr tempTablesState = nullptr, TMaybe<TQueryAst> queryAst = Nothing(),
Expand All @@ -85,7 +85,7 @@ struct TEvRecompileRequest: public TEventLocal<TEvRecompileRequest, TKqpEvents::
, ClientAddress(clientAddress)
, Uid(uid)
, Query(query)
, IsQueryActionPrepare(isQueryActionPrepare)
, QueryAction(queryAction)
, Deadline(deadline)
, DbCounters(dbCounters)
, GUCSettings(gUCSettings)
Expand All @@ -105,7 +105,7 @@ struct TEvRecompileRequest: public TEventLocal<TEvRecompileRequest, TKqpEvents::
TString ClientAddress;
TString Uid;
TMaybe<TKqpQueryId> Query;
bool IsQueryActionPrepare = false;
NKikimrKqp::EQueryAction QueryAction;

TInstant Deadline;
TKqpDbCountersPtr DbCounters;
Expand Down
2 changes: 1 addition & 1 deletion ydb/core/kqp/common/events/query.h
Original file line number Diff line number Diff line change
Expand Up @@ -282,7 +282,7 @@ struct TEvQueryRequest: public NActors::TEventLocal<TEvQueryRequest, TKqpEvents:
}

bool GetCollectDiagnostics() const {
return Record.GetRequest().GetCollectDiagnostics();
return Record.MutableRequest()->GetCollectDiagnostics();
}

ui32 CalculateSerializedSize() const override {
Expand Down
12 changes: 8 additions & 4 deletions ydb/core/kqp/compile_service/kqp_compile_actor.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -52,7 +52,7 @@ class TKqpCompileActor : public TActorBootstrapped<TKqpCompileActor> {
TKqpDbCountersPtr dbCounters, std::optional<TKqpFederatedQuerySetup> federatedQuerySetup,
const TIntrusivePtr<TUserRequestContext>& userRequestContext,
NWilson::TTraceId traceId, TKqpTempTablesState::TConstPtr tempTablesState, bool collectFullDiagnostics,
bool perStatementResult,
bool perStatementResult, NKikimrKqp::EQueryAction queryAction,
ECompileActorAction compileAction, TMaybe<TQueryAst> queryAst,
NYql::TExprContext* splitCtx,
NYql::TExprNode::TPtr splitExpr)
Expand All @@ -75,6 +75,7 @@ class TKqpCompileActor : public TActorBootstrapped<TKqpCompileActor> {
, CompileActorSpan(TWilsonKqp::CompileActor, std::move(traceId), "CompileActor")
, TempTablesState(std::move(tempTablesState))
, CollectFullDiagnostics(collectFullDiagnostics)
, QueryAction(queryAction)
, CompileAction(compileAction)
, QueryAst(std::move(queryAst))
, SplitCtx(splitCtx)
Expand Down Expand Up @@ -365,7 +366,9 @@ class TKqpCompileActor : public TActorBootstrapped<TKqpCompileActor> {
replayMessage.InsertValue("query_syntax", ToString(Config->_KqpYqlSyntaxVersion.Get().GetRef()));
replayMessage.InsertValue("query_database", QueryId.Database);
replayMessage.InsertValue("query_cluster", QueryId.Cluster);
replayMessage.InsertValue("query_plan", queryPlan);
if (QueryAction == NKikimrKqp::QUERY_ACTION_EXPLAIN) {
replayMessage.InsertValue("query_plan", queryPlan);
}
replayMessage.InsertValue("query_type", ToString(QueryId.Settings.QueryType));

if (CollectFullDiagnostics) {
Expand Down Expand Up @@ -613,6 +616,7 @@ class TKqpCompileActor : public TActorBootstrapped<TKqpCompileActor> {
bool CollectFullDiagnostics;

bool PerStatementResult;
NKikimrKqp::EQueryAction QueryAction;
ECompileActorAction CompileAction;
TMaybe<TQueryAst> QueryAst;

Expand Down Expand Up @@ -666,7 +670,7 @@ IActor* CreateKqpCompileActor(const TActorId& owner, const TKqpSettings::TConstP
const TString& uid, const TKqpQueryId& query, const TIntrusiveConstPtr<NACLib::TUserToken>& userToken, const TString& clientAddress,
std::optional<TKqpFederatedQuerySetup> federatedQuerySetup, TKqpDbCountersPtr dbCounters, const TGUCSettings::TPtr& gUCSettings,
const TMaybe<TString>& applicationName, const TIntrusivePtr<TUserRequestContext>& userRequestContext,
NWilson::TTraceId traceId, TKqpTempTablesState::TConstPtr tempTablesState,
NWilson::TTraceId traceId, TKqpTempTablesState::TConstPtr tempTablesState, NKikimrKqp::EQueryAction queryAction,
ECompileActorAction compileAction, TMaybe<TQueryAst> queryAst, bool collectFullDiagnostics,
bool perStatementResult, NYql::TExprContext* splitCtx, NYql::TExprNode::TPtr splitExpr)
{
Expand All @@ -675,7 +679,7 @@ IActor* CreateKqpCompileActor(const TActorId& owner, const TKqpSettings::TConstP
uid, query, userToken, clientAddress, dbCounters,
federatedQuerySetup, userRequestContext,
std::move(traceId), std::move(tempTablesState), collectFullDiagnostics,
perStatementResult, compileAction, std::move(queryAst),
perStatementResult, queryAction, compileAction, std::move(queryAst),
splitCtx, splitExpr);
}

Expand Down
14 changes: 7 additions & 7 deletions ydb/core/kqp/compile_service/kqp_compile_service.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -29,17 +29,17 @@ using namespace NYql;


struct TKqpCompileSettings {
TKqpCompileSettings(bool keepInCache, bool isQueryActionPrepare, bool perStatementResult,
TKqpCompileSettings(bool keepInCache, NKikimrKqp::EQueryAction queryAction, bool perStatementResult,
const TInstant& deadline, ECompileActorAction action = ECompileActorAction::COMPILE)
: KeepInCache(keepInCache)
, IsQueryActionPrepare(isQueryActionPrepare)
, QueryAction(queryAction)
, PerStatementResult(perStatementResult)
, Deadline(deadline)
, Action(action)
{}

bool KeepInCache;
bool IsQueryActionPrepare;
NKikimrKqp::EQueryAction QueryAction;
bool PerStatementResult;
TInstant Deadline;
ECompileActorAction Action;
Expand Down Expand Up @@ -467,7 +467,7 @@ class TKqpCompileService : public TActorBootstrapped<TKqpCompileService> {

TKqpCompileSettings compileSettings(
request.KeepInCache,
request.IsQueryActionPrepare,
request.QueryAction,
request.PerStatementResult,
request.Deadline,
ev->Get()->Split
Expand Down Expand Up @@ -535,7 +535,7 @@ class TKqpCompileService : public TActorBootstrapped<TKqpCompileService> {

TKqpCompileSettings compileSettings(
true,
request.IsQueryActionPrepare,
request.QueryAction,
false,
request.Deadline,
ev->Get()->Split
Expand Down Expand Up @@ -629,7 +629,7 @@ class TKqpCompileService : public TActorBootstrapped<TKqpCompileService> {
try {
if (compileResult->Status == Ydb::StatusIds::SUCCESS) {
if (!hasTempTablesNameClashes) {
UpdateQueryCache(ctx, compileResult, keepInCache, compileRequest.CompileSettings.IsQueryActionPrepare, isPerStatementExecution);
UpdateQueryCache(ctx, compileResult, keepInCache, compileRequest.CompileSettings.QueryAction == NKikimrKqp::QUERY_ACTION_PREPARE, isPerStatementExecution);
}

if (ev->Get()->ReplayMessage && !QueryReplayBackend->IsNull()) {
Expand Down Expand Up @@ -852,7 +852,7 @@ class TKqpCompileService : public TActorBootstrapped<TKqpCompileService> {
void StartCompilation(TKqpCompileRequest&& request, const TActorContext& ctx) {
auto compileActor = CreateKqpCompileActor(ctx.SelfID, KqpSettings, TableServiceConfig, QueryServiceConfig, ModuleResolverState, Counters,
request.Uid, request.Query, request.UserToken, request.ClientAddress, FederatedQuerySetup, request.DbCounters, request.GUCSettings, request.ApplicationName, request.UserRequestContext,
request.CompileServiceSpan.GetTraceId(), request.TempTablesState, request.CompileSettings.Action, std::move(request.QueryAst), CollectDiagnostics,
request.CompileServiceSpan.GetTraceId(), request.TempTablesState, request.CompileSettings.QueryAction, request.CompileSettings.Action, std::move(request.QueryAst), CollectDiagnostics,
request.CompileSettings.PerStatementResult, request.SplitCtx, request.SplitExpr);
auto compileActorId = ctx.ExecutorThread.RegisterActor(compileActor, TMailboxType::HTSwap,
AppData(ctx)->UserPoolId);
Expand Down
1 change: 1 addition & 0 deletions ydb/core/kqp/compile_service/kqp_compile_service.h
Original file line number Diff line number Diff line change
Expand Up @@ -166,6 +166,7 @@ IActor* CreateKqpCompileActor(const TActorId& owner, const TKqpSettings::TConstP
TKqpDbCountersPtr dbCounters, const TGUCSettings::TPtr& gUCSettings, const TMaybe<TString>& applicationName,
const TIntrusivePtr<TUserRequestContext>& userRequestContext, NWilson::TTraceId traceId = {},
TKqpTempTablesState::TConstPtr tempTablesState = nullptr,
NKikimrKqp::EQueryAction queryAction = NKikimrKqp::QUERY_ACTION_EXECUTE,
ECompileActorAction compileAction = ECompileActorAction::COMPILE,
TMaybe<TQueryAst> queryAst = {},
bool collectFullDiagnostics = false,
Expand Down
9 changes: 3 additions & 6 deletions ydb/core/kqp/session_actor/kqp_query_state.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -300,16 +300,14 @@ std::unique_ptr<TEvKqp::TEvCompileRequest> TKqpQueryState::BuildCompileRequest(s
compileDeadline = Min(compileDeadline, QueryDeadlines.CancelAt);
}

bool isQueryActionPrepare = GetAction() == NKikimrKqp::QUERY_ACTION_PREPARE;

TMaybe<TQueryAst> statementAst;
if (!Statements.empty()) {
YQL_ENSURE(CurrentStatementId < Statements.size());
statementAst = Statements[CurrentStatementId];
}

return std::make_unique<TEvKqp::TEvCompileRequest>(UserToken, ClientAddress, uid, std::move(query), keepInCache,
isQueryActionPrepare, perStatementResult, compileDeadline, DbCounters, gUCSettingsPtr, ApplicationName, std::move(cookie),
GetAction(), perStatementResult, compileDeadline, DbCounters, gUCSettingsPtr, ApplicationName, std::move(cookie),
UserRequestContext, std::move(Orbit), TempTablesState, GetCollectDiagnostics(), statementAst);
}

Expand Down Expand Up @@ -344,12 +342,11 @@ std::unique_ptr<TEvKqp::TEvRecompileRequest> TKqpQueryState::BuildReCompileReque
}

auto compileDeadline = QueryDeadlines.TimeoutAt;
bool isQueryActionPrepare = GetAction() == NKikimrKqp::QUERY_ACTION_PREPARE;
if (QueryDeadlines.CancelAt) {
compileDeadline = Min(compileDeadline, QueryDeadlines.CancelAt);
}

return std::make_unique<TEvKqp::TEvRecompileRequest>(UserToken, ClientAddress, CompileResult->Uid, query, isQueryActionPrepare,
return std::make_unique<TEvKqp::TEvRecompileRequest>(UserToken, ClientAddress, CompileResult->Uid, query, GetAction(),
compileDeadline, DbCounters, gUCSettingsPtr, ApplicationName, std::move(cookie), UserRequestContext, std::move(Orbit), TempTablesState,
CompileResult->QueryAst);
}
Expand Down Expand Up @@ -392,7 +389,7 @@ std::unique_ptr<TEvKqp::TEvCompileRequest> TKqpQueryState::BuildCompileSplittedR
}

return std::make_unique<TEvKqp::TEvCompileRequest>(UserToken, ClientAddress, uid, std::move(query), false,
false, perStatementResult, compileDeadline, DbCounters, gUCSettingsPtr, ApplicationName, std::move(cookie),
NKikimrKqp::QUERY_ACTION_EXECUTE, perStatementResult, compileDeadline, DbCounters, gUCSettingsPtr, ApplicationName, std::move(cookie),
UserRequestContext, std::move(Orbit), TempTablesState, GetCollectDiagnostics(), statementAst,
false, SplittedCtx.get(), SplittedExprs.at(NextSplittedExpr));
}
Expand Down
4 changes: 2 additions & 2 deletions ydb/core/kqp/ut/olap/kqp_olap_ut.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -308,7 +308,7 @@ Y_UNIT_TEST_SUITE(KqpOlap) {

{
TStreamExecScanQuerySettings settings;
settings.CollectQueryStats(ECollectQueryStatsMode::Full);
settings.CollectQueryStats(ECollectQueryStatsMode::Basic);
auto it = client.StreamExecuteScanQuery(R"(
--!syntax_v1
SELECT `resource_id`, `timestamp`
Expand All @@ -325,7 +325,7 @@ Y_UNIT_TEST_SUITE(KqpOlap) {
{
TStreamExecScanQuerySettings settings;
settings.CollectQueryStats(ECollectQueryStatsMode::Full);
settings.CollectFullDiagnostics(true);

auto it = client.StreamExecuteScanQuery(R"(
--!syntax_v1
SELECT `resource_id`, `timestamp`
Expand Down
Loading

0 comments on commit c15077f

Please sign in to comment.