Skip to content

Commit

Permalink
improve base stats propagation logic (#1741)
Browse files Browse the repository at this point in the history
  • Loading branch information
alexd65536 authored Feb 12, 2024
1 parent 12610a7 commit 00dcc23
Show file tree
Hide file tree
Showing 6 changed files with 211 additions and 49 deletions.
7 changes: 6 additions & 1 deletion ydb/core/protos/statistics.proto
Original file line number Diff line number Diff line change
Expand Up @@ -44,9 +44,10 @@ message TEvConnectNode {
message TEvRequestStats {
optional uint32 NodeId = 1;
repeated fixed64 NeedSchemeShards = 2;
optional bool Urgent = 3;
}

// SA -> nodes
// SA -> nodes, node -> nodes
message TEvPropagateStatistics {
repeated uint32 NodeIds = 1; // hierarchical propagation
message TStatsEntry {
Expand All @@ -57,6 +58,10 @@ message TEvPropagateStatistics {
repeated TStatsEntry Entries = 2;
}

// node -> SA, node -> node
message TEvPropagateStatisticsResponse {
}

// SA -> nodes
message TEvStatisticsIsDisabled {
}
142 changes: 104 additions & 38 deletions ydb/core/statistics/aggregator/aggregator_impl.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@ TStatisticsAggregator::TStatisticsAggregator(const NActors::TActorId& tablet, TT
, TTabletExecutedFlat(info, tablet, new NMiniKQL::TMiniKQLFactory)
{
PropagateInterval = forTests ? TDuration::Seconds(5) : TDuration::Minutes(3);
PropagateTimeout = forTests ? TDuration::Seconds(3) : TDuration::Minutes(2);

auto seed = std::random_device{}();
RandomGenerator.seed(seed);
Expand Down Expand Up @@ -124,11 +125,6 @@ void TStatisticsAggregator::Handle(TEvStatistics::TEvConnectNode::TPtr& ev) {
return;
}

if (!IsPropagateInFlight) {
Schedule(PropagateInterval, new TEvPrivate::TEvPropagate());
IsPropagateInFlight = true;
}

if (!record.NeedSchemeShardsSize()) {
return;
}
Expand All @@ -149,14 +145,28 @@ void TStatisticsAggregator::Handle(TEvStatistics::TEvRequestStats::TPtr& ev) {

SA_LOG_D("[" << TabletID() << "] EvRequestStats"
<< ", node id = " << nodeId
<< ", schemeshard count = " << record.NeedSchemeShardsSize());
<< ", schemeshard count = " << record.NeedSchemeShardsSize()
<< ", urgent = " << record.GetUrgent());

if (!EnableStatistics) {
auto disabled = std::make_unique<TEvStatistics::TEvStatisticsIsDisabled>();
Send(NStat::MakeStatServiceID(nodeId), disabled.release());
return;
}

for (const auto& ssId : record.GetNeedSchemeShards()) {
RequestedSchemeShards.insert(ssId);
}

if (record.GetUrgent()) {
PendingRequests.push(std::move(ev));
if (!ProcessUrgentInFlight) {
Send(SelfId(), new TEvPrivate::TEvProcessUrgent());
ProcessUrgentInFlight = true;
}
return;
}

std::vector<TSSId> ssIds;
ssIds.reserve(record.NeedSchemeShardsSize());
for (const auto& ssId : record.GetNeedSchemeShards()) {
Expand Down Expand Up @@ -206,6 +216,60 @@ void TStatisticsAggregator::Handle(TEvPrivate::TEvPropagate::TPtr&) {
Schedule(PropagateInterval, new TEvPrivate::TEvPropagate());
}

void TStatisticsAggregator::Handle(TEvStatistics::TEvPropagateStatisticsResponse::TPtr&) {
if (!PropagationInFlight) {
return;
}
if (LastSSIndex < PropagationSchemeShards.size()) {
LastSSIndex = PropagatePart(PropagationNodes, PropagationSchemeShards, LastSSIndex, true);
} else {
PropagationInFlight = false;
PropagationNodes.clear();
PropagationSchemeShards.clear();
LastSSIndex = 0;
}
}

void TStatisticsAggregator::Handle(TEvPrivate::TEvProcessUrgent::TPtr&) {
SA_LOG_D("[" << TabletID() << "] EvProcessUrgent");

ProcessUrgentInFlight = false;

if (PendingRequests.empty()) {
return;
}

TEvStatistics::TEvRequestStats::TPtr ev = std::move(PendingRequests.front());
PendingRequests.pop();

if (!PendingRequests.empty()) {
Send(SelfId(), new TEvPrivate::TEvProcessUrgent());
ProcessUrgentInFlight = true;
}

auto record = ev->Get()->Record;
const auto nodeId = record.GetNodeId();

std::vector<TSSId> ssIds;
ssIds.reserve(record.NeedSchemeShardsSize());
for (const auto& ssId : record.GetNeedSchemeShards()) {
ssIds.push_back(ssId);
}

SendStatisticsToNode(nodeId, ssIds);
}

void TStatisticsAggregator::Handle(TEvPrivate::TEvPropagateTimeout::TPtr&) {
SA_LOG_D("[" << TabletID() << "] EvPropagateTimeout");

if (PropagationInFlight) {
PropagationInFlight = false;
PropagationNodes.clear();
PropagationSchemeShards.clear();
LastSSIndex = 0;
}
}

void TStatisticsAggregator::ProcessRequests(TNodeId nodeId, const std::vector<TSSId>& ssIds) {
if (FastCounter > 0) {
--FastCounter;
Expand All @@ -217,7 +281,7 @@ void TStatisticsAggregator::ProcessRequests(TNodeId nodeId, const std::vector<TS
}
}
if (!FastCheckInFlight) {
Schedule(TDuration::MilliSeconds(100), new TEvPrivate::TEvFastPropagateCheck());
Schedule(FastCheckInterval, new TEvPrivate::TEvFastPropagateCheck());
FastCheckInFlight = true;
}
}
Expand All @@ -230,7 +294,7 @@ void TStatisticsAggregator::SendStatisticsToNode(TNodeId nodeId, const std::vect
std::vector<TNodeId> nodeIds;
nodeIds.push_back(nodeId);

PropagateStatisticsImpl(nodeIds, ssIds);
PropagatePart(nodeIds, ssIds, 0, false);
}

void TStatisticsAggregator::PropagateStatistics() {
Expand All @@ -255,7 +319,13 @@ void TStatisticsAggregator::PropagateStatistics() {
ssIds.push_back(ssId);
}

PropagateStatisticsImpl(nodeIds, ssIds);
Schedule(PropagateTimeout, new TEvPrivate::TEvPropagateTimeout);

PropagationInFlight = true;
PropagationNodes = std::move(nodeIds);
PropagationSchemeShards = std::move(ssIds);

LastSSIndex = PropagatePart(PropagationNodes, PropagationSchemeShards, 0, true);
}

void TStatisticsAggregator::PropagateFastStatistics() {
Expand All @@ -280,43 +350,39 @@ void TStatisticsAggregator::PropagateFastStatistics() {
ssIds.push_back(ssId);
}

PropagateStatisticsImpl(nodeIds, ssIds);
PropagatePart(nodeIds, ssIds, 0, false);
}

void TStatisticsAggregator::PropagateStatisticsImpl(
const std::vector<TNodeId>& nodeIds, const std::vector<TSSId>& ssIds)
size_t TStatisticsAggregator::PropagatePart(const std::vector<TNodeId>& nodeIds, const std::vector<TSSId>& ssIds,
size_t lastSSIndex, bool useSizeLimit)
{
if (nodeIds.empty() || ssIds.empty()) {
return;
}
auto propagate = std::make_unique<TEvStatistics::TEvPropagateStatistics>();
auto* record = propagate->MutableRecord();

TNodeId leadingNodeId = nodeIds[0];
record->MutableNodeIds()->Reserve(nodeIds.size() - 1);
for (size_t i = 1; i < nodeIds.size(); ++i) {
record->AddNodeIds(nodeIds[i]);
}

for (size_t index = 0; index < ssIds.size(); ) {
auto propagate = std::make_unique<TEvStatistics::TEvPropagateStatistics>();
auto* record = propagate->MutableRecord();
record->MutableNodeIds()->Reserve(nodeIds.size() - 1);
for (size_t i = 1; i < nodeIds.size(); ++i) {
record->AddNodeIds(nodeIds[i]);
}
for (size_t size = 0; index < ssIds.size(); ++index) {
auto ssId = ssIds[index];
auto* entry = record->AddEntries();
entry->SetSchemeShardId(ssId);
auto itStats = BaseStats.find(ssId);
if (itStats != BaseStats.end()) {
entry->SetStats(itStats->second);
size += itStats->second.size();
} else {
entry->SetStats(TString()); // stats are not sent from SA yet
}
if (size >= StatsSizeLimitBytes) {
++index;
break;
}
size_t sizeLimit = useSizeLimit ? StatsSizeLimitBytes : std::numeric_limits<size_t>::max();
size_t index = lastSSIndex;
for (size_t size = 0; index < ssIds.size() && size < sizeLimit; ++index) {
auto ssId = ssIds[index];
auto* entry = record->AddEntries();
entry->SetSchemeShardId(ssId);
auto itStats = BaseStats.find(ssId);
if (itStats != BaseStats.end()) {
entry->SetStats(itStats->second);
size += itStats->second.size();
} else {
entry->SetStats(TString()); // stats are not sent from SS yet
}
Send(NStat::MakeStatServiceID(leadingNodeId), propagate.release());
}

Send(NStat::MakeStatServiceID(leadingNodeId), propagate.release());

return index;
}

void TStatisticsAggregator::PersistSysParam(NIceDb::TNiceDb& db, ui64 id, const TString& value) {
Expand Down
24 changes: 22 additions & 2 deletions ydb/core/statistics/aggregator/aggregator_impl.h
Original file line number Diff line number Diff line change
Expand Up @@ -42,12 +42,16 @@ class TStatisticsAggregator : public TActor<TStatisticsAggregator>, public NTabl
enum EEv {
EvPropagate = EventSpaceBegin(TEvents::ES_PRIVATE),
EvFastPropagateCheck,
EvProcessUrgent,
EvPropagateTimeout,

EvEnd
};

struct TEvPropagate : public TEventLocal<TEvPropagate, EvPropagate> {};
struct TEvFastPropagateCheck : public TEventLocal<TEvFastPropagateCheck, EvFastPropagateCheck> {};
struct TEvProcessUrgent : public TEventLocal<TEvProcessUrgent, EvProcessUrgent> {};
struct TEvPropagateTimeout : public TEventLocal<TEvPropagateTimeout, EvPropagateTimeout> {};
};

private:
Expand All @@ -73,12 +77,16 @@ class TStatisticsAggregator : public TActor<TStatisticsAggregator>, public NTabl
void Handle(TEvTabletPipe::TEvServerConnected::TPtr& ev);
void Handle(TEvTabletPipe::TEvServerDisconnected::TPtr& ev);
void Handle(TEvPrivate::TEvFastPropagateCheck::TPtr& ev);
void Handle(TEvStatistics::TEvPropagateStatisticsResponse::TPtr& ev);
void Handle(TEvPrivate::TEvProcessUrgent::TPtr& ev);
void Handle(TEvPrivate::TEvPropagateTimeout::TPtr& ev);

void ProcessRequests(TNodeId nodeId, const std::vector<TSSId>& ssIds);
void SendStatisticsToNode(TNodeId nodeId, const std::vector<TSSId>& ssIds);
void PropagateStatistics();
void PropagateFastStatistics();
void PropagateStatisticsImpl(const std::vector<TNodeId>& nodeIds, const std::vector<TSSId>& ssIds);
size_t PropagatePart(const std::vector<TNodeId>& nodeIds, const std::vector<TSSId>& ssIds,
size_t lastSSIndex, bool useSizeLimit);

void PersistSysParam(NIceDb::TNiceDb& db, ui64 id, const TString& value);

Expand All @@ -99,6 +107,9 @@ class TStatisticsAggregator : public TActor<TStatisticsAggregator>, public NTabl
hFunc(TEvTabletPipe::TEvServerConnected, Handle);
hFunc(TEvTabletPipe::TEvServerDisconnected, Handle);
hFunc(TEvPrivate::TEvFastPropagateCheck, Handle);
hFunc(TEvStatistics::TEvPropagateStatisticsResponse, Handle);
hFunc(TEvPrivate::TEvProcessUrgent, Handle);
hFunc(TEvPrivate::TEvPropagateTimeout, Handle);
default:
if (!HandleDefaultEvents(ev, SelfId())) {
LOG_CRIT(TlsActivationContext->AsActorContext(), NKikimrServices::STATISTICS,
Expand All @@ -118,7 +129,8 @@ class TStatisticsAggregator : public TActor<TStatisticsAggregator>, public NTabl
static constexpr size_t StatsSizeLimitBytes = 2 << 20; // limit for stats size in one message

TDuration PropagateInterval;
bool IsPropagateInFlight = false; // is slow propagation started
TDuration PropagateTimeout;
static constexpr TDuration FastCheckInterval = TDuration::MilliSeconds(50);

std::unordered_map<TSSId, TString> BaseStats; // schemeshard id -> serialized stats for all paths

Expand All @@ -134,6 +146,14 @@ class TStatisticsAggregator : public TActor<TStatisticsAggregator>, public NTabl
bool FastCheckInFlight = false;
std::unordered_set<TNodeId> FastNodes; // nodes for fast propagation
std::unordered_set<TSSId> FastSchemeShards; // schemeshards for fast propagation

bool PropagationInFlight = false;
std::vector<TNodeId> PropagationNodes;
std::vector<TSSId> PropagationSchemeShards;
size_t LastSSIndex = 0;

std::queue<TEvStatistics::TEvRequestStats::TPtr> PendingRequests;
bool ProcessUrgentInFlight = false;
};

} // NKikimr::NStat
2 changes: 2 additions & 0 deletions ydb/core/statistics/aggregator/tx_init.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -89,6 +89,8 @@ struct TStatisticsAggregator::TTxInit : public TTxBase {
Self->EnableStatistics = AppData(ctx)->FeatureFlags.GetEnableStatistics();
Self->SubscribeForConfigChanges(ctx);

Self->Schedule(Self->PropagateInterval, new TEvPrivate::TEvPropagate());

Self->Become(&TThis::StateWork);
}
};
Expand Down
7 changes: 7 additions & 0 deletions ydb/core/statistics/events.h
Original file line number Diff line number Diff line change
Expand Up @@ -55,6 +55,7 @@ struct TEvStatistics {
EvRequestStats,
EvPropagateStatistics,
EvStatisticsIsDisabled,
EvPropagateStatisticsResponse,

EvEnd
};
Expand Down Expand Up @@ -115,6 +116,12 @@ struct TEvStatistics {
NKikimrStat::TEvStatisticsIsDisabled,
EvStatisticsIsDisabled>
{};

struct TEvPropagateStatisticsResponse : public TEventPB<
TEvPropagateStatisticsResponse,
NKikimrStat::TEvPropagateStatisticsResponse,
EvPropagateStatisticsResponse>
{};
};

} // NStat
Expand Down
Loading

0 comments on commit 00dcc23

Please sign in to comment.