diff --git a/ydb/library/actors/core/executor_pool_basic.cpp b/ydb/library/actors/core/executor_pool_basic.cpp index 9beecd762383..9964facc940c 100644 --- a/ydb/library/actors/core/executor_pool_basic.cpp +++ b/ydb/library/actors/core/executor_pool_basic.cpp @@ -186,8 +186,6 @@ namespace NActors { TWorkerId workerId = wctx.WorkerId; Y_DEBUG_ABORT_UNLESS(workerId < MaxFullThreadCount); - TlsThreadContext->Timers.Reset(); - if (workerId >= 0) { Threads[workerId].UnsetWork(); } else { @@ -197,7 +195,7 @@ namespace NActors { if (Harmonizer) { LWPROBE(TryToHarmonize, PoolId, PoolName); - Harmonizer->Harmonize(TlsThreadContext->Timers.HPStart); + Harmonizer->Harmonize(TlsThreadContext->StartOfElapsingTime.load(std::memory_order_relaxed)); } TAtomic x = AtomicGet(Semaphore); @@ -205,8 +203,6 @@ namespace NActors { while (!StopFlag.load(std::memory_order_acquire)) { if (!semaphore.OldSemaphore || workerId >= 0 && semaphore.CurrentSleepThreadCount < 0) { if (workerId < 0 || !wctx.IsNeededToWaitNextActivation) { - TlsThreadContext->Timers.HPNow = GetCycleCountFast(); - wctx.AddElapsedCycles(ActorSystemIndex, TlsThreadContext->Timers.HPNow - TlsThreadContext->Timers.HPStart); return 0; } @@ -227,13 +223,6 @@ namespace NActors { wctx.SharedThread->SetWork(); } AtomicDecrement(Semaphore); - TlsThreadContext->Timers.HPNow = GetCycleCountFast(); - TlsThreadContext->Timers.Elapsed += TlsThreadContext->Timers.HPNow - TlsThreadContext->Timers.HPStart; - wctx.AddElapsedCycles(ActorSystemIndex, TlsThreadContext->Timers.Elapsed); - if (TlsThreadContext->Timers.Parked > 0) { - wctx.AddParkedCycles(TlsThreadContext->Timers.Parked); - } - return activation; } } diff --git a/ydb/library/actors/core/executor_pool_io.cpp b/ydb/library/actors/core/executor_pool_io.cpp index ba5919edaebc..7eef8f372139 100644 --- a/ydb/library/actors/core/executor_pool_io.cpp +++ b/ydb/library/actors/core/executor_pool_io.cpp @@ -31,31 +31,27 @@ namespace NActors { i16 workerId = wctx.WorkerId; Y_DEBUG_ABORT_UNLESS(workerId < PoolThreads); - NHPTimer::STime elapsed = 0; - NHPTimer::STime parked = 0; - NHPTimer::STime hpstart = GetCycleCountFast(); - NHPTimer::STime hpnow; - const TAtomic x = AtomicDecrement(Semaphore); if (x < 0) { TExecutorThreadCtx& threadCtx = Threads[workerId]; ThreadQueue.Push(workerId + 1, revolvingCounter); - hpnow = GetCycleCountFast(); - elapsed += hpnow - hpstart; + + NHPTimer::STime hpnow = GetCycleCountFast(); + NHPTimer::STime hpprev = TlsThreadContext->StartOfElapsingTime.exchange(hpnow, std::memory_order_acq_rel); + TlsThreadContext->ElapsingActorActivity.store(Max(), std::memory_order_release); + wctx.AddElapsedCycles(ActorSystemIndex, hpnow - hpprev); + if (threadCtx.WaitingPad.Park()) return 0; - hpstart = GetCycleCountFast(); - parked += hpstart - hpnow; + + hpnow = GetCycleCountFast(); + hpprev = TlsThreadContext->StartOfElapsingTime.exchange(hpnow, std::memory_order_acq_rel); + TlsThreadContext->ElapsingActorActivity.store(ActorSystemIndex, std::memory_order_release); + wctx.AddParkedCycles(hpnow - hpprev); } while (!StopFlag.load(std::memory_order_acquire)) { if (const ui32 activation = Activations.Pop(++revolvingCounter)) { - hpnow = GetCycleCountFast(); - elapsed += hpnow - hpstart; - wctx.AddElapsedCycles(ActorSystemIndex, elapsed); - if (parked > 0) { - wctx.AddParkedCycles(parked); - } return activation; } SpinLockPause(); diff --git a/ydb/library/actors/core/executor_pool_shared.cpp b/ydb/library/actors/core/executor_pool_shared.cpp index 3db627faaf39..fc4d272ac797 100644 --- a/ydb/library/actors/core/executor_pool_shared.cpp +++ b/ydb/library/actors/core/executor_pool_shared.cpp @@ -18,7 +18,6 @@ TSharedExecutorPool::TSharedExecutorPool(const TSharedExecutorPoolConfig &config , PoolCount(poolCount) , SharedThreadCount(poolsWithThreads.size()) , Threads(new TSharedExecutorThreadCtx[SharedThreadCount]) - , Timers(new TTimers[SharedThreadCount]) , TimePerMailbox(config.TimePerMailbox) , EventsPerMailbox(config.EventsPerMailbox) , SoftProcessingDurationTs(config.SoftProcessingDurationTs) diff --git a/ydb/library/actors/core/executor_pool_shared.h b/ydb/library/actors/core/executor_pool_shared.h index 2e184f56ba49..b215a5d4e547 100644 --- a/ydb/library/actors/core/executor_pool_shared.h +++ b/ydb/library/actors/core/executor_pool_shared.h @@ -56,7 +56,6 @@ namespace NActors { i16 PoolCount; i16 SharedThreadCount; std::unique_ptr Threads; - std::unique_ptr Timers; std::unique_ptr ScheduleReaders; std::unique_ptr ScheduleWriters; diff --git a/ydb/library/actors/core/executor_thread.cpp b/ydb/library/actors/core/executor_thread.cpp index 4edaa6ba8efe..5f7679d5cdfd 100644 --- a/ydb/library/actors/core/executor_thread.cpp +++ b/ydb/library/actors/core/executor_thread.cpp @@ -48,6 +48,7 @@ namespace NActors { , ThreadName(threadName) , TimePerMailbox(timePerMailbox) , EventsPerMailbox(eventsPerMailbox) + , ActorSystemIndex(TActorTypeOperator::GetActorSystemIndex()) { Ctx.Switch( ExecutorPool, @@ -75,6 +76,7 @@ namespace NActors { , EventsPerMailbox(eventsPerMailbox) , SoftProcessingDurationTs(softProcessingDurationTs) , SharedStats(poolCount) + , ActorSystemIndex(TActorTypeOperator::GetActorSystemIndex()) { Ctx.Switch( ExecutorPool, @@ -241,7 +243,13 @@ namespace NActors { NProfiling::TMemoryTagScope::Reset(activityType); } + TlsThreadContext->ElapsingActorActivity.store(activityType, std::memory_order_release); actor->Receive(ev); + + hpnow = GetCycleCountFast(); + hpprev = TlsThreadContext->StartOfElapsingTime.exchange(hpnow, std::memory_order_acq_rel); + TlsThreadContext->ElapsingActorActivity.store(ActorSystemIndex, std::memory_order_release); + mailbox->ProcessEvents(mailbox); actor->OnDequeueEvent(); @@ -256,7 +264,6 @@ namespace NActors { if (mailbox->IsEmpty()) // was not-free and become free, we must reclaim mailbox reclaimAsFree = true; - hpnow = GetCycleCountFast(); NHPTimer::STime elapsed = Ctx.AddEventProcessingStats(hpprev, hpnow, activityType, CurrentActorScheduledEventsCounter); if (elapsed > 1000000) { LwTraceSlowEvent(ev.Get(), evTypeForTracing, actorType, Ctx.PoolId, CurrentRecipient, NHPTimer::GetSeconds(elapsed) * 1000.0); @@ -277,10 +284,10 @@ namespace NActors { Ctx.IncrementNonDeliveredEvents(); } hpnow = GetCycleCountFast(); + hpprev = TlsThreadContext->StartOfElapsingTime.exchange(hpnow, std::memory_order_acq_rel); + Ctx.AddElapsedCycles(ActorSystemIndex, hpnow - hpprev); } - hpprev = hpnow; - if (TlsThreadContext->CapturedType == ESendingType::Tail) { AtomicStore(&mailbox->ScheduleMoment, hpnow); Ctx.IncrementMailboxPushedOutByTailSending(); @@ -495,8 +502,9 @@ namespace NActors { ThreadDisableBalloc(); #endif - TThreadContext threadCtx; - TlsThreadContext = &threadCtx; + TlsThreadCtx.WorkerCtx = &Ctx; + TlsThreadCtx.ActorSystemIndex = ActorSystemIndex; + TlsThreadContext = &TlsThreadCtx; if (ThreadName) { ::SetCurrentThreadName(ThreadName); } @@ -528,8 +536,9 @@ namespace NActors { ThreadDisableBalloc(); #endif - TThreadContext threadCtx; - TlsThreadContext = &threadCtx; + TlsThreadCtx.WorkerCtx = &Ctx; + TlsThreadCtx.ActorSystemIndex = ActorSystemIndex; + TlsThreadContext = &TlsThreadCtx; if (ThreadName) { ::SetCurrentThreadName(ThreadName); } @@ -550,7 +559,6 @@ namespace NActors { } if (!wasWorking && !StopFlag.load(std::memory_order_relaxed)) { - TlsThreadContext->Timers.Reset(); ThreadCtx->UnsetWork(); ThreadCtx->Wait(0, &StopFlag); } @@ -760,10 +768,26 @@ namespace NActors { } void TGenericExecutorThread::GetCurrentStats(TExecutorThreadStats& statsCopy) const { + NHPTimer::STime hpnow = GetCycleCountFast(); + ui64 activityType = TlsThreadCtx.ElapsingActorActivity.load(std::memory_order_acquire); + NHPTimer::STime hpprev = TlsThreadCtx.StartOfElapsingTime.exchange(hpnow, std::memory_order_acq_rel); + if (activityType == Max()) { + Ctx.AddParkedCycles(hpnow - hpprev); + } else { + Ctx.AddElapsedCycles(activityType, hpnow - hpprev); + } Ctx.GetCurrentStats(statsCopy); } void TGenericExecutorThread::GetSharedStats(i16 poolId, TExecutorThreadStats &statsCopy) const { + NHPTimer::STime hpnow = GetCycleCountFast(); + ui64 activityType = TlsThreadCtx.ElapsingActorActivity.load(std::memory_order_acquire); + NHPTimer::STime hpprev = TlsThreadCtx.StartOfElapsingTime.exchange(hpnow, std::memory_order_acq_rel); + if (activityType == Max()) { + Ctx.AddParkedCycles(hpnow - hpprev); + } else { + Ctx.AddElapsedCycles(activityType, hpnow - hpprev); + } statsCopy = TExecutorThreadStats(); statsCopy.Aggregate(SharedStats[poolId]); } diff --git a/ydb/library/actors/core/executor_thread.h b/ydb/library/actors/core/executor_thread.h index b9a339648b6f..d8ef032f4d3f 100644 --- a/ydb/library/actors/core/executor_thread.h +++ b/ydb/library/actors/core/executor_thread.h @@ -4,6 +4,7 @@ #include "event.h" #include "callstack.h" #include "probes.h" +#include "thread_context.h" #include "worker_context.h" #include "log_settings.h" @@ -92,7 +93,8 @@ namespace NActors { ui64 CurrentActorScheduledEventsCounter = 0; // Thread-specific - TWorkerContext Ctx; + mutable TThreadContext TlsThreadCtx; + mutable TWorkerContext Ctx; ui64 RevolvingReadCounter = 0; ui64 RevolvingWriteCounter = 0; const TString ThreadName; @@ -104,6 +106,7 @@ namespace NActors { ui64 SoftProcessingDurationTs; std::vector SharedStats; + const ui32 ActorSystemIndex; }; class TExecutorThread: public TGenericExecutorThread { diff --git a/ydb/library/actors/core/executor_thread_ctx.h b/ydb/library/actors/core/executor_thread_ctx.h index ec88e4bfaa82..d88b2de00d35 100644 --- a/ydb/library/actors/core/executor_thread_ctx.h +++ b/ydb/library/actors/core/executor_thread_ctx.h @@ -96,16 +96,19 @@ namespace NActors { return false; } + NHPTimer::STime hpnow = GetCycleCountFast(); + NHPTimer::STime hpprev = TlsThreadContext->StartOfElapsingTime.exchange(hpnow, std::memory_order_acq_rel); + TlsThreadContext->ElapsingActorActivity.store(Max(), std::memory_order_release); + TlsThreadContext->WorkerCtx->AddElapsedCycles(TlsThreadContext->ActorSystemIndex, hpnow - hpprev); do { - TlsThreadContext->Timers.HPNow = GetCycleCountFast(); - TlsThreadContext->Timers.Elapsed += TlsThreadContext->Timers.HPNow - TlsThreadContext->Timers.HPStart; if (WaitingPad.Park()) // interrupted return true; - TlsThreadContext->Timers.HPStart = GetCycleCountFast(); - TlsThreadContext->Timers.Parked += TlsThreadContext->Timers.HPStart - TlsThreadContext->Timers.HPNow; + hpnow = GetCycleCountFast(); + hpprev = TlsThreadContext->StartOfElapsingTime.exchange(hpnow, std::memory_order_acq_rel); + TlsThreadContext->WorkerCtx->AddParkedCycles(hpnow - hpprev); state = GetState(); } while (static_cast(state) == EThreadState::Sleep && !stopFlag->load(std::memory_order_relaxed)); - + TlsThreadContext->ElapsingActorActivity.store(TlsThreadContext->ActorSystemIndex, std::memory_order_release); static_cast(this)->AfterWakeUp(state); return false; } diff --git a/ydb/library/actors/core/thread_context.h b/ydb/library/actors/core/thread_context.h index e9782362f1c9..9bfa8dd45284 100644 --- a/ydb/library/actors/core/thread_context.h +++ b/ydb/library/actors/core/thread_context.h @@ -11,26 +11,11 @@ namespace NActors { class IExecutorPool; + struct TWorkerContext; template struct TWaitingStats; - struct TTimers { - NHPTimer::STime Elapsed = 0; - NHPTimer::STime Parked = 0; - NHPTimer::STime Blocked = 0; - NHPTimer::STime HPStart = GetCycleCountFast(); - NHPTimer::STime HPNow; - - void Reset() { - Elapsed = 0; - Parked = 0; - Blocked = 0; - HPStart = GetCycleCountFast(); - HPNow = HPStart; - } - }; - struct TThreadContext { IExecutorPool *Pool = nullptr; ui32 CapturedActivation = 0; @@ -42,8 +27,12 @@ namespace NActors { ui16 LocalQueueSize = 0; TWaitingStats *WaitingStats = nullptr; bool IsCurrentRecipientAService = false; - TTimers Timers; TMPMCRingQueue<20>::EPopMode ActivationPopMode = TMPMCRingQueue<20>::EPopMode::ReallySlow; + + std::atomic StartOfElapsingTime = 0; + std::atomic ElapsingActorActivity = 0; + TWorkerContext *WorkerCtx = nullptr; + ui32 ActorSystemIndex = 0; }; extern Y_POD_THREAD(TThreadContext*) TlsThreadContext; // in actor.cpp