Skip to content

Commit

Permalink
add different metrics to graph service
Browse files Browse the repository at this point in the history
  • Loading branch information
adameat committed Jan 8, 2024
1 parent 37ab7bf commit e0898f8
Show file tree
Hide file tree
Showing 13 changed files with 184 additions and 17 deletions.
4 changes: 0 additions & 4 deletions ydb/core/base/pool_stats_collector.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -47,15 +47,11 @@ class TStatsCollectingActor : public NActors::TStatsCollectingActor {
MiniKQLPoolStats.Update();

TVector<std::tuple<TString, double, ui32>> pools;
double cpuUsage = 0;
for (const auto& pool : PoolCounters) {
pools.emplace_back(pool.Name, pool.Usage, pool.Threads);
cpuUsage += pool.Usage;
}

ctx.Send(NNodeWhiteboard::MakeNodeWhiteboardServiceId(ctx.SelfID.NodeId()), new NNodeWhiteboard::TEvWhiteboard::TEvSystemStateUpdate(pools));

ctx.Send(NGraph::MakeGraphServiceId(), new NGraph::TEvGraph::TEvSendMetrics("cpu_usage", cpuUsage));
}

private:
Expand Down
4 changes: 4 additions & 0 deletions ydb/core/graph/api/events.h
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,10 @@ struct TEvGraph {
TEvSendMetrics() = default;

TEvSendMetrics(const TString& name, double value) {
AddMetric(name, value);
}

void AddMetric(const TString& name, double value) {
NKikimrGraph::TMetric* metric = Record.AddMetrics();
metric->SetName(name);
metric->SetValue(value);
Expand Down
2 changes: 2 additions & 0 deletions ydb/core/graph/api/service.h
Original file line number Diff line number Diff line change
Expand Up @@ -14,5 +14,7 @@ inline TActorId MakeGraphServiceId(ui32 node = 0) {

IActor* CreateGraphService(const TString& database);

double GetTimingForPercentile(double percentile, const TVector<ui64>& values, const TVector<ui64>& /*upper*/bounds, ui64 total);

} // NGraph
} // NKikimr
28 changes: 28 additions & 0 deletions ydb/core/graph/service/service_impl.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -230,6 +230,7 @@ class TGraphService : public TActor<TGraphService> {
hFunc(TEvGraph::TEvMetricsResult, Handle);
hFunc(TEvTxProxySchemeCache::TEvNavigateKeySetResult, Handle);
hFunc(TEvTabletPipe::TEvClientConnected, Handle);
hFunc(TEvTabletPipe::TEvClientDestroyed, Handle);
cFunc(TEvents::TSystem::Wakeup, HandleTimeout);
}
}
Expand All @@ -247,5 +248,32 @@ IActor* CreateGraphService(const TString& database) {
return new TGraphService(database);
}

