From c006f27800470d23cae7afc10067c08c13e94c76 Mon Sep 17 00:00:00 2001 From: Hor911 Date: Fri, 20 Sep 2024 08:32:53 +0300 Subject: [PATCH] Timeline support (#9533) MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Co-authored-by: Олег <150132506+iddqdex@users.noreply.github.com> --- ydb/core/fq/libs/compute/common/utils.cpp | 45 +- ydb/core/fq/libs/compute/common/utils.h | 6 +- ydb/core/fq/libs/compute/common/ya.make | 1 + ydb/core/fq/libs/config/protos/common.proto | 2 + .../internal/task_ping.cpp | 4 + .../proto/yq_internal.proto | 1 + .../fq/libs/control_plane_storage/util.cpp | 1 + .../ydb_control_plane_storage_queries.cpp | 6 + ydb/core/fq/libs/protos/fq_private.proto | 1 + ydb/public/api/protos/draft/fq.proto | 5 + .../lib/ydb_cli/commands/ydb_benchmark.cpp | 11 +- ydb/public/lib/ydb_cli/common/plan2svg.cpp | 1646 +++++++++++++++++ ydb/public/lib/ydb_cli/common/plan2svg.h | 264 +++ ydb/public/lib/ydb_cli/common/ya.make | 1 + ydb/tests/fq/plans/test_stats_mode.py | 6 +- ydb/tests/olap/lib/ydb_cli.py | 6 +- ydb/tests/olap/load/conftest.py | 2 + ydb/tests/tools/fq_runner/kikimr_utils.py | 1 + ydb/tests/tools/kqprun/src/kqp_runner.cpp | 2 +- 19 files changed, 1995 insertions(+), 16 deletions(-) create mode 100644 ydb/public/lib/ydb_cli/common/plan2svg.cpp create mode 100644 ydb/public/lib/ydb_cli/common/plan2svg.h diff --git a/ydb/core/fq/libs/compute/common/utils.cpp b/ydb/core/fq/libs/compute/common/utils.cpp index a7fc8cd81aa1..40c06436fb4c 100644 --- a/ydb/core/fq/libs/compute/common/utils.cpp +++ b/ydb/core/fq/libs/compute/common/utils.cpp @@ -1,4 +1,5 @@ #include "utils.h" +#include #include #include @@ -430,7 +431,7 @@ void EnumeratePlans(NYson::TYsonWriter& writer, NJson::TJsonValue& value, ui32& } } -TString GetV1StatFromV2Plan(const TString& plan, double* cpuUsage) { +TString GetV1StatFromV2Plan(const TString& plan, double* cpuUsage, TString* timeline, ui64 maxTimelineSize) { TStringStream out; NYson::TYsonWriter writer(&out); writer.OnBeginMap(); @@ -471,6 +472,24 @@ TString GetV1StatFromV2Plan(const TString& plan, double* cpuUsage) { } } } + if (timeline) { + TPlanVisualizer planViz; + planViz.LoadPlans(plan); + *timeline = planViz.PrintSvgSafe(); + if (maxTimelineSize && timeline->size() > maxTimelineSize) { + TStringBuilder builder; + builder + << "" << Endl + << " There is nothing wrong with the request." << Endl + << " Unfortunately, image size " << timeline->size() << " is too large." << Endl + << " It exceeds limit of " << maxTimelineSize << " and was discarded" << Endl + << "" << Endl; + *timeline = builder; + } + // remove json "timeline" field after migration + writer.OnKeyedItem("timeline"); + writer.OnStringScalar(*timeline); + } writer.OnEndMap(); return NJson2Yson::ConvertYson2Json(out.Str()); } @@ -1145,7 +1164,7 @@ struct TNoneStatProcessor : IPlanStatProcessor { return plan; } - TString GetQueryStat(const TString&, double& cpuUsage) override { + TString GetQueryStat(const TString&, double& cpuUsage, TString*, ui64) override { cpuUsage = 0.0; return ""; } @@ -1178,8 +1197,8 @@ struct TPlanStatProcessor : IPlanStatProcessor { return plan; } - TString GetQueryStat(const TString& plan, double& cpuUsage) override { - return GetV1StatFromV2Plan(plan, &cpuUsage); + TString GetQueryStat(const TString& plan, double& cpuUsage, TString* timeline, ui64 maxtimelineSize) override { + return GetV1StatFromV2Plan(plan, &cpuUsage, timeline, maxtimelineSize); } TPublicStat GetPublicStat(const TString& stat) override { @@ -1210,8 +1229,8 @@ struct TProfileStatProcessor : TPlanStatProcessor { }; struct TProdStatProcessor : TFullStatProcessor { - TString GetQueryStat(const TString& plan, double& cpuUsage) override { - return GetPrettyStatistics(GetV1StatFromV2Plan(plan, &cpuUsage)); + TString GetQueryStat(const TString& plan, double& cpuUsage, TString* timeline, ui64 maxtimelineSize) override { + return GetPrettyStatistics(GetV1StatFromV2Plan(plan, &cpuUsage, timeline, maxtimelineSize)); } }; @@ -1229,8 +1248,12 @@ std::unique_ptr CreateStatProcessor(const TString& statViewN PingTaskRequestBuilder::PingTaskRequestBuilder(const NConfig::TCommonConfig& commonConfig, std::unique_ptr&& processor) : Compressor(commonConfig.GetQueryArtifactsCompressionMethod(), commonConfig.GetQueryArtifactsCompressionMinSize()) - , Processor(std::move(processor)) -{} + , Processor(std::move(processor)), ShowQueryTimeline(commonConfig.GetShowQueryTimeline()), MaxQueryTimelineSize(commonConfig.GetMaxQueryTimelineSize()) +{ + if (!MaxQueryTimelineSize) { + MaxQueryTimelineSize = 200_KB; + } +} Fq::Private::PingTaskRequest PingTaskRequestBuilder::Build( const Ydb::TableStats::QueryStats& queryStats, @@ -1294,9 +1317,13 @@ Fq::Private::PingTaskRequest PingTaskRequestBuilder::Build(const TString& queryP CpuUsage = 0.0; try { - auto stat = Processor->GetQueryStat(plan, CpuUsage); + TString timeline; + auto stat = Processor->GetQueryStat(plan, CpuUsage, ShowQueryTimeline ? &timeline : nullptr, MaxQueryTimelineSize); pingTaskRequest.set_statistics(stat); pingTaskRequest.set_dump_raw_statistics(true); + if (timeline) { + pingTaskRequest.set_timeline(timeline); + } auto flatStat = Processor->GetFlatStat(plan); flatStat["CompilationTimeUs"] = compilationTimeUs; flatStat["ComputeTimeUs"] = computeTimeUs; diff --git a/ydb/core/fq/libs/compute/common/utils.h b/ydb/core/fq/libs/compute/common/utils.h index 8fd7d56419bc..efd11787838d 100644 --- a/ydb/core/fq/libs/compute/common/utils.h +++ b/ydb/core/fq/libs/compute/common/utils.h @@ -28,7 +28,7 @@ inline std::shared_ptr CreateNewTableClient(const TS tableSettings); } -TString GetV1StatFromV2Plan(const TString& plan, double* cpuUsage = nullptr); +TString GetV1StatFromV2Plan(const TString& plan, double* cpuUsage = nullptr, TString* timeline = nullptr, ui64 maxTimelineSize = 0); TString GetV1StatFromV2PlanV2(const TString& plan); TString GetPrettyStatistics(const TString& statistics); THashMap AggregateStats(TStringBuf plan); @@ -55,7 +55,7 @@ struct IPlanStatProcessor { virtual Ydb::Query::StatsMode GetStatsMode() = 0; virtual TString ConvertPlan(const TString& plan) = 0; virtual TString GetPlanVisualization(const TString& plan) = 0; - virtual TString GetQueryStat(const TString& plan, double& cpuUsage) = 0; + virtual TString GetQueryStat(const TString& plan, double& cpuUsage, TString* timeline, ui64 maxtimelineSize) = 0; virtual TPublicStat GetPublicStat(const TString& stat) = 0; virtual THashMap GetFlatStat(TStringBuf plan) = 0; }; @@ -79,6 +79,8 @@ class PingTaskRequestBuilder { private: const TCompressor Compressor; std::unique_ptr Processor; + bool ShowQueryTimeline = false; + ui64 MaxQueryTimelineSize = 0; }; TString GetStatViewName(const ::NFq::TRunActorParams& params); diff --git a/ydb/core/fq/libs/compute/common/ya.make b/ydb/core/fq/libs/compute/common/ya.make index cf9a359f840b..efdb54097732 100644 --- a/ydb/core/fq/libs/compute/common/ya.make +++ b/ydb/core/fq/libs/compute/common/ya.make @@ -19,6 +19,7 @@ PEERDIR( ydb/library/yql/providers/generic/connector/api/service/protos ydb/library/yql/providers/generic/connector/libcpp ydb/library/yql/providers/s3/actors_factory + ydb/public/lib/ydb_cli/common ) YQL_LAST_ABI_VERSION() diff --git a/ydb/core/fq/libs/config/protos/common.proto b/ydb/core/fq/libs/config/protos/common.proto index 297436bc2b6f..1b3da64d754b 100644 --- a/ydb/core/fq/libs/config/protos/common.proto +++ b/ydb/core/fq/libs/config/protos/common.proto @@ -29,4 +29,6 @@ message TCommonConfig { bool KeepInternalErrors = 13; bool UseNativeProtocolForClickHouse = 14; bool DisableSslForGenericDataSources = 15; + bool ShowQueryTimeline = 16; + uint64 MaxQueryTimelineSize = 17; // default: 200KB } diff --git a/ydb/core/fq/libs/control_plane_storage/internal/task_ping.cpp b/ydb/core/fq/libs/control_plane_storage/internal/task_ping.cpp index b214e127d44c..9e7baaa95d6e 100644 --- a/ydb/core/fq/libs/control_plane_storage/internal/task_ping.cpp +++ b/ydb/core/fq/libs/control_plane_storage/internal/task_ping.cpp @@ -285,6 +285,10 @@ TPingTaskParams ConstructHardPingTask( internal.set_current_load(request.current_load()); } + if (request.timeline()) { + internal.set_timeline(request.timeline()); + } + if (request.flat_stats_size() != 0) { internal.clear_statistics(); auto stats = DeserializeFlatStats(request.flat_stats()); diff --git a/ydb/core/fq/libs/control_plane_storage/proto/yq_internal.proto b/ydb/core/fq/libs/control_plane_storage/proto/yq_internal.proto index 2da81596f71c..205117599e4e 100644 --- a/ydb/core/fq/libs/control_plane_storage/proto/yq_internal.proto +++ b/ydb/core/fq/libs/control_plane_storage/proto/yq_internal.proto @@ -57,6 +57,7 @@ message QueryInternal { NYql.NDqProto.StatusIds.StatusCode pending_status_code = 28; repeated StatisticsNamedValue statistics = 29; int32 current_load = 30; + string timeline = 31; } message JobInternal { diff --git a/ydb/core/fq/libs/control_plane_storage/util.cpp b/ydb/core/fq/libs/control_plane_storage/util.cpp index db0f310b509f..58f672e293c0 100644 --- a/ydb/core/fq/libs/control_plane_storage/util.cpp +++ b/ydb/core/fq/libs/control_plane_storage/util.cpp @@ -179,6 +179,7 @@ bool DoesPingTaskUpdateQueriesTable(const Fq::Private::PingTaskRequest& request) || !request.issues().empty() || !request.transient_issues().empty() || request.statistics() + || request.timeline() || !request.result_set_meta().empty() || request.ast() || request.ast_compressed().data() diff --git a/ydb/core/fq/libs/control_plane_storage/ydb_control_plane_storage_queries.cpp b/ydb/core/fq/libs/control_plane_storage/ydb_control_plane_storage_queries.cpp index ae371c52b811..966afc7c7957 100644 --- a/ydb/core/fq/libs/control_plane_storage/ydb_control_plane_storage_queries.cpp +++ b/ydb/core/fq/libs/control_plane_storage/ydb_control_plane_storage_queries.cpp @@ -594,6 +594,12 @@ void TYdbControlPlaneStorageActor::Handle(TEvControlPlaneStorage::TEvDescribeQue } } } + + auto timeline = internal.timeline(); + if (timeline) { + result.mutable_query()->mutable_timeline()->set_svg(timeline); + } + if (!permissions.Check(TPermissions::VIEW_AST)) { result.mutable_query()->clear_ast(); } else { diff --git a/ydb/core/fq/libs/protos/fq_private.proto b/ydb/core/fq/libs/protos/fq_private.proto index 04cc5d7c5d9d..879095566509 100644 --- a/ydb/core/fq/libs/protos/fq_private.proto +++ b/ydb/core/fq/libs/protos/fq_private.proto @@ -166,6 +166,7 @@ message PingTaskRequest { bool dump_raw_statistics = 38; repeated Ydb.ValuePair flat_stats = 39; int32 current_load = 40; + string timeline = 41; } message PingTaskResult { diff --git a/ydb/public/api/protos/draft/fq.proto b/ydb/public/api/protos/draft/fq.proto index d1713334a047..f76c01ef4944 100644 --- a/ydb/public/api/protos/draft/fq.proto +++ b/ydb/public/api/protos/draft/fq.proto @@ -205,6 +205,10 @@ message ResultSetMeta { bool truncated = 3; } +message QueryTimeline { + string svg = 1; // No validation because generated on server side +} + message Query { QueryMeta meta = 1; QueryContent content = 2; @@ -214,6 +218,7 @@ message Query { QueryStatistics statistics = 6; repeated ResultSetMeta result_set_meta = 7; QueryAst ast = 8; + QueryTimeline timeline = 9; } message QueryStatistics { diff --git a/ydb/public/lib/ydb_cli/commands/ydb_benchmark.cpp b/ydb/public/lib/ydb_cli/commands/ydb_benchmark.cpp index 16148eb13ee4..5576991f7d69 100644 --- a/ydb/public/lib/ydb_cli/commands/ydb_benchmark.cpp +++ b/ydb/public/lib/ydb_cli/commands/ydb_benchmark.cpp @@ -1,7 +1,8 @@ #include "ydb_benchmark.h" #include "benchmark_utils.h" -#include #include +#include +#include #include #include #include @@ -93,7 +94,7 @@ TString TWorkloadCommandBenchmark::PatchQuery(const TStringBuf& original) const std::vector lines; for (auto& line : StringSplitter(result).Split('\n').SkipEmpty()) { - if (line.StartsWith("--")) { + if (line.StartsWith("--") && !line.StartsWith("--!")) { continue; } @@ -386,6 +387,12 @@ bool TWorkloadCommandBenchmark::RunBench(TClient& client, NYdbWorkload::IWorkloa TFileOutput out(PlanFileName + ".ast"); out << res.GetPlanAst(); } + { + TPlanVisualizer pv; + pv.LoadPlans(res.GetQueryPlan()); + TFileOutput out(PlanFileName + ".svg"); + out << pv.PrintSvgSafe(); + } planSaved = true; } } diff --git a/ydb/public/lib/ydb_cli/common/plan2svg.cpp b/ydb/public/lib/ydb_cli/common/plan2svg.cpp new file mode 100644 index 000000000000..23b52ba9dfe2 --- /dev/null +++ b/ydb/public/lib/ydb_cli/common/plan2svg.cpp @@ -0,0 +1,1646 @@ +#include "plan2svg.h" + +#include + +constexpr ui32 INDENT_X = 8; +constexpr ui32 GAP_X = 3; +constexpr ui32 GAP_Y = 3; +constexpr ui32 TIME_HEIGHT = 10; +constexpr ui32 INTERNAL_GAP_Y = 2; +constexpr ui32 INTERNAL_GAP_X = 2; +constexpr ui32 INTERNAL_HEIGHT = 14; +constexpr ui32 INTERNAL_WIDTH = 16; +constexpr ui32 INTERNAL_TEXT_HEIGHT = 8; +constexpr ui32 TIME_SERIES_RANGES = 32; + +TString FormatDurationMs(ui64 durationMs) { + TStringBuilder builder; + + if (durationMs && durationMs < 100) { + builder << durationMs << "ms"; + } else { + auto seconds = durationMs / 1'000; + if (seconds >= 60) { + auto minutes = seconds / 60; + if (minutes >= 60) { + auto hours = minutes / 60; + builder << hours << 'h'; + if (hours < 24) { + auto minutes60 = minutes % 60; + builder << ' '; + if (minutes60 < 10) { + builder << '0'; + } + builder << minutes60 << 'm'; + } + } else { + auto seconds60 = seconds % 60; + builder << minutes << "m "; + if (seconds60 < 10) { + builder << '0'; + } + builder << seconds60 << 's'; + } + } else { + auto hundredths = (durationMs % 1'000) / 10; + builder << seconds << '.'; + if (hundredths < 10) { + builder << '0'; + } + builder << hundredths << 's'; + } + } + + return builder; +} + +TString FormatDurationUs(ui64 durationUs) { + if (durationUs && durationUs < 1000) { + return TStringBuilder() << durationUs << "us"; + } + + return FormatDurationMs(durationUs / 1000); +} + +TString FormatUsage(ui64 usec) { + return FormatDurationUs(usec); +} + +TString FormatIntegerValue(ui64 i, ui32 scale = 1000, const TString& suffix = "") { + if (i < scale) { + return Sprintf("%lu%s", i, suffix.c_str()); + } + for (auto c : "KMGTP") { + auto pcs = (i % scale) * 100 / scale; + i /= scale; + if (i < scale || c == 'P') { + return Sprintf("%lu.%.2lu%c%s", i, pcs, c, suffix.c_str()); + } + } + return ""; +} + +TString FormatBytes(ui64 bytes) { + return FormatIntegerValue(bytes, 1024, "B"); +} + +TString FormatTimeMs(ui64 time, bool shortFormat) { + if (shortFormat) { + time /= 10; + return Sprintf("%lu.%.2lu", time / 100, time % 100); + } else { + time /= 1000; + return Sprintf("%lu:%.2lu", time / 60, time % 60); + } +} + +TString FormatTimeMs(ui64 time) { + return FormatTimeMs(time, time < 60000); +} + +TString FormatTimeAgg(const TAggregation& agg, bool shortFormat) { + TStringBuilder result; + result << FormatTimeMs(agg.Min, shortFormat) << " | " << FormatTimeMs(agg.Avg, shortFormat) << " | " << FormatTimeMs(agg.Max, shortFormat); + return result; +} + +TString FormatMCpu(ui64 mCpu) { + mCpu /= 10; + return Sprintf("%lu.%.2lu", mCpu / 100, mCpu % 100); +} + +TString GetEstimation(const NJson::TJsonValue& node) { + TStringBuilder ebuilder; + auto* eCostNode = node.GetValueByPath("E-SelfCost"); + if (!eCostNode) { + eCostNode = node.GetValueByPath("E-Cost"); + } + if (eCostNode) { + auto costString = eCostNode->GetStringSafe(); + if (costString != "No estimate") { + ebuilder << "Est:"; + double cost; + if (TryFromString(costString, cost)) { + if (cost >= 1e+18) { + ebuilder << Sprintf(" %.2e", cost); + } else { + ebuilder << ' ' << FormatIntegerValue(static_cast(cost)); + } + } + if (auto* eRowsNode = node.GetValueByPath("E-Rows")) { + double rows; + if (TryFromString(eRowsNode->GetStringSafe(), rows)) { + if (rows >= 1e+18) { + ebuilder << Sprintf(" Rows: %.2e", rows); + } else { + ebuilder << " Rows: " << FormatIntegerValue(static_cast(rows)); + } + } + } + if (auto* eSizeNode = node.GetValueByPath("E-Size")) { + double size; + if (TryFromString(eSizeNode->GetStringSafe(), size)) { + if (size >= 1e+18) { + ebuilder << Sprintf(" Size: %.2e", size); + } else { + ebuilder << " Size: " << FormatBytes(static_cast(size)); + } + } + } + } + } + return ebuilder; +} + +bool TAggregation::Load(const NJson::TJsonValue& node) { + if (auto* countNode = node.GetValueByPath("Count")) { + Count = countNode->GetIntegerSafe(); + + if (Count == 0) { + return false; + } + + if (auto* sumNode = node.GetValueByPath("Sum")) { + Sum = sumNode->GetIntegerSafe(); + } + Avg = Sum / Count; + if (auto* minNode = node.GetValueByPath("Min")) { + Min = minNode->GetIntegerSafe(); + } else { + Min = Avg; + } + if (auto* maxNode = node.GetValueByPath("Max")) { + Max = maxNode->GetIntegerSafe(); + } else { + Max = Avg; + } + + return true; + } + return false; +} + +void TMetricHistory::Load(const NJson::TJsonValue& node, ui64 explicitMinTime, ui64 explicitMaxTime) { + std::vector times; + std::vector values; + + bool even = true; + + for (const auto& subNode : node.GetArray()) { + ui64 i = subNode.GetIntegerSafe(); + if (even) times.push_back(i); + else values.push_back(i); + even = !even; + } + + if (times.size() > values.size()) { + times.resize(values.size()); + } + + Load(times, values, explicitMinTime, explicitMaxTime); +} + +void TMetricHistory::Load(std::vector& times, std::vector& values, ui64 explicitMinTime, ui64 explicitMaxTime) { + if (times.size() < 2) { + return; + } + auto itt = times.begin(); + auto itv = values.begin(); + + MinTime = explicitMinTime ? explicitMinTime : *itt; + MaxTime = explicitMaxTime ? explicitMaxTime : times.back(); + + ui64 prevValue = *itv++; + ui64 prevTime = *itt++; + + while (itt != times.end() && *itt <= MinTime) { + prevValue = *itv++; + prevTime = *itt++; + } + + Deriv.resize(TIME_SERIES_RANGES + 1); + Deriv[0].first = MinTime; + + ui64 timeLeft = MinTime; + for (ui32 i = 1; i <= TIME_SERIES_RANGES; i++) { + + ui64 timeRight = MinTime + (MaxTime - MinTime) * i / TIME_SERIES_RANGES; + Deriv[i].first = timeRight; + while (itt != times.end() && *itt <= timeRight) { + ui64 delta = (*itv - prevValue); + if (prevTime >= timeLeft) { + Deriv[i].second += delta; + } else { + ui64 currDelta = delta * (*itt - timeLeft) / (*itt - prevTime); + Deriv[i].second += currDelta; + if (i > 1) { + Deriv[i - 1].second += delta - currDelta; + } + } + prevTime = *itt++; + prevValue = *itv++; + } + + timeLeft = timeRight; + } + + if (itt != times.end()) { + Deriv[TIME_SERIES_RANGES].second += (*itv - prevValue) * (*itt - MaxTime) / (*itt - prevTime); + } + for (ui32 i = 1; i <= TIME_SERIES_RANGES; i++) { + MaxDeriv = std::max(MaxDeriv, Deriv[i].second); + } + bool missed = false; + for (ui32 i = 0; i < times.size(); i++) { + auto t = times[i]; + if (t < MinTime) { + missed = true; + continue; + } + if (missed && t > MinTime) { + Values.emplace_back(MinTime, values[i - 1]); + } + if (t > MaxTime) { + if (i && times[i - 1] < MaxTime) { + Values.emplace_back(MaxTime, values[i - 1]); + } + break; + } + Values.emplace_back(t, values[i]); + } + for (auto& p : Values) { + MaxValue = std::max(MaxValue, p.second); + } +} + +TSingleMetric::TSingleMetric(std::shared_ptr summary, const NJson::TJsonValue& node, + const NJson::TJsonValue* firstMessageNode, const NJson::TJsonValue* lastMessageNode, + const NJson::TJsonValue* waitTimeUsNode) + : Summary(summary) { + + if (!Details.Load(node)) { + return; + } + + Summary->Add(Details.Sum); + + if (firstMessageNode) { + FirstMessage.Load(*firstMessageNode); + } + + if (lastMessageNode) { + LastMessage.Load(*lastMessageNode); + } + + if (auto* historyNode = node.GetValueByPath("History")) { + History.Load(*historyNode, FirstMessage.Min, LastMessage.Max); + MinTime = History.MinTime; + MaxTime = History.MaxTime; + } + + if (waitTimeUsNode) { + WaitTime.Load(*waitTimeUsNode, FirstMessage.Min, LastMessage.Max); + MinTime = MinTime ? std::min(MinTime, WaitTime.MinTime) : WaitTime.MinTime; + MaxTime = MaxTime ? std::max(MaxTime, WaitTime.MaxTime) : WaitTime.MaxTime; + } else if (FirstMessage.Min && LastMessage.Max) { + MinTime = MinTime ? std::min(MinTime, FirstMessage.Min) : FirstMessage.Min; + MaxTime = MaxTime ? std::max(MaxTime, LastMessage.Max) : LastMessage.Max; + } +} + +void TPlan::Load(const NJson::TJsonValue& node) { + if (auto* subplanNameNode = node.GetValueByPath("Subplan Name")) { + auto subplanName = subplanNameNode->GetStringSafe(); + if (subplanName.StartsWith("CTE ")) { + if (auto* nodeTypeNode = node.GetValueByPath("Node Type")) { + CteSubPlans[subplanName] = nodeTypeNode->GetStringSafe(); + } + } + } + + if (auto* subNode = node.GetValueByPath("Plans")) { + for (auto& plan : subNode->GetArray()) { + TString nodeType; + if (auto* nodeTypeNode = plan.GetValueByPath("Node Type")) { + nodeType = nodeTypeNode->GetStringSafe(); + } + if (auto* planNodeTypeNode = plan.GetValueByPath("PlanNodeType")) { + auto planNodeType = planNodeTypeNode->GetStringSafe(); + ythrow yexception() << "Unexpected plan node type [" << planNodeType << "]"; + } else { + Stages.push_back(std::make_shared(nodeType)); + LoadStage(Stages.back(), plan, 0); + } + } + } + + if (!TotalCpuTimes.empty()) { + TotalCpuTime.Load(TotalCpuTimes, TotalCpuValues, TotalCpuTimes.front(), TotalCpuTimes.back()); + } +} + +void TPlan::ResolveCteRefs() { + for (auto& memberRef : MemberRefs) { + auto it = CteSubPlans.find(memberRef.first); + if (it == CteSubPlans.end()) { + ythrow yexception() << "Can not find CTE Ref " << memberRef.first; + } + memberRef.second.first->Info.at(memberRef.second.second) = "Reference: " + it->second; + } + for (auto& cteRef : CteRefs) { + auto it = CteStages.find(cteRef.first); + if (it == CteStages.end()) { + ythrow yexception() << "Can not find CTE Ref " << cteRef.first; + } + + cteRef.second->FromStage = it->second; + if (cteRef.second->StatsNode) { + if (auto* inputNode = cteRef.second->StatsNode->GetValueByPath("Input")) { + for (const auto& subNode : inputNode->GetArray()) { + if (auto* nameNode = subNode.GetValueByPath("Name")) { + if (ToString(it->second->PlanNodeId) == nameNode->GetStringSafe()) { + if (auto* pushNode = subNode.GetValueByPath("Push")) { + if (auto* bytesNode = pushNode->GetValueByPath("Bytes")) { + cteRef.second->InputBytes = std::make_shared(InputBytes, + *bytesNode, + pushNode->GetValueByPath("FirstMessageMs"), + pushNode->GetValueByPath("LastMessageMs"), + pushNode->GetValueByPath("WaitTimeUs.History") + ); + MaxTime = std::max(MaxTime, cteRef.second->InputBytes->MaxTime); + } + if (auto* rowsNode = pushNode->GetValueByPath("Rows")) { + cteRef.second->InputRows = std::make_shared(InputRows, *rowsNode); + } + } + } + } + } + } + } + if (cteRef.second->FromStage->StatsNode) { + if (auto* outputNode = cteRef.second->FromStage->StatsNode->GetValueByPath("Output")) { + for (const auto& subNode : outputNode->GetArray()) { + if (auto* nameNode = subNode.GetValueByPath("Name")) { + if (ToString(cteRef.second->StagePlanNodeId) == nameNode->GetStringSafe()) { + if (auto* popNode = subNode.GetValueByPath("Pop")) { + if (auto* bytesNode = popNode->GetValueByPath("Bytes")) { + cteRef.second->CteOutputBytes = std::make_shared(OutputBytes, + *bytesNode, + popNode->GetValueByPath("FirstMessageMs"), + popNode->GetValueByPath("LastMessageMs"), + popNode->GetValueByPath("WaitTimeUs.History") + ); + MaxTime = std::max(MaxTime, cteRef.second->CteOutputBytes->MaxTime); + } + if (auto* rowsNode = popNode->GetValueByPath("Rows")) { + cteRef.second->CteOutputRows = std::make_shared(OutputRows, *rowsNode); + } + } + } + } + } + } + } + } +} + +void TPlan::LoadStage(std::shared_ptr stage, const NJson::TJsonValue& node, ui32 parentPlanNodeId) { + + if (auto* planNodeIdNode = node.GetValueByPath("PlanNodeId")) { + stage->PlanNodeId = planNodeIdNode->GetIntegerSafe(); + } + + if (auto* subplanNameNode = node.GetValueByPath("Subplan Name")) { + auto subplanName = subplanNameNode->GetStringSafe(); + if (subplanName.StartsWith("CTE ")) { + CteStages[subplanName] = stage; + } + } + + auto operators = node.GetValueByPath("Operators"); + + if (operators) { + TString prevFilter; + std::set references; + for (const auto& subNode : operators->GetArray()) { + if (auto* nameNode = subNode.GetValueByPath("Name")) { + auto name = nameNode->GetStringSafe(); + + if (name == "Iterator" || name == "Member") { + if (auto* referenceNode = subNode.GetValueByPath(name)) { + auto referenceName = referenceNode->GetStringSafe(); + if (references.contains(referenceName)) { + continue; + } + if (name == "Iterator" && !referenceName.StartsWith("precompute_")) { + continue; + } + } + } + + if (name == "Filter" && prevFilter) { + if (auto* predicateNode = subNode.GetValueByPath("Predicate")) { + auto filter = predicateNode->GetStringSafe(); + if (filter == prevFilter) { + continue; + } + } + } + prevFilter = ""; + + TStringBuilder builder; + + if (name == "Iterator" || name == "Member") { + builder << "Reference"; + } else { + builder << name; + } + + if (name == "Limit") { + if (auto* limitNode = subNode.GetValueByPath("Limit")) { + builder << ": " << limitNode->GetStringSafe(); + } + } else if (name == "Filter") { + if (auto* predicateNode = subNode.GetValueByPath("Predicate")) { + auto filter = predicateNode->GetStringSafe(); + prevFilter = filter; + while(true) { + auto p = filter.find("item."); + if (p == filter.npos) { + break; + } + filter.erase(p, 5); + } + while(true) { + auto p = filter.find('<'); + if (p == filter.npos) { + break; + } + filter.erase(p, 1); + filter.insert(p, "<"); + } + builder << ": " << filter; + } + } else if (name == "TopSort") { + if (auto* limitNode = subNode.GetValueByPath("Limit")) { + builder << ", Limit: " << limitNode->GetStringSafe(); + } + if (auto* topSortByNode = subNode.GetValueByPath("TopSortBy")) { + builder << ", TopSortBy: " << topSortByNode->GetStringSafe(); + } + } else if (name == "Iterator" || name == "Member") { + if (auto* referenceNode = subNode.GetValueByPath(name)) { + auto referenceName = referenceNode->GetStringSafe(); + references.insert(referenceName); + builder << ": " << referenceName; + auto cteRef = "CTE " + referenceName; + auto stageCopy = stage; + MemberRefs.emplace_back(cteRef, std::make_pair, ui32>(std::move(stageCopy), stage->Info.size())); + } + } else if (name.Contains("Join")) { + if (auto* conditionNode = subNode.GetValueByPath("Condition")) { + builder << " on " << conditionNode->GetStringSafe(); + } + } + stage->Info.push_back(builder); + + auto est = GetEstimation(subNode); + if (est) { + stage->Info.push_back(est); + } + } + } + } + + stage->StatsNode = node.GetValueByPath("Stats"); + + const NJson::TJsonValue* inputNode = nullptr; + + if (stage->StatsNode) { + + if (auto* tasksNode = stage->StatsNode->GetValueByPath("Tasks")) { + stage->Tasks = tasksNode->GetIntegerSafe(); + Tasks += stage->Tasks; + } + + if (auto* physicalStageIdNode = stage->StatsNode->GetValueByPath("PhysicalStageId")) { + stage->PhysicalStageId = physicalStageIdNode->GetIntegerSafe(); + } + + if (auto* baseTimeNode = stage->StatsNode->GetValueByPath("BaseTimeMs")) { + stage->BaseTime = baseTimeNode->GetIntegerSafe(); + if (BaseTime == 0) { + BaseTime = stage->BaseTime; + } else { + BaseTime = std::min(BaseTime, stage->BaseTime); + } + } + + if (auto* cpuTimeNode = stage->StatsNode->GetValueByPath("CpuTimeUs")) { + stage->CpuTime = std::make_shared(CpuTime, *cpuTimeNode); + + std::vector updatedCpuTimes; + std::vector updatedCpuValues; + + auto itt = TotalCpuTimes.begin(); + auto itv = TotalCpuValues.begin(); + auto ith = stage->CpuTime->History.Values.begin(); + + ui64 v0 = 0; + ui64 v1 = 0; + ui64 t = 0; + + while (itt != TotalCpuTimes.end() || ith != stage->CpuTime->History.Values.end()) { + + if (itt == TotalCpuTimes.end()) { + t = ith->first; + v1 = ith->second; + ith++; + } else if (ith == stage->CpuTime->History.Values.end()) { + t = *itt++; + v0 = *itv++; + } else if (*itt == ith->first) { + t = *itt++; + v0 = *itv++; + v1 = ith->second; + ith++; + } else if (*itt > ith->first) { + t = ith->first; + v1 = ith->second; + ith++; + } else { + t = *itt++; + v0 = *itv++; + } + + updatedCpuTimes.push_back(t); + updatedCpuValues.push_back(v0 + v1); + } + + TotalCpuTimes.swap(updatedCpuTimes); + TotalCpuValues.swap(updatedCpuValues); + } + + if (auto* mmuNode = stage->StatsNode->GetValueByPath("MaxMemoryUsage")) { + stage->MaxMemoryUsage = std::make_shared(MaxMemoryUsage, *mmuNode); + } + + if (auto* spillingComputeBytesNode = stage->StatsNode->GetValueByPath("SpillingComputeBytes")) { + stage->SpillingComputeBytes = std::make_shared(SpillingComputeBytes, *spillingComputeBytesNode); + } + + if (auto* spillingComputeTimeNode = stage->StatsNode->GetValueByPath("SpillingComputeTimeUs")) { + stage->SpillingComputeTime = std::make_shared(SpillingComputeTime, *spillingComputeTimeNode); + } + + if (auto* spillingChannelBytesNode = stage->StatsNode->GetValueByPath("SpillingChannelBytes")) { + stage->SpillingChannelBytes = std::make_shared(SpillingChannelBytes, *spillingChannelBytesNode); + } + + if (auto* spillingChannelTimeNode = stage->StatsNode->GetValueByPath("SpillingChannelTimeUs")) { + stage->SpillingChannelTime = std::make_shared(SpillingChannelTime, *spillingChannelTimeNode); + } + + if (auto* outputNode = stage->StatsNode->GetValueByPath("Output")) { + for (const auto& subNode : outputNode->GetArray()) { + if (auto* nameNode = subNode.GetValueByPath("Name")) { + if (ToString(parentPlanNodeId) == nameNode->GetStringSafe()) { + if (auto* popNode = subNode.GetValueByPath("Pop")) { + if (auto* bytesNode = popNode->GetValueByPath("Bytes")) { + stage->OutputBytes = std::make_shared(OutputBytes, + *bytesNode, + popNode->GetValueByPath("FirstMessageMs"), + popNode->GetValueByPath("LastMessageMs"), + popNode->GetValueByPath("WaitTimeUs.History") + ); + MaxTime = std::max(MaxTime, stage->OutputBytes->MaxTime); + } + if (auto* rowsNode = popNode->GetValueByPath("Rows")) { + stage->OutputRows = std::make_shared(OutputRows, *rowsNode); + } + } + } + } + } + } + + inputNode = stage->StatsNode->GetValueByPath("Input"); + } + + if (auto* subNode = node.GetValueByPath("Plans")) { + for (auto& plan : subNode->GetArray()) { + TString subNodeType; + if (auto* nodeTypeNode = plan.GetValueByPath("Node Type")) { + subNodeType = nodeTypeNode->GetStringSafe(); + } + TString planNodeType; + if (auto* planNodeTypeNode = plan.GetValueByPath("PlanNodeType")) { + planNodeType = planNodeTypeNode->GetStringSafe(); + } + if (planNodeType == "Connection") { + auto* keyColumnsNode = plan.GetValueByPath("KeyColumns"); + if (auto* subNode = plan.GetValueByPath("Plans")) { + for (auto& plan : subNode->GetArray()) { + TString nodeType; + if (auto* nodeTypeNode = plan.GetValueByPath("Node Type")) { + nodeType = nodeTypeNode->GetStringSafe(); + } + if (auto* planNodeTypeNode = plan.GetValueByPath("PlanNodeType")) { + auto planNodeType = planNodeTypeNode->GetStringSafe(); + if (planNodeType) { + ythrow yexception() << "Unexpected plan node type [" << planNodeType << "]"; + } + } + auto connection = std::make_shared(subNodeType, stage->PlanNodeId); + stage->Connections.push_back(connection); + if (keyColumnsNode) { + for (auto& keyColumn : keyColumnsNode->GetArray()) { + stage->Connections.back()->KeyColumns.push_back(keyColumn.GetStringSafe()); + } + } + + if (auto* planNodeIdNode = plan.GetValueByPath("PlanNodeId")) { + auto planNodeId = planNodeIdNode->GetStringRobust(); + if (inputNode) { + for (const auto& subNode : inputNode->GetArray()) { + if (auto* nameNode = subNode.GetValueByPath("Name")) { + if (planNodeId == nameNode->GetStringSafe()) { + if (auto* pushNode = subNode.GetValueByPath("Push")) { + if (auto* bytesNode = pushNode->GetValueByPath("Bytes")) { + connection->InputBytes = std::make_shared(InputBytes, + *bytesNode, + pushNode->GetValueByPath("FirstMessageMs"), + pushNode->GetValueByPath("LastMessageMs"), + pushNode->GetValueByPath("WaitTimeUs.History") + ); + MaxTime = std::max(MaxTime, connection->InputBytes->MaxTime); + } + if (auto* rowsNode = pushNode->GetValueByPath("Rows")) { + connection->InputRows = std::make_shared(InputRows, *rowsNode); + } + } + } + } + } + } + } + + Stages.push_back(std::make_shared(nodeType)); + connection->FromStage = Stages.back(); + LoadStage(Stages.back(), plan, stage->PlanNodeId); + } + } else if (auto* cteNameNode = plan.GetValueByPath("CTE Name")) { + auto cteName = "CTE " + cteNameNode->GetStringSafe(); + auto connection = std::make_shared(subNodeType, stage->PlanNodeId); + connection->CteConnection = true; + stage->Connections.push_back(connection); + if (keyColumnsNode) { + for (auto& keyColumn : keyColumnsNode->GetArray()) { + stage->Connections.back()->KeyColumns.push_back(keyColumn.GetStringSafe()); + } + } + CteRefs.emplace_back(cteName, stage->Connections.back()); + stage->Connections.back()->StatsNode = stage->StatsNode; + } + } else if (planNodeType == "") { + if (subNodeType == "Source") { + if (stage->Source) { + ythrow yexception() << "Plan stage already has linked Source [" << stage->Source->NodeType << "]"; + } + stage->Source = std::make_shared(subNodeType); + LoadSource(stage->Source, plan); + if (!stage->Source->Info.empty()) { + stage->Info.insert(stage->Info.end(), stage->Source->Info.begin(), stage->Source->Info.end()); + } + + if (stage->StatsNode) { + if (auto* ingressTopNode = stage->StatsNode->GetValueByPath("Ingress")) { + if (auto* ingressNode = (*ingressTopNode)[0].GetValueByPath("Ingress")) { + if (auto* bytesNode = ingressNode->GetValueByPath("Bytes")) { + stage->Source->IngressBytes = std::make_shared(IngressBytes, + *bytesNode, + ingressNode->GetValueByPath("FirstMessageMs"), + ingressNode->GetValueByPath("LastMessageMs"), + ingressNode->GetValueByPath("WaitTimeUs.History") + ); + MaxTime = std::max(MaxTime, stage->Source->IngressBytes->MaxTime); + } + if (auto* rowsNode = ingressNode->GetValueByPath("Rows")) { + stage->Source->IngressRows = std::make_shared(IngressRows, *rowsNode); + } + } + } + } + + } else { + stage->Connections.push_back(std::make_shared("Implicit", stage->PlanNodeId)); + Stages.push_back(std::make_shared(subNodeType)); + stage->Connections.back()->FromStage = Stages.back(); + LoadStage(Stages.back(), plan, stage->PlanNodeId); + } + } else { + ythrow yexception() << "Unexpected plan node type [" << planNodeType << "]"; + } + } + } +} + +void TPlan::LoadSource(std::shared_ptr source, const NJson::TJsonValue& node) { + + auto operators = node.GetValueByPath("Operators"); + + if (operators) { + for (const auto& subNode : operators->GetArray()) { + TStringBuilder builder; + builder << "Source"; + if (auto* sourceTypeNode = subNode.GetValueByPath("SourceType")) { + builder << " " << sourceTypeNode->GetStringSafe(); + } + if (auto* nameNode = subNode.GetValueByPath("Name")) { + builder << " " << nameNode->GetStringSafe() << "("; + } + if (auto* readColumnsNode = subNode.GetValueByPath("ReadColumns")) { + bool firstColumn = true; + for (const auto& subNode : readColumnsNode->GetArray()) { + if (firstColumn) { + firstColumn = false; + } else { + builder << ", "; + } + builder << subNode.GetStringSafe(); + } + } + builder << ")"; + source->Info.push_back(builder); + + auto est = GetEstimation(subNode); + if (est) { + source->Info.push_back(est); + } + break; + } + } +} + +void TPlan::MarkStageIndent(ui32 indent, ui32& offsetY, std::shared_ptr stage) { + if (stage->IndentX < indent) { + stage->IndentX = indent; + } + + stage->OffsetY = offsetY; + ui32 height = std::max(stage->Connections.size() + (stage->Source ? 1 : 0) + 3, 4) * (INTERNAL_HEIGHT + INTERNAL_GAP_Y) + INTERNAL_GAP_Y; + stage->Height = height; + stage->IndentY = stage->OffsetY + height; + offsetY += (height + GAP_Y); + + if (stage->Connections.size() > 1) { + indent += (INDENT_X + GAP_X); + } + + for (auto c : stage->Connections) { + if (c->CteConnection) { + c->CteIndentX = indent; + c->CteOffsetY = offsetY; + offsetY += GAP_Y + INTERNAL_HEIGHT + INTERNAL_GAP_Y * 2; + stage->IndentY = std::max(stage->IndentY, offsetY); + } else { + MarkStageIndent(indent, offsetY, c->FromStage); + stage->IndentY = std::max(stage->IndentY, c->FromStage->IndentY); + } + } +} + +void TPlan::MarkLayout() { + ui32 offsetY = 0; + MarkStageIndent(0, offsetY, Stages.front()); + // Compress Reference(s) + for (auto& stage : Stages) { + auto& info = stage->Info; + ui32 i = 0; + while (i < info.size()) { + auto& s = info[i]; + if (s.starts_with("Reference: ")) { + auto next = i + 1; + if (next < info.size()) { + auto& sn = info[next]; + if (sn.starts_with("Reference: ")) { + s.insert(9, "s"); + while (next < info.size()) { + auto& sn = info[next]; + if (sn.starts_with("Reference: ")) { + s += ", " + sn.substr(11); + info.erase(info.begin() + next); + } else { + break; + } + } + } + } + } + i++; + } + } +} + +void TPlan::PrintTimeline(TStringBuilder& background, TStringBuilder& canvas, const TString& title, TAggregation& firstMessage, TAggregation& lastMessage, ui32 x, ui32 y, ui32 w, ui32 h, const TString& color) { + + auto firstMin = firstMessage.Min * w / MaxTime; + auto lastMax = lastMessage.Max * w / MaxTime; + + background + << "" << title << ", Duration: " << FormatTimeMs(lastMessage.Max - firstMessage.Min) << " (" << FormatTimeAgg(firstMessage, lastMessage.Max < 60000) << " - " << FormatTimeAgg(lastMessage, lastMessage.Max < 60000) << ")" + << "" << Endl; + + if (firstMessage.Min < firstMessage.Max) { + auto firstAvg = firstMessage.Avg * w / MaxTime; + auto firstMax = firstMessage.Max * w / MaxTime; + canvas + << " " << Endl + << " " << Endl; + } + + if (lastMessage.Min < lastMessage.Max) { + auto lastMin = lastMessage.Min * w / MaxTime; + auto lastAvg = lastMessage.Avg * w / MaxTime; + canvas + << " " << Endl + << " " << Endl; + } + + background + << "" << Endl; +} + +void TPlan::PrintWaitTime(TStringBuilder& background, std::shared_ptr metric, ui32 x, ui32 y, ui32 w, ui32 h, const TString& fillColor) { + + if (metric->WaitTime.MaxDeriv == 0) { + return; + } + + background + << "" << Endl; +} + +void TPlan::PrintDeriv(TStringBuilder& canvas, TMetricHistory& history, ui32 x, ui32 y, ui32 w, ui32 h, const TString& title, const TString& lineColor, const TString& fillColor) { + + if (history.MaxDeriv == 0) { + return; + } + + if (title) { + canvas << "" << title << "" << Endl; + } + + canvas + << (fillColor ? "(item.second * h / history.MaxDeriv, 1)) << " "; + } + + canvas + << x + history.Deriv.back().first * w / MaxTime << "," << y + (h - 1) << " " + << "' stroke-width='1' stroke='" << lineColor << "' fill='" << (fillColor ? fillColor : "none") << "' />" << Endl; + + if (title) { + canvas << "" << Endl; + } +} + +void TPlan::PrintValues(TStringBuilder& canvas, std::shared_ptr metric, ui32 x, ui32 y, ui32 w, ui32 h, const TString& title, const TString& lineColor, const TString& fillColor) { + + if (metric->History.MaxValue == 0) { + return; + } + + if (title) { + canvas << "" << title << "" << Endl; + } + + canvas + << (fillColor ? "History.Values.front().first * w / MaxTime << "," << y + (h - 1) << " "; + + for (auto& item : metric->History.Values) { + canvas << x + item.first * w / MaxTime << "," << y + (h - std::max(item.second * h / metric->History.MaxValue, 1)) << " "; + } + + canvas + << x + metric->History.Values.back().first * w / MaxTime << "," << y + (h - 1) << " " + << "' stroke-width='1' stroke='" << lineColor << "' fill='" << (fillColor ? fillColor : "none") << "' />" << Endl; + + if (title) { + canvas << "" << Endl; + } +} + +void TPlan::PrintStageSummary(TStringBuilder& background, TStringBuilder&, ui32 y0, std::shared_ptr metric, const TString& mediumColor, const TString& lightColor, const TString& textSum, const TString& tooltip) { + ui32 x0 = Config.HeaderWidth + GAP_X + INTERNAL_GAP_X; + ui32 width = Config.SummaryWidth - INTERNAL_GAP_X * 2; + if (metric->Summary && metric->Summary->Max) { + width = metric->Details.Sum * width / metric->Summary->Max; + } + background + << "" << tooltip << "" << Endl; + if (metric->Details.Max) { + auto wavg = width / 2; + if (metric->Details.Max > metric->Details.Min) { + wavg = (metric->Details.Avg - metric->Details.Min) * width / (metric->Details.Max - metric->Details.Min); + } + background + << " " + << " " << Endl; + } else { + background + << " " << Endl; + } + if (textSum) { + background + << "" << Endl + << "" << textSum << "" << Endl; + } + background + << "" << Endl; +} + +void TPlan::PrintSvg(ui64 maxTime, ui32& offsetY, TStringBuilder& background, TStringBuilder& canvas) { + OffsetY = offsetY; + ui32 planHeight = 0; + + for (auto& s : Stages) { + planHeight = std::max(planHeight, s->IndentY); + background + << "" << Endl; + auto x = Config.HeaderWidth + GAP_X; + background + << "" << Endl; + x += Config.SummaryWidth + GAP_X; + background + << "" << Endl; + if (s->Connections.size() > 1) { + ui32 y = s->OffsetY + s->Height; + background + << "" << Endl; + } + background + << "" << Endl + << "" << ToString(s->PhysicalStageId) << "" << Endl; + + { + ui32 y0 = s->OffsetY + INTERNAL_TEXT_HEIGHT + (INTERNAL_HEIGHT - INTERNAL_TEXT_HEIGHT) / 2 + offsetY; + if (!s->Info.empty()) { + for (auto text : s->Info) { + canvas + << "" << text << "" << Endl; + y0 += (INTERNAL_TEXT_HEIGHT + INTERNAL_GAP_Y); + } + } else { + canvas + << "" << s->NodeType << "" << Endl; + } + } + + + ui32 y0 = s->OffsetY + offsetY + INTERNAL_GAP_Y; + + auto tx0 = Config.HeaderWidth + GAP_X + Config.SummaryWidth + GAP_X + INTERNAL_GAP_X; + auto tx1 = Config.Width - INTERNAL_GAP_X; + auto tw = tx1 - tx0; + auto px = tx0 + TimeOffset * tw / maxTime; + auto pw = MaxTime * tw / maxTime; + + auto taskCount = s->CpuTime ? s->CpuTime->Details.Count : 0; + + if (s->OutputBytes) { + auto textSum = FormatBytes(s->OutputBytes->Details.Sum); + TStringBuilder tooltip; + tooltip + << "Output " + << s->OutputBytes->Details.Sum * 100 / s->OutputBytes->Summary->Value << "%, \u2211" + << textSum << ", " << FormatBytes(s->OutputBytes->Details.Min) << " | " + << FormatBytes(s->OutputBytes->Details.Avg) << " | " << FormatBytes(s->OutputBytes->Details.Max); + if (s->OutputRows && s->OutputRows->Details.Sum) { + tooltip + << ", Rows \u2211" + << FormatIntegerValue(s->OutputRows->Details.Sum) << ", " << FormatIntegerValue(s->OutputRows->Details.Min) << " | " + << FormatIntegerValue(s->OutputRows->Details.Avg) << " | " << FormatIntegerValue(s->OutputRows->Details.Max) + << ", Width " << FormatBytes(s->OutputBytes->Details.Sum / s->OutputRows->Details.Sum); + } + PrintStageSummary(background, canvas, y0, s->OutputBytes, Config.Palette.OutputMedium, Config.Palette.OutputLight, textSum, tooltip); + + if (s->SpillingChannelBytes && s->SpillingChannelBytes->Details.Sum) { + auto textSum = FormatBytes(s->SpillingChannelBytes->Details.Sum); + auto x1 = Config.HeaderWidth + GAP_X + Config.SummaryWidth + - INTERNAL_GAP_X; + auto x0 = x1 - textSum.size() * INTERNAL_TEXT_HEIGHT * 7 / 10; + background + << "" << "Channel Spilling \u2211" << textSum + << ", " << FormatBytes(s->SpillingChannelBytes->Details.Min) << " | " + << FormatBytes(s->SpillingChannelBytes->Details.Avg) << " | " << FormatBytes(s->SpillingChannelBytes->Details.Max) + << "" << Endl + << "" << Endl + << "" << textSum << "" << Endl + << "" << Endl; + } + + if (s->OutputBytes->Details.Count != taskCount) { + canvas + << "" << s->OutputBytes->Details.Count << "" << Endl; + } + + auto d = s->OutputBytes->MaxTime - s->OutputBytes->MinTime; + TStringBuilder title; + title << "Output"; + if (d) { + title << " " << FormatBytes(s->OutputBytes->Details.Sum * 1000 / d) << "/s"; + if (s->OutputRows) { + title << ", Rows " << FormatIntegerValue(s->OutputRows->Details.Sum * 1000 / d) << "/s"; + } + } + PrintTimeline(background, canvas, title, s->OutputBytes->FirstMessage, s->OutputBytes->LastMessage, px, y0, pw, INTERNAL_HEIGHT, Config.Palette.OutputMedium); + + if (!s->OutputBytes->WaitTime.Deriv.empty()) { + PrintWaitTime(background, s->OutputBytes, px, y0, pw, INTERNAL_HEIGHT, Config.Palette.OutputLight); + } + + if (!s->OutputBytes->History.Deriv.empty()) { + PrintDeriv(canvas, s->OutputBytes->History, px, y0, pw, INTERNAL_HEIGHT, "", Config.Palette.OutputDark); + } + } + + // Output is mandatory metric + y0 += INTERNAL_HEIGHT + INTERNAL_GAP_Y; + + if (s->MaxMemoryUsage) { + auto textSum = FormatBytes(s->MaxMemoryUsage->Details.Sum); + TStringBuilder tooltip; + tooltip + << "Memory " + << s->MaxMemoryUsage->Details.Sum * 100 / s->MaxMemoryUsage->Summary->Value << "%, \u2211" + << textSum << ", " << FormatBytes(s->MaxMemoryUsage->Details.Min) << " | " + << FormatBytes(s->MaxMemoryUsage->Details.Avg) << " | " << FormatBytes(s->MaxMemoryUsage->Details.Max); + PrintStageSummary(background, canvas, y0, s->MaxMemoryUsage, Config.Palette.MemMedium, Config.Palette.MemLight, textSum, tooltip); + + if (s->SpillingComputeBytes && s->SpillingComputeBytes->Details.Sum) { + auto textSum = FormatBytes(s->SpillingComputeBytes->Details.Sum); + auto x1 = Config.HeaderWidth + GAP_X + Config.SummaryWidth + - INTERNAL_GAP_X; + auto x0 = x1 - textSum.size() * INTERNAL_TEXT_HEIGHT * 7 / 10; + background + << "" << "Compute Spilling \u2211" << textSum + << ", " << FormatBytes(s->SpillingComputeBytes->Details.Min) << " | " + << FormatBytes(s->SpillingComputeBytes->Details.Avg) << " | " << FormatBytes(s->SpillingComputeBytes->Details.Max) + << "" << Endl + << "" << Endl + << "" << textSum << "" << Endl + << "" << Endl; + } + + if (s->MaxMemoryUsage->Details.Count != taskCount) { + canvas + << "" << s->MaxMemoryUsage->Details.Count << "" << Endl; + } + + if (!s->MaxMemoryUsage->History.Values.empty()) { + PrintValues(canvas, s->MaxMemoryUsage, px, y0, pw, INTERNAL_HEIGHT, "Max MEM " + FormatBytes(s->MaxMemoryUsage->History.MaxValue), Config.Palette.MemMedium, Config.Palette.MemMedium); + } + + if (s->SpillingComputeBytes && !s->SpillingComputeBytes->History.Deriv.empty()) { + PrintDeriv(canvas, s->SpillingComputeBytes->History, px, y0, pw, INTERNAL_HEIGHT, "Spilling Compute", Config.Palette.SpillingBytesMedium, Config.Palette.SpillingBytesLight); + } + } + + y0 += INTERNAL_HEIGHT + INTERNAL_GAP_Y; + + if (s->CpuTime) { + auto textSum = FormatUsage(s->CpuTime->Details.Sum); + TStringBuilder tooltip; + tooltip + << "CPU Usage " + << s->CpuTime->Details.Sum * 100 / s->CpuTime->Summary->Value << "%, \u2211" + << textSum << ", " << FormatUsage(s->CpuTime->Details.Min) << " | " + << FormatUsage(s->CpuTime->Details.Avg) << " | " << FormatUsage(s->CpuTime->Details.Max); + PrintStageSummary(background, canvas, y0, s->CpuTime, Config.Palette.CpuMedium, Config.Palette.CpuLight, textSum, tooltip); + + if (taskCount) { + canvas + << "" << taskCount << "" << Endl; + } + + if (!s->CpuTime->History.Deriv.empty()) { + auto maxCpu = s->CpuTime->History.MaxDeriv * TIME_SERIES_RANGES / (s->CpuTime->History.MaxTime - s->CpuTime->History.MinTime); + PrintDeriv(canvas, s->CpuTime->History, px, y0, pw, INTERNAL_HEIGHT, "Max CPU " + FormatMCpu(maxCpu), Config.Palette.CpuMedium, Config.Palette.CpuLight); + } + + if (s->SpillingComputeTime && !s->SpillingComputeTime->History.Deriv.empty()) { + PrintDeriv(canvas, s->SpillingComputeTime->History, px, y0, pw, INTERNAL_HEIGHT, "Spilling Compute", Config.Palette.SpillingTimeMedium); + } + } + + y0 += INTERNAL_HEIGHT + INTERNAL_GAP_Y; + + for (auto& c : s->Connections) { + + auto x = c->CteConnection ? c->CteIndentX : c->FromStage->IndentX; + auto y = (c->CteConnection ? c->CteOffsetY : c->FromStage->OffsetY) + offsetY; + + if (c->CteConnection) { + auto xx = x; + background + << "" << Endl; + xx = Config.HeaderWidth + GAP_X; + background + << "" << Endl; + xx += Config.SummaryWidth + GAP_X; + background + << "" << Endl; + background + << "" << Endl + << "" << ToString(c->FromStage->PhysicalStageId) << "" << Endl; + + auto s = c->FromStage->Info.empty() ? c->FromStage->NodeType : c->FromStage->Info[0]; + canvas + << "" << s << "" << Endl; + + if (c->CteOutputBytes) { + auto textSum = FormatBytes(c->CteOutputBytes->Details.Sum); + TStringBuilder tooltip; + tooltip + << "Output " + << c->CteOutputBytes->Details.Sum * 100 / c->CteOutputBytes->Summary->Value << "%, \u2211" + << textSum << ", " << FormatBytes(c->CteOutputBytes->Details.Min) << " | " + << FormatBytes(c->CteOutputBytes->Details.Avg) << " | " << FormatBytes(c->CteOutputBytes->Details.Max); + if (c->CteOutputRows && c->CteOutputRows->Details.Sum) { + tooltip + << ", Rows \u2211" + << FormatIntegerValue(c->CteOutputRows->Details.Sum) << ", " << FormatIntegerValue(c->CteOutputRows->Details.Min) << " | " + << FormatIntegerValue(c->CteOutputRows->Details.Avg) << " | " << FormatIntegerValue(c->CteOutputRows->Details.Max) + << ", Width " << FormatBytes(c->CteOutputBytes->Details.Sum / c->CteOutputRows->Details.Sum); + } + PrintStageSummary(background, canvas, y + INTERNAL_GAP_Y, c->CteOutputBytes, Config.Palette.OutputMedium, Config.Palette.OutputLight, textSum, tooltip); + + canvas + << "" << c->CteOutputBytes->Details.Count << "" << Endl; + + auto d = c->CteOutputBytes->MaxTime - c->CteOutputBytes->MinTime; + TStringBuilder title; + title << "Output"; + if (d) { + title << " " << FormatBytes(c->CteOutputBytes->Details.Sum * 1000 / d) << "/s"; + if (c->CteOutputRows) { + title << ", Rows " << FormatIntegerValue(c->CteOutputRows->Details.Sum * 1000 / d) << "/s"; + } + } + PrintTimeline(background, canvas, title, c->CteOutputBytes->FirstMessage, c->CteOutputBytes->LastMessage, px, y + INTERNAL_GAP_Y, pw, INTERNAL_HEIGHT, Config.Palette.OutputMedium); + + if (!c->CteOutputBytes->WaitTime.Deriv.empty()) { + PrintWaitTime(background, c->CteOutputBytes, px, y + INTERNAL_GAP_Y, pw, INTERNAL_HEIGHT, Config.Palette.OutputLight); + } + + if (!c->CteOutputBytes->History.Deriv.empty()) { + PrintDeriv(canvas, c->CteOutputBytes->History, px, y + INTERNAL_GAP_Y, pw, INTERNAL_HEIGHT, "", Config.Palette.OutputDark); + } + } + } + + TString mark; + if (c->NodeType == "HashShuffle") mark = "H"; + else if (c->NodeType == "Merge") mark = "Me"; + else if (c->NodeType == "Map") mark = "Ma"; + else if (c->NodeType == "UnionAll") mark = "U"; + else if (c->NodeType == "Broadcast") mark = "B"; + else mark = "?"; + + canvas + << "Connection: " << c->NodeType; + if (!c->KeyColumns.empty()) { + canvas << " KeyColumns: "; + bool first = true; + for (auto k : c->KeyColumns) { + if (first) { + first = false; + } else { + canvas << ", "; + } + canvas << k; + } + } + canvas + << "" << Endl + << " " << Endl + << " " << mark << "" << Endl + << "" << Endl; + + if (c->InputBytes) { + auto textSum = FormatBytes(c->InputBytes->Details.Sum); + TStringBuilder tooltip; + tooltip + << "Input " + << c->InputBytes->Details.Sum * 100 / c->InputBytes->Summary->Value << "%, \u2211" + << textSum << ", " << FormatBytes(c->InputBytes->Details.Min) << " | " + << FormatBytes(c->InputBytes->Details.Avg) << " | " << FormatBytes(c->InputBytes->Details.Max); + if (c->InputRows && c->InputRows->Details.Sum) { + tooltip + << ", Rows \u2211" + << FormatIntegerValue(c->InputRows->Details.Sum) << ", " << FormatIntegerValue(c->InputRows->Details.Min) << " | " + << FormatIntegerValue(c->InputRows->Details.Avg) << " | " << FormatIntegerValue(c->InputRows->Details.Max) + << ", Width " << FormatBytes(c->InputBytes->Details.Sum / c->InputRows->Details.Sum); + } + PrintStageSummary(background, canvas, y0, c->InputBytes, Config.Palette.InputMedium, Config.Palette.InputLight, textSum, tooltip); + + if (c->InputBytes->Details.Count != taskCount) { + canvas + << "" << c->InputBytes->Details.Count << "" << Endl; + } + + auto d = c->InputBytes->MaxTime - c->InputBytes->MinTime; + TStringBuilder title; + title << "Input"; + if (d) { + title << " " << FormatBytes(c->InputBytes->Details.Sum * 1000 / d) << "/s"; + if (c->InputRows) { + title << ", Rows " << FormatIntegerValue(c->InputRows->Details.Sum * 1000 / d) << "/s"; + } + } + PrintTimeline(background, canvas, title, c->InputBytes->FirstMessage, c->InputBytes->LastMessage, px, y0, pw, INTERNAL_HEIGHT, Config.Palette.InputMedium); + + if (!c->InputBytes->WaitTime.Deriv.empty()) { + PrintWaitTime(background, c->InputBytes, px, y0, pw, INTERNAL_HEIGHT, Config.Palette.InputLight); + } + + if (!c->InputBytes->History.Deriv.empty()) { + PrintDeriv(canvas, c->InputBytes->History, px, y0, pw, INTERNAL_HEIGHT, "", Config.Palette.InputDark); + } + + y0 += INTERNAL_HEIGHT + INTERNAL_GAP_Y; + } + } + + if (s->Source && s->Source->IngressBytes) { + auto textSum = FormatBytes(s->Source->IngressBytes->Details.Sum); + TStringBuilder tooltip; + tooltip + << "Ingress " + << s->Source->IngressBytes->Details.Sum * 100 / s->Source->IngressBytes->Summary->Value << "%, \u2211" + << textSum << ", " << FormatBytes(s->Source->IngressBytes->Details.Min) << " | " + << FormatBytes(s->Source->IngressBytes->Details.Avg) << " | " << FormatBytes(s->Source->IngressBytes->Details.Max); + if (s->Source->IngressRows && s->Source->IngressRows->Details.Sum) { + tooltip + << ", Rows \u2211" + << FormatIntegerValue(s->Source->IngressRows->Details.Sum) << ", " << FormatIntegerValue(s->Source->IngressRows->Details.Min) << " | " + << FormatIntegerValue(s->Source->IngressRows->Details.Avg) << " | " << FormatIntegerValue(s->Source->IngressRows->Details.Max) + << ", Width " << FormatBytes(s->Source->IngressBytes->Details.Sum / s->Source->IngressRows->Details.Sum); + } + PrintStageSummary(background, canvas, y0, s->Source->IngressBytes, Config.Palette.IngressMedium, Config.Palette.IngressLight, textSum, tooltip); + + if (s->Source->IngressBytes->Details.Count != taskCount) { + canvas + << "" << s->Source->IngressBytes->Details.Count << "" << Endl; + } + + auto d = s->Source->IngressBytes->MaxTime - s->Source->IngressBytes->MinTime; + TStringBuilder title; + title << "Ingress"; + if (d) { + title << " " << FormatBytes(s->Source->IngressBytes->Details.Sum * 1000 / d) << "/s"; + if (s->Source->IngressRows) { + title << ", Rows " << FormatIntegerValue(s->Source->IngressRows->Details.Sum / d) << "/s"; + } + } + PrintTimeline(background, canvas, title, s->Source->IngressBytes->FirstMessage, s->Source->IngressBytes->LastMessage, px, y0, pw, INTERNAL_HEIGHT, Config.Palette.IngressMedium); + + if (!s->Source->IngressBytes->WaitTime.Deriv.empty()) { + PrintWaitTime(background, s->Source->IngressBytes, px, y0, pw, INTERNAL_HEIGHT, Config.Palette.IngressLight); + } + + if (!s->Source->IngressBytes->History.Deriv.empty()) { + PrintDeriv(canvas, s->Source->IngressBytes->History, px, y0, pw, INTERNAL_HEIGHT, "", Config.Palette.IngressDark); + } + + y0 += INTERNAL_HEIGHT + INTERNAL_GAP_Y; + } + } + + offsetY += planHeight; +} + +TColorPalette::TColorPalette() { + StageMain = "var(--stage-main, #F2F2F2)"; + StageClone = "var(--stage-clone, #D9D9D9"; + StageText = "var(--stage-text, #262626)"; + StageTextHighlight = "var(--stage-texthl, #EA0703)"; + StageGrid = "var(--stage-grid, #B2B2B2"; + IngressDark = "var(--ingress-dark, #574F38)"; + IngressMedium = "var(--ingress-medium, #82723C)"; + IngressLight = "var(--ingress-light, #C0A645)"; + InputDark = "var(--input-dark, #315B34)"; + InputMedium = "var(--input-medium, #379A33)"; + InputLight = "var(--input-light, #3AC936)"; + OutputDark = "var(--output-dark, #3F5799)"; + OutputMedium = "var(--output-medium, #4E79EB)"; + OutputLight = "var(--output-light, #86A8FF)"; + MemMedium = "var(--mem-medium, #543B70)"; + MemLight = "var(--mem-light, #854EBD)"; + CpuMedium = "var(--cpu-medium, #EA0703)"; + CpuLight = "var(--cpu-light, #FF6866)"; + ConnectionFill= "var(--conn-fill, #BFBFBF)"; + ConnectionLine= "var(--conn-line, #BFBFBF)"; + ConnectionText= "var(--conn-text, #393939)"; + MinMaxLine = "var(--minmax-line, #FFDB4D)"; + TextLight = "var(--text-light, #FFFFFF)"; + TextInverted = "var(--text-inv, #FFFFFF)"; + TextSummary = "var(--text-summary, #262626)"; + SpillingBytesDark = "var(--spill-dark, #406B61)"; + SpillingBytesMedium = "var(--spill-medium, #599587)"; + SpillingBytesLight = "var(--spill-light, #72C0AE)"; + SpillingTimeDark = "var(--spill-dark, #406B61)"; + SpillingTimeMedium = "var(--spill-medium, #599587)"; + SpillingTimeLight = "var(--spill-light, #72C0AE)"; +} + +TPlanViewConfig::TPlanViewConfig() { + HeaderWidth = 300; + SummaryWidth = 128; + Width = 1024; +} + + +void TPlanVisualizer::LoadPlans(const TString& plans) { + NJson::TJsonReaderConfig jsonConfig; + NJson::TJsonValue jsonNode; + if (NJson::ReadJsonTree(plans, &jsonConfig, &jsonNode)) { + if (auto* topNode = jsonNode.GetValueByPath("Plan")) { + if (auto* subNode = topNode->GetValueByPath("Plans")) { + for (auto& plan : subNode->GetArray()) { + if (auto* typeNode = plan.GetValueByPath("Node Type")) { + auto nodeType = typeNode->GetStringSafe(); + LoadPlan(nodeType, plan); + } + } + } + } + } + PostProcessPlans(); +} + +void TPlanVisualizer::LoadPlan(const TString& nodeType, const NJson::TJsonValue& node) { + Plans.emplace_back(nodeType, Config, CteStages, CteSubPlans); + Plans.back().Load(node); +} + +void TPlanVisualizer::PostProcessPlans() { + // Fix CTE Refs + for (auto& p : Plans) { + p.ResolveCteRefs(); + } + // Fix Layouts + for (auto& p : Plans) { + p.MarkLayout(); + if (BaseTime == 0) { + BaseTime = p.BaseTime; + } else { + BaseTime = std::min(BaseTime, p.BaseTime); + } + } + // Fix time Offsets + for (auto& p : Plans) { + p.TimeOffset = p.BaseTime - BaseTime; + MaxTime = std::max(MaxTime, p.TimeOffset + p.MaxTime); + } +} + +TString TPlanVisualizer::PrintSvgSafe() { + try { + return PrintSvg(); + } catch (std::exception& e) { + return Sprintf("%s", e.what()); + } +} + +TString TPlanVisualizer::PrintSvg() { + TStringBuilder background; + TStringBuilder canvas; + TStringBuilder svg; + + ui32 offsetY = 0; + + ui32 summary3 = (Config.SummaryWidth - INTERNAL_GAP_X * 2) / 3; + for (auto& p : Plans) { + offsetY += GAP_Y; + canvas + << "" + << p.NodeType << "" << Endl; + + canvas + << "" << ToString(p.Tasks) << "" << Endl; + + canvas + << "Ingress " + << FormatBytes(p.IngressBytes->Value) << ", Rows " << FormatIntegerValue(p.IngressRows->Value); + if (p.IngressRows->Value) { + canvas + << ", Width " << p.IngressBytes->Value / p.IngressRows->Value << "B"; + } + if (p.MaxTime) { + canvas + << ", Avg " << FormatBytes(p.IngressBytes->Value * 1000 / p.MaxTime) << "/s"; + } + canvas + << "" << Endl + << " " << Endl + << " " << FormatBytes(p.IngressBytes->Value) << "" << Endl + << "" << Endl; + + canvas + << "CPU Usage " << FormatUsage(p.CpuTime->Value); + if (p.MaxTime) { + auto usagePS = p.CpuTime->Value / p.MaxTime; + usagePS /= 10; + canvas + << ", Avg " << Sprintf("%lu.%.2lu", usagePS / 100, usagePS % 100) << " CPU/s"; + } + canvas + << "" << Endl + << " " << Endl + << " " << FormatUsage(p.CpuTime->Value) << "" << Endl + << "" << Endl; + + canvas + << "Memory " << FormatBytes(p.MaxMemoryUsage->Value) << "" << Endl + << " " << Endl + << " " << FormatBytes(p.MaxMemoryUsage->Value) << "" << Endl + << "" << Endl; + + auto w = Config.Width - (Config.HeaderWidth + GAP_X + Config.SummaryWidth + GAP_X); + auto x = (Config.HeaderWidth + GAP_X + Config.SummaryWidth + GAP_X) + w * (p.MaxTime + p.TimeOffset) / MaxTime; + canvas + << "" << "Duration: " << FormatTimeMs(p.MaxTime) << ", Total " << FormatTimeMs(p.MaxTime + p.TimeOffset) << "" << Endl + << " " << Endl + << " " << FormatTimeMs(p.MaxTime + p.TimeOffset) << "" << Endl + << "" << Endl; + + offsetY += TIME_HEIGHT; + if (!p.TotalCpuTime.Deriv.empty()) { + + auto tx0 = Config.HeaderWidth + GAP_X + Config.SummaryWidth + GAP_X + INTERNAL_GAP_X; + auto tx1 = Config.Width - INTERNAL_GAP_X; + auto tw = tx1 - tx0; + auto maxCpu = p.TotalCpuTime.MaxDeriv * TIME_SERIES_RANGES / (p.TotalCpuTime.MaxTime - p.TotalCpuTime.MinTime); + p.PrintDeriv(canvas, p.TotalCpuTime, tx0, offsetY, tw, INTERNAL_HEIGHT, "Max CPU " + FormatMCpu(maxCpu), Config.Palette.CpuMedium, Config.Palette.CpuLight); + } + offsetY += INTERNAL_HEIGHT; + p.PrintSvg(MaxTime, offsetY, background, canvas); + } + + svg << "" << Endl; + svg << "" + << "" << Endl; + svg << TString(background) << Endl; + + { + ui64 maxSec = MaxTime / 1000; + ui64 deltaSec = 0; + + if (maxSec <= 10) deltaSec = 1; + else if (maxSec <= 20) deltaSec = 2; + else if (maxSec <= 30) deltaSec = 3; + else if (maxSec <= 40) deltaSec = 4; + else if (maxSec <= 50) deltaSec = 5; + else if (maxSec <= 60) deltaSec = 6; + else if (maxSec <= 100) deltaSec = 10; + else if (maxSec <= 150) deltaSec = 15; + else if (maxSec <= 200) deltaSec = 20; + else if (maxSec <= 300) deltaSec = 30; + else if (maxSec <= 600) deltaSec = 60; + else if (maxSec <= 1200) deltaSec = 120; + else if (maxSec <= 1800) deltaSec = 180; + else if (maxSec <= 3600) deltaSec = 360; + else { + ui64 stepSec = maxSec / 10; + deltaSec = stepSec - (stepSec % 60); + } + + auto x = Config.HeaderWidth + GAP_X + Config.SummaryWidth + GAP_X; + auto w = Config.Width - x - INTERNAL_GAP_X * 2; + + for (ui64 t = 0; t < maxSec; t += deltaSec) { + ui64 x1 = t * w / maxSec; + svg + << "" << Endl; + auto timeLabel = Sprintf("%lu:%.2lu", t / 60, t % 60); + for (auto& p : Plans) { + svg + << "" + << timeLabel << "" << Endl; + } + } + } + + svg << TString(canvas) << Endl; + svg << "" << Endl; + + return svg; +} diff --git a/ydb/public/lib/ydb_cli/common/plan2svg.h b/ydb/public/lib/ydb_cli/common/plan2svg.h new file mode 100644 index 000000000000..c4115a05f355 --- /dev/null +++ b/ydb/public/lib/ydb_cli/common/plan2svg.h @@ -0,0 +1,264 @@ +#pragma once + +#include +#include +#include + +#include +#include +#include + +#include +#include + +class TStage; + +class TSummaryMetric { + +public: + ui64 Value = 0; + ui32 Count = 0; + ui64 Min = 0; + ui64 Max = 0; + + void Add(ui64 value) { + if (Count) { + Min = std::min(Min, value); + Max = std::max(Max, value); + } else { + Min = value; + Max = value; + } + Value += value; + Count++; + } + + ui64 Average() { + return Count ? (Value / Count) : 0; + } +}; + +struct TAggregation { + ui64 Min = 0; + ui64 Max = 0; + ui64 Avg = 0; + ui64 Sum = 0; + ui32 Count = 0; + + bool Load(const NJson::TJsonValue& node); +}; + +struct TMetricHistory { + std::vector> Deriv; + ui64 MaxDeriv = 0; + std::vector> Values; + ui64 MaxValue = 0; + ui64 MinTime = 0; + ui64 MaxTime = 0; + + void Load(const NJson::TJsonValue& node, ui64 explicitMinTime, ui64 explicitMaxTime); + void Load(std::vector& times, std::vector& values, ui64 explicitMinTime, ui64 explicitMaxTime); +}; + +class TSingleMetric { + +public: + TSingleMetric(std::shared_ptr summary, const NJson::TJsonValue& node, + const NJson::TJsonValue* firstMessageNode = nullptr, + const NJson::TJsonValue* lastMessageNode = nullptr, + const NJson::TJsonValue* waitTimeUsNode = nullptr); + + std::shared_ptr Summary; + TAggregation Details; + + TMetricHistory History; + TMetricHistory WaitTime; + ui64 MinTime = 0; + ui64 MaxTime = 0; + TAggregation FirstMessage; + TAggregation LastMessage; +}; + +class TConnection { + +public: + TConnection(const TString& nodeType, ui32 stagePlanNodeId) : NodeType(nodeType), StagePlanNodeId(stagePlanNodeId) { + } + + TString NodeType; + std::shared_ptr FromStage; + std::shared_ptr InputBytes; + std::shared_ptr InputRows; + std::vector KeyColumns; + bool CteConnection = false; + ui32 CteIndentX = 0; + ui32 CteOffsetY = 0; + std::shared_ptr CteOutputBytes; + std::shared_ptr CteOutputRows; + const NJson::TJsonValue* StatsNode = nullptr; + const ui32 StagePlanNodeId; +}; + +class TSource { + +public: + TSource(const TString& nodeType) : NodeType(nodeType) { + } + + TString NodeType; + std::shared_ptr IngressBytes; + std::shared_ptr IngressRows; + std::vector Info; +}; + +class TStage { + +public: + TStage(const TString& nodeType) : NodeType(nodeType) { + } + + TString NodeType; + std::shared_ptr Source; + std::shared_ptr IngressBytes; + std::vector> Connections; + ui32 IndentX = 0; + ui32 IndentY = 0; + ui32 OffsetY = 0; + ui32 Height = 0; + std::shared_ptr CpuTime; + std::shared_ptr MaxMemoryUsage; + std::shared_ptr OutputBytes; + std::shared_ptr OutputRows; + std::shared_ptr SpillingComputeTime; + std::shared_ptr SpillingComputeBytes; + std::shared_ptr SpillingChannelTime; + std::shared_ptr SpillingChannelBytes; + std::vector Info; + ui64 BaseTime = 0; + ui32 PlanNodeId = 0; + ui32 PhysicalStageId = 0; + ui32 Tasks = 0; + const NJson::TJsonValue* StatsNode = nullptr; +}; + +struct TColorPalette { + TColorPalette(); + TString StageMain; + TString StageClone; + TString StageText; + TString StageTextHighlight; + TString StageGrid; + TString IngressDark; + TString IngressMedium; + TString IngressLight; + TString InputDark; + TString InputMedium; + TString InputLight; + TString OutputDark; + TString OutputMedium; + TString OutputLight; + TString MemMedium; + TString MemLight; + TString CpuMedium; + TString CpuLight; + TString ConnectionFill; + TString ConnectionLine; + TString ConnectionText; + TString MinMaxLine; + TString TextLight; + TString TextInverted; + TString TextSummary; + TString SpillingBytesDark; + TString SpillingBytesMedium; + TString SpillingBytesLight; + TString SpillingTimeDark; + TString SpillingTimeMedium; + TString SpillingTimeLight; +}; + +struct TPlanViewConfig { + TPlanViewConfig(); + ui32 HeaderWidth; + ui32 SummaryWidth; + ui32 Width; + TColorPalette Palette; +}; + +class TPlan { + +public: + TPlan(const TString& nodeType, TPlanViewConfig& config, std::map>& cteStages, + std::map& cteSubPlans) + : NodeType(nodeType), Config(config), CteStages(cteStages), CteSubPlans(cteSubPlans) { + CpuTime = std::make_shared(); + MaxMemoryUsage = std::make_shared(); + OutputBytes = std::make_shared(); + OutputRows = std::make_shared(); + InputBytes = std::make_shared(); + InputRows = std::make_shared(); + IngressBytes = std::make_shared(); + IngressRows = std::make_shared(); + SpillingComputeTime = std::make_shared(); + SpillingComputeBytes = std::make_shared(); + SpillingChannelTime = std::make_shared(); + SpillingChannelBytes = std::make_shared(); + } + + void Load(const NJson::TJsonValue& node); + void LoadStage(std::shared_ptr stage, const NJson::TJsonValue& node, ui32 parentPlanNodeId); + void LoadSource(std::shared_ptr source, const NJson::TJsonValue& node); + void MarkStageIndent(ui32 indentX, ui32& offsetY, std::shared_ptr stage); + void MarkLayout(); + void ResolveCteRefs(); + void PrintTimeline(TStringBuilder& background, TStringBuilder& canvas, const TString& title, TAggregation& firstMessage, TAggregation& lastMessage, ui32 x, ui32 y, ui32 w, ui32 h, const TString& color); + void PrintWaitTime(TStringBuilder& canvas, std::shared_ptr metric, ui32 x, ui32 y, ui32 w, ui32 h, const TString& fillColor); + void PrintDeriv(TStringBuilder& canvas, TMetricHistory& history, ui32 x, ui32 y, ui32 w, ui32 h, const TString& title, const TString& lineColor, const TString& fillColor = ""); + void PrintValues(TStringBuilder& canvas, std::shared_ptr metric, ui32 x, ui32 y, ui32 w, ui32 h, const TString& title, const TString& lineColor, const TString& fillColor = ""); + void PrintStageSummary(TStringBuilder& background, TStringBuilder&, ui32 y0, std::shared_ptr metric, const TString& mediumColor, const TString& lightColor, const TString& textSum, const TString& tooltip); + void PrintSvg(ui64 maxTime, ui32& offsetY, TStringBuilder& background, TStringBuilder& canvas); + TString NodeType; + std::vector> Stages; + std::shared_ptr CpuTime; + std::shared_ptr MaxMemoryUsage; + std::shared_ptr OutputBytes; + std::shared_ptr OutputRows; + std::shared_ptr InputBytes; + std::shared_ptr InputRows; + std::shared_ptr IngressBytes; + std::shared_ptr IngressRows; + std::shared_ptr SpillingComputeTime; + std::shared_ptr SpillingComputeBytes; + std::shared_ptr SpillingChannelTime; + std::shared_ptr SpillingChannelBytes; + std::vector TotalCpuTimes; + std::vector TotalCpuValues; + TMetricHistory TotalCpuTime; + ui64 MaxTime = 1000; + ui64 BaseTime = 0; + ui64 TimeOffset = 0; + ui32 OffsetY = 0; + ui32 Tasks = 0; + std::vector>> CteRefs; + std::vector, ui32>>> MemberRefs; + TPlanViewConfig& Config; + std::map>& CteStages; + std::map& CteSubPlans; +}; + +class TPlanVisualizer { + +public: + + void LoadPlans(const TString& plans); + void LoadPlan(const TString& planNodeType, const NJson::TJsonValue& root); + void PostProcessPlans(); + TString PrintSvg(); + TString PrintSvgSafe(); + + std::vector Plans; + ui64 MaxTime = 1000; + ui64 BaseTime = 0; + TPlanViewConfig Config; + std::map> CteStages; + std::map CteSubPlans; +}; diff --git a/ydb/public/lib/ydb_cli/common/ya.make b/ydb/public/lib/ydb_cli/common/ya.make index 533d768660f6..16c9c4835ee4 100644 --- a/ydb/public/lib/ydb_cli/common/ya.make +++ b/ydb/public/lib/ydb_cli/common/ya.make @@ -13,6 +13,7 @@ SRCS( parameter_stream.cpp parameters.cpp pg_dump_parser.cpp + plan2svg.cpp pretty_table.cpp print_operation.cpp print_utils.cpp diff --git a/ydb/tests/fq/plans/test_stats_mode.py b/ydb/tests/fq/plans/test_stats_mode.py index f8a7a62d3d85..7679b3ac6133 100644 --- a/ydb/tests/fq/plans/test_stats_mode.py +++ b/ydb/tests/fq/plans/test_stats_mode.py @@ -14,7 +14,7 @@ class TestStatsMode: @pytest.mark.parametrize( "stats_mode", ["STATS_MODE_NONE", "STATS_MODE_BASIC", "STATS_MODE_FULL", "STATS_MODE_PROFILE"] ) - def test_mode(self, kikimr, s3, client, stats_mode): + def test_mode(self, kikimr, s3, client, stats_mode, yq_version): resource = boto3.resource( "s3", endpoint_url=s3.s3_url, aws_access_key_id="key", aws_secret_access_key="secret_key" ) @@ -50,6 +50,10 @@ def test_mode(self, kikimr, s3, client, stats_mode): query_id = client.create_query("simple", sql, type=fq.QueryContent.QueryType.ANALYTICS).result.query_id client.wait_query_status(query_id, fq.QueryMeta.COMPLETED) + if yq_version == "v2": + result = client.describe_query(query_id).result + assert " list[str]: return [cli] class QueuePlan: - def __init__(self, plan: dict | None = None, table: str | None = None, ast: str | None = None) -> None: + def __init__(self, plan: dict | None = None, table: str | None = None, ast: str | None = None, svg: str | None = None) -> None: self.plan = plan self.table = table self.ast = ast + self.svg = svg class WorkloadRunResult: def __init__( @@ -92,6 +93,9 @@ def workload_run(type: WorkloadType, path: str, query_num: int, iterations: int if (os.path.exists(plan_path + '.ast')): with open(plan_path + '.ast') as f: plan.ast = f.read() + if (os.path.exists(plan_path + '.svg')): + with open(plan_path + '.svg') as f: + plan.svg = f.read() return YdbCliHelper.WorkloadRunResult( stats=stats, diff --git a/ydb/tests/olap/load/conftest.py b/ydb/tests/olap/load/conftest.py index f119887bd365..e2139f65638f 100644 --- a/ydb/tests/olap/load/conftest.py +++ b/ydb/tests/olap/load/conftest.py @@ -46,6 +46,8 @@ def _get_duraton(stats, field): allure.attach(result.plan.table, 'Plan table', attachment_type=allure.attachment_type.TEXT) if result.plan.ast is not None: allure.attach(result.plan.ast, 'Plan ast', attachment_type=allure.attachment_type.TEXT) + if result.plan.svg is not None: + allure.attach(result.plan.svg, 'Plan svg', attachment_type=allure.attachment_type.SVG) if result.stdout is not None: allure.attach(result.stdout, 'Stdout', attachment_type=allure.attachment_type.TEXT) diff --git a/ydb/tests/tools/fq_runner/kikimr_utils.py b/ydb/tests/tools/fq_runner/kikimr_utils.py index b3f86b84a3d8..9deb83e14262 100644 --- a/ydb/tests/tools/fq_runner/kikimr_utils.py +++ b/ydb/tests/tools/fq_runner/kikimr_utils.py @@ -114,6 +114,7 @@ def apply_to_kikimr(self, request, kikimr): solomon_endpoint = os.environ.get('SOLOMON_URL') if solomon_endpoint is not None: kikimr.compute_plane.fq_config['common']['monitoring_endpoint'] = solomon_endpoint + kikimr.control_plane.fq_config['common']['show_query_timeline'] = True class YQv2Extension(ExtensionPoint): diff --git a/ydb/tests/tools/kqprun/src/kqp_runner.cpp b/ydb/tests/tools/kqprun/src/kqp_runner.cpp index 399025dcee25..fdcaefc4901f 100644 --- a/ydb/tests/tools/kqprun/src/kqp_runner.cpp +++ b/ydb/tests/tools/kqprun/src/kqp_runner.cpp @@ -322,7 +322,7 @@ class TKqpRunner::TImpl { try { double cpuUsage = 0.0; - auto fullStat = StatProcessor_->GetQueryStat(convertedPlan, cpuUsage); + auto fullStat = StatProcessor_->GetQueryStat(convertedPlan, cpuUsage, nullptr, 0); auto flatStat = StatProcessor_->GetFlatStat(convertedPlan); auto publicStat = StatProcessor_->GetPublicStat(fullStat);