From 53259c90946aa5172013b3adb11f6037c3d288c4 Mon Sep 17 00:00:00 2001 From: kruall Date: Thu, 28 Dec 2023 13:03:27 +0300 Subject: [PATCH] Add ring activation queue (#695) Co-authored-by: Aleksandr Kriukov --- .../actors/core/executor_pool_base.cpp | 13 +- ydb/library/actors/core/executor_pool_base.h | 12 +- ydb/library/actors/core/thread_context.h | 2 + ydb/library/actors/util/activation_queue.h | 55 +++ ydb/library/actors/util/mpmc_ring_queue.h | 359 ++++++++++++++++++ 5 files changed, 439 insertions(+), 2 deletions(-) create mode 100644 ydb/library/actors/util/activation_queue.h create mode 100644 ydb/library/actors/util/mpmc_ring_queue.h diff --git a/ydb/library/actors/core/executor_pool_base.cpp b/ydb/library/actors/core/executor_pool_base.cpp index eb7d6df88fc4..4e559735c76d 100644 --- a/ydb/library/actors/core/executor_pool_base.cpp +++ b/ydb/library/actors/core/executor_pool_base.cpp @@ -69,6 +69,9 @@ namespace NActors { : TExecutorPoolBaseMailboxed(poolId) , PoolThreads(threads) , ThreadsAffinity(affinity) +#ifdef RING_ACTIVATION_QUEUE + , Activations(threads == 1) +#endif {} TExecutorPoolBase::~TExecutorPoolBase() { @@ -112,7 +115,11 @@ namespace NActors { } void TExecutorPoolBase::ScheduleActivation(ui32 activation) { +#ifdef RING_ACTIVATION_QUEUE + ScheduleActivationEx(activation, 0); +#else ScheduleActivationEx(activation, AtomicIncrement(ActivationsRevolvingCounter)); +#endif } Y_FORCE_INLINE bool IsAllowedToCapture(IExecutorPool *self) { @@ -132,7 +139,11 @@ namespace NActors { TlsThreadContext->CapturedType = TlsThreadContext->SendingType; } if (activation) { - ScheduleActivationEx(activation, AtomicIncrement(ActivationsRevolvingCounter)); +#ifdef RING_ACTIVATION_QUEUE + ScheduleActivationEx(activation, 0); +#else + ScheduleActivationEx(activation, AtomicIncrement(ActivationsRevolvingCounter)); +#endif } } diff --git a/ydb/library/actors/core/executor_pool_base.h b/ydb/library/actors/core/executor_pool_base.h index 56e74a025f59..ae3ba30b8aaf 100644 --- a/ydb/library/actors/core/executor_pool_base.h +++ b/ydb/library/actors/core/executor_pool_base.h @@ -4,10 +4,13 @@ #include "executor_thread.h" #include "mon_stats.h" #include "scheduler_queue.h" +#include #include #include #include +//#define RING_ACTIVATION_QUEUE + namespace NActors { class TActorSystem; @@ -43,10 +46,17 @@ namespace NActors { class TExecutorPoolBase: public TExecutorPoolBaseMailboxed { protected: + +#ifdef RING_ACTIVATION_QUEUE + using TActivationQueue = TRingActivationQueue; +#else + using TActivationQueue = TUnorderedCache; +#endif + const i16 PoolThreads; TIntrusivePtr ThreadsAffinity; TAtomic Semaphore = 0; - TUnorderedCache Activations; + TActivationQueue Activations; TAtomic ActivationsRevolvingCounter = 0; std::atomic_bool StopFlag = false; public: diff --git a/ydb/library/actors/core/thread_context.h b/ydb/library/actors/core/thread_context.h index 5345d31d9bb5..0ddaffd1c5e3 100644 --- a/ydb/library/actors/core/thread_context.h +++ b/ydb/library/actors/core/thread_context.h @@ -3,6 +3,7 @@ #include "defs.h" #include +#include #include @@ -42,6 +43,7 @@ namespace NActors { TWaitingStats *WaitingStats = nullptr; bool IsCurrentRecipientAService = false; TTimers Timers; + TMPMCRingQueue<20>::EPopMode ActivationPopMode = TMPMCRingQueue<20>::EPopMode::ReallySlow; }; extern Y_POD_THREAD(TThreadContext*) TlsThreadContext; // in actor.cpp diff --git a/ydb/library/actors/util/activation_queue.h b/ydb/library/actors/util/activation_queue.h new file mode 100644 index 000000000000..dbc8df71b5aa --- /dev/null +++ b/ydb/library/actors/util/activation_queue.h @@ -0,0 +1,55 @@ +#pragma once + +#include "defs.h" +#include "mpmc_ring_queue.h" +#include + + +namespace NActors { + + +class TRingActivationQueue { + NThreading::TPadded IsNeedToWriteToOldQueue = false; + NThreading::TPadded> ActivationQueue; + NThreading::TPadded> OldActivationQueue; + NThreading::TPadded RevolvingCounter = 0; + const bool IsMPSC = false; + +public: + TRingActivationQueue(bool isMPSC) + : IsMPSC(isMPSC) + {} + + + void Push(ui32 activation, ui64 revolvingCounter) { + if (!IsNeedToWriteToOldQueue.load(std::memory_order_acquire)) { + if (ActivationQueue.TryPush(activation)) { + return; + } + IsNeedToWriteToOldQueue.store(true, std::memory_order_release); + } + if (!revolvingCounter) { + revolvingCounter = RevolvingCounter.fetch_add(1, std::memory_order_relaxed); + } + OldActivationQueue.Push(activation, AtomicIncrement(revolvingCounter)); + } + + ui32 Pop(ui64 revolvingCounter) { + std::optional activation; + if (IsMPSC) { + activation = ActivationQueue.TryPopSingleConsumer(); + } else { + activation = ActivationQueue.TryPop(TlsThreadContext->ActivationPopMode); + } + if (activation) { + return *activation; + } + if (IsNeedToWriteToOldQueue.load(std::memory_order_acquire)) { + return OldActivationQueue.Pop(revolvingCounter); + } + return 0; + } + +}; + +} // NActors diff --git a/ydb/library/actors/util/mpmc_ring_queue.h b/ydb/library/actors/util/mpmc_ring_queue.h new file mode 100644 index 000000000000..a78392d7de57 --- /dev/null +++ b/ydb/library/actors/util/mpmc_ring_queue.h @@ -0,0 +1,359 @@ +#pragma once +#include "defs.h" + +#include + +#include +#include + +namespace NActors { + +template +struct TMPMCRingQueue { + static constexpr ui32 MaxSize = 1 << MaxSizeBits; + + enum class EPopMode { + Fast, + ReallyFast, + Slow, + ReallySlow, + }; + + struct alignas(ui64) TSlot { + static constexpr ui64 EmptyBit = 1ull << 63; + ui64 Generation = 0; + ui64 Value = 0; + bool IsEmpty; + + static constexpr ui64 MakeEmpty(ui64 generation) { + return (1ull << 63) | generation; + } + + static constexpr TSlot Recognise(ui64 slotValue) { + if (slotValue & EmptyBit) { + return {.Generation = (EmptyBit ^ slotValue), .IsEmpty=true}; + } + return {.Value=slotValue, .IsEmpty=false}; + } + } ; + + NThreading::TPadded> Tail{0}; + NThreading::TPadded> Head{0}; + NThreading::TPadded>> Buffer; + ui64 LocalHead = 0; + ui64 LocalGeneration = 0; + + static constexpr ui32 ConvertIdx(ui32 idx) { + idx = idx % MaxSize; + if constexpr (MaxSize < 0x100) { + return idx; + } + // 0, 16, 32, .., 240, + // 1, 17, 33, .., 241, + // ... + // 15, 31, 63, ..., 255, + return (idx & ~0xff) | ((idx & 0xf) << 4) | ((idx >> 4) & 0xf); + } + + TMPMCRingQueue() + : Buffer(new std::atomic[MaxSize]) + { + for (ui32 idx = 0; idx < MaxSize; ++idx) { + Buffer[idx] = TSlot::MakeEmpty(0); + } + } + + bool TryPushSlow(ui32 val) { + for (;;) { + ui64 currentTail = Tail.load(std::memory_order_acquire); + ui32 generation = currentTail / MaxSize; + + std::atomic ¤tSlot = Buffer[ConvertIdx(currentTail)]; + TSlot slot; + ui64 expected = TSlot::MakeEmpty(generation); + do { + if (currentSlot.compare_exchange_weak(expected, val)) { + Tail.compare_exchange_strong(currentTail, currentTail + 1); + return true; + } + slot = TSlot::Recognise(expected); + } while (slot.Generation <= generation && slot.IsEmpty); + + if (!slot.IsEmpty) { + ui64 currentHead = Head.load(std::memory_order_acquire); + if (currentHead + MaxSize <= currentTail + std::min(1024, MaxSize - 1)) { + return false; + } + } + + Tail.compare_exchange_strong(currentTail, currentTail + 1); + SpinLockPause(); + } + } + + bool TryPush(ui32 val) { + ui64 currentTail = Tail.fetch_add(1, std::memory_order_relaxed); + ui32 generation = currentTail / MaxSize; + + std::atomic ¤tSlot = Buffer[ConvertIdx(currentTail)]; + TSlot slot; + ui64 expected = TSlot::MakeEmpty(generation); + do { + if (currentSlot.compare_exchange_weak(expected, val)) { + return true; + } + slot = TSlot::Recognise(expected); + } while (slot.Generation <= generation && slot.IsEmpty); + + if (!slot.IsEmpty) { + ui64 currentHead = Head.load(std::memory_order_acquire); + if (currentHead + MaxSize <= currentTail + std::min(1024, MaxSize - 1)) { + return false; + } + } + return TryPushSlow(val); + } + + void ShiftLocalHead() { + if (++LocalHead == MaxSize) { + LocalHead = 0; + LocalGeneration++; + } + } + + std::optional TryPopSingleConsumer() { + for (;;) { + std::atomic ¤tSlot = Buffer[ConvertIdx(LocalHead)]; + ui64 expected = currentSlot.load(std::memory_order_acquire); + TSlot slot = TSlot::Recognise(expected); + if (slot.IsEmpty) { + ui64 currentTail = Tail.load(std::memory_order_acquire); + ui64 globalHead = LocalGeneration * MaxSize + LocalHead; + if (currentTail <= globalHead) { + Tail.compare_exchange_strong(currentTail, globalHead); + return std::nullopt; + } + if (slot.Generation == LocalGeneration) { + if (currentSlot.compare_exchange_strong(expected, TSlot::MakeEmpty(LocalGeneration + 1))) { + ShiftLocalHead(); + } + } + SpinLockPause(); + continue; + } + currentSlot.store(TSlot::MakeEmpty(LocalGeneration + 1), std::memory_order_release); + ShiftLocalHead(); + return slot.Value; + } + } + + std::optional TryPop(EPopMode &mode) { + switch (mode) { + case EPopMode::Fast: + if (auto item = TryPopFast()) { + mode = EPopMode::ReallyFast; + return item; + } + mode = EPopMode::Slow; + return std::nullopt; + case EPopMode::ReallyFast: + if (auto item = TryPopReallyFast()) { + return item; + } + mode = EPopMode::Slow; + return std::nullopt; + case EPopMode::Slow: + if (auto item = TryPopSlow()) { + mode = EPopMode::Fast; + return item; + } + mode = EPopMode::ReallySlow; + return std::nullopt; + case EPopMode::ReallySlow: + if (auto item = TryPopReallySlow()) { + mode = EPopMode::Fast; + return item; + } + return std::nullopt; + } + } + + void TryIncrementHead(ui64 ¤tHead) { + ui64 expectedHead = currentHead; + while (expectedHead <= currentHead) { + if (Head.compare_exchange_weak(expectedHead, currentHead + 1)) { + currentHead++; + return; + } + } + currentHead = expectedHead; + } + + std::optional TryPopReallySlow() { + ui64 currentHead = Head.load(std::memory_order_acquire); + ui64 currentTail = Tail.load(std::memory_order_acquire); + while (currentHead > currentTail) { + if (Tail.compare_exchange_weak(currentTail, currentHead)) { + currentTail = currentHead; + } + } + if (currentHead == currentTail) { + return std::nullopt; + } + + return TryPopSlow(currentHead); + } + + std::optional TryPopSlow(ui64 currentHead = 0) { + if (!currentHead) { + currentHead = Head.load(std::memory_order_acquire); + } + for (ui32 it = 0; it < 3; ++it) { + ui32 generation = currentHead / MaxSize; + + std::atomic ¤tSlot = Buffer[ConvertIdx(currentHead)]; + + ui64 expected = currentSlot.load(std::memory_order_relaxed); + TSlot slot = TSlot::Recognise(expected); + + if (slot.Generation > generation) { + Head.compare_exchange_strong(currentHead, currentHead + 1); + SpinLockPause(); + continue; + } + + while (generation > slot.Generation) { + if (currentSlot.compare_exchange_weak(expected, TSlot::MakeEmpty(generation))) { + if (!slot.IsEmpty) { + Head.compare_exchange_strong(currentHead, currentHead + 1); + return slot.Value; + } + break; + } + slot = TSlot::Recognise(expected); + } + + while (!slot.IsEmpty) { + if (currentSlot.compare_exchange_weak(expected, TSlot::MakeEmpty(generation + 1))) { + Head.compare_exchange_strong(currentHead, currentHead + 1); + return slot.Value; + } + slot = TSlot::Recognise(expected); + } + + if (slot.Generation > generation) { + Head.compare_exchange_strong(currentHead, currentHead + 1); + SpinLockPause(); + continue; + } + + ui64 currentTail = Tail.load(std::memory_order_acquire); + if (currentTail <= currentHead) { + return std::nullopt; + } + + while (slot.Generation == generation && slot.IsEmpty) { + if (currentSlot.compare_exchange_weak(expected, TSlot::MakeEmpty(generation + 1))) { + Head.compare_exchange_strong(currentHead, currentHead + 1); + break; + } + slot = TSlot::Recognise(expected); + } + + SpinLockPause(); + currentHead = Head.load(std::memory_order_acquire); + } + return std::nullopt; + } + + std::optional TryPopFast() { + for (;;) { + ui64 currentHead = Head.fetch_add(1, std::memory_order_relaxed); + ui32 generation = currentHead / MaxSize; + + std::atomic ¤tSlot = Buffer[ConvertIdx(currentHead)]; + + ui64 expected = currentSlot.load(std::memory_order_relaxed); + TSlot slot = TSlot::Recognise(expected); + + if (slot.Generation > generation) { + SpinLockPause(); + continue; + } + + while (generation >= slot.Generation) { + if (currentSlot.compare_exchange_weak(expected, TSlot::MakeEmpty(generation + 1))) { + if (!slot.IsEmpty) { + return slot.Value; + } + break; + } + slot = TSlot::Recognise(expected); + } + + if (slot.Generation > generation) { + SpinLockPause(); + continue; + } + + ui64 currentTail = Tail.load(std::memory_order_acquire); + if (currentTail > currentHead) { + SpinLockPause(); + continue; + } + + while (currentTail <= currentHead) { + if (Tail.compare_exchange_weak(currentTail, currentHead + 1)) { + return std::nullopt; + } + } + return std::nullopt; + } + } + + std::optional TryPopReallyFast() { + for (;;) { + ui64 currentHead = Head.fetch_add(1, std::memory_order_relaxed); + ui32 generation = currentHead / MaxSize; + + std::atomic ¤tSlot = Buffer[ConvertIdx(currentHead)]; + + ui64 expected = currentSlot.exchange(TSlot::MakeEmpty(generation + 1), std::memory_order_acq_rel); + TSlot slot = TSlot::Recognise(expected); + if (!slot.IsEmpty) { + return slot.Value; + } + + if (slot.Generation > generation) { + expected = TSlot::MakeEmpty(generation + 1); + TSlot slot2 = TSlot::Recognise(expected); + while (slot.Generation > slot2.Generation) { + if (currentSlot.compare_exchange_weak(expected, TSlot::MakeEmpty(slot.Generation))) { + if (!slot2.IsEmpty) { + return slot2.Value; + } + break; + } + slot2 = TSlot::Recognise(expected); + } + SpinLockPause(); + continue; + } + + if (slot.Generation > generation) { + SpinLockPause(); + continue; + } + + ui64 currentTail = Tail.load(std::memory_order_acquire); + while (currentTail < currentHead) { + if (Tail.compare_exchange_weak(currentTail, currentHead + 1)) { + return std::nullopt; + } + } + SpinLockPause(); + } + } +}; + +} // NActors \ No newline at end of file