From a90e18d56a77a918fb2f609576f10a789cdd5c18 Mon Sep 17 00:00:00 2001 From: Aleksandr Kriukov Date: Tue, 4 Jun 2024 11:29:39 +0000 Subject: [PATCH 1/2] Fix autospin --- ydb/library/actors/core/activity_guard.h | 15 +-- ydb/library/actors/core/actorsystem.cpp | 2 +- .../actors/core/executor_pool_base.cpp | 4 +- .../actors/core/executor_pool_basic.cpp | 114 ++++++++++-------- ydb/library/actors/core/executor_pool_basic.h | 10 +- ydb/library/actors/core/harmonizer.cpp | 20 +-- 6 files changed, 89 insertions(+), 76 deletions(-) diff --git a/ydb/library/actors/core/activity_guard.h b/ydb/library/actors/core/activity_guard.h index 65f6eb2f21c6..604c607ecaaf 100644 --- a/ydb/library/actors/core/activity_guard.h +++ b/ydb/library/actors/core/activity_guard.h @@ -10,11 +10,11 @@ namespace NActors { -template +template class TInternalActorTypeGuard { public: TInternalActorTypeGuard() { - if (Enabled && TlsThreadContext) { + if (Allowed && TlsThreadContext) { NHPTimer::STime hpnow = GetCycleCountFast(); NHPTimer::STime hpprev = TlsThreadContext->UpdateStartOfProcessingEventTS(hpnow); NextIndex = TlsThreadContext->ElapsingActorActivity.exchange(Index, std::memory_order_acq_rel); @@ -25,7 +25,7 @@ class TInternalActorTypeGuard { TInternalActorTypeGuard(ui32 nextIndex) : NextIndex(nextIndex) { - if (Enabled && TlsThreadContext) { + if (Allowed && TlsThreadContext) { NHPTimer::STime hpnow = GetCycleCountFast(); NHPTimer::STime hpprev = TlsThreadContext->UpdateStartOfProcessingEventTS(hpnow); ui32 prevIndex = TlsThreadContext->ElapsingActorActivity.exchange(Index, std::memory_order_acq_rel); @@ -34,7 +34,7 @@ class TInternalActorTypeGuard { } TInternalActorTypeGuard(NHPTimer::STime hpnow) { - if (Enabled && TlsThreadContext) { + if (Allowed && TlsThreadContext) { NHPTimer::STime hpprev = TlsThreadContext->UpdateStartOfProcessingEventTS(hpnow); NextIndex = TlsThreadContext->ElapsingActorActivity.exchange(Index, std::memory_order_acq_rel); TlsThreadContext->WorkerCtx->AddElapsedCycles(NextIndex, hpnow - hpprev); @@ -44,7 +44,7 @@ class TInternalActorTypeGuard { TInternalActorTypeGuard(NHPTimer::STime hpnow, ui32 nextIndex) : NextIndex(nextIndex) { - if (Enabled && TlsThreadContext) { + if (Allowed && TlsThreadContext) { NHPTimer::STime hpprev = TlsThreadContext->UpdateStartOfProcessingEventTS(hpnow); ui32 prevIndex = TlsThreadContext->ElapsingActorActivity.exchange(Index, std::memory_order_acq_rel); TlsThreadContext->WorkerCtx->AddElapsedCycles(prevIndex, hpnow - hpprev); @@ -52,7 +52,7 @@ class TInternalActorTypeGuard { } ~TInternalActorTypeGuard() { - if (Enabled && TlsThreadContext) { + if (Allowed && TlsThreadContext) { NHPTimer::STime hpnow = GetCycleCountFast(); NHPTimer::STime hpprev = TlsThreadContext->UpdateStartOfProcessingEventTS(hpnow); TlsThreadContext->ElapsingActorActivity.store(NextIndex, std::memory_order_release); @@ -62,7 +62,8 @@ class TInternalActorTypeGuard { private: - static constexpr bool Enabled = false; + static constexpr bool ExtraActivitiesIsAllowed = false; + static constexpr bool Allowed = ExtraActivitiesIsAllowed || IsMainActivity; static ui32 Index; ui32 NextIndex = 0; }; diff --git a/ydb/library/actors/core/actorsystem.cpp b/ydb/library/actors/core/actorsystem.cpp index d63568583f9f..21ad7a77a14d 100644 --- a/ydb/library/actors/core/actorsystem.cpp +++ b/ydb/library/actors/core/actorsystem.cpp @@ -97,7 +97,7 @@ namespace NActors { if (Y_UNLIKELY(!ev)) return false; - TInternalActorTypeGuard activityGuard; + TInternalActorTypeGuard activityGuard; #ifdef USE_ACTOR_CALLSTACK ev->Callstack.TraceIfEmpty(); #endif diff --git a/ydb/library/actors/core/executor_pool_base.cpp b/ydb/library/actors/core/executor_pool_base.cpp index 1c216d50623d..2bc87a17bf89 100644 --- a/ydb/library/actors/core/executor_pool_base.cpp +++ b/ydb/library/actors/core/executor_pool_base.cpp @@ -149,7 +149,7 @@ namespace NActors { TActorId TExecutorPoolBaseMailboxed::Register(IActor* actor, TMailboxType::EType mailboxType, ui64 revolvingWriteCounter, const TActorId& parentId) { NHPTimer::STime hpstart = GetCycleCountFast(); - TInternalActorTypeGuard activityGuard(hpstart); + TInternalActorTypeGuard activityGuard(hpstart); #ifdef ACTORSLIB_COLLECT_EXEC_STATS ui32 at = actor->GetActivityType(); Y_DEBUG_ABORT_UNLESS(at < Stats.ActorsAliveByActivity.size()); @@ -237,7 +237,7 @@ namespace NActors { TActorId TExecutorPoolBaseMailboxed::Register(IActor* actor, TMailboxHeader* mailbox, ui32 hint, const TActorId& parentId) { NHPTimer::STime hpstart = GetCycleCountFast(); - TInternalActorTypeGuard activityGuard(hpstart); + TInternalActorTypeGuard activityGuard(hpstart); #ifdef ACTORSLIB_COLLECT_EXEC_STATS ui32 at = actor->GetActivityType(); if (at >= Stats.MaxActivityType()) diff --git a/ydb/library/actors/core/executor_pool_basic.cpp b/ydb/library/actors/core/executor_pool_basic.cpp index 28e73d6aba9a..cdcb2420a86d 100644 --- a/ydb/library/actors/core/executor_pool_basic.cpp +++ b/ydb/library/actors/core/executor_pool_basic.cpp @@ -24,6 +24,8 @@ namespace NActors { constexpr bool DebugMode = false; #endif + + LWTRACE_USING(ACTORLIB_PROVIDER); @@ -49,41 +51,63 @@ namespace NActors { TDuration timePerMailbox, ui32 eventsPerMailbox, int realtimePriority, - ui32 maxActivityType, + ui32 /*maxActivityType*/, i16 minThreadCount, i16 maxThreadCount, i16 defaultThreadCount, i16 priority, bool hasOwnSharedThread, - TExecutorPoolJail *jail) - : TExecutorPoolBase(poolId, threads, affinity) - , DefaultSpinThresholdCycles(spinThreshold * NHPTimer::GetCyclesPerSecond() * 0.000001) // convert microseconds to cycles + TExecutorPoolJail *jail + ) + : TBasicExecutorPool(TBasicExecutorPoolConfig{ + .PoolId = poolId, + .PoolName = poolName, + .Threads = threads, + .SpinThreshold = spinThreshold, + .Affinity = (affinity ? static_cast(*affinity) : TCpuMask{}), + .TimePerMailbox = timePerMailbox, + .EventsPerMailbox = eventsPerMailbox, + .RealtimePriority = realtimePriority, + .MinThreadCount = minThreadCount, + .MaxThreadCount = maxThreadCount, + .DefaultThreadCount = defaultThreadCount, + .Priority = priority, + .HasSharedThread = hasOwnSharedThread, + }, harmonizer, jail) + { + if (affinity != nullptr) { + delete affinity; + } + } + + TBasicExecutorPool::TBasicExecutorPool(const TBasicExecutorPoolConfig& cfg, IHarmonizer *harmonizer, TExecutorPoolJail *jail) + : TExecutorPoolBase(cfg.PoolId, cfg.Threads, new TAffinity(cfg.Affinity)) + , DefaultSpinThresholdCycles(cfg.SpinThreshold * NHPTimer::GetCyclesPerSecond() * 0.000001) // convert microseconds to cycles , SpinThresholdCycles(DefaultSpinThresholdCycles) - , SpinThresholdCyclesPerThread(new NThreading::TPadded>[threads]) - , Threads(new NThreading::TPadded[threads]) - , WaitingStats(new TWaitingStats[threads]) - , PoolName(poolName) - , TimePerMailbox(timePerMailbox) - , EventsPerMailbox(eventsPerMailbox) - , RealtimePriority(realtimePriority) - , ThreadUtilization(0) - , MaxUtilizationCounter(0) - , MaxUtilizationAccumulator(0) - , WrongWakenedThreadCount(0) - , ThreadCount(threads) - , MinFullThreadCount(minThreadCount) - , MaxFullThreadCount(maxThreadCount) - , DefaultFullThreadCount(defaultThreadCount) + , SpinThresholdCyclesPerThread(new NThreading::TPadded>[cfg.Threads]) + , Threads(new NThreading::TPadded[cfg.Threads]) + , WaitingStats(new TWaitingStats[cfg.Threads]) + , PoolName(cfg.PoolName) + , TimePerMailbox(cfg.TimePerMailbox) + , EventsPerMailbox(cfg.EventsPerMailbox) + , RealtimePriority(cfg.RealtimePriority) + , ThreadCount(cfg.Threads) + , MinFullThreadCount(cfg.MinThreadCount) + , MaxFullThreadCount(cfg.MaxThreadCount) + , DefaultFullThreadCount(cfg.DefaultThreadCount) , Harmonizer(harmonizer) - , HasOwnSharedThread(hasOwnSharedThread) - , Priority(priority) + , SoftProcessingDurationTs(cfg.SoftProcessingDurationTs) + , HasOwnSharedThread(cfg.HasSharedThread) + , Priority(cfg.Priority) , Jail(jail) + , ActorSystemProfile(cfg.ActorSystemProfile) { - Y_UNUSED(Jail); + Y_UNUSED(Jail, SoftProcessingDurationTs); for (ui32 idx = 0; idx < MaxSharedThreadsForPool; ++idx) { SharedThreads[idx].store(nullptr, std::memory_order_release); } + ui32 threads = ThreadCount; if (HasOwnSharedThread && threads) { threads = threads - 1; } @@ -105,7 +129,6 @@ namespace NActors { MovingWaitingStats.Reset(new TWaitingStats[threads]); } - Y_UNUSED(maxActivityType); i16 limit = Min(threads, (ui32)Max()); if (DefaultFullThreadCount) { DefaultFullThreadCount = Min(DefaultFullThreadCount - HasOwnSharedThread, limit); @@ -135,30 +158,6 @@ namespace NActors { } } - TBasicExecutorPool::TBasicExecutorPool(const TBasicExecutorPoolConfig& cfg, IHarmonizer *harmonizer, TExecutorPoolJail *jail) - : TBasicExecutorPool( - cfg.PoolId, - cfg.Threads, - cfg.SpinThreshold, - cfg.PoolName, - harmonizer, - new TAffinity(cfg.Affinity), - cfg.TimePerMailbox, - cfg.EventsPerMailbox, - cfg.RealtimePriority, - 0, - cfg.MinThreadCount, - cfg.MaxThreadCount, - cfg.DefaultThreadCount, - cfg.Priority, - cfg.HasSharedThread, - jail - ) - { - SoftProcessingDurationTs = cfg.SoftProcessingDurationTs; - ActorSystemProfile = cfg.ActorSystemProfile; - } - TBasicExecutorPool::~TBasicExecutorPool() { Threads.Destroy(); } @@ -201,7 +200,7 @@ namespace NActors { ui32 TBasicExecutorPool::GetReadyActivationCommon(TWorkerContext& wctx, ui64 revolvingCounter) { NHPTimer::STime hpnow = GetCycleCountFast(); - TInternalActorTypeGuard activityGuard(hpnow); + TInternalActorTypeGuard activityGuard(hpnow); TWorkerId workerId = wctx.WorkerId; Y_DEBUG_ABORT_UNLESS(workerId < MaxFullThreadCount); @@ -234,7 +233,7 @@ namespace NActors { } } } else { - TInternalActorTypeGuard activityGuard; + TInternalActorTypeGuard activityGuard; if (const ui32 activation = Activations.Pop(++revolvingCounter)) { if (workerId >= 0) { Threads[workerId].SetWork(); @@ -670,6 +669,9 @@ namespace NActors { } void TBasicExecutorPool::CalcSpinPerThread(ui64 wakingUpConsumption) { + if (ActorSystemProfile == EASProfile::Default) { + return; + } for (i16 threadIdx = 0; threadIdx < MaxFullThreadCount; ++threadIdx) { ui64 newSpinThreshold = 0; if constexpr (NFeatures::TSpinFeatureFlags::UsePseudoMovingWindow) { @@ -678,7 +680,15 @@ namespace NActors { } else { newSpinThreshold = WaitingStats[threadIdx].CalculateGoodSpinThresholdCycles(wakingUpConsumption); } - SpinThresholdCyclesPerThread[threadIdx].store(newSpinThreshold); + + if (ActorSystemProfile == EASProfile::LowCpuConsumption) { + SpinThresholdCyclesPerThread[threadIdx].store(newSpinThreshold, std::memory_order_release); + } else { + auto oldSpinThreshold = SpinThresholdCyclesPerThread[threadIdx].load(std::memory_order_acquire); + if (oldSpinThreshold < newSpinThreshold) { + SpinThresholdCyclesPerThread[threadIdx].store(newSpinThreshold, std::memory_order_release); + } + } double resolutionUs = TWaitingStatsConstants::HistogramResolutionUs; ui32 bucketIdx = newSpinThreshold / TWaitingStatsConstants::HistogramResolution; @@ -714,7 +724,7 @@ namespace NActors { } bool TExecutorThreadCtx::WakeUp() { - TInternalActorTypeGuard activityGuard; + TInternalActorTypeGuard activityGuard; for (ui32 i = 0; i < 2; ++i) { EThreadState state = GetState(); switch (state) { @@ -743,7 +753,7 @@ namespace NActors { } bool TSharedExecutorThreadCtx::WakeUp() { - TInternalActorTypeGuard activityGuard; + TInternalActorTypeGuard activityGuard; i64 requestsForWakeUp = RequestsForWakeUp.fetch_add(1, std::memory_order_acq_rel); if (requestsForWakeUp >= 0) { return false; diff --git a/ydb/library/actors/core/executor_pool_basic.h b/ydb/library/actors/core/executor_pool_basic.h index be23d0ab1040..8bede0e7c6b0 100644 --- a/ydb/library/actors/core/executor_pool_basic.h +++ b/ydb/library/actors/core/executor_pool_basic.h @@ -151,14 +151,13 @@ namespace NActors { const TString PoolName; const TDuration TimePerMailbox; const ui32 EventsPerMailbox; - EASProfile ActorSystemProfile; const int RealtimePriority; - TAtomic ThreadUtilization; - TAtomic MaxUtilizationCounter; - TAtomic MaxUtilizationAccumulator; - TAtomic WrongWakenedThreadCount; + TAtomic ThreadUtilization = 0; + TAtomic MaxUtilizationCounter = 0; + TAtomic MaxUtilizationAccumulator = 0; + TAtomic WrongWakenedThreadCount = 0; std::atomic SpinningTimeUs; TAtomic ThreadCount; @@ -208,6 +207,7 @@ namespace NActors { } }; + const EASProfile ActorSystemProfile; static constexpr TDuration DEFAULT_TIME_PER_MAILBOX = TBasicExecutorPoolConfig::DEFAULT_TIME_PER_MAILBOX; static constexpr ui32 DEFAULT_EVENTS_PER_MAILBOX = TBasicExecutorPoolConfig::DEFAULT_EVENTS_PER_MAILBOX; diff --git a/ydb/library/actors/core/harmonizer.cpp b/ydb/library/actors/core/harmonizer.cpp index 8d63423fb7a3..5c71d5a4ac7b 100644 --- a/ydb/library/actors/core/harmonizer.cpp +++ b/ydb/library/actors/core/harmonizer.cpp @@ -476,16 +476,18 @@ void THarmonizer::HarmonizeImpl(ui64 ts) { if (!pool.BasicPool) { continue; } - if constexpr (NFeatures::TSpinFeatureFlags::CalcPerThread) { - pool.BasicPool->CalcSpinPerThread(avgWakingUpConsumption); - } else if constexpr (NFeatures::TSpinFeatureFlags::UsePseudoMovingWindow) { - ui64 newSpinThreshold = pool.MovingWaitingStats->CalculateGoodSpinThresholdCycles(avgWakingUpConsumption); - pool.BasicPool->SetSpinThresholdCycles(newSpinThreshold); - } else { - ui64 newSpinThreshold = pool.WaitingStats->CalculateGoodSpinThresholdCycles(avgWakingUpConsumption); - pool.BasicPool->SetSpinThresholdCycles(newSpinThreshold); + if (pool.BasicPool->ActorSystemProfile != EASProfile::Default) { + if constexpr (NFeatures::TSpinFeatureFlags::CalcPerThread) { + pool.BasicPool->CalcSpinPerThread(avgWakingUpConsumption); + } else if constexpr (NFeatures::TSpinFeatureFlags::UsePseudoMovingWindow) { + ui64 newSpinThreshold = pool.MovingWaitingStats->CalculateGoodSpinThresholdCycles(avgWakingUpConsumption); + pool.BasicPool->SetSpinThresholdCycles(newSpinThreshold); + } else { + ui64 newSpinThreshold = pool.WaitingStats->CalculateGoodSpinThresholdCycles(avgWakingUpConsumption); + pool.BasicPool->SetSpinThresholdCycles(newSpinThreshold); + } + pool.BasicPool->ClearWaitingStats(); } - pool.BasicPool->ClearWaitingStats(); } std::vector hasSharedThread(Pools.size()); From dbd6725623a52be71686e83a398d61320aff06e2 Mon Sep 17 00:00:00 2001 From: Aleksandr Kriukov Date: Tue, 4 Jun 2024 11:35:56 +0000 Subject: [PATCH 2/2] fast fix --- ydb/library/actors/core/activity_guard.h | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) diff --git a/ydb/library/actors/core/activity_guard.h b/ydb/library/actors/core/activity_guard.h index 604c607ecaaf..2311c71dc2f5 100644 --- a/ydb/library/actors/core/activity_guard.h +++ b/ydb/library/actors/core/activity_guard.h @@ -10,7 +10,7 @@ namespace NActors { -template +template class TInternalActorTypeGuard { public: TInternalActorTypeGuard() { @@ -68,7 +68,7 @@ class TInternalActorTypeGuard { ui32 NextIndex = 0; }; -template -ui32 TInternalActorTypeGuard::Index = TEnumProcessKey::GetIndex(type); +template +ui32 TInternalActorTypeGuard::Index = TEnumProcessKey::GetIndex(ActivityType); } \ No newline at end of file