double GetTimingForPercentile(double percentile, const TVector<ui64>& values, const TVector<ui64>& /*upper*/bounds, ui64 total) {
ui64 ppMark = total * percentile / 100;
ui64 accm = 0;
ui32 n = 0;
while (n < bounds.size() && accm < ppMark) {
if (accm + values[n] >= ppMark) {
ui64 lowerBound = 0;
if (n > 0) {
lowerBound = bounds[n - 1];
}
ui64 upperBound = bounds[n];
if (upperBound == std::numeric_limits<ui64>::max()) {
return lowerBound; // workaround for INF bucket
}
ui64 currentValue = values[n];
ui64 ppValue = ppMark - accm;
if (currentValue == 0) {
return NAN;
}
return (static_cast<double>(ppValue) / currentValue) * (upperBound - lowerBound) + lowerBound;
}
accm += values[n];
n++;
}
return NAN;
}

} // NGraph
} // NKikimr
10 changes: 7 additions & 3 deletions ydb/core/graph/shard/backends.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -202,6 +202,7 @@ bool TLocalBackend::GetMetrics(NTabletFlatExecutor::TTransactionContext& txc, co
}
}
TMetricsValues metricValues;
BLOG_D("Querying from " << minTime << " to " << maxTime);
auto rowset = db.Table<Schema::MetricsValues>().GreaterOrEqual(minTime).LessOrEqual(maxTime).Select();
if (!rowset.IsReady()) {
return false;
Expand All @@ -220,7 +221,7 @@ bool TLocalBackend::GetMetrics(NTabletFlatExecutor::TTransactionContext& txc, co
ui64 id = rowset.GetValue<Schema::MetricsValues::Id>();
auto itIdx = metricIdx.find(id);
if (itIdx != metricIdx.end()) {
metricValues.Values.back()[itIdx->second] = rowset.GetValue<Schema::MetricsValues::Value>();
metricValues.Values[itIdx->second].back() = rowset.GetValue<Schema::MetricsValues::Value>();
}
if (!rowset.Next()) {
return false;
Expand All @@ -237,18 +238,21 @@ bool TLocalBackend::ClearData(NTabletFlatExecutor::TTransactionContext& txc, TIn
if (!rowset.IsReady()) {
return false;
}
ui64 prevTimestamp = 0;
while (!rowset.EndOfSet()) {
ui64 timestamp = rowset.GetValue<Schema::MetricsValues::Timestamp>();
ui64 id = rowset.GetValue<Schema::MetricsValues::Id>();
db.Table<Schema::MetricsValues>().Key(timestamp, id).Delete();
newStartTimestamp = TInstant::Seconds(timestamp);
if (++rows >= MAX_ROWS_TO_DELETE) {
break;
if (timestamp != prevTimestamp && ++rows >= MAX_ROWS_TO_DELETE) { // we count as a logical row every unique timestamp
break; // so for database it will be MAX_ROWS * NUM_OF_METRICS rows
}
prevTimestamp = timestamp;
if (!rowset.Next()) {
return false;
}
}
BLOG_D("Cleared " << rows << " logical rows");
return true;
}

Expand Down
2 changes: 1 addition & 1 deletion ydb/core/graph/shard/shard_impl.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -91,7 +91,7 @@ void TGraphShard::Handle(TEvGraph::TEvSendMetrics::TPtr& ev) {
MetricsData.Timestamp = now;
MetricsData.Values.clear();
}
if ((now - StartTimestamp) > DURATION_CLEAR_TRIGGER && (now - ClearTimestamp) < DURATION_CLEAR_PERIOD) {
if ((now - StartTimestamp) > DURATION_CLEAR_TRIGGER && (now - ClearTimestamp) > DURATION_CLEAR_PERIOD) {
ClearTimestamp = now;
BLOG_TRACE("Executing TxClearData");
ExecuteTxClearData();
Expand Down
2 changes: 1 addition & 1 deletion ydb/core/graph/shard/shard_impl.h
Original file line number Diff line number Diff line change
Expand Up @@ -43,7 +43,7 @@ class TGraphShard : public TActor<TGraphShard>, public NTabletFlatExecutor::TTab
STATEFN(StateWork);

// how often we could issue a clear operation
static constexpr TDuration DURATION_CLEAR_PERIOD = TDuration::Minutes(10);
static constexpr TDuration DURATION_CLEAR_PERIOD = TDuration::Minutes(1);
// after what size of metrics data we issue a clear operation
static constexpr TDuration DURATION_CLEAR_TRIGGER = TDuration::Hours(25);
// the maximum size of metrics data to keep
Expand Down
1 change: 0 additions & 1 deletion ydb/core/graph/shard/tx_clear_data.cpp
Original file line number Diff line number Diff line change
@@ -1,6 +1,5 @@
#include "shard_impl.h"
#include "log.h"
#include "schema.h"

namespace NKikimr {
namespace NGraph {
Expand Down
38 changes: 34 additions & 4 deletions ydb/core/graph/shard/tx_monitoring.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,24 @@ class TTxMonitoring : public TTransactionBase<TGraphShard> {
return true;
}

static TString DumpMetricsIndex(const std::unordered_map<TString, ui64>& metricsIndex) {
TStringBuilder str;
str << metricsIndex.size();
if (!metricsIndex.empty()) {
str << " (";
bool wasItem = false;
for (const auto& [name, idx] : metricsIndex) {
if (wasItem) {
str << ", ";
}
str << name;
wasItem = true;
}
str << ")";
}
return str;
}

void Complete(const TActorContext& ctx) override {
BLOG_D("TTxMonitoring::Complete");
TStringBuilder html;
Expand All @@ -47,12 +65,24 @@ class TTxMonitoring : public TTransactionBase<TGraphShard> {
}
html << "</td></tr>";

html << "<tr><td>Memory.MetricsSize</td><td>" << Self->MemoryBackend.MetricsIndex.size() << "</td></tr>";
html << "<tr><td>Memory.MetricsSize</td><td>" << DumpMetricsIndex(Self->MemoryBackend.MetricsIndex) << "</td></tr>";
html << "<tr><td>Memory.RecordsSize</td><td>" << Self->MemoryBackend.MetricsValues.size() << "</td></tr>";

html << "<tr><td>Local.MetricsSize</td><td>" << Self->LocalBackend.MetricsIndex.size() << "</td></tr>";
html << "<tr><td>StartTimestamp</td><td>" << Self->StartTimestamp << "</td></tr>";
html << "<tr><td>ClearTimestamp</td><td>" << Self->ClearTimestamp << "</td></tr>";
html << "<tr><td>Local.MetricsSize</td><td>" << DumpMetricsIndex(Self->LocalBackend.MetricsIndex) << "</td></tr>";
html << "<tr><td>StartTimestamp</td><td>" << Self->StartTimestamp.ToIsoStringLocalUpToSeconds() << "</td></tr>";
html << "<tr><td>ClearTimestamp</td><td>" << Self->ClearTimestamp.ToIsoStringLocalUpToSeconds() << "</td></tr>";
html << "<tr><td>CurrentTimestamp</td><td>" << Self->MetricsData.Timestamp.ToIsoStringLocalUpToSeconds() << "</td></tr>";

html << "<tr><td style='vertical-align:top'>CurrentMetricsData</td><td>";
bool wasLine = false;
for (const auto& [name, value] : Self->MetricsData.Values) {
if (wasLine) {
html << "<br>";
}
html << name << "=" << value;
wasLine = true;
}
html << "</td></tr>";

html << "</table>";
html << "</html>";
Expand Down
24 changes: 24 additions & 0 deletions ydb/core/graph/shard/ut/shard_ut.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@
#include <ydb/core/tx/schemeshard/ut_helpers/helpers.h>
#include <util/stream/output.h>
#include <ydb/core/graph/shard/backends.h>
#include <ydb/core/graph/api/service.h>

#ifdef NDEBUG
#define Ctest Cnull
Expand Down Expand Up @@ -257,6 +258,29 @@ Y_UNIT_TEST_SUITE(GraphShard) {
}
}

Y_UNIT_TEST(CheckHistogramToPercentileConversions) {
TVector<ui64> bounds = {2, 4, 8, 16, 32, 64, 128, 256, 512, 1024, std::numeric_limits<ui64>::max()};
TVector<ui64> values = {10, 10, 10, 10, 10, 10, 10, 10, 10, 10, 0};
ui64 total = std::accumulate(values.begin(), values.end(), 0);
UNIT_ASSERT(total == 100);
auto p50 = NGraph::GetTimingForPercentile(50, values, bounds, total);
Ctest << "p50=" << p50 << Endl;
UNIT_ASSERT(!isnan(p50));
UNIT_ASSERT(abs(p50 - 32) < 0.01); // 32ms
auto p75 = NGraph::GetTimingForPercentile(75, values, bounds, total);
Ctest << "p75=" << p75 << Endl;
UNIT_ASSERT(!isnan(p75));
UNIT_ASSERT(abs(p75 - 192) < 0.01); // 192ms
auto p90 = NGraph::GetTimingForPercentile(90, values, bounds, total);
Ctest << "p90=" << p90 << Endl;
UNIT_ASSERT(!isnan(p90));
UNIT_ASSERT(abs(p90 - 512) < 0.01); // 512ms
auto p99 = NGraph::GetTimingForPercentile(99, values, bounds, total);
Ctest << "p99=" << p99 << Endl;
UNIT_ASSERT(!isnan(p99));
UNIT_ASSERT(abs(p99 - 972.8) < 0.01); // 972.8ms
}

TTenantTestConfig GetTenantTestConfig() {
return {
.Domains = {
Expand Down
73 changes: 71 additions & 2 deletions ydb/core/sys_view/service/ext_counters.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,8 @@

#include <ydb/core/base/appdata.h>
#include <ydb/core/base/counters.h>
#include <ydb/core/graph/api/events.h>
#include <ydb/core/graph/api/service.h>
#include <ydb/library/services/services.pb.h>
#include <ydb/library/actors/core/actor_bootstrapped.h>

Expand All @@ -13,18 +15,26 @@ class TExtCountersUpdaterActor
: public TActorBootstrapped<TExtCountersUpdaterActor>
{
using TCounterPtr = ::NMonitoring::TDynamicCounters::TCounterPtr;
using THistogramPtr = ::NMonitoring::THistogramPtr;
using THistogramSnapshotPtr = ::NMonitoring::IHistogramSnapshotPtr;

const TExtCountersConfig Config;

TCounterPtr MemoryUsedBytes;
TCounterPtr MemoryLimitBytes;
TCounterPtr StorageUsedBytes;
TVector<TCounterPtr> CpuUsedCorePercents;
TVector<TCounterPtr> CpuLimitCorePercents;
THistogramPtr ExecuteLatencyMs;

TCounterPtr AnonRssSize;
TCounterPtr CGroupMemLimit;
TVector<TCounterPtr> PoolElapsedMicrosec;
TVector<TCounterPtr> PoolCurrentThreadCount;
TVector<ui64> PoolElapsedMicrosecPrevValue;
TVector<ui64> ExecuteLatencyMsValues;
TVector<ui64> ExecuteLatencyMsPrevValues;
TVector<ui64> ExecuteLatencyMsBounds;

public:
static constexpr NKikimrServices::TActivity::EType ActorActivityType() {
Expand All @@ -42,6 +52,8 @@ class TExtCountersUpdaterActor
"resources.memory.used_bytes", false);
MemoryLimitBytes = ydbGroup->GetNamedCounter("name",
"resources.memory.limit_bytes", false);
StorageUsedBytes = ydbGroup->GetNamedCounter("name",
"resources.storage.used_bytes", false);

auto poolCount = Config.Pools.size();
CpuUsedCorePercents.resize(poolCount);
Expand All @@ -55,6 +67,8 @@ class TExtCountersUpdaterActor
"resources.cpu.limit_core_percents", false);
}

ExecuteLatencyMs = ydbGroup->FindNamedHistogram("name", "table.query.execution.latency_milliseconds");

Schedule(TDuration::Seconds(1), new TEvents::TEvWakeup);
Become(&TThis::StateWork);
}
Expand All @@ -69,29 +83,43 @@ class TExtCountersUpdaterActor

PoolElapsedMicrosec.resize(Config.Pools.size());
PoolCurrentThreadCount.resize(Config.Pools.size());
PoolElapsedMicrosecPrevValue.resize(Config.Pools.size());
for (size_t i = 0; i < Config.Pools.size(); ++i) {
auto poolGroup = utilsGroup->FindSubgroup("execpool", Config.Pools[i].Name);
if (poolGroup) {
PoolElapsedMicrosec[i] = poolGroup->FindCounter("ElapsedMicrosec");
PoolCurrentThreadCount[i] = poolGroup->FindCounter("CurrentThreadCount");
if (PoolElapsedMicrosec[i]) {
PoolElapsedMicrosecPrevValue[i] = PoolElapsedMicrosec[i]->Val();
}
}
}
}
}

void Transform() {
Initialize();

auto metrics(MakeHolder<NGraph::TEvGraph::TEvSendMetrics>());
if (AnonRssSize) {
MemoryUsedBytes->Set(AnonRssSize->Val());
metrics->AddMetric("resources.memory.used_bytes", AnonRssSize->Val());
}
if (CGroupMemLimit) {
MemoryLimitBytes->Set(CGroupMemLimit->Val());
}
if (StorageUsedBytes->Val() != 0) {
metrics->AddMetric("resources.storage.used_bytes", StorageUsedBytes->Val());
}
double cpuUsage = 0;
for (size_t i = 0; i < Config.Pools.size(); ++i) {
if (PoolElapsedMicrosec[i]) {
double usedCore = PoolElapsedMicrosec[i]->Val() / 10000.;
auto elapsedMs = PoolElapsedMicrosec[i]->Val();
double usedCore = elapsedMs / 10000.;
CpuUsedCorePercents[i]->Set(usedCore);
if (PoolElapsedMicrosecPrevValue[i] != 0) {
cpuUsage += (elapsedMs - PoolElapsedMicrosecPrevValue[i]) / 1000000.;
}
PoolElapsedMicrosecPrevValue[i] = elapsedMs;
}
if (PoolCurrentThreadCount[i] && PoolCurrentThreadCount[i]->Val()) {
double limitCore = PoolCurrentThreadCount[i]->Val() * 100;
Expand All @@ -101,6 +129,47 @@ class TExtCountersUpdaterActor
CpuLimitCorePercents[i]->Set(limitCore);
}
}
metrics->AddMetric("resources.cpu.usage", cpuUsage);
if (ExecuteLatencyMs) {
THistogramSnapshotPtr snapshot = ExecuteLatencyMs->Snapshot();
ui32 count = snapshot->Count();
if (ExecuteLatencyMsValues.empty()) {
ExecuteLatencyMsValues.resize(count);
ExecuteLatencyMsPrevValues.resize(count);
ExecuteLatencyMsBounds.resize(count);
}
ui64 total = 0;
for (ui32 n = 0; n < count; ++n) {
ui64 value = snapshot->Value(n);;
ui64 diff = value - ExecuteLatencyMsPrevValues[n];
total += diff;
ExecuteLatencyMsValues[n] = diff;
ExecuteLatencyMsPrevValues[n] = value;
if (ExecuteLatencyMsBounds[n] == 0) {
ExecuteLatencyMsBounds[n] = snapshot->UpperBound(n);
}
}
metrics->AddMetric("queries.requests", total);
if (total != 0) {
double p50 = NGraph::GetTimingForPercentile(50, ExecuteLatencyMsValues, ExecuteLatencyMsBounds, total);
if (!isnan(p50)) {
metrics->AddMetric("queries.latencies.p50", p50);
}
double p75 = NGraph::GetTimingForPercentile(75, ExecuteLatencyMsValues, ExecuteLatencyMsBounds, total);
if (!isnan(p75)) {
metrics->AddMetric("queries.latencies.p75", p75);
}
double p90 = NGraph::GetTimingForPercentile(90, ExecuteLatencyMsValues, ExecuteLatencyMsBounds, total);
if (!isnan(p90)) {
metrics->AddMetric("queries.latencies.p90", p90);
}
double p99 = NGraph::GetTimingForPercentile(99, ExecuteLatencyMsValues, ExecuteLatencyMsBounds, total);
if (!isnan(p99)) {
metrics->AddMetric("queries.latencies.p99", p99);
}
}
}
Send(NGraph::MakeGraphServiceId(), metrics.Release());
}

void HandleWakeup() {
Expand Down
1 change: 1 addition & 0 deletions ydb/core/sys_view/service/ya.make
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@ PEERDIR(
ydb/library/actors/core
ydb/core/base
ydb/core/protos
ydb/core/graph/api
ydb/library/aclib/protos
)

Expand Down
Loading

0 comments on commit e0898f8

Please sign in to comment.