Skip to content

Commit

Permalink
Merge 63c2c6b into dccc8c7
Browse files Browse the repository at this point in the history
  • Loading branch information
kruall authored Apr 11, 2024
2 parents dccc8c7 + 63c2c6b commit 49d67fc
Show file tree
Hide file tree
Showing 8 changed files with 62 additions and 60 deletions.
13 changes: 1 addition & 12 deletions ydb/library/actors/core/executor_pool_basic.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -186,8 +186,6 @@ namespace NActors {
TWorkerId workerId = wctx.WorkerId;
Y_DEBUG_ABORT_UNLESS(workerId < MaxFullThreadCount);

TlsThreadContext->Timers.Reset();

if (workerId >= 0) {
Threads[workerId].UnsetWork();
} else {
Expand All @@ -197,16 +195,14 @@ namespace NActors {

if (Harmonizer) {
LWPROBE(TryToHarmonize, PoolId, PoolName);
Harmonizer->Harmonize(TlsThreadContext->Timers.HPStart);
Harmonizer->Harmonize(TlsThreadContext->StartOfElapsingTime.load(std::memory_order_relaxed));
}

TAtomic x = AtomicGet(Semaphore);
TSemaphore semaphore = TSemaphore::GetSemaphore(x);
while (!StopFlag.load(std::memory_order_acquire)) {
if (!semaphore.OldSemaphore || workerId >= 0 && semaphore.CurrentSleepThreadCount < 0) {
if (workerId < 0 || !wctx.IsNeededToWaitNextActivation) {
TlsThreadContext->Timers.HPNow = GetCycleCountFast();
wctx.AddElapsedCycles(ActorSystemIndex, TlsThreadContext->Timers.HPNow - TlsThreadContext->Timers.HPStart);
return 0;
}

Expand All @@ -227,13 +223,6 @@ namespace NActors {
wctx.SharedThread->SetWork();
}
AtomicDecrement(Semaphore);
TlsThreadContext->Timers.HPNow = GetCycleCountFast();
TlsThreadContext->Timers.Elapsed += TlsThreadContext->Timers.HPNow - TlsThreadContext->Timers.HPStart;
wctx.AddElapsedCycles(ActorSystemIndex, TlsThreadContext->Timers.Elapsed);
if (TlsThreadContext->Timers.Parked > 0) {
wctx.AddParkedCycles(TlsThreadContext->Timers.Parked);
}

return activation;
}
}
Expand Down
26 changes: 11 additions & 15 deletions ydb/library/actors/core/executor_pool_io.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -31,31 +31,27 @@ namespace NActors {
i16 workerId = wctx.WorkerId;
Y_DEBUG_ABORT_UNLESS(workerId < PoolThreads);

NHPTimer::STime elapsed = 0;
NHPTimer::STime parked = 0;
NHPTimer::STime hpstart = GetCycleCountFast();
NHPTimer::STime hpnow;

const TAtomic x = AtomicDecrement(Semaphore);
if (x < 0) {
TExecutorThreadCtx& threadCtx = Threads[workerId];
ThreadQueue.Push(workerId + 1, revolvingCounter);
hpnow = GetCycleCountFast();
elapsed += hpnow - hpstart;

NHPTimer::STime hpnow = GetCycleCountFast();
NHPTimer::STime hpprev = TlsThreadContext->StartOfElapsingTime.exchange(hpnow, std::memory_order_acq_rel);
TlsThreadContext->ElapsingActorActivity.store(Max<ui64>(), std::memory_order_release);
wctx.AddElapsedCycles(ActorSystemIndex, hpnow - hpprev);

if (threadCtx.WaitingPad.Park())
return 0;
hpstart = GetCycleCountFast();
parked += hpstart - hpnow;

hpnow = GetCycleCountFast();
hpprev = TlsThreadContext->StartOfElapsingTime.exchange(hpnow, std::memory_order_acq_rel);
TlsThreadContext->ElapsingActorActivity.store(ActorSystemIndex, std::memory_order_release);
wctx.AddParkedCycles(hpnow - hpprev);
}

while (!StopFlag.load(std::memory_order_acquire)) {
if (const ui32 activation = Activations.Pop(++revolvingCounter)) {
hpnow = GetCycleCountFast();
elapsed += hpnow - hpstart;
wctx.AddElapsedCycles(ActorSystemIndex, elapsed);
if (parked > 0) {
wctx.AddParkedCycles(parked);
}
return activation;
}
SpinLockPause();
Expand Down
1 change: 0 additions & 1 deletion ydb/library/actors/core/executor_pool_shared.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,6 @@ TSharedExecutorPool::TSharedExecutorPool(const TSharedExecutorPoolConfig &config
, PoolCount(poolCount)
, SharedThreadCount(poolsWithThreads.size())
, Threads(new TSharedExecutorThreadCtx[SharedThreadCount])
, Timers(new TTimers[SharedThreadCount])
, TimePerMailbox(config.TimePerMailbox)
, EventsPerMailbox(config.EventsPerMailbox)
, SoftProcessingDurationTs(config.SoftProcessingDurationTs)
Expand Down
1 change: 0 additions & 1 deletion ydb/library/actors/core/executor_pool_shared.h
Original file line number Diff line number Diff line change
Expand Up @@ -56,7 +56,6 @@ namespace NActors {
i16 PoolCount;
i16 SharedThreadCount;
std::unique_ptr<TSharedExecutorThreadCtx[]> Threads;
std::unique_ptr<TTimers[]> Timers;

std::unique_ptr<NSchedulerQueue::TReader[]> ScheduleReaders;
std::unique_ptr<NSchedulerQueue::TWriter[]> ScheduleWriters;
Expand Down
40 changes: 32 additions & 8 deletions ydb/library/actors/core/executor_thread.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -48,6 +48,7 @@ namespace NActors {
, ThreadName(threadName)
, TimePerMailbox(timePerMailbox)
, EventsPerMailbox(eventsPerMailbox)
, ActorSystemIndex(TActorTypeOperator::GetActorSystemIndex())
{
Ctx.Switch(
ExecutorPool,
Expand Down Expand Up @@ -75,6 +76,7 @@ namespace NActors {
, EventsPerMailbox(eventsPerMailbox)
, SoftProcessingDurationTs(softProcessingDurationTs)
, SharedStats(poolCount)
, ActorSystemIndex(TActorTypeOperator::GetActorSystemIndex())
{
Ctx.Switch(
ExecutorPool,
Expand Down Expand Up @@ -241,7 +243,13 @@ namespace NActors {
NProfiling::TMemoryTagScope::Reset(activityType);
}

TlsThreadContext->ElapsingActorActivity.store(activityType, std::memory_order_release);
actor->Receive(ev);

hpnow = GetCycleCountFast();
hpprev = TlsThreadContext->StartOfElapsingTime.exchange(hpnow, std::memory_order_acq_rel);
TlsThreadContext->ElapsingActorActivity.store(ActorSystemIndex, std::memory_order_release);

mailbox->ProcessEvents(mailbox);
actor->OnDequeueEvent();

Expand All @@ -256,7 +264,6 @@ namespace NActors {
if (mailbox->IsEmpty()) // was not-free and become free, we must reclaim mailbox
reclaimAsFree = true;

hpnow = GetCycleCountFast();
NHPTimer::STime elapsed = Ctx.AddEventProcessingStats(hpprev, hpnow, activityType, CurrentActorScheduledEventsCounter);
if (elapsed > 1000000) {
LwTraceSlowEvent(ev.Get(), evTypeForTracing, actorType, Ctx.PoolId, CurrentRecipient, NHPTimer::GetSeconds(elapsed) * 1000.0);
Expand All @@ -277,10 +284,10 @@ namespace NActors {
Ctx.IncrementNonDeliveredEvents();
}
hpnow = GetCycleCountFast();
hpprev = TlsThreadContext->StartOfElapsingTime.exchange(hpnow, std::memory_order_acq_rel);
Ctx.AddElapsedCycles(ActorSystemIndex, hpnow - hpprev);
}

hpprev = hpnow;

if (TlsThreadContext->CapturedType == ESendingType::Tail) {
AtomicStore(&mailbox->ScheduleMoment, hpnow);
Ctx.IncrementMailboxPushedOutByTailSending();
Expand Down Expand Up @@ -495,8 +502,9 @@ namespace NActors {
ThreadDisableBalloc();
#endif

TThreadContext threadCtx;
TlsThreadContext = &threadCtx;
TlsThreadCtx.WorkerCtx = &Ctx;
TlsThreadCtx.ActorSystemIndex = ActorSystemIndex;
TlsThreadContext = &TlsThreadCtx;
if (ThreadName) {
::SetCurrentThreadName(ThreadName);
}
Expand Down Expand Up @@ -528,8 +536,9 @@ namespace NActors {
ThreadDisableBalloc();
#endif

TThreadContext threadCtx;
TlsThreadContext = &threadCtx;
TlsThreadCtx.WorkerCtx = &Ctx;
TlsThreadCtx.ActorSystemIndex = ActorSystemIndex;
TlsThreadContext = &TlsThreadCtx;
if (ThreadName) {
::SetCurrentThreadName(ThreadName);
}
Expand All @@ -550,7 +559,6 @@ namespace NActors {
}

if (!wasWorking && !StopFlag.load(std::memory_order_relaxed)) {
TlsThreadContext->Timers.Reset();
ThreadCtx->UnsetWork();
ThreadCtx->Wait(0, &StopFlag);
}
Expand Down Expand Up @@ -760,10 +768,26 @@ namespace NActors {
}

void TGenericExecutorThread::GetCurrentStats(TExecutorThreadStats& statsCopy) const {
NHPTimer::STime hpnow = GetCycleCountFast();
ui64 activityType = TlsThreadCtx.ElapsingActorActivity.load(std::memory_order_acquire);
NHPTimer::STime hpprev = TlsThreadCtx.StartOfElapsingTime.exchange(hpnow, std::memory_order_acq_rel);
if (activityType == Max<ui64>()) {
Ctx.AddParkedCycles(hpnow - hpprev);
} else {
Ctx.AddElapsedCycles(activityType, hpnow - hpprev);
}
Ctx.GetCurrentStats(statsCopy);
}

void TGenericExecutorThread::GetSharedStats(i16 poolId, TExecutorThreadStats &statsCopy) const {
NHPTimer::STime hpnow = GetCycleCountFast();
ui64 activityType = TlsThreadCtx.ElapsingActorActivity.load(std::memory_order_acquire);
NHPTimer::STime hpprev = TlsThreadCtx.StartOfElapsingTime.exchange(hpnow, std::memory_order_acq_rel);
if (activityType == Max<ui64>()) {
Ctx.AddParkedCycles(hpnow - hpprev);
} else {
Ctx.AddElapsedCycles(activityType, hpnow - hpprev);
}
statsCopy = TExecutorThreadStats();
statsCopy.Aggregate(SharedStats[poolId]);
}
Expand Down
5 changes: 4 additions & 1 deletion ydb/library/actors/core/executor_thread.h
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@
#include "event.h"
#include "callstack.h"
#include "probes.h"
#include "thread_context.h"
#include "worker_context.h"
#include "log_settings.h"

Expand Down Expand Up @@ -92,7 +93,8 @@ namespace NActors {
ui64 CurrentActorScheduledEventsCounter = 0;

// Thread-specific
TWorkerContext Ctx;
mutable TThreadContext TlsThreadCtx;
mutable TWorkerContext Ctx;
ui64 RevolvingReadCounter = 0;
ui64 RevolvingWriteCounter = 0;
const TString ThreadName;
Expand All @@ -104,6 +106,7 @@ namespace NActors {
ui64 SoftProcessingDurationTs;

std::vector<TExecutorThreadStats> SharedStats;
const ui32 ActorSystemIndex;
};

class TExecutorThread: public TGenericExecutorThread {
Expand Down
13 changes: 8 additions & 5 deletions ydb/library/actors/core/executor_thread_ctx.h
Original file line number Diff line number Diff line change
Expand Up @@ -96,16 +96,19 @@ namespace NActors {
return false;
}

NHPTimer::STime hpnow = GetCycleCountFast();
NHPTimer::STime hpprev = TlsThreadContext->StartOfElapsingTime.exchange(hpnow, std::memory_order_acq_rel);
TlsThreadContext->ElapsingActorActivity.store(Max<ui64>(), std::memory_order_release);
TlsThreadContext->WorkerCtx->AddElapsedCycles(TlsThreadContext->ActorSystemIndex, hpnow - hpprev);
do {
TlsThreadContext->Timers.HPNow = GetCycleCountFast();
TlsThreadContext->Timers.Elapsed += TlsThreadContext->Timers.HPNow - TlsThreadContext->Timers.HPStart;
if (WaitingPad.Park()) // interrupted
return true;
TlsThreadContext->Timers.HPStart = GetCycleCountFast();
TlsThreadContext->Timers.Parked += TlsThreadContext->Timers.HPStart - TlsThreadContext->Timers.HPNow;
hpnow = GetCycleCountFast();
hpprev = TlsThreadContext->StartOfElapsingTime.exchange(hpnow, std::memory_order_acq_rel);
TlsThreadContext->WorkerCtx->AddParkedCycles(hpnow - hpprev);
state = GetState<TWaitState>();
} while (static_cast<EThreadState>(state) == EThreadState::Sleep && !stopFlag->load(std::memory_order_relaxed));

TlsThreadContext->ElapsingActorActivity.store(TlsThreadContext->ActorSystemIndex, std::memory_order_release);
static_cast<TDerived*>(this)->AfterWakeUp(state);
return false;
}
Expand Down
23 changes: 6 additions & 17 deletions ydb/library/actors/core/thread_context.h
Original file line number Diff line number Diff line change
Expand Up @@ -11,26 +11,11 @@
namespace NActors {

class IExecutorPool;
struct TWorkerContext;

template <typename T>
struct TWaitingStats;

struct TTimers {
NHPTimer::STime Elapsed = 0;
NHPTimer::STime Parked = 0;
NHPTimer::STime Blocked = 0;
NHPTimer::STime HPStart = GetCycleCountFast();
NHPTimer::STime HPNow;

void Reset() {
Elapsed = 0;
Parked = 0;
Blocked = 0;
HPStart = GetCycleCountFast();
HPNow = HPStart;
}
};

struct TThreadContext {
IExecutorPool *Pool = nullptr;
ui32 CapturedActivation = 0;
Expand All @@ -42,8 +27,12 @@ namespace NActors {
ui16 LocalQueueSize = 0;
TWaitingStats<ui64> *WaitingStats = nullptr;
bool IsCurrentRecipientAService = false;
TTimers Timers;
TMPMCRingQueue<20>::EPopMode ActivationPopMode = TMPMCRingQueue<20>::EPopMode::ReallySlow;

std::atomic<ui64> StartOfElapsingTime = 0;
std::atomic<ui64> ElapsingActorActivity = 0;
TWorkerContext *WorkerCtx = nullptr;
ui32 ActorSystemIndex = 0;
};

extern Y_POD_THREAD(TThreadContext*) TlsThreadContext; // in actor.cpp
Expand Down

0 comments on commit 49d67fc

Please sign in to comment.