diff --git a/ydb/library/actors/core/executor_pool_basic.cpp b/ydb/library/actors/core/executor_pool_basic.cpp index 4c6db5d609a4..be76655b6bc9 100644 --- a/ydb/library/actors/core/executor_pool_basic.cpp +++ b/ydb/library/actors/core/executor_pool_basic.cpp @@ -162,13 +162,6 @@ namespace NActors { TWorkerId workerId = wctx.WorkerId; Y_DEBUG_ABORT_UNLESS(workerId < PoolThreads); - TlsThreadContext->Timers.Reset(); - - if (Harmonizer) { - LWPROBE(TryToHarmonize, PoolId, PoolName); - Harmonizer->Harmonize(TlsThreadContext->Timers.HPStart); - } - if (workerId >= 0) { Threads[workerId].UnsetWork(); } else { @@ -176,13 +169,16 @@ namespace NActors { wctx.SharedThread->UnsetWork(); } + if (Harmonizer) { + LWPROBE(TryToHarmonize, PoolId, PoolName); + Harmonizer->Harmonize(TlsThreadContext->StartOfElapsingTime.load(std::memory_order_relaxed)); + } + TAtomic x = AtomicGet(Semaphore); TSemaphore semaphore = TSemaphore::GetSemaphore(x); while (!StopFlag.load(std::memory_order_acquire)) { if (!semaphore.OldSemaphore || workerId >= 0 && semaphore.CurrentSleepThreadCount < 0) { if (workerId < 0 || !wctx.IsNeededToWaitNextActivation) { - TlsThreadContext->Timers.HPNow = GetCycleCountFast(); - wctx.AddElapsedCycles(ActorSystemIndex, TlsThreadContext->Timers.HPNow - TlsThreadContext->Timers.HPStart); return 0; } @@ -203,13 +199,6 @@ namespace NActors { wctx.SharedThread->SetWork(); } AtomicDecrement(Semaphore); - TlsThreadContext->Timers.HPNow = GetCycleCountFast(); - TlsThreadContext->Timers.Elapsed += TlsThreadContext->Timers.HPNow - TlsThreadContext->Timers.HPStart; - wctx.AddElapsedCycles(ActorSystemIndex, TlsThreadContext->Timers.Elapsed); - if (TlsThreadContext->Timers.Parked > 0) { - wctx.AddParkedCycles(TlsThreadContext->Timers.Parked); - } - return activation; } semaphore.CurrentSleepThreadCount++; diff --git a/ydb/library/actors/core/executor_pool_io.cpp b/ydb/library/actors/core/executor_pool_io.cpp index 1046d6ea66cd..2acced884827 100644 --- a/ydb/library/actors/core/executor_pool_io.cpp +++ b/ydb/library/actors/core/executor_pool_io.cpp @@ -30,31 +30,27 @@ namespace NActors { i16 workerId = wctx.WorkerId; Y_DEBUG_ABORT_UNLESS(workerId < PoolThreads); - NHPTimer::STime elapsed = 0; - NHPTimer::STime parked = 0; - NHPTimer::STime hpstart = GetCycleCountFast(); - NHPTimer::STime hpnow; - const TAtomic x = AtomicDecrement(Semaphore); if (x < 0) { TExecutorThreadCtx& threadCtx = Threads[workerId]; ThreadQueue.Push(workerId + 1, revolvingCounter); - hpnow = GetCycleCountFast(); - elapsed += hpnow - hpstart; + + NHPTimer::STime hpnow = GetCycleCountFast(); + NHPTimer::STime hpprev = TlsThreadContext->StartOfElapsingTime.exchange(hpnow, std::memory_order_acq_rel); + TlsThreadContext->ElapsingActorActivity.store(Max(), std::memory_order_release); + wctx.AddElapsedCycles(ActorSystemIndex, hpnow - hpprev); + if (threadCtx.WaitingPad.Park()) return 0; - hpstart = GetCycleCountFast(); - parked += hpstart - hpnow; + + hpnow = GetCycleCountFast(); + hpprev = TlsThreadContext->StartOfElapsingTime.exchange(hpnow, std::memory_order_acq_rel); + TlsThreadContext->ElapsingActorActivity.store(ActorSystemIndex, std::memory_order_release); + wctx.AddParkedCycles(hpnow - hpprev); } while (!StopFlag.load(std::memory_order_acquire)) { if (const ui32 activation = Activations.Pop(++revolvingCounter)) { - hpnow = GetCycleCountFast(); - elapsed += hpnow - hpstart; - wctx.AddElapsedCycles(ActorSystemIndex, elapsed); - if (parked > 0) { - wctx.AddParkedCycles(parked); - } return activation; } SpinLockPause(); diff --git a/ydb/library/actors/core/executor_pool_shared.cpp b/ydb/library/actors/core/executor_pool_shared.cpp index f9b769d40494..990c7d7ca52c 100644 --- a/ydb/library/actors/core/executor_pool_shared.cpp +++ b/ydb/library/actors/core/executor_pool_shared.cpp @@ -20,7 +20,6 @@ TSharedExecutorPool::TSharedExecutorPool(const TSharedExecutorPoolConfig &config , PoolCount(poolCount) , SharedThreadCount(poolsWithThreads.size()) , Threads(new TSharedExecutorThreadCtx[SharedThreadCount]) - , Timers(new TTimers[SharedThreadCount]) , TimePerMailbox(config.TimePerMailbox) , EventsPerMailbox(config.EventsPerMailbox) , SoftProcessingDurationTs(config.SoftProcessingDurationTs) diff --git a/ydb/library/actors/core/executor_pool_shared.h b/ydb/library/actors/core/executor_pool_shared.h index 2f0eb6097323..c8a8594012d0 100644 --- a/ydb/library/actors/core/executor_pool_shared.h +++ b/ydb/library/actors/core/executor_pool_shared.h @@ -43,7 +43,6 @@ namespace NActors { i16 PoolCount; i16 SharedThreadCount; std::unique_ptr Threads; - std::unique_ptr Timers; std::unique_ptr ScheduleReaders; std::unique_ptr ScheduleWriters; diff --git a/ydb/library/actors/core/executor_thread.cpp b/ydb/library/actors/core/executor_thread.cpp index 7262b2b4cf14..b1dacedd49e3 100644 --- a/ydb/library/actors/core/executor_thread.cpp +++ b/ydb/library/actors/core/executor_thread.cpp @@ -48,6 +48,7 @@ namespace NActors { , ThreadName(threadName) , TimePerMailbox(timePerMailbox) , EventsPerMailbox(eventsPerMailbox) + , ActorSystemIndex(TActorTypeOperator::GetActorSystemIndex()) { Ctx.Switch( ExecutorPool, @@ -75,6 +76,7 @@ namespace NActors { , EventsPerMailbox(eventsPerMailbox) , SoftProcessingDurationTs(softProcessingDurationTs) , SharedStats(poolCount) + , ActorSystemIndex(TActorTypeOperator::GetActorSystemIndex()) { Ctx.Switch( ExecutorPool, @@ -239,9 +241,14 @@ namespace NActors { if (activityType != prevActivityType) { prevActivityType = activityType; NProfiling::TMemoryTagScope::Reset(activityType); + TlsThreadContext->ElapsingActorActivity.store(activityType, std::memory_order_release); } actor->Receive(ev); + + hpnow = GetCycleCountFast(); + hpprev = TlsThreadContext->StartOfElapsingTime.exchange(hpnow, std::memory_order_acq_rel); + mailbox->ProcessEvents(mailbox); actor->OnDequeueEvent(); @@ -256,7 +263,6 @@ namespace NActors { if (mailbox->IsEmpty()) // was not-free and become free, we must reclaim mailbox reclaimAsFree = true; - hpnow = GetCycleCountFast(); NHPTimer::STime elapsed = Ctx.AddEventProcessingStats(hpprev, hpnow, activityType, CurrentActorScheduledEventsCounter); if (elapsed > 1000000) { LwTraceSlowEvent(ev.Get(), evTypeForTracing, actorType, Ctx.PoolId, CurrentRecipient, NHPTimer::GetSeconds(elapsed) * 1000.0); @@ -277,10 +283,10 @@ namespace NActors { Ctx.IncrementNonDeliveredEvents(); } hpnow = GetCycleCountFast(); + hpprev = TlsThreadContext->StartOfElapsingTime.exchange(hpnow, std::memory_order_acq_rel); + Ctx.AddElapsedCycles(ActorSystemIndex, hpnow - hpprev); } - hpprev = hpnow; - if (TlsThreadContext->CapturedType == ESendingType::Tail) { AtomicStore(&mailbox->ScheduleMoment, hpnow); Ctx.IncrementMailboxPushedOutByTailSending(); @@ -360,6 +366,7 @@ namespace NActors { break; // empty queue, leave } } + TlsThreadContext->ElapsingActorActivity.store(ActorSystemIndex, std::memory_order_release); NProfiling::TMemoryTagScope::Reset(0); TlsActivationContext = nullptr; @@ -495,8 +502,11 @@ namespace NActors { ThreadDisableBalloc(); #endif - TThreadContext threadCtx; - TlsThreadContext = &threadCtx; + TlsThreadCtx.WorkerCtx = &Ctx; + TlsThreadCtx.ActorSystemIndex = ActorSystemIndex; + TlsThreadCtx.ElapsingActorActivity = ActorSystemIndex; + TlsThreadCtx.StartOfElapsingTime = GetCycleCountFast(); + TlsThreadContext = &TlsThreadCtx; if (ThreadName) { ::SetCurrentThreadName(ThreadName); } @@ -529,8 +539,11 @@ namespace NActors { ThreadDisableBalloc(); #endif - TThreadContext threadCtx; - TlsThreadContext = &threadCtx; + TlsThreadCtx.WorkerCtx = &Ctx; + TlsThreadCtx.ActorSystemIndex = ActorSystemIndex; + TlsThreadCtx.ElapsingActorActivity = ActorSystemIndex; + TlsThreadCtx.StartOfElapsingTime = GetCycleCountFast(); + TlsThreadContext = &TlsThreadCtx; if (ThreadName) { ::SetCurrentThreadName(ThreadName); } @@ -551,7 +564,7 @@ namespace NActors { } if (!wasWorking && !StopFlag.load(std::memory_order_relaxed)) { - TlsThreadContext->Timers.Reset(); + ThreadCtx->UnsetWork(); ThreadCtx->Wait(0, &StopFlag); } @@ -760,10 +773,26 @@ namespace NActors { } void TGenericExecutorThread::GetCurrentStats(TExecutorThreadStats& statsCopy) const { + NHPTimer::STime hpnow = GetCycleCountFast(); + ui64 activityType = TlsThreadCtx.ElapsingActorActivity.load(std::memory_order_acquire); + NHPTimer::STime hpprev = TlsThreadCtx.StartOfElapsingTime.exchange(hpnow, std::memory_order_acq_rel); + if (activityType == Max()) { + Ctx.AddParkedCycles(hpnow - hpprev); + } else { + Ctx.AddElapsedCycles(activityType, hpnow - hpprev); + } Ctx.GetCurrentStats(statsCopy); } void TGenericExecutorThread::GetSharedStats(i16 poolId, TExecutorThreadStats &statsCopy) const { + NHPTimer::STime hpnow = GetCycleCountFast(); + ui64 activityType = TlsThreadCtx.ElapsingActorActivity.load(std::memory_order_acquire); + NHPTimer::STime hpprev = TlsThreadCtx.StartOfElapsingTime.exchange(hpnow, std::memory_order_acq_rel); + if (activityType == Max()) { + Ctx.AddParkedCycles(hpnow - hpprev); + } else { + Ctx.AddElapsedCycles(activityType, hpnow - hpprev); + } statsCopy = TExecutorThreadStats(); statsCopy.Aggregate(SharedStats[poolId]); } diff --git a/ydb/library/actors/core/executor_thread.h b/ydb/library/actors/core/executor_thread.h index b9a339648b6f..d8ef032f4d3f 100644 --- a/ydb/library/actors/core/executor_thread.h +++ b/ydb/library/actors/core/executor_thread.h @@ -4,6 +4,7 @@ #include "event.h" #include "callstack.h" #include "probes.h" +#include "thread_context.h" #include "worker_context.h" #include "log_settings.h" @@ -92,7 +93,8 @@ namespace NActors { ui64 CurrentActorScheduledEventsCounter = 0; // Thread-specific - TWorkerContext Ctx; + mutable TThreadContext TlsThreadCtx; + mutable TWorkerContext Ctx; ui64 RevolvingReadCounter = 0; ui64 RevolvingWriteCounter = 0; const TString ThreadName; @@ -104,6 +106,7 @@ namespace NActors { ui64 SoftProcessingDurationTs; std::vector SharedStats; + const ui32 ActorSystemIndex; }; class TExecutorThread: public TGenericExecutorThread { diff --git a/ydb/library/actors/core/executor_thread_ctx.h b/ydb/library/actors/core/executor_thread_ctx.h index 7af5a085dd1d..dd0d1b1c7193 100644 --- a/ydb/library/actors/core/executor_thread_ctx.h +++ b/ydb/library/actors/core/executor_thread_ctx.h @@ -2,6 +2,7 @@ #include "defs.h" #include "thread_context.h" +#include "worker_context.h" #include #include @@ -95,16 +96,19 @@ namespace NActors { return false; } + NHPTimer::STime hpnow = GetCycleCountFast(); + NHPTimer::STime hpprev = TlsThreadContext->StartOfElapsingTime.exchange(hpnow, std::memory_order_acq_rel); + TlsThreadContext->ElapsingActorActivity.store(Max(), std::memory_order_release); + TlsThreadContext->WorkerCtx->AddElapsedCycles(TlsThreadContext->ActorSystemIndex, hpnow - hpprev); do { - TlsThreadContext->Timers.HPNow = GetCycleCountFast(); - TlsThreadContext->Timers.Elapsed += TlsThreadContext->Timers.HPNow - TlsThreadContext->Timers.HPStart; if (WaitingPad.Park()) // interrupted return true; - TlsThreadContext->Timers.HPStart = GetCycleCountFast(); - TlsThreadContext->Timers.Parked += TlsThreadContext->Timers.HPStart - TlsThreadContext->Timers.HPNow; + hpnow = GetCycleCountFast(); + hpprev = TlsThreadContext->StartOfElapsingTime.exchange(hpnow, std::memory_order_acq_rel); + TlsThreadContext->WorkerCtx->AddParkedCycles(hpnow - hpprev); state = GetState(); } while (static_cast(state) == EThreadState::Sleep && !stopFlag->load(std::memory_order_relaxed)); - + TlsThreadContext->ElapsingActorActivity.store(TlsThreadContext->ActorSystemIndex, std::memory_order_release); static_cast(this)->AfterWakeUp(state); return false; } diff --git a/ydb/library/actors/core/thread_context.h b/ydb/library/actors/core/thread_context.h index 0ddaffd1c5e3..d19ae3cd1e08 100644 --- a/ydb/library/actors/core/thread_context.h +++ b/ydb/library/actors/core/thread_context.h @@ -11,26 +11,11 @@ namespace NActors { class IExecutorPool; + struct TWorkerContext; template struct TWaitingStats; - struct TTimers { - NHPTimer::STime Elapsed = 0; - NHPTimer::STime Parked = 0; - NHPTimer::STime Blocked = 0; - NHPTimer::STime HPStart = GetCycleCountFast(); - NHPTimer::STime HPNow; - - void Reset() { - Elapsed = 0; - Parked = 0; - Blocked = 0; - HPStart = GetCycleCountFast(); - HPNow = HPStart; - } - }; - struct TThreadContext { IExecutorPool *Pool = nullptr; ui32 CapturedActivation = 0; @@ -42,8 +27,12 @@ namespace NActors { ui16 LocalQueueSize = 0; TWaitingStats *WaitingStats = nullptr; bool IsCurrentRecipientAService = false; - TTimers Timers; TMPMCRingQueue<20>::EPopMode ActivationPopMode = TMPMCRingQueue<20>::EPopMode::ReallySlow; + + std::atomic StartOfElapsingTime = 0; + std::atomic ElapsingActorActivity = 0; + TWorkerContext *WorkerCtx = nullptr; + ui32 ActorSystemIndex = 0; }; extern Y_POD_THREAD(TThreadContext*) TlsThreadContext; // in actor.cpp