From e0898f8af5aea15752b12548213f6abf83396a79 Mon Sep 17 00:00:00 2001 From: Alexey Efimov Date: Mon, 8 Jan 2024 18:00:59 +0000 Subject: [PATCH] add different metrics to graph service --- ydb/core/base/pool_stats_collector.cpp | 4 -- ydb/core/graph/api/events.h | 4 ++ ydb/core/graph/api/service.h | 2 + ydb/core/graph/service/service_impl.cpp | 28 +++++++++ ydb/core/graph/shard/backends.cpp | 10 ++- ydb/core/graph/shard/shard_impl.cpp | 2 +- ydb/core/graph/shard/shard_impl.h | 2 +- ydb/core/graph/shard/tx_clear_data.cpp | 1 - ydb/core/graph/shard/tx_monitoring.cpp | 38 +++++++++-- ydb/core/graph/shard/ut/shard_ut.cpp | 24 +++++++ ydb/core/sys_view/service/ext_counters.cpp | 73 +++++++++++++++++++++- ydb/core/sys_view/service/ya.make | 1 + ydb/core/viewer/json_render.h | 12 +++- 13 files changed, 184 insertions(+), 17 deletions(-) diff --git a/ydb/core/base/pool_stats_collector.cpp b/ydb/core/base/pool_stats_collector.cpp index fe3041d40249..0ddce4c08d37 100644 --- a/ydb/core/base/pool_stats_collector.cpp +++ b/ydb/core/base/pool_stats_collector.cpp @@ -47,15 +47,11 @@ class TStatsCollectingActor : public NActors::TStatsCollectingActor { MiniKQLPoolStats.Update(); TVector> pools; - double cpuUsage = 0; for (const auto& pool : PoolCounters) { pools.emplace_back(pool.Name, pool.Usage, pool.Threads); - cpuUsage += pool.Usage; } ctx.Send(NNodeWhiteboard::MakeNodeWhiteboardServiceId(ctx.SelfID.NodeId()), new NNodeWhiteboard::TEvWhiteboard::TEvSystemStateUpdate(pools)); - - ctx.Send(NGraph::MakeGraphServiceId(), new NGraph::TEvGraph::TEvSendMetrics("cpu_usage", cpuUsage)); } private: diff --git a/ydb/core/graph/api/events.h b/ydb/core/graph/api/events.h index b05350630cf2..f7c496668f75 100644 --- a/ydb/core/graph/api/events.h +++ b/ydb/core/graph/api/events.h @@ -21,6 +21,10 @@ struct TEvGraph { TEvSendMetrics() = default; TEvSendMetrics(const TString& name, double value) { + AddMetric(name, value); + } + + void AddMetric(const TString& name, double value) { NKikimrGraph::TMetric* metric = Record.AddMetrics(); metric->SetName(name); metric->SetValue(value); diff --git a/ydb/core/graph/api/service.h b/ydb/core/graph/api/service.h index 88c66eaa8e50..4483a73839e7 100644 --- a/ydb/core/graph/api/service.h +++ b/ydb/core/graph/api/service.h @@ -14,5 +14,7 @@ inline TActorId MakeGraphServiceId(ui32 node = 0) { IActor* CreateGraphService(const TString& database); +double GetTimingForPercentile(double percentile, const TVector& values, const TVector& /*upper*/bounds, ui64 total); + } // NGraph } // NKikimr diff --git a/ydb/core/graph/service/service_impl.cpp b/ydb/core/graph/service/service_impl.cpp index 89746613f508..9eb834201cd7 100644 --- a/ydb/core/graph/service/service_impl.cpp +++ b/ydb/core/graph/service/service_impl.cpp @@ -230,6 +230,7 @@ class TGraphService : public TActor { hFunc(TEvGraph::TEvMetricsResult, Handle); hFunc(TEvTxProxySchemeCache::TEvNavigateKeySetResult, Handle); hFunc(TEvTabletPipe::TEvClientConnected, Handle); + hFunc(TEvTabletPipe::TEvClientDestroyed, Handle); cFunc(TEvents::TSystem::Wakeup, HandleTimeout); } } @@ -247,5 +248,32 @@ IActor* CreateGraphService(const TString& database) { return new TGraphService(database); } +double GetTimingForPercentile(double percentile, const TVector& values, const TVector& /*upper*/bounds, ui64 total) { + ui64 ppMark = total * percentile / 100; + ui64 accm = 0; + ui32 n = 0; + while (n < bounds.size() && accm < ppMark) { + if (accm + values[n] >= ppMark) { + ui64 lowerBound = 0; + if (n > 0) { + lowerBound = bounds[n - 1]; + } + ui64 upperBound = bounds[n]; + if (upperBound == std::numeric_limits::max()) { + return lowerBound; // workaround for INF bucket + } + ui64 currentValue = values[n]; + ui64 ppValue = ppMark - accm; + if (currentValue == 0) { + return NAN; + } + return (static_cast(ppValue) / currentValue) * (upperBound - lowerBound) + lowerBound; + } + accm += values[n]; + n++; + } + return NAN; +} + } // NGraph } // NKikimr diff --git a/ydb/core/graph/shard/backends.cpp b/ydb/core/graph/shard/backends.cpp index e7393bac050e..55c9705d2558 100644 --- a/ydb/core/graph/shard/backends.cpp +++ b/ydb/core/graph/shard/backends.cpp @@ -202,6 +202,7 @@ bool TLocalBackend::GetMetrics(NTabletFlatExecutor::TTransactionContext& txc, co } } TMetricsValues metricValues; + BLOG_D("Querying from " << minTime << " to " << maxTime); auto rowset = db.Table().GreaterOrEqual(minTime).LessOrEqual(maxTime).Select(); if (!rowset.IsReady()) { return false; @@ -220,7 +221,7 @@ bool TLocalBackend::GetMetrics(NTabletFlatExecutor::TTransactionContext& txc, co ui64 id = rowset.GetValue(); auto itIdx = metricIdx.find(id); if (itIdx != metricIdx.end()) { - metricValues.Values.back()[itIdx->second] = rowset.GetValue(); + metricValues.Values[itIdx->second].back() = rowset.GetValue(); } if (!rowset.Next()) { return false; @@ -237,18 +238,21 @@ bool TLocalBackend::ClearData(NTabletFlatExecutor::TTransactionContext& txc, TIn if (!rowset.IsReady()) { return false; } + ui64 prevTimestamp = 0; while (!rowset.EndOfSet()) { ui64 timestamp = rowset.GetValue(); ui64 id = rowset.GetValue(); db.Table().Key(timestamp, id).Delete(); newStartTimestamp = TInstant::Seconds(timestamp); - if (++rows >= MAX_ROWS_TO_DELETE) { - break; + if (timestamp != prevTimestamp && ++rows >= MAX_ROWS_TO_DELETE) { // we count as a logical row every unique timestamp + break; // so for database it will be MAX_ROWS * NUM_OF_METRICS rows } + prevTimestamp = timestamp; if (!rowset.Next()) { return false; } } + BLOG_D("Cleared " << rows << " logical rows"); return true; } diff --git a/ydb/core/graph/shard/shard_impl.cpp b/ydb/core/graph/shard/shard_impl.cpp index 1c50c8b82eea..2942f69549b3 100644 --- a/ydb/core/graph/shard/shard_impl.cpp +++ b/ydb/core/graph/shard/shard_impl.cpp @@ -91,7 +91,7 @@ void TGraphShard::Handle(TEvGraph::TEvSendMetrics::TPtr& ev) { MetricsData.Timestamp = now; MetricsData.Values.clear(); } - if ((now - StartTimestamp) > DURATION_CLEAR_TRIGGER && (now - ClearTimestamp) < DURATION_CLEAR_PERIOD) { + if ((now - StartTimestamp) > DURATION_CLEAR_TRIGGER && (now - ClearTimestamp) > DURATION_CLEAR_PERIOD) { ClearTimestamp = now; BLOG_TRACE("Executing TxClearData"); ExecuteTxClearData(); diff --git a/ydb/core/graph/shard/shard_impl.h b/ydb/core/graph/shard/shard_impl.h index fca4eafbc014..15e655c9e6d1 100644 --- a/ydb/core/graph/shard/shard_impl.h +++ b/ydb/core/graph/shard/shard_impl.h @@ -43,7 +43,7 @@ class TGraphShard : public TActor, public NTabletFlatExecutor::TTab STATEFN(StateWork); // how often we could issue a clear operation - static constexpr TDuration DURATION_CLEAR_PERIOD = TDuration::Minutes(10); + static constexpr TDuration DURATION_CLEAR_PERIOD = TDuration::Minutes(1); // after what size of metrics data we issue a clear operation static constexpr TDuration DURATION_CLEAR_TRIGGER = TDuration::Hours(25); // the maximum size of metrics data to keep diff --git a/ydb/core/graph/shard/tx_clear_data.cpp b/ydb/core/graph/shard/tx_clear_data.cpp index f5808e54f9d7..c0ef2fafe294 100644 --- a/ydb/core/graph/shard/tx_clear_data.cpp +++ b/ydb/core/graph/shard/tx_clear_data.cpp @@ -1,6 +1,5 @@ #include "shard_impl.h" #include "log.h" -#include "schema.h" namespace NKikimr { namespace NGraph { diff --git a/ydb/core/graph/shard/tx_monitoring.cpp b/ydb/core/graph/shard/tx_monitoring.cpp index 1ecc8c111c6e..e7758dd93ac1 100644 --- a/ydb/core/graph/shard/tx_monitoring.cpp +++ b/ydb/core/graph/shard/tx_monitoring.cpp @@ -21,6 +21,24 @@ class TTxMonitoring : public TTransactionBase { return true; } + static TString DumpMetricsIndex(const std::unordered_map& metricsIndex) { + TStringBuilder str; + str << metricsIndex.size(); + if (!metricsIndex.empty()) { + str << " ("; + bool wasItem = false; + for (const auto& [name, idx] : metricsIndex) { + if (wasItem) { + str << ", "; + } + str << name; + wasItem = true; + } + str << ")"; + } + return str; + } + void Complete(const TActorContext& ctx) override { BLOG_D("TTxMonitoring::Complete"); TStringBuilder html; @@ -47,12 +65,24 @@ class TTxMonitoring : public TTransactionBase { } html << ""; - html << "Memory.MetricsSize" << Self->MemoryBackend.MetricsIndex.size() << ""; + html << "Memory.MetricsSize" << DumpMetricsIndex(Self->MemoryBackend.MetricsIndex) << ""; html << "Memory.RecordsSize" << Self->MemoryBackend.MetricsValues.size() << ""; - html << "Local.MetricsSize" << Self->LocalBackend.MetricsIndex.size() << ""; - html << "StartTimestamp" << Self->StartTimestamp << ""; - html << "ClearTimestamp" << Self->ClearTimestamp << ""; + html << "Local.MetricsSize" << DumpMetricsIndex(Self->LocalBackend.MetricsIndex) << ""; + html << "StartTimestamp" << Self->StartTimestamp.ToIsoStringLocalUpToSeconds() << ""; + html << "ClearTimestamp" << Self->ClearTimestamp.ToIsoStringLocalUpToSeconds() << ""; + html << "CurrentTimestamp" << Self->MetricsData.Timestamp.ToIsoStringLocalUpToSeconds() << ""; + + html << "CurrentMetricsData"; + bool wasLine = false; + for (const auto& [name, value] : Self->MetricsData.Values) { + if (wasLine) { + html << "
"; + } + html << name << "=" << value; + wasLine = true; + } + html << ""; html << ""; html << ""; diff --git a/ydb/core/graph/shard/ut/shard_ut.cpp b/ydb/core/graph/shard/ut/shard_ut.cpp index a4e86ba894c2..71ff72f136cc 100644 --- a/ydb/core/graph/shard/ut/shard_ut.cpp +++ b/ydb/core/graph/shard/ut/shard_ut.cpp @@ -5,6 +5,7 @@ #include #include #include +#include #ifdef NDEBUG #define Ctest Cnull @@ -257,6 +258,29 @@ Y_UNIT_TEST_SUITE(GraphShard) { } } + Y_UNIT_TEST(CheckHistogramToPercentileConversions) { + TVector bounds = {2, 4, 8, 16, 32, 64, 128, 256, 512, 1024, std::numeric_limits::max()}; + TVector values = {10, 10, 10, 10, 10, 10, 10, 10, 10, 10, 0}; + ui64 total = std::accumulate(values.begin(), values.end(), 0); + UNIT_ASSERT(total == 100); + auto p50 = NGraph::GetTimingForPercentile(50, values, bounds, total); + Ctest << "p50=" << p50 << Endl; + UNIT_ASSERT(!isnan(p50)); + UNIT_ASSERT(abs(p50 - 32) < 0.01); // 32ms + auto p75 = NGraph::GetTimingForPercentile(75, values, bounds, total); + Ctest << "p75=" << p75 << Endl; + UNIT_ASSERT(!isnan(p75)); + UNIT_ASSERT(abs(p75 - 192) < 0.01); // 192ms + auto p90 = NGraph::GetTimingForPercentile(90, values, bounds, total); + Ctest << "p90=" << p90 << Endl; + UNIT_ASSERT(!isnan(p90)); + UNIT_ASSERT(abs(p90 - 512) < 0.01); // 512ms + auto p99 = NGraph::GetTimingForPercentile(99, values, bounds, total); + Ctest << "p99=" << p99 << Endl; + UNIT_ASSERT(!isnan(p99)); + UNIT_ASSERT(abs(p99 - 972.8) < 0.01); // 972.8ms + } + TTenantTestConfig GetTenantTestConfig() { return { .Domains = { diff --git a/ydb/core/sys_view/service/ext_counters.cpp b/ydb/core/sys_view/service/ext_counters.cpp index d35de38e7a20..73f424cd2b15 100644 --- a/ydb/core/sys_view/service/ext_counters.cpp +++ b/ydb/core/sys_view/service/ext_counters.cpp @@ -2,6 +2,8 @@ #include #include +#include +#include #include #include @@ -13,18 +15,26 @@ class TExtCountersUpdaterActor : public TActorBootstrapped { using TCounterPtr = ::NMonitoring::TDynamicCounters::TCounterPtr; + using THistogramPtr = ::NMonitoring::THistogramPtr; + using THistogramSnapshotPtr = ::NMonitoring::IHistogramSnapshotPtr; const TExtCountersConfig Config; TCounterPtr MemoryUsedBytes; TCounterPtr MemoryLimitBytes; + TCounterPtr StorageUsedBytes; TVector CpuUsedCorePercents; TVector CpuLimitCorePercents; + THistogramPtr ExecuteLatencyMs; TCounterPtr AnonRssSize; TCounterPtr CGroupMemLimit; TVector PoolElapsedMicrosec; TVector PoolCurrentThreadCount; + TVector PoolElapsedMicrosecPrevValue; + TVector ExecuteLatencyMsValues; + TVector ExecuteLatencyMsPrevValues; + TVector ExecuteLatencyMsBounds; public: static constexpr NKikimrServices::TActivity::EType ActorActivityType() { @@ -42,6 +52,8 @@ class TExtCountersUpdaterActor "resources.memory.used_bytes", false); MemoryLimitBytes = ydbGroup->GetNamedCounter("name", "resources.memory.limit_bytes", false); + StorageUsedBytes = ydbGroup->GetNamedCounter("name", + "resources.storage.used_bytes", false); auto poolCount = Config.Pools.size(); CpuUsedCorePercents.resize(poolCount); @@ -55,6 +67,8 @@ class TExtCountersUpdaterActor "resources.cpu.limit_core_percents", false); } + ExecuteLatencyMs = ydbGroup->FindNamedHistogram("name", "table.query.execution.latency_milliseconds"); + Schedule(TDuration::Seconds(1), new TEvents::TEvWakeup); Become(&TThis::StateWork); } @@ -69,11 +83,15 @@ class TExtCountersUpdaterActor PoolElapsedMicrosec.resize(Config.Pools.size()); PoolCurrentThreadCount.resize(Config.Pools.size()); + PoolElapsedMicrosecPrevValue.resize(Config.Pools.size()); for (size_t i = 0; i < Config.Pools.size(); ++i) { auto poolGroup = utilsGroup->FindSubgroup("execpool", Config.Pools[i].Name); if (poolGroup) { PoolElapsedMicrosec[i] = poolGroup->FindCounter("ElapsedMicrosec"); PoolCurrentThreadCount[i] = poolGroup->FindCounter("CurrentThreadCount"); + if (PoolElapsedMicrosec[i]) { + PoolElapsedMicrosecPrevValue[i] = PoolElapsedMicrosec[i]->Val(); + } } } } @@ -81,17 +99,27 @@ class TExtCountersUpdaterActor void Transform() { Initialize(); - + auto metrics(MakeHolder()); if (AnonRssSize) { MemoryUsedBytes->Set(AnonRssSize->Val()); + metrics->AddMetric("resources.memory.used_bytes", AnonRssSize->Val()); } if (CGroupMemLimit) { MemoryLimitBytes->Set(CGroupMemLimit->Val()); } + if (StorageUsedBytes->Val() != 0) { + metrics->AddMetric("resources.storage.used_bytes", StorageUsedBytes->Val()); + } + double cpuUsage = 0; for (size_t i = 0; i < Config.Pools.size(); ++i) { if (PoolElapsedMicrosec[i]) { - double usedCore = PoolElapsedMicrosec[i]->Val() / 10000.; + auto elapsedMs = PoolElapsedMicrosec[i]->Val(); + double usedCore = elapsedMs / 10000.; CpuUsedCorePercents[i]->Set(usedCore); + if (PoolElapsedMicrosecPrevValue[i] != 0) { + cpuUsage += (elapsedMs - PoolElapsedMicrosecPrevValue[i]) / 1000000.; + } + PoolElapsedMicrosecPrevValue[i] = elapsedMs; } if (PoolCurrentThreadCount[i] && PoolCurrentThreadCount[i]->Val()) { double limitCore = PoolCurrentThreadCount[i]->Val() * 100; @@ -101,6 +129,47 @@ class TExtCountersUpdaterActor CpuLimitCorePercents[i]->Set(limitCore); } } + metrics->AddMetric("resources.cpu.usage", cpuUsage); + if (ExecuteLatencyMs) { + THistogramSnapshotPtr snapshot = ExecuteLatencyMs->Snapshot(); + ui32 count = snapshot->Count(); + if (ExecuteLatencyMsValues.empty()) { + ExecuteLatencyMsValues.resize(count); + ExecuteLatencyMsPrevValues.resize(count); + ExecuteLatencyMsBounds.resize(count); + } + ui64 total = 0; + for (ui32 n = 0; n < count; ++n) { + ui64 value = snapshot->Value(n);; + ui64 diff = value - ExecuteLatencyMsPrevValues[n]; + total += diff; + ExecuteLatencyMsValues[n] = diff; + ExecuteLatencyMsPrevValues[n] = value; + if (ExecuteLatencyMsBounds[n] == 0) { + ExecuteLatencyMsBounds[n] = snapshot->UpperBound(n); + } + } + metrics->AddMetric("queries.requests", total); + if (total != 0) { + double p50 = NGraph::GetTimingForPercentile(50, ExecuteLatencyMsValues, ExecuteLatencyMsBounds, total); + if (!isnan(p50)) { + metrics->AddMetric("queries.latencies.p50", p50); + } + double p75 = NGraph::GetTimingForPercentile(75, ExecuteLatencyMsValues, ExecuteLatencyMsBounds, total); + if (!isnan(p75)) { + metrics->AddMetric("queries.latencies.p75", p75); + } + double p90 = NGraph::GetTimingForPercentile(90, ExecuteLatencyMsValues, ExecuteLatencyMsBounds, total); + if (!isnan(p90)) { + metrics->AddMetric("queries.latencies.p90", p90); + } + double p99 = NGraph::GetTimingForPercentile(99, ExecuteLatencyMsValues, ExecuteLatencyMsBounds, total); + if (!isnan(p99)) { + metrics->AddMetric("queries.latencies.p99", p99); + } + } + } + Send(NGraph::MakeGraphServiceId(), metrics.Release()); } void HandleWakeup() { diff --git a/ydb/core/sys_view/service/ya.make b/ydb/core/sys_view/service/ya.make index a77c27b2b3fe..cfe00ba24246 100644 --- a/ydb/core/sys_view/service/ya.make +++ b/ydb/core/sys_view/service/ya.make @@ -16,6 +16,7 @@ PEERDIR( ydb/library/actors/core ydb/core/base ydb/core/protos + ydb/core/graph/api ydb/library/aclib/protos ) diff --git a/ydb/core/viewer/json_render.h b/ydb/core/viewer/json_render.h index 83fdb356a35d..e8e0451153a7 100644 --- a/ydb/core/viewer/json_render.h +++ b/ydb/core/viewer/json_render.h @@ -34,7 +34,17 @@ class TJsonRender : public TActorBootstrapped { if (postData) { TCgiParameters params(postData); if (params.Has("target")) { - StringSplitter(params.Get("target")).Split(',').SkipEmpty().Collect(&Metrics); + TString metric; + size_t num = 0; + for (;;) { + metric = params.Get("target", num); + if (metric.empty()) { + break; + } + Metrics.push_back(metric); + ++num; + } + //StringSplitter(params.Get("target")).Split(',').SkipEmpty().Collect(&Metrics); for (const auto& metric : Metrics) { getRequest.AddMetrics(metric); }