Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Fix autospin #5167

Merged
merged 2 commits into from
Jun 4, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
19 changes: 10 additions & 9 deletions ydb/library/actors/core/activity_guard.h
Original file line number Diff line number Diff line change
Expand Up @@ -10,11 +10,11 @@
namespace NActors {


template <EInternalActorSystemActivity type>
template <EInternalActorSystemActivity ActivityType, bool IsMainActivity=true>
class TInternalActorTypeGuard {
public:
TInternalActorTypeGuard() {
if (Enabled && TlsThreadContext) {
if (Allowed && TlsThreadContext) {
NHPTimer::STime hpnow = GetCycleCountFast();
NHPTimer::STime hpprev = TlsThreadContext->UpdateStartOfProcessingEventTS(hpnow);
NextIndex = TlsThreadContext->ElapsingActorActivity.exchange(Index, std::memory_order_acq_rel);
Expand All @@ -25,7 +25,7 @@ class TInternalActorTypeGuard {
TInternalActorTypeGuard(ui32 nextIndex)
: NextIndex(nextIndex)
{
if (Enabled && TlsThreadContext) {
if (Allowed && TlsThreadContext) {
NHPTimer::STime hpnow = GetCycleCountFast();
NHPTimer::STime hpprev = TlsThreadContext->UpdateStartOfProcessingEventTS(hpnow);
ui32 prevIndex = TlsThreadContext->ElapsingActorActivity.exchange(Index, std::memory_order_acq_rel);
Expand All @@ -34,7 +34,7 @@ class TInternalActorTypeGuard {
}

TInternalActorTypeGuard(NHPTimer::STime hpnow) {
if (Enabled && TlsThreadContext) {
if (Allowed && TlsThreadContext) {
NHPTimer::STime hpprev = TlsThreadContext->UpdateStartOfProcessingEventTS(hpnow);
NextIndex = TlsThreadContext->ElapsingActorActivity.exchange(Index, std::memory_order_acq_rel);
TlsThreadContext->WorkerCtx->AddElapsedCycles(NextIndex, hpnow - hpprev);
Expand All @@ -44,15 +44,15 @@ class TInternalActorTypeGuard {
TInternalActorTypeGuard(NHPTimer::STime hpnow, ui32 nextIndex)
: NextIndex(nextIndex)
{
if (Enabled && TlsThreadContext) {
if (Allowed && TlsThreadContext) {
NHPTimer::STime hpprev = TlsThreadContext->UpdateStartOfProcessingEventTS(hpnow);
ui32 prevIndex = TlsThreadContext->ElapsingActorActivity.exchange(Index, std::memory_order_acq_rel);
TlsThreadContext->WorkerCtx->AddElapsedCycles(prevIndex, hpnow - hpprev);
}
}

~TInternalActorTypeGuard() {
if (Enabled && TlsThreadContext) {
if (Allowed && TlsThreadContext) {
NHPTimer::STime hpnow = GetCycleCountFast();
NHPTimer::STime hpprev = TlsThreadContext->UpdateStartOfProcessingEventTS(hpnow);
TlsThreadContext->ElapsingActorActivity.store(NextIndex, std::memory_order_release);
Expand All @@ -62,12 +62,13 @@ class TInternalActorTypeGuard {


private:
static constexpr bool Enabled = false;
static constexpr bool ExtraActivitiesIsAllowed = false;
static constexpr bool Allowed = ExtraActivitiesIsAllowed || IsMainActivity;
static ui32 Index;
ui32 NextIndex = 0;
};

template <EInternalActorSystemActivity type>
ui32 TInternalActorTypeGuard<type>::Index = TEnumProcessKey<TActorActivityTag, EInternalActorSystemActivity>::GetIndex(type);
template <EInternalActorSystemActivity ActivityType, bool IsMainActivity>
ui32 TInternalActorTypeGuard<ActivityType, IsMainActivity>::Index = TEnumProcessKey<TActorActivityTag, EInternalActorSystemActivity>::GetIndex(ActivityType);

}
2 changes: 1 addition & 1 deletion ydb/library/actors/core/actorsystem.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -97,7 +97,7 @@ namespace NActors {
if (Y_UNLIKELY(!ev))
return false;

TInternalActorTypeGuard<EInternalActorSystemActivity::ACTOR_SYSTEM_SEND> activityGuard;
TInternalActorTypeGuard<EInternalActorSystemActivity::ACTOR_SYSTEM_SEND, false> activityGuard;
#ifdef USE_ACTOR_CALLSTACK
ev->Callstack.TraceIfEmpty();
#endif
Expand Down
4 changes: 2 additions & 2 deletions ydb/library/actors/core/executor_pool_base.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -149,7 +149,7 @@ namespace NActors {

TActorId TExecutorPoolBaseMailboxed::Register(IActor* actor, TMailboxType::EType mailboxType, ui64 revolvingWriteCounter, const TActorId& parentId) {
NHPTimer::STime hpstart = GetCycleCountFast();
TInternalActorTypeGuard<EInternalActorSystemActivity::ACTOR_SYSTEM_REGISTER> activityGuard(hpstart);
TInternalActorTypeGuard<EInternalActorSystemActivity::ACTOR_SYSTEM_REGISTER, false> activityGuard(hpstart);
#ifdef ACTORSLIB_COLLECT_EXEC_STATS
ui32 at = actor->GetActivityType();
Y_DEBUG_ABORT_UNLESS(at < Stats.ActorsAliveByActivity.size());
Expand Down Expand Up @@ -237,7 +237,7 @@ namespace NActors {

TActorId TExecutorPoolBaseMailboxed::Register(IActor* actor, TMailboxHeader* mailbox, ui32 hint, const TActorId& parentId) {
NHPTimer::STime hpstart = GetCycleCountFast();
TInternalActorTypeGuard<EInternalActorSystemActivity::ACTOR_SYSTEM_REGISTER> activityGuard(hpstart);
TInternalActorTypeGuard<EInternalActorSystemActivity::ACTOR_SYSTEM_REGISTER, false> activityGuard(hpstart);
#ifdef ACTORSLIB_COLLECT_EXEC_STATS
ui32 at = actor->GetActivityType();
if (at >= Stats.MaxActivityType())
Expand Down
114 changes: 62 additions & 52 deletions ydb/library/actors/core/executor_pool_basic.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,8 @@ namespace NActors {
constexpr bool DebugMode = false;
#endif



LWTRACE_USING(ACTORLIB_PROVIDER);


Expand All @@ -49,41 +51,63 @@ namespace NActors {
TDuration timePerMailbox,
ui32 eventsPerMailbox,
int realtimePriority,
ui32 maxActivityType,
ui32 /*maxActivityType*/,
i16 minThreadCount,
i16 maxThreadCount,
i16 defaultThreadCount,
i16 priority,
bool hasOwnSharedThread,
TExecutorPoolJail *jail)
: TExecutorPoolBase(poolId, threads, affinity)
, DefaultSpinThresholdCycles(spinThreshold * NHPTimer::GetCyclesPerSecond() * 0.000001) // convert microseconds to cycles
TExecutorPoolJail *jail
)
: TBasicExecutorPool(TBasicExecutorPoolConfig{
.PoolId = poolId,
.PoolName = poolName,
.Threads = threads,
.SpinThreshold = spinThreshold,
.Affinity = (affinity ? static_cast<TCpuMask>(*affinity) : TCpuMask{}),
.TimePerMailbox = timePerMailbox,
.EventsPerMailbox = eventsPerMailbox,
.RealtimePriority = realtimePriority,
.MinThreadCount = minThreadCount,
.MaxThreadCount = maxThreadCount,
.DefaultThreadCount = defaultThreadCount,
.Priority = priority,
.HasSharedThread = hasOwnSharedThread,
}, harmonizer, jail)
{
if (affinity != nullptr) {
delete affinity;
}
}

TBasicExecutorPool::TBasicExecutorPool(const TBasicExecutorPoolConfig& cfg, IHarmonizer *harmonizer, TExecutorPoolJail *jail)
: TExecutorPoolBase(cfg.PoolId, cfg.Threads, new TAffinity(cfg.Affinity))
, DefaultSpinThresholdCycles(cfg.SpinThreshold * NHPTimer::GetCyclesPerSecond() * 0.000001) // convert microseconds to cycles
, SpinThresholdCycles(DefaultSpinThresholdCycles)
, SpinThresholdCyclesPerThread(new NThreading::TPadded<std::atomic<ui64>>[threads])
, Threads(new NThreading::TPadded<TExecutorThreadCtx>[threads])
, WaitingStats(new TWaitingStats<ui64>[threads])
, PoolName(poolName)
, TimePerMailbox(timePerMailbox)
, EventsPerMailbox(eventsPerMailbox)
, RealtimePriority(realtimePriority)
, ThreadUtilization(0)
, MaxUtilizationCounter(0)
, MaxUtilizationAccumulator(0)
, WrongWakenedThreadCount(0)
, ThreadCount(threads)
, MinFullThreadCount(minThreadCount)
, MaxFullThreadCount(maxThreadCount)
, DefaultFullThreadCount(defaultThreadCount)
, SpinThresholdCyclesPerThread(new NThreading::TPadded<std::atomic<ui64>>[cfg.Threads])
, Threads(new NThreading::TPadded<TExecutorThreadCtx>[cfg.Threads])
, WaitingStats(new TWaitingStats<ui64>[cfg.Threads])
, PoolName(cfg.PoolName)
, TimePerMailbox(cfg.TimePerMailbox)
, EventsPerMailbox(cfg.EventsPerMailbox)
, RealtimePriority(cfg.RealtimePriority)
, ThreadCount(cfg.Threads)
, MinFullThreadCount(cfg.MinThreadCount)
, MaxFullThreadCount(cfg.MaxThreadCount)
, DefaultFullThreadCount(cfg.DefaultThreadCount)
, Harmonizer(harmonizer)
, HasOwnSharedThread(hasOwnSharedThread)
, Priority(priority)
, SoftProcessingDurationTs(cfg.SoftProcessingDurationTs)
, HasOwnSharedThread(cfg.HasSharedThread)
, Priority(cfg.Priority)
, Jail(jail)
, ActorSystemProfile(cfg.ActorSystemProfile)
{
Y_UNUSED(Jail);
Y_UNUSED(Jail, SoftProcessingDurationTs);
for (ui32 idx = 0; idx < MaxSharedThreadsForPool; ++idx) {
SharedThreads[idx].store(nullptr, std::memory_order_release);
}

ui32 threads = ThreadCount;
if (HasOwnSharedThread && threads) {
threads = threads - 1;
}
Expand All @@ -105,7 +129,6 @@ namespace NActors {
MovingWaitingStats.Reset(new TWaitingStats<double>[threads]);
}

Y_UNUSED(maxActivityType);
i16 limit = Min(threads, (ui32)Max<i16>());
if (DefaultFullThreadCount) {
DefaultFullThreadCount = Min<i16>(DefaultFullThreadCount - HasOwnSharedThread, limit);
Expand Down Expand Up @@ -135,30 +158,6 @@ namespace NActors {
}
}

TBasicExecutorPool::TBasicExecutorPool(const TBasicExecutorPoolConfig& cfg, IHarmonizer *harmonizer, TExecutorPoolJail *jail)
: TBasicExecutorPool(
cfg.PoolId,
cfg.Threads,
cfg.SpinThreshold,
cfg.PoolName,
harmonizer,
new TAffinity(cfg.Affinity),
cfg.TimePerMailbox,
cfg.EventsPerMailbox,
cfg.RealtimePriority,
0,
cfg.MinThreadCount,
cfg.MaxThreadCount,
cfg.DefaultThreadCount,
cfg.Priority,
cfg.HasSharedThread,
jail
)
{
SoftProcessingDurationTs = cfg.SoftProcessingDurationTs;
ActorSystemProfile = cfg.ActorSystemProfile;
}

TBasicExecutorPool::~TBasicExecutorPool() {
Threads.Destroy();
}
Expand Down Expand Up @@ -201,7 +200,7 @@ namespace NActors {

ui32 TBasicExecutorPool::GetReadyActivationCommon(TWorkerContext& wctx, ui64 revolvingCounter) {
NHPTimer::STime hpnow = GetCycleCountFast();
TInternalActorTypeGuard<EInternalActorSystemActivity::ACTOR_SYSTEM_GET_ACTIVATION> activityGuard(hpnow);
TInternalActorTypeGuard<EInternalActorSystemActivity::ACTOR_SYSTEM_GET_ACTIVATION, false> activityGuard(hpnow);

TWorkerId workerId = wctx.WorkerId;
Y_DEBUG_ABORT_UNLESS(workerId < MaxFullThreadCount);
Expand Down Expand Up @@ -234,7 +233,7 @@ namespace NActors {
}
}
} else {
TInternalActorTypeGuard<EInternalActorSystemActivity::ACTOR_SYSTEM_GET_ACTIVATION_FROM_QUEUE> activityGuard;
TInternalActorTypeGuard<EInternalActorSystemActivity::ACTOR_SYSTEM_GET_ACTIVATION_FROM_QUEUE, false> activityGuard;
if (const ui32 activation = Activations.Pop(++revolvingCounter)) {
if (workerId >= 0) {
Threads[workerId].SetWork();
Expand Down Expand Up @@ -670,6 +669,9 @@ namespace NActors {
}

void TBasicExecutorPool::CalcSpinPerThread(ui64 wakingUpConsumption) {
if (ActorSystemProfile == EASProfile::Default) {
return;
}
for (i16 threadIdx = 0; threadIdx < MaxFullThreadCount; ++threadIdx) {
ui64 newSpinThreshold = 0;
if constexpr (NFeatures::TSpinFeatureFlags::UsePseudoMovingWindow) {
Expand All @@ -678,7 +680,15 @@ namespace NActors {
} else {
newSpinThreshold = WaitingStats[threadIdx].CalculateGoodSpinThresholdCycles(wakingUpConsumption);
}
SpinThresholdCyclesPerThread[threadIdx].store(newSpinThreshold);

if (ActorSystemProfile == EASProfile::LowCpuConsumption) {
SpinThresholdCyclesPerThread[threadIdx].store(newSpinThreshold, std::memory_order_release);
} else {
auto oldSpinThreshold = SpinThresholdCyclesPerThread[threadIdx].load(std::memory_order_acquire);
if (oldSpinThreshold < newSpinThreshold) {
SpinThresholdCyclesPerThread[threadIdx].store(newSpinThreshold, std::memory_order_release);
}
}

double resolutionUs = TWaitingStatsConstants::HistogramResolutionUs;
ui32 bucketIdx = newSpinThreshold / TWaitingStatsConstants::HistogramResolution;
Expand Down Expand Up @@ -714,7 +724,7 @@ namespace NActors {
}

bool TExecutorThreadCtx::WakeUp() {
TInternalActorTypeGuard<EInternalActorSystemActivity::ACTOR_SYSTEM_WAKE_UP> activityGuard;
TInternalActorTypeGuard<EInternalActorSystemActivity::ACTOR_SYSTEM_WAKE_UP, false> activityGuard;
for (ui32 i = 0; i < 2; ++i) {
EThreadState state = GetState<EThreadState>();
switch (state) {
Expand Down Expand Up @@ -743,7 +753,7 @@ namespace NActors {
}

bool TSharedExecutorThreadCtx::WakeUp() {
TInternalActorTypeGuard<EInternalActorSystemActivity::ACTOR_SYSTEM_GET_ACTIVATION_FROM_QUEUE> activityGuard;
TInternalActorTypeGuard<EInternalActorSystemActivity::ACTOR_SYSTEM_GET_ACTIVATION_FROM_QUEUE, false> activityGuard;
i64 requestsForWakeUp = RequestsForWakeUp.fetch_add(1, std::memory_order_acq_rel);
if (requestsForWakeUp >= 0) {
return false;
Expand Down
10 changes: 5 additions & 5 deletions ydb/library/actors/core/executor_pool_basic.h
Original file line number Diff line number Diff line change
Expand Up @@ -151,14 +151,13 @@ namespace NActors {
const TString PoolName;
const TDuration TimePerMailbox;
const ui32 EventsPerMailbox;
EASProfile ActorSystemProfile;

const int RealtimePriority;

TAtomic ThreadUtilization;
TAtomic MaxUtilizationCounter;
TAtomic MaxUtilizationAccumulator;
TAtomic WrongWakenedThreadCount;
TAtomic ThreadUtilization = 0;
TAtomic MaxUtilizationCounter = 0;
TAtomic MaxUtilizationAccumulator = 0;
TAtomic WrongWakenedThreadCount = 0;
std::atomic<ui64> SpinningTimeUs;

TAtomic ThreadCount;
Expand Down Expand Up @@ -208,6 +207,7 @@ namespace NActors {
}
};

const EASProfile ActorSystemProfile;
static constexpr TDuration DEFAULT_TIME_PER_MAILBOX = TBasicExecutorPoolConfig::DEFAULT_TIME_PER_MAILBOX;
static constexpr ui32 DEFAULT_EVENTS_PER_MAILBOX = TBasicExecutorPoolConfig::DEFAULT_EVENTS_PER_MAILBOX;

Expand Down
20 changes: 11 additions & 9 deletions ydb/library/actors/core/harmonizer.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -476,16 +476,18 @@ void THarmonizer::HarmonizeImpl(ui64 ts) {
if (!pool.BasicPool) {
continue;
}
if constexpr (NFeatures::TSpinFeatureFlags::CalcPerThread) {
pool.BasicPool->CalcSpinPerThread(avgWakingUpConsumption);
} else if constexpr (NFeatures::TSpinFeatureFlags::UsePseudoMovingWindow) {
ui64 newSpinThreshold = pool.MovingWaitingStats->CalculateGoodSpinThresholdCycles(avgWakingUpConsumption);
pool.BasicPool->SetSpinThresholdCycles(newSpinThreshold);
} else {
ui64 newSpinThreshold = pool.WaitingStats->CalculateGoodSpinThresholdCycles(avgWakingUpConsumption);
pool.BasicPool->SetSpinThresholdCycles(newSpinThreshold);
if (pool.BasicPool->ActorSystemProfile != EASProfile::Default) {
if constexpr (NFeatures::TSpinFeatureFlags::CalcPerThread) {
pool.BasicPool->CalcSpinPerThread(avgWakingUpConsumption);
} else if constexpr (NFeatures::TSpinFeatureFlags::UsePseudoMovingWindow) {
ui64 newSpinThreshold = pool.MovingWaitingStats->CalculateGoodSpinThresholdCycles(avgWakingUpConsumption);
pool.BasicPool->SetSpinThresholdCycles(newSpinThreshold);
} else {
ui64 newSpinThreshold = pool.WaitingStats->CalculateGoodSpinThresholdCycles(avgWakingUpConsumption);
pool.BasicPool->SetSpinThresholdCycles(newSpinThreshold);
}
pool.BasicPool->ClearWaitingStats();
}
pool.BasicPool->ClearWaitingStats();
}

std::vector<bool> hasSharedThread(Pools.size());
Expand Down
Loading