Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

add pumpkin mode and improve downsampling #827

Merged
merged 1 commit into from
Jan 3, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
19 changes: 18 additions & 1 deletion ydb/core/graph/service/service_impl.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -162,9 +162,12 @@ class TGraphService : public TActor<TGraphService> {
BLOG_D("Database " << Database << " resolved to shard " << GraphShardId);
ConnectShard();
return;
} else {
BLOG_D("Error resolving database " << Database << " - no graph shard (switching to pumpkin mode)");
return Become(&TGraphService::StatePumpkin);
}
}
BLOG_W("Error resolving database " << Database << " incomplete response / no graph shard");
BLOG_W("Error resolving database " << Database << " incomplete response");
} else {
if (!request->ResultSet.empty()) {
BLOG_W("Error resolving database " << Database << " error " << request->ResultSet.front().Status);
Expand Down Expand Up @@ -213,6 +216,13 @@ class TGraphService : public TActor<TGraphService> {
}
}

void HandlePumpkin(TEvGraph::TEvGetMetrics::TPtr& ev) {
BLOG_TRACE("TEvGetMetrics(Pumpkin)");
TEvGraph::TEvMetricsResult* response = new TEvGraph::TEvMetricsResult();
response->Record.SetError("GraphShard is not enabled on the database");
Send(ev->Sender, response, 0, ev->Cookie);
}

STATEFN(StateWork) {
switch (ev->GetTypeRewrite()) {
hFunc(TEvGraph::TEvSendMetrics, Handle);
Expand All @@ -223,6 +233,13 @@ class TGraphService : public TActor<TGraphService> {
cFunc(TEvents::TSystem::Wakeup, HandleTimeout);
}
}

STATEFN(StatePumpkin) {
switch (ev->GetTypeRewrite()) {
hFunc(TEvGraph::TEvGetMetrics, HandlePumpkin);
cFunc(TEvents::TSystem::Wakeup, HandleTimeout);
}
}
};


Expand Down
185 changes: 98 additions & 87 deletions ydb/core/graph/shard/backends.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -5,54 +5,99 @@
namespace NKikimr {
namespace NGraph {

template<>
std::vector<TInstant> TMemoryBackend::Downsample<TInstant>(const std::vector<TInstant>& data, size_t maxPoints) {
if (data.size() <= maxPoints) {
return data;
}
std::vector<TInstant> result;
double coeff = (double)maxPoints / data.size();
result.resize(maxPoints);
size_t ltrg = maxPoints;
for (size_t src = 0; src < data.size(); ++src) {
size_t trg = floor(coeff * src);
if (trg != ltrg) {
result[trg] = data[src]; // we expect sorted data so we practically use min() here
ltrg = trg;
}
void TBaseBackend::NormalizeAndDownsample(TMetricsValues& values, size_t maxPoints) {
if (values.Timestamps.size() <= maxPoints) {
return;
}

TMetricsValues result;
result.Timestamps.resize(maxPoints);
result.Values.resize(values.Values.size());
for (auto& values : result.Values) {
values.resize(maxPoints);
}
return result;
}

template<>
std::vector<double> TMemoryBackend::Downsample<double>(const std::vector<double>& data, size_t maxPoints) {
if (data.size() <= maxPoints) {
return data;
}
std::vector<double> result;
double coeff = (double)maxPoints / data.size();
result.resize(maxPoints);
size_t ltrg = 0;
long cnt = 0;
for (size_t src = 0; src < data.size(); ++src) {
if (isnan(data[src])) {
continue;
size_t srcSize = values.Timestamps.size();
size_t trgSize = result.Timestamps.size();
TInstant frontTs = values.Timestamps.front();
TInstant backTs = values.Timestamps.back();
TDuration distanceTs = backTs - frontTs;
// normalize timestamps
size_t src = 0;
size_t trg = 0;
size_t trgSrc = 0;
size_t trgRest = trgSize - 1;
while (src < srcSize && trg < trgSize) {
TInstant expected = frontTs + TDuration::Seconds((trg - trgSrc) * distanceTs.Seconds() / trgRest);
if (expected < values.Timestamps[src]) {
result.Timestamps[trg] = values.Timestamps[src];
frontTs = values.Timestamps[src];
distanceTs = backTs - frontTs;
trgSrc = trg;
trgRest = trgSize - trg - 1;
++src;
++trg;
} else if (expected == values.Timestamps[src]) {
result.Timestamps[trg] = values.Timestamps[src];
++src;
++trg;
} else if (expected > values.Timestamps[src]) {
result.Timestamps[trg] = expected;
++trg;
do {
++src;
if (src >= srcSize) {
break;
}
} while (values.Timestamps[src] < expected);
}
size_t trg = floor(coeff * src);
if (trg != ltrg && cnt > 0) {
if (cnt > 1) {
result[ltrg] /= cnt;
}
// aggregate values
for (size_t numVal = 0; numVal < result.Values.size(); ++numVal) {
double accm = NAN; // avg impl
long cnt = 0;
const std::vector<double>& srcValues(values.Values[numVal]);
std::vector<double>& trgValues(result.Values[numVal]);
size_t trgPos = 0;
for (size_t srcPos = 0; srcPos < srcValues.size(); ++srcPos) {
double srcValue = srcValues[srcPos];
if (!isnan(srcValue)) {
if (isnan(accm)) { // avg impl
accm = srcValue;
cnt = 1;
} else {
accm += srcValue;
cnt += 1;
}
}
if (values.Timestamps[srcPos] >= result.Timestamps[trgPos]) {
if (isnan(accm)) { // avg impl
trgValues[trgPos] = NAN;
} else {
trgValues[trgPos] = accm / cnt;
}
++trgPos;
accm = NAN;
cnt = 0;
}
cnt = 0;
}
result[trg] += data[src];
++cnt;
ltrg = trg;
}
if (cnt > 1) {
result[ltrg] /= cnt;
values = std::move(result);
}

void TBaseBackend::FillResult(TMetricsValues& values, const NKikimrGraph::TEvGetMetrics& get, NKikimrGraph::TEvMetricsResult& result) {
if (get.HasMaxPoints() && values.Timestamps.size() > get.GetMaxPoints()) {
NormalizeAndDownsample(values, get.GetMaxPoints());
}
result.Clear();
auto time = result.MutableTime();
time->Reserve(values.Timestamps.size());
for (const TInstant t : values.Timestamps) {
time->Add(t.Seconds());
}
for (std::vector<double>& values : values.Values) {
adameat marked this conversation as resolved.
Show resolved Hide resolved
result.AddData()->MutableValues()->Add(values.begin(), values.end());
}
return result;
}

void TMemoryBackend::StoreMetrics(TMetricsData&& data) {
Expand Down Expand Up @@ -93,37 +138,20 @@ void TMemoryBackend::GetMetrics(const NKikimrGraph::TEvGetMetrics& get, NKikimrG
}
indexes.push_back(idx);
}
std::vector<TInstant> timestamps;
std::vector<std::vector<double>> values;
values.resize(indexes.size());
TMetricsValues metricValues;
metricValues.Values.resize(indexes.size());
for (auto it = itLeft; it != itRight; ++it) {
timestamps.push_back(it->Timestamp);
metricValues.Timestamps.push_back(it->Timestamp);
for (size_t num = 0; num < indexes.size(); ++num) {
size_t idx = indexes[num];
if (idx < it->Values.size()) {
values[num].push_back(it->Values[idx]);
metricValues.Values[num].push_back(it->Values[idx]);
} else {
values[num].push_back(NAN);
metricValues.Values[num].push_back(NAN);
}
}
}
if (get.HasMaxPoints() && timestamps.size() > get.GetMaxPoints()) {
timestamps = Downsample(timestamps, get.GetMaxPoints());
BLOG_TRACE("GetMetrics timestamps=" << timestamps.size());
for (std::vector<double>& values : values) {
values = Downsample(values, get.GetMaxPoints());
BLOG_TRACE("GetMetrics values=" << values.size());
}
}
result.Clear();
auto time = result.MutableTime();
time->Reserve(timestamps.size());
for (const TInstant t : timestamps) {
time->Add(t.Seconds());
}
for (std::vector<double>& values : values) {
result.AddData()->MutableValues()->Add(values.begin(), values.end());
}
FillResult(metricValues, get, result);
}

void TMemoryBackend::ClearData(TInstant cutline, TInstant& newStartTimestamp) {
Expand Down Expand Up @@ -173,49 +201,32 @@ bool TLocalBackend::GetMetrics(NTabletFlatExecutor::TTransactionContext& txc, co
metricIdx[itMetricIdx->second] = nMetric;
}
}
std::vector<TInstant> timestamps;
std::vector<std::vector<double>> values;
TMetricsValues metricValues;
auto rowset = db.Table<Schema::MetricsValues>().GreaterOrEqual(minTime).LessOrEqual(maxTime).Select();
if (!rowset.IsReady()) {
return false;
}
ui64 lastTime = 0;
values.resize(get.MetricsSize());
metricValues.Values.resize(get.MetricsSize());
while (!rowset.EndOfSet()) {
ui64 time = rowset.GetValue<Schema::MetricsValues::Timestamp>();
if (time != lastTime) {
lastTime = time;
timestamps.push_back(TInstant::Seconds(time));
for (auto& vals : values) {
metricValues.Timestamps.push_back(TInstant::Seconds(time));
for (auto& vals : metricValues.Values) {
vals.emplace_back(NAN);
}
}
ui64 id = rowset.GetValue<Schema::MetricsValues::Id>();
auto itIdx = metricIdx.find(id);
if (itIdx != metricIdx.end()) {
values.back()[itIdx->second] = rowset.GetValue<Schema::MetricsValues::Value>();
metricValues.Values.back()[itIdx->second] = rowset.GetValue<Schema::MetricsValues::Value>();
}
if (!rowset.Next()) {
return false;
}
}
if (get.HasMaxPoints() && timestamps.size() > get.GetMaxPoints()) {
timestamps = TMemoryBackend::Downsample(timestamps, get.GetMaxPoints());
BLOG_TRACE("GetMetrics timestamps=" << timestamps.size());
for (std::vector<double>& values : values) {
values = TMemoryBackend::Downsample(values, get.GetMaxPoints());
BLOG_TRACE("GetMetrics values=" << values.size());
}
}
result.Clear();
auto time = result.MutableTime();
time->Reserve(timestamps.size());
for (const TInstant t : timestamps) {
time->Add(t.Seconds());
}
for (std::vector<double>& values : values) {
result.AddData()->MutableValues()->Add(values.begin(), values.end());
}
FillResult(metricValues, get, result);
return true;
}

Expand Down
23 changes: 15 additions & 8 deletions ydb/core/graph/shard/backends.h
Original file line number Diff line number Diff line change
Expand Up @@ -17,15 +17,25 @@ struct TMetricsData {
std::unordered_map<TString, double> Values;
};

class TMemoryBackend {
class TBaseBackend {
public:
struct TMetricsValues {
std::vector<TInstant> Timestamps;
adameat marked this conversation as resolved.
Show resolved Hide resolved
std::vector<std::vector<double>> Values;
};

static void NormalizeAndDownsample(TMetricsValues& values, size_t maxPoints);
static void FillResult(TMetricsValues& values, const NKikimrGraph::TEvGetMetrics& get, NKikimrGraph::TEvMetricsResult& result);

std::unordered_map<TString, ui64> MetricsIndex; // mapping name -> id
};

class TMemoryBackend : public TBaseBackend {
public:
void StoreMetrics(TMetricsData&& data);
void GetMetrics(const NKikimrGraph::TEvGetMetrics& get, NKikimrGraph::TEvMetricsResult& result) const;
void ClearData(TInstant cutline, TInstant& newStartTimestamp);

template<typename ValueType>
static std::vector<ValueType> Downsample(const std::vector<ValueType>& data, size_t maxPoints);

TString GetLogPrefix() const;

struct TMetricsRecord {
Expand All @@ -41,11 +51,10 @@ class TMemoryBackend {
}
};

std::unordered_map<TString, size_t> MetricsIndex; // mapping name -> id
std::deque<TMetricsRecord> MetricsValues;
};

class TLocalBackend {
class TLocalBackend : public TBaseBackend {
public:
static constexpr ui64 MAX_ROWS_TO_DELETE = 1000;

Expand All @@ -54,8 +63,6 @@ class TLocalBackend {
bool ClearData(NTabletFlatExecutor::TTransactionContext& txc, TInstant cutline, TInstant& newStartTimestamp);

TString GetLogPrefix() const;

std::unordered_map<TString, ui64> MetricsIndex; // mapping name -> id
};

} // NGraph
Expand Down
3 changes: 2 additions & 1 deletion ydb/core/graph/shard/tx_monitoring.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -51,7 +51,8 @@ class TTxMonitoring : public TTransactionBase<TGraphShard> {
html << "<tr><td>Memory.RecordsSize</td><td>" << Self->MemoryBackend.MetricsValues.size() << "</td></tr>";

html << "<tr><td>Local.MetricsSize</td><td>" << Self->LocalBackend.MetricsIndex.size() << "</td></tr>";
html << "<tr><td>Local.StartTimestamp</td><td>" << Self->StartTimestamp << "</td></tr>";
html << "<tr><td>StartTimestamp</td><td>" << Self->StartTimestamp << "</td></tr>";
html << "<tr><td>ClearTimestamp</td><td>" << Self->ClearTimestamp << "</td></tr>";

html << "</table>";
html << "</html>";
Expand Down
Loading
Loading