Skip to content

Commit

Permalink
Add ring activation queue (#695)
Browse files Browse the repository at this point in the history
Co-authored-by: Aleksandr Kriukov <kruall@ydb.ru>
  • Loading branch information
kruall and Aleksandr Kriukov authored Dec 28, 2023
1 parent 68184b7 commit 53259c9
Show file tree
Hide file tree
Showing 5 changed files with 439 additions and 2 deletions.
13 changes: 12 additions & 1 deletion ydb/library/actors/core/executor_pool_base.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -69,6 +69,9 @@ namespace NActors {
: TExecutorPoolBaseMailboxed(poolId)
, PoolThreads(threads)
, ThreadsAffinity(affinity)
#ifdef RING_ACTIVATION_QUEUE
, Activations(threads == 1)
#endif
{}

TExecutorPoolBase::~TExecutorPoolBase() {
Expand Down Expand Up @@ -112,7 +115,11 @@ namespace NActors {
}

void TExecutorPoolBase::ScheduleActivation(ui32 activation) {
#ifdef RING_ACTIVATION_QUEUE
ScheduleActivationEx(activation, 0);
#else
ScheduleActivationEx(activation, AtomicIncrement(ActivationsRevolvingCounter));
#endif
}

Y_FORCE_INLINE bool IsAllowedToCapture(IExecutorPool *self) {
Expand All @@ -132,7 +139,11 @@ namespace NActors {
TlsThreadContext->CapturedType = TlsThreadContext->SendingType;
}
if (activation) {
ScheduleActivationEx(activation, AtomicIncrement(ActivationsRevolvingCounter));
#ifdef RING_ACTIVATION_QUEUE
ScheduleActivationEx(activation, 0);
#else
ScheduleActivationEx(activation, AtomicIncrement(ActivationsRevolvingCounter));
#endif
}
}

Expand Down
12 changes: 11 additions & 1 deletion ydb/library/actors/core/executor_pool_base.h
Original file line number Diff line number Diff line change
Expand Up @@ -4,10 +4,13 @@
#include "executor_thread.h"
#include "mon_stats.h"
#include "scheduler_queue.h"
#include <ydb/library/actors/util/activation_queue.h>
#include <ydb/library/actors/util/affinity.h>
#include <ydb/library/actors/util/unordered_cache.h>
#include <ydb/library/actors/util/threadparkpad.h>

//#define RING_ACTIVATION_QUEUE

namespace NActors {
class TActorSystem;

Expand Down Expand Up @@ -43,10 +46,17 @@ namespace NActors {

class TExecutorPoolBase: public TExecutorPoolBaseMailboxed {
protected:

#ifdef RING_ACTIVATION_QUEUE
using TActivationQueue = TRingActivationQueue;
#else
using TActivationQueue = TUnorderedCache<ui32, 512, 4>;
#endif

const i16 PoolThreads;
TIntrusivePtr<TAffinity> ThreadsAffinity;
TAtomic Semaphore = 0;
TUnorderedCache<ui32, 512, 4> Activations;
TActivationQueue Activations;
TAtomic ActivationsRevolvingCounter = 0;
std::atomic_bool StopFlag = false;
public:
Expand Down
2 changes: 2 additions & 0 deletions ydb/library/actors/core/thread_context.h
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@
#include "defs.h"

#include <ydb/library/actors/util/datetime.h>
#include <ydb/library/actors/util/mpmc_ring_queue.h>

#include <util/system/tls.h>

Expand Down Expand Up @@ -42,6 +43,7 @@ namespace NActors {
TWaitingStats<ui64> *WaitingStats = nullptr;
bool IsCurrentRecipientAService = false;
TTimers Timers;
TMPMCRingQueue<20>::EPopMode ActivationPopMode = TMPMCRingQueue<20>::EPopMode::ReallySlow;
};

extern Y_POD_THREAD(TThreadContext*) TlsThreadContext; // in actor.cpp
Expand Down
55 changes: 55 additions & 0 deletions ydb/library/actors/util/activation_queue.h
Original file line number Diff line number Diff line change
@@ -0,0 +1,55 @@
#pragma once

#include "defs.h"
#include "mpmc_ring_queue.h"
#include <atomic>


namespace NActors {


class TRingActivationQueue {
NThreading::TPadded<std::atomic_bool> IsNeedToWriteToOldQueue = false;
NThreading::TPadded<TMPMCRingQueue<20>> ActivationQueue;
NThreading::TPadded<TUnorderedCache<ui32, 512, 4>> OldActivationQueue;
NThreading::TPadded<std::atomic_uint64_t> RevolvingCounter = 0;
const bool IsMPSC = false;

public:
TRingActivationQueue(bool isMPSC)
: IsMPSC(isMPSC)
{}


void Push(ui32 activation, ui64 revolvingCounter) {
if (!IsNeedToWriteToOldQueue.load(std::memory_order_acquire)) {
if (ActivationQueue.TryPush(activation)) {
return;
}
IsNeedToWriteToOldQueue.store(true, std::memory_order_release);
}
if (!revolvingCounter) {
revolvingCounter = RevolvingCounter.fetch_add(1, std::memory_order_relaxed);
}
OldActivationQueue.Push(activation, AtomicIncrement(revolvingCounter));
}

ui32 Pop(ui64 revolvingCounter) {
std::optional<ui32> activation;
if (IsMPSC) {
activation = ActivationQueue.TryPopSingleConsumer();
} else {
activation = ActivationQueue.TryPop(TlsThreadContext->ActivationPopMode);
}
if (activation) {
return *activation;
}
if (IsNeedToWriteToOldQueue.load(std::memory_order_acquire)) {
return OldActivationQueue.Pop(revolvingCounter);
}
return 0;
}

};

} // NActors
Loading

0 comments on commit 53259c9

Please sign in to comment.