Skip to content

Commit

Permalink
fix walking back start of elapsing time (ydb-platform#4578)
Browse files Browse the repository at this point in the history
  • Loading branch information
kruall authored and zverevgeny committed Jun 12, 2024
1 parent a0edd99 commit 05dc81b
Show file tree
Hide file tree
Showing 5 changed files with 39 additions and 20 deletions.
4 changes: 2 additions & 2 deletions ydb/library/actors/core/executor_pool_io.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -36,15 +36,15 @@ namespace NActors {
ThreadQueue.Push(workerId + 1, revolvingCounter);

NHPTimer::STime hpnow = GetCycleCountFast();
NHPTimer::STime hpprev = TlsThreadContext->StartOfElapsingTime.exchange(hpnow, std::memory_order_acq_rel);
NHPTimer::STime hpprev = TlsThreadContext->UpdateStartOfElapsingTime(hpnow);
TlsThreadContext->ElapsingActorActivity.store(Max<ui64>(), std::memory_order_release);
wctx.AddElapsedCycles(ActorSystemIndex, hpnow - hpprev);

if (threadCtx.WaitingPad.Park())
return 0;

hpnow = GetCycleCountFast();
hpprev = TlsThreadContext->StartOfElapsingTime.exchange(hpnow, std::memory_order_acq_rel);
hpprev = TlsThreadContext->UpdateStartOfElapsingTime(hpnow);
TlsThreadContext->ElapsingActorActivity.store(ActorSystemIndex, std::memory_order_release);
wctx.AddParkedCycles(hpnow - hpprev);
}
Expand Down
22 changes: 12 additions & 10 deletions ydb/library/actors/core/executor_thread.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -199,16 +199,16 @@ namespace NActors {
bool firstEvent = true;
bool preempted = false;
bool wasWorking = false;
NHPTimer::STime hpnow = Ctx.HPStart;
NHPTimer::STime hpprev = TlsThreadContext->StartOfElapsingTime.exchange(hpnow, std::memory_order_acq_rel);
NHPTimer::STime hpnow = Ctx.HPStart;
NHPTimer::STime hpprev = TlsThreadContext->UpdateStartOfElapsingTime(hpnow);
Ctx.AddElapsedCycles(ActorSystemIndex, hpnow - hpprev);
hpprev = Ctx.HPStart;
NHPTimer::STime eventStart = Ctx.HPStart;

for (; Ctx.ExecutedEvents < Ctx.EventsPerMailbox; ++Ctx.ExecutedEvents) {
if (TAutoPtr<IEventHandle> evExt = mailbox->Pop()) {
mailbox->ProcessEvents(mailbox);
recipient = evExt->GetRecipientRewrite();
TActorContext ctx(*mailbox, *this, hpprev, recipient);
TActorContext ctx(*mailbox, *this, eventStart, recipient);
TlsActivationContext = &ctx; // ensure dtor (if any) is called within actor system
// move for destruct before ctx;
auto ev = std::move(evExt);
Expand Down Expand Up @@ -250,7 +250,7 @@ namespace NActors {
actor->Receive(ev);

hpnow = GetCycleCountFast();
hpprev = TlsThreadContext->StartOfElapsingTime.exchange(hpnow, std::memory_order_acq_rel);
hpprev = TlsThreadContext->UpdateStartOfElapsingTime(hpnow);

mailbox->ProcessEvents(mailbox);
actor->OnDequeueEvent();
Expand All @@ -265,8 +265,9 @@ namespace NActors {

if (mailbox->IsEmpty()) // was not-free and become free, we must reclaim mailbox
reclaimAsFree = true;

NHPTimer::STime elapsed = Ctx.AddEventProcessingStats(hpprev, hpnow, activityType, CurrentActorScheduledEventsCounter);

Ctx.AddElapsedCycles(activityType, hpnow - hpprev);
NHPTimer::STime elapsed = Ctx.AddEventProcessingStats(eventStart, hpnow, activityType, CurrentActorScheduledEventsCounter);
if (elapsed > 1000000) {
LwTraceSlowEvent(ev.Get(), evTypeForTracing, actorType, Ctx.PoolId, CurrentRecipient, NHPTimer::GetSeconds(elapsed) * 1000.0);
}
Expand All @@ -286,9 +287,10 @@ namespace NActors {
Ctx.IncrementNonDeliveredEvents();
}
hpnow = GetCycleCountFast();
hpprev = TlsThreadContext->StartOfElapsingTime.exchange(hpnow, std::memory_order_acq_rel);
hpprev = TlsThreadContext->UpdateStartOfElapsingTime(hpnow);
Ctx.AddElapsedCycles(ActorSystemIndex, hpnow - hpprev);
}
eventStart = hpnow;

if (TlsThreadContext->CapturedType == ESendingType::Tail) {
AtomicStore(&mailbox->ScheduleMoment, hpnow);
Expand Down Expand Up @@ -778,7 +780,7 @@ namespace NActors {
void TGenericExecutorThread::GetCurrentStats(TExecutorThreadStats& statsCopy) const {
NHPTimer::STime hpnow = GetCycleCountFast();
ui64 activityType = TlsThreadCtx.ElapsingActorActivity.load(std::memory_order_acquire);
NHPTimer::STime hpprev = TlsThreadCtx.StartOfElapsingTime.exchange(hpnow, std::memory_order_acq_rel);
NHPTimer::STime hpprev = TlsThreadCtx.UpdateStartOfElapsingTime(hpnow);
if (activityType == Max<ui64>()) {
Ctx.AddParkedCycles(hpnow - hpprev);
} else {
Expand All @@ -790,7 +792,7 @@ namespace NActors {
void TGenericExecutorThread::GetSharedStats(i16 poolId, TExecutorThreadStats &statsCopy) const {
NHPTimer::STime hpnow = GetCycleCountFast();
ui64 activityType = TlsThreadCtx.ElapsingActorActivity.load(std::memory_order_acquire);
NHPTimer::STime hpprev = TlsThreadCtx.StartOfElapsingTime.exchange(hpnow, std::memory_order_acq_rel);
NHPTimer::STime hpprev = TlsThreadCtx.UpdateStartOfElapsingTime(hpnow);
if (activityType == Max<ui64>()) {
Ctx.AddParkedCycles(hpnow - hpprev);
} else {
Expand Down
4 changes: 2 additions & 2 deletions ydb/library/actors/core/executor_thread_ctx.h
Original file line number Diff line number Diff line change
Expand Up @@ -97,14 +97,14 @@ namespace NActors {
}

NHPTimer::STime hpnow = GetCycleCountFast();
NHPTimer::STime hpprev = TlsThreadContext->StartOfElapsingTime.exchange(hpnow, std::memory_order_acq_rel);
NHPTimer::STime hpprev = TlsThreadContext->UpdateStartOfElapsingTime(hpnow);
TlsThreadContext->ElapsingActorActivity.store(Max<ui64>(), std::memory_order_release);
TlsThreadContext->WorkerCtx->AddElapsedCycles(TlsThreadContext->ActorSystemIndex, hpnow - hpprev);
do {
if (WaitingPad.Park()) // interrupted
return true;
hpnow = GetCycleCountFast();
hpprev = TlsThreadContext->StartOfElapsingTime.exchange(hpnow, std::memory_order_acq_rel);
hpprev = TlsThreadContext->UpdateStartOfElapsingTime(hpnow);
TlsThreadContext->WorkerCtx->AddParkedCycles(hpnow - hpprev);
state = GetState<TWaitState>();
} while (static_cast<EThreadState>(state) == EThreadState::Sleep && !stopFlag->load(std::memory_order_relaxed));
Expand Down
16 changes: 15 additions & 1 deletion ydb/library/actors/core/thread_context.h
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@

#include "defs.h"

#include <atomic>
#include <ydb/library/actors/util/datetime.h>
#include <ydb/library/actors/util/mpmc_ring_queue.h>

Expand Down Expand Up @@ -29,10 +30,23 @@ namespace NActors {
bool IsCurrentRecipientAService = false;
TMPMCRingQueue<20>::EPopMode ActivationPopMode = TMPMCRingQueue<20>::EPopMode::ReallySlow;

std::atomic<ui64> StartOfElapsingTime = 0;
std::atomic<i64> StartOfElapsingTime = 0;
std::atomic<ui64> ElapsingActorActivity = 0;
TWorkerContext *WorkerCtx = nullptr;
ui32 ActorSystemIndex = 0;

ui64 UpdateStartOfElapsingTime(i64 newValue) {
i64 oldValue = StartOfElapsingTime.load(std::memory_order_acquire);
for (;;) {
if (newValue - oldValue <= 0) {
break;
}
if (StartOfElapsingTime.compare_exchange_strong(oldValue, newValue, std::memory_order_acq_rel)) {
break;
}
}
return oldValue;
}
};

extern Y_POD_THREAD(TThreadContext*) TlsThreadContext; // in actor.cpp
Expand Down
13 changes: 8 additions & 5 deletions ydb/library/actors/core/worker_context.h
Original file line number Diff line number Diff line change
Expand Up @@ -51,13 +51,17 @@ namespace NActors {
}

void AddElapsedCycles(ui32 activityType, i64 elapsed) {
Y_DEBUG_ABORT_UNLESS(activityType < Stats->MaxActivityType());
RelaxedStore(&Stats->ElapsedTicks, RelaxedLoad(&Stats->ElapsedTicks) + elapsed);
RelaxedStore(&Stats->ElapsedTicksByActivity[activityType], RelaxedLoad(&Stats->ElapsedTicksByActivity[activityType]) + elapsed);
if (Y_LIKELY(elapsed > 0)) {
Y_DEBUG_ABORT_UNLESS(activityType < Stats->MaxActivityType());
RelaxedStore(&Stats->ElapsedTicks, RelaxedLoad(&Stats->ElapsedTicks) + elapsed);
RelaxedStore(&Stats->ElapsedTicksByActivity[activityType], RelaxedLoad(&Stats->ElapsedTicksByActivity[activityType]) + elapsed);
}
}

void AddParkedCycles(i64 elapsed) {
RelaxedStore(&Stats->ParkedTicks, RelaxedLoad(&Stats->ParkedTicks) + elapsed);
if (Y_LIKELY(elapsed > 0)) {
RelaxedStore(&Stats->ParkedTicks, RelaxedLoad(&Stats->ParkedTicks) + elapsed);
}
}

void AddBlockedCycles(i64 elapsed) {
Expand Down Expand Up @@ -126,7 +130,6 @@ namespace NActors {
RelaxedStore(&Stats->ReceivedEvents, RelaxedLoad(&Stats->ReceivedEvents) + 1);
RelaxedStore(&Stats->ReceivedEventsByActivity[activityType], RelaxedLoad(&Stats->ReceivedEventsByActivity[activityType]) + 1);
RelaxedStore(&Stats->ScheduledEventsByActivity[activityType], RelaxedLoad(&Stats->ScheduledEventsByActivity[activityType]) + scheduled);
AddElapsedCycles(activityType, elapsed);
return elapsed;
}

Expand Down

0 comments on commit 05dc81b

Please sign in to comment.