From 96de0614cb9c66fdd11b46784a6031baf809f414 Mon Sep 17 00:00:00 2001 From: Nikita Vasilev <31595000+nikvas0@users.noreply.github.com> Date: Wed, 27 Dec 2023 14:06:51 +0300 Subject: [PATCH] Session actor perf (#724) --- ydb/core/kqp/common/compilation/events.h | 2 +- ydb/core/kqp/common/compilation/result.h | 6 + ydb/core/kqp/common/kqp_ru_calc.cpp | 72 ++---- ydb/core/kqp/common/kqp_ru_calc.h | 18 +- .../kqp/compile_service/kqp_compile_actor.cpp | 6 +- .../compile_service/kqp_compile_service.cpp | 10 +- ydb/core/kqp/provider/yql_kikimr_provider.h | 53 ++-- .../kqp/session_actor/kqp_query_state.cpp | 2 +- ydb/core/kqp/session_actor/kqp_query_state.h | 8 +- .../kqp/session_actor/kqp_query_stats.cpp | 230 ++++++++++++++++++ ydb/core/kqp/session_actor/kqp_query_stats.h | 40 +++ .../kqp/session_actor/kqp_session_actor.cpp | 57 +++-- ydb/core/kqp/session_actor/kqp_tx.h | 106 ++++---- .../kqp/session_actor/kqp_worker_actor.cpp | 5 +- ydb/core/kqp/session_actor/ya.make | 1 + ydb/core/sys_view/service/sysview_service.cpp | 112 --------- ydb/core/sys_view/service/sysview_service.h | 5 - 17 files changed, 458 insertions(+), 275 deletions(-) create mode 100644 ydb/core/kqp/session_actor/kqp_query_stats.cpp create mode 100644 ydb/core/kqp/session_actor/kqp_query_stats.h diff --git a/ydb/core/kqp/common/compilation/events.h b/ydb/core/kqp/common/compilation/events.h index 92f5511e9b93..c1272f74a833 100644 --- a/ydb/core/kqp/common/compilation/events.h +++ b/ydb/core/kqp/common/compilation/events.h @@ -91,7 +91,7 @@ struct TEvCompileResponse: public TEventLocal ReplayMessage; std::optional ReplayMessageUserView; diff --git a/ydb/core/kqp/common/compilation/result.h b/ydb/core/kqp/common/compilation/result.h index d66543ee943f..20fb30ea37ff 100644 --- a/ydb/core/kqp/common/compilation/result.h +++ b/ydb/core/kqp/common/compilation/result.h @@ -41,4 +41,10 @@ struct TKqpCompileResult { std::shared_ptr PreparedQuery; }; + +struct TKqpStatsCompile { + bool FromCache = false; + ui64 DurationUs = 0; + ui64 CpuTimeUs = 0; +}; } // namespace NKikimr::NKqp diff --git a/ydb/core/kqp/common/kqp_ru_calc.cpp b/ydb/core/kqp/common/kqp_ru_calc.cpp index 66e2d7c9fff2..d6f353b2b193 100644 --- a/ydb/core/kqp/common/kqp_ru_calc.cpp +++ b/ydb/core/kqp/common/kqp_ru_calc.cpp @@ -1,9 +1,5 @@ #include "kqp_ru_calc.h" -#include - -#include - #include #include @@ -14,8 +10,6 @@ namespace NKikimr { namespace NKqp { namespace NRuCalc { -namespace { - ui64 CalcReadIORu(const TTableStat& stat) { constexpr ui64 bytesPerUnit = 4_KB; constexpr ui64 bytesPerUnitAdjust = bytesPerUnit - 1; @@ -24,65 +18,33 @@ ui64 CalcReadIORu(const TTableStat& stat) { return std::max(bytes, stat.Rows); } -class TIoReadStat: public TTableStat { -public: - void Add(const NYql::NDqProto::TDqTableStats& tableAggr) { - Rows += tableAggr.GetReadRows(); - Bytes += tableAggr.GetReadBytes(); - } - - ui64 CalcRu() const { - return CalcReadIORu(*this); - } -}; +void TIoReadStat::Add(const NYql::NDqProto::TDqTableStats& tableAggr) { + Rows += tableAggr.GetReadRows(); + Bytes += tableAggr.GetReadBytes(); +} -class TIoWriteStat: public TTableStat { -public: - void Add(const NYql::NDqProto::TDqTableStats& tableAggr) { - Rows += tableAggr.GetWriteRows(); - Rows += tableAggr.GetEraseRows(); - Bytes += tableAggr.GetWriteBytes(); - } +ui64 TIoReadStat::CalcRu() const { + return CalcReadIORu(*this); +} - ui64 CalcRu() const { - constexpr ui64 bytesPerUnit = 1_KB; - constexpr ui64 bytesPerUnitAdjust = bytesPerUnit - 1; +void TIoWriteStat::Add(const NYql::NDqProto::TDqTableStats& tableAggr) { + Rows += tableAggr.GetWriteRows(); + Rows += tableAggr.GetEraseRows(); + Bytes += tableAggr.GetWriteBytes(); +} - auto bytes = (Bytes + bytesPerUnitAdjust) / bytesPerUnit; - return 2 * std::max(bytes, Rows); - } -}; +ui64 TIoWriteStat::CalcRu() const { + constexpr ui64 bytesPerUnit = 1_KB; + constexpr ui64 bytesPerUnitAdjust = bytesPerUnit - 1; + auto bytes = (Bytes + bytesPerUnitAdjust) / bytesPerUnit; + return 2 * std::max(bytes, Rows); } ui64 CpuTimeToUnit(TDuration cpuTime) { return std::floor(cpuTime.MicroSeconds() / 1500.0); } -ui64 CalcRequestUnit(const NKqpProto::TKqpStatsQuery& stats) { - TDuration totalCpuTime; - TIoReadStat totalReadStat; - TIoWriteStat totalWriteStat; - - for (const auto& exec : stats.GetExecutions()) { - totalCpuTime += TDuration::MicroSeconds(exec.GetCpuTimeUs()); - - for (auto& table : exec.GetTables()) { - totalReadStat.Add(table); - } - } - - if (stats.HasCompilation()) { - totalCpuTime += TDuration::MicroSeconds(stats.GetCompilation().GetCpuTimeUs()); - } - - totalCpuTime += TDuration::MicroSeconds(stats.GetWorkerCpuTimeUs()); - - auto totalIoRu = totalReadStat.CalcRu() + totalWriteStat.CalcRu(); - - return std::max(std::max(CpuTimeToUnit(totalCpuTime), totalIoRu), (ui64)1); -} - ui64 CalcRequestUnit(const TProgressStatEntry& stats) { auto ioRu = CalcReadIORu(stats.ReadIOStat); diff --git a/ydb/core/kqp/common/kqp_ru_calc.h b/ydb/core/kqp/common/kqp_ru_calc.h index b7012049595c..8b03ae6eedcd 100644 --- a/ydb/core/kqp/common/kqp_ru_calc.h +++ b/ydb/core/kqp/common/kqp_ru_calc.h @@ -3,6 +3,9 @@ #include #include +#include +#include + namespace NKqpProto { class TKqpStatsQuery; } @@ -14,8 +17,21 @@ struct TProgressStatEntry; namespace NRuCalc { +ui64 CalcReadIORu(const TTableStat& stat); + +class TIoReadStat: public TTableStat { +public: + void Add(const NYql::NDqProto::TDqTableStats& tableAggr); + ui64 CalcRu() const; +}; + +class TIoWriteStat: public TTableStat { +public: + void Add(const NYql::NDqProto::TDqTableStats& tableAggr); + ui64 CalcRu() const; +}; + ui64 CpuTimeToUnit(TDuration cpuTimeUs); -ui64 CalcRequestUnit(const NKqpProto::TKqpStatsQuery& stats); ui64 CalcRequestUnit(const TProgressStatEntry& stats); } // namespace NRuCalc diff --git a/ydb/core/kqp/compile_service/kqp_compile_actor.cpp b/ydb/core/kqp/compile_service/kqp_compile_actor.cpp index 5741fe4cfc8c..ae63b50d9df5 100644 --- a/ydb/core/kqp/compile_service/kqp_compile_actor.cpp +++ b/ydb/core/kqp/compile_service/kqp_compile_actor.cpp @@ -317,9 +317,9 @@ class TKqpCompileActor : public TActorBootstrapped { ReplayMessage = std::nullopt; ReplayMessageUserView = std::nullopt; auto& stats = responseEv->Stats; - stats.SetFromCache(false); - stats.SetDurationUs((TInstant::Now() - StartTime).MicroSeconds()); - stats.SetCpuTimeUs(CompileCpuTime.MicroSeconds()); + stats.FromCache = false; + stats.DurationUs = (TInstant::Now() - StartTime).MicroSeconds(); + stats.CpuTimeUs = CompileCpuTime.MicroSeconds(); Send(Owner, responseEv.Release()); Counters->ReportCompileFinish(DbCounters); diff --git a/ydb/core/kqp/compile_service/kqp_compile_service.cpp b/ydb/core/kqp/compile_service/kqp_compile_service.cpp index 522ee57eac27..6d6414fca96c 100644 --- a/ydb/core/kqp/compile_service/kqp_compile_service.cpp +++ b/ydb/core/kqp/compile_service/kqp_compile_service.cpp @@ -938,7 +938,7 @@ class TKqpCompileService : public TActorBootstrapped { } void Reply(const TActorId& sender, const TKqpCompileResult::TConstPtr& compileResult, - const NKqpProto::TKqpStatsCompile& compileStats, const TActorContext& ctx, ui64 cookie, + const TKqpStatsCompile& compileStats, const TActorContext& ctx, ui64 cookie, NLWTrace::TOrbit orbit, NWilson::TSpan span, const std::optional& replayMessage = std::nullopt) { const auto& query = compileResult->Query; @@ -953,7 +953,7 @@ class TKqpCompileService : public TActorBootstrapped { << ", status:" << compileResult->Status); auto responseEv = MakeHolder(compileResult, std::move(orbit), replayMessage); - responseEv->Stats.CopyFrom(compileStats); + responseEv->Stats = compileStats; if (span) { span.End(); @@ -965,8 +965,8 @@ class TKqpCompileService : public TActorBootstrapped { void ReplyFromCache(const TActorId& sender, const TKqpCompileResult::TConstPtr& compileResult, const TActorContext& ctx, ui64 cookie, NLWTrace::TOrbit orbit, NWilson::TSpan span) { - NKqpProto::TKqpStatsCompile stats; - stats.SetFromCache(true); + TKqpStatsCompile stats; + stats.FromCache = true; LWTRACK(KqpCompileServiceReplyFromCache, orbit); Reply(sender, compileResult, stats, ctx, cookie, std::move(orbit), std::move(span)); @@ -976,7 +976,7 @@ class TKqpCompileService : public TActorBootstrapped { const TIssues& issues, const TActorContext& ctx, ui64 cookie, NLWTrace::TOrbit orbit, NWilson::TSpan span) { LWTRACK(KqpCompileServiceReplyError, orbit); - Reply(sender, TKqpCompileResult::Make(uid, status, issues, ETableReadType::Other), NKqpProto::TKqpStatsCompile(), ctx, cookie, std::move(orbit), std::move(span)); + Reply(sender, TKqpCompileResult::Make(uid, status, issues, ETableReadType::Other), TKqpStatsCompile{}, ctx, cookie, std::move(orbit), std::move(span)); } void ReplyInternalError(const TActorId& sender, const TString& uid, const TString& message, diff --git a/ydb/core/kqp/provider/yql_kikimr_provider.h b/ydb/core/kqp/provider/yql_kikimr_provider.h index f662ce6becfe..368808aeaeca 100644 --- a/ydb/core/kqp/provider/yql_kikimr_provider.h +++ b/ydb/core/kqp/provider/yql_kikimr_provider.h @@ -308,34 +308,44 @@ class TKikimrTransactionContextBase : public TThrRefBase { bool hasScheme = false; bool hasData = false; - for (auto& pair : TableOperations) { - hasScheme = hasScheme || (pair.second & KikimrSchemeOps()); - hasData = hasData || (pair.second & KikimrDataOps()); + for (auto& [_, operation] : TableOperations) { + hasScheme = hasScheme || (operation & KikimrSchemeOps()); + hasData = hasData || (operation & KikimrDataOps()); + } + + THashMap tableInfoMap; + tableInfoMap.reserve(tableInfos.size()); + if (TableByIdMap.empty()) { + TableByIdMap.reserve(tableInfos.size()); + } + if (TableOperations.empty()) { + TableOperations.reserve(operations.size()); } - THashMap tableInfoMap; for (const auto& info : tableInfos) { - tableInfoMap.insert(std::make_pair(info.GetTableName(), info)); + tableInfoMap.emplace(info.GetTableName(), &info); TKikimrPathId pathId(info.GetTableId().GetOwnerId(), info.GetTableId().GetTableId()); - TableByIdMap.insert(std::make_pair(pathId, info.GetTableName())); + TableByIdMap.emplace(pathId, info.GetTableName()); } for (const auto& op : operations) { - auto table = op.GetTable(); + const auto& table = [&]() -> const TString& { + const auto tempTable = TempTables.FindPtr(op.GetTable()); + if (tempTable) { + return *tempTable; + } else { + return op.GetTable(); + } + }(); - auto newOp = TYdbOperation(op.GetOperation()); - TPosition pos(op.GetPosition().GetColumn(), op.GetPosition().GetRow()); - - auto tempTable = TempTables.FindPtr(table); - if (tempTable) { - table = *tempTable; - } + const auto newOp = TYdbOperation(op.GetOperation()); const auto info = tableInfoMap.FindPtr(table); if (!info) { TString message = TStringBuilder() << "Unable to find table info for table '" << table << "'"; + const TPosition pos(op.GetPosition().GetColumn(), op.GetPosition().GetRow()); issues.AddIssue(YqlIssue(pos, TIssuesIds::KIKIMR_SCHEME_ERROR, message)); return {false, issues}; } @@ -343,6 +353,7 @@ class TKikimrTransactionContextBase : public TThrRefBase { if (queryType == EKikimrQueryType::Dml && (newOp & KikimrSchemeOps())) { TString message = TStringBuilder() << "Operation '" << newOp << "' can't be performed in data query"; + const TPosition pos(op.GetPosition().GetColumn(), op.GetPosition().GetRow()); issues.AddIssue(YqlIssue(pos, TIssuesIds::KIKIMR_BAD_OPERATION, message)); return {false, issues}; } @@ -351,6 +362,7 @@ class TKikimrTransactionContextBase : public TThrRefBase { if (EffectiveIsolationLevel) { TString message = TStringBuilder() << "Scheme operations can't be performed inside transaction, " << "operation: " << newOp; + const TPosition pos(op.GetPosition().GetColumn(), op.GetPosition().GetRow()); issues.AddIssue(YqlIssue(pos, TIssuesIds::KIKIMR_BAD_OPERATION, message)); return {false, issues}; } @@ -359,6 +371,7 @@ class TKikimrTransactionContextBase : public TThrRefBase { if (queryType == EKikimrQueryType::Ddl && (newOp & KikimrDataOps())) { TString message = TStringBuilder() << "Operation '" << newOp << "' can't be performed in scheme query"; + const TPosition pos(op.GetPosition().GetColumn(), op.GetPosition().GetRow()); issues.AddIssue(YqlIssue(pos, TIssuesIds::KIKIMR_BAD_OPERATION, message)); return {false, issues}; } @@ -366,6 +379,7 @@ class TKikimrTransactionContextBase : public TThrRefBase { if (queryType == EKikimrQueryType::Scan && (newOp & KikimrModifyOps())) { TString message = TStringBuilder() << "Operation '" << newOp << "' can't be performed in scan query"; + const TPosition pos(op.GetPosition().GetColumn(), op.GetPosition().GetRow()); issues.AddIssue(YqlIssue(pos, TIssuesIds::KIKIMR_BAD_OPERATION, message)); return {false, issues}; } @@ -379,7 +393,7 @@ class TKikimrTransactionContextBase : public TThrRefBase { message = TStringBuilder() << message << " Use COMMIT statement to indicate end of transaction between scheme and data operations."; } - + const TPosition pos(op.GetPosition().GetColumn(), op.GetPosition().GetRow()); issues.AddIssue(YqlIssue(pos, TIssuesIds::KIKIMR_MIXED_SCHEME_DATA_TX, message)); return {false, issues}; } @@ -387,18 +401,20 @@ class TKikimrTransactionContextBase : public TThrRefBase { if (Readonly && (newOp & KikimrModifyOps())) { TString message = TStringBuilder() << "Operation '" << newOp << "' can't be performed in read only transaction"; + const TPosition pos(op.GetPosition().GetColumn(), op.GetPosition().GetRow()); issues.AddIssue(YqlIssue(pos, TIssuesIds::KIKIMR_BAD_OPERATION, message)); return {false, issues}; } auto& currentOps = TableOperations[table]; - bool currentModify = currentOps & KikimrModifyOps(); + const bool currentModify = currentOps & KikimrModifyOps(); if (currentModify) { if (KikimrReadOps() & newOp) { if (!EnableImmediateEffects) { TString message = TStringBuilder() << "Data modifications previously made to table '" << table << "' in current transaction won't be seen by operation: '" << newOp << "'"; + const TPosition pos(op.GetPosition().GetColumn(), op.GetPosition().GetRow()); auto newIssue = AddDmlIssue(YqlIssue(pos, TIssuesIds::KIKIMR_READ_MODIFIED_TABLE, message)); issues.AddIssue(newIssue); return {false, issues}; @@ -407,10 +423,11 @@ class TKikimrTransactionContextBase : public TThrRefBase { HasUncommittedChangesRead = true; } - if (info->GetHasIndexTables()) { + if ((*info)->GetHasIndexTables()) { if (!EnableImmediateEffects) { TString message = TStringBuilder() << "Multiple modification of table with secondary indexes is not supported yet"; + const TPosition pos(op.GetPosition().GetColumn(), op.GetPosition().GetRow()); issues.AddIssue(YqlIssue(pos, TIssuesIds::KIKIMR_BAD_OPERATION, message)); return {false, issues}; } @@ -428,9 +445,9 @@ class TKikimrTransactionContextBase : public TThrRefBase { virtual ~TKikimrTransactionContextBase() = default; public: - THashMap TableOperations; bool HasUncommittedChangesRead = false; const bool EnableImmediateEffects; + THashMap TableOperations; THashMap TableByIdMap; TMaybe EffectiveIsolationLevel; THashMap TempTables; diff --git a/ydb/core/kqp/session_actor/kqp_query_state.cpp b/ydb/core/kqp/session_actor/kqp_query_state.cpp index dbf51494820c..74571ae2f8cc 100644 --- a/ydb/core/kqp/session_actor/kqp_query_state.cpp +++ b/ydb/core/kqp/session_actor/kqp_query_state.cpp @@ -115,7 +115,7 @@ bool TKqpQueryState::SaveAndCheckCompileResult(TEvKqp::TEvCompileResponse* ev) { YQL_ENSURE(compiledVersion == NKikimrKqp::TPreparedQuery::VERSION_PHYSICAL_V1, "Unexpected prepared query version: " << compiledVersion); - CompileStats.Swap(&ev->Stats); + CompileStats = ev->Stats; PreparedQuery = CompileResult->PreparedQuery; if (ev->ReplayMessage) { ReplayMessage = *ev->ReplayMessage; diff --git a/ydb/core/kqp/session_actor/kqp_query_state.h b/ydb/core/kqp/session_actor/kqp_query_state.h index 0eb2c0ee046b..94f498007e4e 100644 --- a/ydb/core/kqp/session_actor/kqp_query_state.h +++ b/ydb/core/kqp/session_actor/kqp_query_state.h @@ -1,5 +1,6 @@ #pragma once +#include "kqp_query_stats.h" #include "kqp_worker_common.h" #include @@ -85,7 +86,7 @@ class TKqpQueryState : public TNonCopyable { ui64 ParametersSize = 0; TPreparedQueryHolder::TConstPtr PreparedQuery; TKqpCompileResult::TConstPtr CompileResult; - NKqpProto::TKqpStatsCompile CompileStats; + TKqpStatsCompile CompileStats; TIntrusivePtr TxCtx; TQueryData::TPtr QueryData; @@ -97,8 +98,7 @@ class TKqpQueryState : public TNonCopyable { TInstant StartTime; NYql::TKikimrQueryDeadlines QueryDeadlines; - - NKqpProto::TKqpStatsQuery Stats; + TKqpQueryStats QueryStats; bool KeepSession = false; TIntrusiveConstPtr UserToken; NActors::TMonotonic StartedAt; @@ -235,7 +235,7 @@ class TKqpQueryState : public TNonCopyable { } bool NeedCheckTableVersions() const { - return CompileStats.GetFromCache(); + return CompileStats.FromCache; } TString ExtractQueryText() const { diff --git a/ydb/core/kqp/session_actor/kqp_query_stats.cpp b/ydb/core/kqp/session_actor/kqp_query_stats.cpp new file mode 100644 index 000000000000..e26d6b5e7b8f --- /dev/null +++ b/ydb/core/kqp/session_actor/kqp_query_stats.cpp @@ -0,0 +1,230 @@ +#include + +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include + +namespace NKikimr { +namespace NKqp { + +const TVector& TKqpQueryStats::GetExecutions() const { + return Executions; +} + +ui64 TKqpQueryStats::GetWorkerCpuTimeUs() const { + return WorkerCpuTimeUs; +} + +constexpr size_t QUERY_TEXT_LIMIT = 4096; + +template +void CollectQueryStatsImpl(const TActorContext& ctx, const T* queryStats, + TDuration queryDuration, const TString& queryText, + const TString& userSID, ui64 parametersSize, const TString& database, + const NKikimrKqp::EQueryType type, ui64 requestUnits) +{ + if (!AppData()->FeatureFlags.GetEnableSystemViews()) { + return; + } + + auto collectEv = MakeHolder(); + collectEv->Database = database; + + auto& stats = collectEv->QueryStats; + auto& dataStats = *stats.MutableStats(); + auto& shardsCpuTime = *stats.MutableShardsCpuTimeUs(); + auto& computeCpuTime = *stats.MutableComputeCpuTimeUs(); + + auto nodeId = ctx.SelfID.NodeId(); + stats.SetNodeId(nodeId); + + stats.SetEndTimeMs(TInstant::Now().MilliSeconds()); + stats.SetDurationMs(queryDuration.MilliSeconds()); + + stats.SetQueryTextHash(MurmurHash(queryText.data(), queryText.size())); + if (queryText.size() > QUERY_TEXT_LIMIT) { + auto limitedText = queryText.substr(0, QUERY_TEXT_LIMIT); + stats.SetQueryText(limitedText); + } else { + stats.SetQueryText(queryText); + } + + if (userSID) { + stats.SetUserSID(userSID); + } + stats.SetParametersSize(parametersSize); + + TString strType; + switch (type) { + case NKikimrKqp::QUERY_TYPE_SQL_DML: + case NKikimrKqp::QUERY_TYPE_PREPARED_DML: + strType = "data"; + break; + case NKikimrKqp::QUERY_TYPE_SQL_SCAN: + strType = "scan"; + break; + case NKikimrKqp::QUERY_TYPE_SQL_SCRIPT: + case NKikimrKqp::QUERY_TYPE_SQL_SCRIPT_STREAMING: + strType = "script"; + break; + default: + break; + } + stats.SetType(strType); + + if (!queryStats || queryStats->GetExecutions().empty()) { + ctx.Send(NSysView::MakeSysViewServiceID(nodeId), std::move(collectEv)); + return; + } + + shardsCpuTime.SetMin(Max()); + computeCpuTime.SetMin(Max()); + + auto aggregate = [] (NKikimrSysView::TStatsAggr& to, const NYql::NDqProto::TDqStatsAggr& from) { + to.SetMin(std::min(to.GetMin(), from.GetMin())); + to.SetMax(std::max(to.GetMax(), from.GetMax())); + to.SetSum(to.GetSum() + from.GetSum()); + to.SetCnt(to.GetCnt() + from.GetCnt()); + }; + + for (const NYql::NDqProto::TDqExecutionStats& exec : queryStats->GetExecutions()) { + NKqpProto::TKqpExecutionExtraStats execExtra; + if (exec.HasExtra()) { + bool ok = exec.GetExtra().UnpackTo(&execExtra); + Y_UNUSED(ok); + } + + dataStats.SetPartitionCount(dataStats.GetPartitionCount() + execExtra.GetAffectedShards()); + + for (auto& table : exec.GetTables()) { + dataStats.SetReadRows(dataStats.GetReadRows() + table.GetReadRows()); + dataStats.SetReadBytes(dataStats.GetReadBytes() + table.GetReadBytes()); + dataStats.SetUpdateRows(dataStats.GetUpdateRows() + table.GetWriteRows()); + dataStats.SetUpdateBytes(dataStats.GetUpdateBytes() + table.GetWriteBytes()); + dataStats.SetDeleteRows(dataStats.GetDeleteRows() + table.GetEraseRows()); + } + + aggregate(shardsCpuTime, execExtra.GetShardsCpuTimeUs()); + aggregate(computeCpuTime, execExtra.GetComputeCpuTimeUs()); + } + + if constexpr (std::is_same_v) { + if (queryStats->HasCompilation()) { + const auto& compileStats = queryStats->GetCompilation(); + stats.SetFromQueryCache(compileStats.GetFromCache()); + stats.SetCompileDurationMs(compileStats.GetDurationUs() / 1'000); + stats.SetCompileCpuTimeUs(compileStats.GetCpuTimeUs()); + } + } else { + if (queryStats->Compilation) { + const auto& compileStats = *queryStats->Compilation; + stats.SetFromQueryCache(compileStats.FromCache); + stats.SetCompileDurationMs(compileStats.DurationUs / 1'000); + stats.SetCompileCpuTimeUs(compileStats.CpuTimeUs); + } + } + + stats.SetProcessCpuTimeUs(queryStats->GetWorkerCpuTimeUs()); + stats.SetTotalCpuTimeUs( + stats.GetProcessCpuTimeUs() + + stats.GetCompileCpuTimeUs() + + stats.GetShardsCpuTimeUs().GetSum() + + stats.GetComputeCpuTimeUs().GetSum() + ); + + stats.SetRequestUnits(requestUnits); + + ctx.Send(NSysView::MakeSysViewServiceID(nodeId), std::move(collectEv)); +} + +void CollectQueryStats(const TActorContext& ctx, const NKqpProto::TKqpStatsQuery* queryStats, + TDuration queryDuration, const TString& queryText, + const TString& userSID, ui64 parametersSize, const TString& database, + const NKikimrKqp::EQueryType type, ui64 requestUnits) +{ + CollectQueryStatsImpl( + ctx, queryStats, queryDuration, queryText, userSID, + parametersSize, database, type, requestUnits); +} + +void CollectQueryStats(const TActorContext& ctx, const TKqpQueryStats* queryStats, + TDuration queryDuration, const TString& queryText, + const TString& userSID, ui64 parametersSize, const TString& database, + const NKikimrKqp::EQueryType type, ui64 requestUnits) +{ + CollectQueryStatsImpl( + ctx, queryStats, queryDuration, queryText, userSID, + parametersSize, database, type, requestUnits); +} + +template +ui64 CalcRequestUnitImpl(const T& stats) { + TDuration totalCpuTime; + NRuCalc::TIoReadStat totalReadStat; + NRuCalc::TIoWriteStat totalWriteStat; + + for (const auto& exec : stats.GetExecutions()) { + totalCpuTime += TDuration::MicroSeconds(exec.GetCpuTimeUs()); + + for (auto& table : exec.GetTables()) { + totalReadStat.Add(table); + } + } + + if constexpr (std::is_same_v) { + if (stats.HasCompilation()) { + totalCpuTime += TDuration::MicroSeconds(stats.GetCompilation().GetCpuTimeUs()); + } + } else { + if (stats.Compilation) { + totalCpuTime += TDuration::MicroSeconds(stats.Compilation->CpuTimeUs); + } + } + + totalCpuTime += TDuration::MicroSeconds(stats.GetWorkerCpuTimeUs()); + + auto totalIoRu = totalReadStat.CalcRu() + totalWriteStat.CalcRu(); + + return std::max(std::max(NRuCalc::CpuTimeToUnit(totalCpuTime), totalIoRu), (ui64)1); +} + +ui64 CalcRequestUnit(const NKqpProto::TKqpStatsQuery& stats) { + return CalcRequestUnitImpl(stats); +} + +ui64 CalcRequestUnit(const TKqpQueryStats& stats) { + return CalcRequestUnitImpl(stats); +} + +NKqpProto::TKqpStatsQuery TKqpQueryStats::ToProto() const { + NKqpProto::TKqpStatsQuery result; + result.SetDurationUs(DurationUs); + + if (Compilation) { + result.MutableCompilation()->SetFromCache(Compilation->FromCache); + result.MutableCompilation()->SetDurationUs(Compilation->DurationUs); + result.MutableCompilation()->SetCpuTimeUs(Compilation->CpuTimeUs); + } + + result.SetWorkerCpuTimeUs(WorkerCpuTimeUs); + result.SetReadSetsCount(ReadSetsCount); + result.SetMaxShardProgramSize(MaxShardProgramSize); + result.SetMaxShardReplySize(MaxShardReplySize); + + result.MutableExecutions()->Add(std::begin(Executions), std::end(Executions)); + return result; +} + +} +} diff --git a/ydb/core/kqp/session_actor/kqp_query_stats.h b/ydb/core/kqp/session_actor/kqp_query_stats.h new file mode 100644 index 000000000000..e1c5b5f835e6 --- /dev/null +++ b/ydb/core/kqp/session_actor/kqp_query_stats.h @@ -0,0 +1,40 @@ +#pragma once + +#include +#include +#include + +namespace NKikimr::NKqp { + +struct TKqpQueryStats { + ui64 DurationUs; + std::optional Compilation; + + ui64 WorkerCpuTimeUs; + ui64 ReadSetsCount; + ui64 MaxShardProgramSize; + ui64 MaxShardReplySize; + + TVector Executions; + + const TVector& GetExecutions() const; + ui64 GetWorkerCpuTimeUs() const; + + NKqpProto::TKqpStatsQuery ToProto() const; +}; + + +void CollectQueryStats(const TActorContext& ctx, const NKqpProto::TKqpStatsQuery* queryStats, + TDuration queryDuration, const TString& queryText, + const TString& userSID, ui64 parametersSize, const TString& database, + const NKikimrKqp::EQueryType type, ui64 requestUnits); + +void CollectQueryStats(const TActorContext& ctx, const TKqpQueryStats* queryStats, + TDuration queryDuration, const TString& queryText, + const TString& userSID, ui64 parametersSize, const TString& database, + const NKikimrKqp::EQueryType type, ui64 requestUnits); + +ui64 CalcRequestUnit(const NKqpProto::TKqpStatsQuery& stats); +ui64 CalcRequestUnit(const TKqpQueryStats& stats); + +} diff --git a/ydb/core/kqp/session_actor/kqp_session_actor.cpp b/ydb/core/kqp/session_actor/kqp_session_actor.cpp index beb05142a3c0..dd08c9b0f1f9 100644 --- a/ydb/core/kqp/session_actor/kqp_session_actor.cpp +++ b/ydb/core/kqp/session_actor/kqp_session_actor.cpp @@ -2,6 +2,7 @@ #include "kqp_tx.h" #include "kqp_worker_common.h" #include "kqp_query_state.h" +#include "kqp_query_stats.h" #include #include @@ -632,7 +633,7 @@ class TKqpSessionActor : public TActorBootstrapped { } Counters->ReportTxCreated(Settings.DbCounters); - Counters->ReportBeginTransaction(Settings.DbCounters, Transactions.EvictedTx, Transactions.Size(), Transactions.ToBeAbortedSize()); + Counters->ReportBeginTransaction(Settings.DbCounters, Transactions.EvictedTx, Transactions.Size(), Transactions.ToBeAbortedSize()); // TODO: check } bool PrepareQueryTransaction() { @@ -718,16 +719,15 @@ class TKqpSessionActor : public TActorBootstrapped { } try { - auto parameters = QueryState->GetYdbParameters(); - if (QueryState->CompileResult && QueryState->CompileResult->Ast) { - auto& params = QueryState->CompileResult->Ast->PgAutoParamValues; - if (params) { - for(const auto& [name, param] : *params) { - parameters.insert({name, param}); + const auto& parameters = QueryState->GetYdbParameters(); + QueryState->QueryData->ParseParameters(parameters); + if (QueryState->CompileResult && QueryState->CompileResult->Ast && QueryState->CompileResult->Ast->PgAutoParamValues) { + for(const auto& [name, param] : *QueryState->CompileResult->Ast->PgAutoParamValues) { + if (!parameters.contains(name)) { + QueryState->QueryData->AddTypedValueParam(name, param); } } } - QueryState->QueryData->ParseParameters(parameters); } catch(const yexception& ex) { ythrow TRequestFail(Ydb::StatusIds::BAD_REQUEST) << ex.what(); } @@ -739,7 +739,7 @@ class TKqpSessionActor : public TActorBootstrapped { QueryResponse = std::make_unique(); FillCompileStatus(QueryState->CompileResult, QueryResponse->Record); - auto ru = NRuCalc::CpuTimeToUnit(TDuration::MicroSeconds(QueryState->CompileStats.GetCpuTimeUs())); + auto ru = NRuCalc::CpuTimeToUnit(TDuration::MicroSeconds(QueryState->CompileStats.CpuTimeUs)); auto& record = QueryResponse->Record.GetRef(); record.SetConsumedRu(ru); @@ -1157,7 +1157,7 @@ class TKqpSessionActor : public TActorBootstrapped { NKqpProto::TKqpStatsQuery& stats = *ev->Get()->Record.MutableQueryStats(); NKqpProto::TKqpStatsQuery executionStats; executionStats.Swap(&stats); - stats = QueryState->Stats; + stats = QueryState->QueryStats.ToProto(); stats.MutableExecutions()->MergeFrom(executionStats.GetExecutions()); ev->Get()->Record.SetQueryPlan(SerializeAnalyzePlan(stats)); } @@ -1283,8 +1283,8 @@ class TKqpSessionActor : public TActorBootstrapped { } if (executerResults.HasStats()) { - auto* exec = QueryState->Stats.AddExecutions(); - exec->Swap(executerResults.MutableStats()); + QueryState->QueryStats.Executions.emplace_back(); + QueryState->QueryStats.Executions.back().Swap(executerResults.MutableStats()); } if (!response->GetIssues().empty()){ @@ -1323,7 +1323,7 @@ class TKqpSessionActor : public TActorBootstrapped { } } - void CollectSystemViewQueryStats(const NKqpProto::TKqpStatsQuery* stats, TDuration queryDuration, + void CollectSystemViewQueryStats(const TKqpQueryStats* stats, TDuration queryDuration, const TString& database, ui64 requestUnits) { auto type = QueryState->GetType(); @@ -1339,7 +1339,7 @@ class TKqpSessionActor : public TActorBootstrapped { TString text = QueryState->ExtractQueryText(); if (IsQueryAllowedToLog(text)) { auto userSID = QueryState->UserToken->GetUserSID(); - NSysView::CollectQueryStats(TlsActivationContext->AsActorContext(), stats, queryDuration, text, + CollectQueryStats(TlsActivationContext->AsActorContext(), stats, queryDuration, text, userSID, QueryState->ParametersSize, database, type, requestUnits); } break; @@ -1351,16 +1351,19 @@ class TKqpSessionActor : public TActorBootstrapped { void FillSystemViewQueryStats(NKikimrKqp::TEvQueryResponse* record) { YQL_ENSURE(QueryState); - auto* stats = &QueryState->Stats; + auto* stats = &QueryState->QueryStats; - stats->SetDurationUs((TInstant::Now() - QueryState->StartTime).MicroSeconds()); - stats->SetWorkerCpuTimeUs(QueryState->GetCpuTime().MicroSeconds()); + stats->DurationUs = ((TInstant::Now() - QueryState->StartTime).MicroSeconds()); + stats->WorkerCpuTimeUs = (QueryState->GetCpuTime().MicroSeconds()); if (QueryState->CompileResult) { - stats->MutableCompilation()->Swap(&QueryState->CompileStats); + stats->Compilation.emplace(); + stats->Compilation->FromCache = (QueryState->CompileStats.FromCache); + stats->Compilation->DurationUs = (QueryState->CompileStats.DurationUs); + stats->Compilation->CpuTimeUs = (QueryState->CompileStats.CpuTimeUs); } if (IsExecuteAction(QueryState->GetAction())) { - auto ru = NRuCalc::CalcRequestUnit(*stats); + auto ru = CalcRequestUnit(*stats); if (record != nullptr) { record->SetConsumedRu(ru); @@ -1381,19 +1384,19 @@ class TKqpSessionActor : public TActorBootstrapped { auto requestInfo = TKqpRequestInfo(QueryState->UserRequestContext->TraceId, SessionId); if (IsExecuteAction(QueryState->GetAction())) { - auto queryDuration = TDuration::MicroSeconds(QueryState->Stats.GetDurationUs()); + auto queryDuration = TDuration::MicroSeconds(QueryState->QueryStats.DurationUs); SlowLogQuery(TlsActivationContext->AsActorContext(), Config.Get(), requestInfo, queryDuration, record->GetYdbStatus(), QueryState->UserToken, QueryState->ParametersSize, record, [this]() { return this->QueryState->ExtractQueryText(); }); } - auto* stats = &QueryState->Stats; if (QueryState->ReportStats()) { + auto stats = QueryState->QueryStats.ToProto(); if (QueryState->GetStatsMode() >= Ydb::Table::QueryStatsCollection::STATS_COLLECTION_FULL) { - response->SetQueryPlan(SerializeAnalyzePlan(*stats)); + response->SetQueryPlan(SerializeAnalyzePlan(stats)); response->SetQueryAst(QueryState->CompileResult->PreparedQuery->GetPhysicalQuery().GetQueryAst()); } - response->MutableQueryStats()->Swap(stats); + response->MutableQueryStats()->Swap(&stats); } } @@ -1433,7 +1436,7 @@ class TKqpSessionActor : public TActorBootstrapped { Counters->ReportQueryWithRangeScan(Settings.DbCounters); } - auto& stats = QueryState->Stats; + auto& stats = QueryState->QueryStats; ui32 affectedShardsCount = 0; ui64 readBytesCount = 0; @@ -1449,9 +1452,9 @@ class TKqpSessionActor : public TActorBootstrapped { Counters->ReportQueryAffectedShards(Settings.DbCounters, affectedShardsCount); Counters->ReportQueryReadRows(Settings.DbCounters, readRowsCount); Counters->ReportQueryReadBytes(Settings.DbCounters, readBytesCount); - Counters->ReportQueryReadSets(Settings.DbCounters, stats.GetReadSetsCount()); - Counters->ReportQueryMaxShardReplySize(Settings.DbCounters, stats.GetMaxShardReplySize()); - Counters->ReportQueryMaxShardProgramSize(Settings.DbCounters, stats.GetMaxShardProgramSize()); + Counters->ReportQueryReadSets(Settings.DbCounters, stats.ReadSetsCount); + Counters->ReportQueryMaxShardReplySize(Settings.DbCounters, stats.MaxShardReplySize); + Counters->ReportQueryMaxShardProgramSize(Settings.DbCounters, stats.MaxShardProgramSize); } void ReplySuccess() { diff --git a/ydb/core/kqp/session_actor/kqp_tx.h b/ydb/core/kqp/session_actor/kqp_tx.h index 0d605b6cfc07..ff2237bf977e 100644 --- a/ydb/core/kqp/session_actor/kqp_tx.h +++ b/ydb/core/kqp/session_actor/kqp_tx.h @@ -319,42 +319,73 @@ struct TTxId { } }; +} + +template<> +struct THash { + inline size_t operator()(const NKikimr::NKqp::TTxId& id) const noexcept { + return THash()(id.Id); + } +}; + +namespace NKikimr::NKqp { + class TTransactionsCache { - TLRUCache> Active; + size_t MaxActiveSize; + THashMap, THash> Active; std::deque> ToBeAborted; + + auto FindOldestTransaction() { + if (Active.empty()) { + return std::end(Active); + } + auto oldest = std::begin(Active); + for (auto it = std::next(oldest); it != std::end(Active); ++it) { + if (oldest->second->LastAccessTime > it->second->LastAccessTime) { + oldest = it; + } + } + return oldest; + } + public: ui64 EvictedTx = 0; TDuration IdleTimeout; TTransactionsCache(size_t size, TDuration idleTimeout) - : Active(size) + : MaxActiveSize(size) , IdleTimeout(idleTimeout) - {} + { + Active.reserve(MaxActiveSize); + } - size_t Size() { - return Active.Size(); + size_t Size() const { + return Active.size(); } - size_t MaxSize() { - return Active.GetMaxSize(); + size_t MaxSize() const { + return MaxActiveSize; } TIntrusivePtr Find(const TTxId& id) { - if (auto it = Active.Find(id); it != Active.End()) { - it.Value()->Touch(); - return *it; + auto it = Active.find(id); + if (it != std::end(Active)) { + it->second->Touch(); + return it->second; } else { - return {}; + return nullptr; } } - TIntrusivePtr ReleaseTransaction(const TTxId& txId) { - if (auto it = Active.FindWithoutPromote(txId); it != Active.End()) { - auto ret = std::move(it.Value()); - Active.Erase(it); - return ret; + TIntrusivePtr ReleaseTransaction(const TTxId& id) { + const auto it = Active.find(id); + if (it != std::end(Active)) { + auto result = std::move(it->second); + Active.erase(it); + return result; + } else { + return nullptr; } - return {}; } void AddToBeAborted(TIntrusivePtr ctx) { @@ -362,20 +393,20 @@ class TTransactionsCache { } bool RemoveOldTransactions() { - if (Active.Size() < Active.GetMaxSize()) { + if (Active.size() < MaxActiveSize) { + return true; + } + + auto oldestIt = FindOldestTransaction(); + auto currentIdle = TInstant::Now() - oldestIt->second->LastAccessTime; + if (currentIdle >= IdleTimeout) { + oldestIt->second->Invalidate(); + ToBeAborted.emplace_back(std::move(oldestIt->second)); + Active.erase(oldestIt); + ++EvictedTx; return true; } else { - auto it = Active.FindOldest(); - auto currentIdle = TInstant::Now() - it.Value()->LastAccessTime; - if (currentIdle >= IdleTimeout) { - it.Value()->Invalidate(); - ToBeAborted.emplace_back(std::move(it.Value())); - Active.Erase(it); - ++EvictedTx; - return true; - } else { - return false; - } + return false; } } @@ -383,15 +414,15 @@ class TTransactionsCache { if (!RemoveOldTransactions()) { return false; } - return Active.Insert(std::make_pair(txId, txCtx)); + return Active.emplace(txId, txCtx).second; } void FinalCleanup() { - for (auto it = Active.Begin(); it != Active.End(); ++it) { - it.Value()->Invalidate(); - ToBeAborted.emplace_back(std::move(it.Value())); + for (auto& item : Active) { + item.second->Invalidate(); + ToBeAborted.emplace_back(std::move(item.second)); } - Active.Clear(); + Active.clear(); } size_t ToBeAbortedSize() { @@ -412,10 +443,3 @@ bool NeedSnapshot(const TKqpTransactionContext& txCtx, const NYql::TKikimrConfig bool HasOlapTableInTx(const NKqpProto::TKqpPhyQuery& physicalQuery); } // namespace NKikimr::NKqp - -template<> -struct THash { - inline size_t operator()(const NKikimr::NKqp::TTxId& id) const noexcept { - return THash()(id.Id); - } -}; diff --git a/ydb/core/kqp/session_actor/kqp_worker_actor.cpp b/ydb/core/kqp/session_actor/kqp_worker_actor.cpp index 0bffab8dba14..054106bebb14 100644 --- a/ydb/core/kqp/session_actor/kqp_worker_actor.cpp +++ b/ydb/core/kqp/session_actor/kqp_worker_actor.cpp @@ -1,12 +1,13 @@ #include "kqp_worker_common.h" +#include "kqp_query_stats.h" #include #include #include #include #include -#include #include +#include #include #include #include @@ -914,7 +915,7 @@ class TKqpWorkerActor : public TActorBootstrapped { TString text = ExtractQueryText(); if (IsQueryAllowedToLog(text)) { auto userSID = QueryState->RequestEv->GetUserToken()->GetUserSID(); - NSysView::CollectQueryStats(ctx, stats, queryDuration, text, + CollectQueryStats(ctx, stats, queryDuration, text, userSID, QueryState->RequestEv->GetParametersSize(), database, type, requestUnits); } break; diff --git a/ydb/core/kqp/session_actor/ya.make b/ydb/core/kqp/session_actor/ya.make index 49d8ebfe83b4..1dcdd665be4d 100644 --- a/ydb/core/kqp/session_actor/ya.make +++ b/ydb/core/kqp/session_actor/ya.make @@ -7,6 +7,7 @@ SRCS( kqp_worker_actor.cpp kqp_worker_common.cpp kqp_query_state.cpp + kqp_query_stats.cpp kqp_temp_tables_manager.cpp ) diff --git a/ydb/core/sys_view/service/sysview_service.cpp b/ydb/core/sys_view/service/sysview_service.cpp index 1ef374296c58..2cd2684f3cb7 100644 --- a/ydb/core/sys_view/service/sysview_service.cpp +++ b/ydb/core/sys_view/service/sysview_service.cpp @@ -23,118 +23,6 @@ using namespace NActors; namespace NKikimr { namespace NSysView { -constexpr size_t QUERY_TEXT_LIMIT = 4096; - -void CollectQueryStats(const TActorContext& ctx, const NKqpProto::TKqpStatsQuery* queryStats, - TDuration queryDuration, const TString& queryText, - const TString& userSID, ui64 parametersSize, const TString& database, - const NKikimrKqp::EQueryType type, ui64 requestUnits) -{ - if (!AppData()->FeatureFlags.GetEnableSystemViews()) { - return; - } - - auto collectEv = MakeHolder(); - collectEv->Database = database; - - auto& stats = collectEv->QueryStats; - auto& dataStats = *stats.MutableStats(); - auto& shardsCpuTime = *stats.MutableShardsCpuTimeUs(); - auto& computeCpuTime = *stats.MutableComputeCpuTimeUs(); - - auto nodeId = ctx.SelfID.NodeId(); - stats.SetNodeId(nodeId); - - stats.SetEndTimeMs(TInstant::Now().MilliSeconds()); - stats.SetDurationMs(queryDuration.MilliSeconds()); - - stats.SetQueryTextHash(MurmurHash(queryText.data(), queryText.size())); - if (queryText.size() > QUERY_TEXT_LIMIT) { - auto limitedText = queryText.substr(0, QUERY_TEXT_LIMIT); - stats.SetQueryText(limitedText); - } else { - stats.SetQueryText(queryText); - } - - if (userSID) { - stats.SetUserSID(userSID); - } - stats.SetParametersSize(parametersSize); - - TString strType; - switch (type) { - case NKikimrKqp::QUERY_TYPE_SQL_DML: - case NKikimrKqp::QUERY_TYPE_PREPARED_DML: - strType = "data"; - break; - case NKikimrKqp::QUERY_TYPE_SQL_SCAN: - strType = "scan"; - break; - case NKikimrKqp::QUERY_TYPE_SQL_SCRIPT: - case NKikimrKqp::QUERY_TYPE_SQL_SCRIPT_STREAMING: - strType = "script"; - break; - default: - break; - } - stats.SetType(strType); - - if (!queryStats || queryStats->GetExecutions().empty()) { - ctx.Send(NSysView::MakeSysViewServiceID(nodeId), std::move(collectEv)); - return; - } - - shardsCpuTime.SetMin(Max()); - computeCpuTime.SetMin(Max()); - - auto aggregate = [] (NKikimrSysView::TStatsAggr& to, const NYql::NDqProto::TDqStatsAggr& from) { - to.SetMin(std::min(to.GetMin(), from.GetMin())); - to.SetMax(std::max(to.GetMax(), from.GetMax())); - to.SetSum(to.GetSum() + from.GetSum()); - to.SetCnt(to.GetCnt() + from.GetCnt()); - }; - - for (const NYql::NDqProto::TDqExecutionStats& exec : queryStats->GetExecutions()) { - NKqpProto::TKqpExecutionExtraStats execExtra; - if (exec.HasExtra()) { - bool ok = exec.GetExtra().UnpackTo(&execExtra); - Y_UNUSED(ok); - } - - dataStats.SetPartitionCount(dataStats.GetPartitionCount() + execExtra.GetAffectedShards()); - - for (auto& table : exec.GetTables()) { - dataStats.SetReadRows(dataStats.GetReadRows() + table.GetReadRows()); - dataStats.SetReadBytes(dataStats.GetReadBytes() + table.GetReadBytes()); - dataStats.SetUpdateRows(dataStats.GetUpdateRows() + table.GetWriteRows()); - dataStats.SetUpdateBytes(dataStats.GetUpdateBytes() + table.GetWriteBytes()); - dataStats.SetDeleteRows(dataStats.GetDeleteRows() + table.GetEraseRows()); - } - - aggregate(shardsCpuTime, execExtra.GetShardsCpuTimeUs()); - aggregate(computeCpuTime, execExtra.GetComputeCpuTimeUs()); - } - - if (queryStats->HasCompilation()) { - auto& compileStats = queryStats->GetCompilation(); - stats.SetFromQueryCache(compileStats.GetFromCache()); - stats.SetCompileDurationMs(compileStats.GetDurationUs() / 1'000); - stats.SetCompileCpuTimeUs(compileStats.GetCpuTimeUs()); - } - - stats.SetProcessCpuTimeUs(queryStats->GetWorkerCpuTimeUs()); - stats.SetTotalCpuTimeUs( - stats.GetProcessCpuTimeUs() + - stats.GetCompileCpuTimeUs() + - stats.GetShardsCpuTimeUs().GetSum() + - stats.GetComputeCpuTimeUs().GetSum() - ); - - stats.SetRequestUnits(requestUnits); - - ctx.Send(NSysView::MakeSysViewServiceID(nodeId), std::move(collectEv)); -} - static void CopyCounters(NKikimrSysView::TDbCounters* diff, const NKikimrSysView::TDbCounters& current) { diff --git a/ydb/core/sys_view/service/sysview_service.h b/ydb/core/sys_view/service/sysview_service.h index 557d78b8c9f2..351aa56d36d6 100644 --- a/ydb/core/sys_view/service/sysview_service.h +++ b/ydb/core/sys_view/service/sysview_service.h @@ -14,11 +14,6 @@ inline TActorId MakeSysViewServiceID(ui32 node) { return TActorId(node, TStringBuf(x, 12)); } -void CollectQueryStats(const TActorContext& ctx, const NKqpProto::TKqpStatsQuery* queryStats, - TDuration queryDuration, const TString& queryText, - const TString& userSID, ui64 parametersSize, const TString& database, - const NKikimrKqp::EQueryType type, ui64 requestUnits); - THolder CreateSysViewService( TExtCountersConfig&& config, bool hasExternalCounters);