diff --git a/ydb/library/actors/queues/bench/bench_cases.h b/ydb/library/actors/queues/bench/bench_cases.h index 981ca9ff201b..a62b610498a1 100644 --- a/ydb/library/actors/queues/bench/bench_cases.h +++ b/ydb/library/actors/queues/bench/bench_cases.h @@ -1,109 +1,394 @@ #pragma once +#include "defs.h" #include "queue.h" #include "worker.h" +#include +#include + namespace NActors::NQueueBench { - - template - struct TBenchCases { - template - static TStatsCollector BasicPushPopSingleThread() { - TRealQueue realQueue; - TQueueAdaptor adapter(&realQueue); - TStatsCollector collector; - TSimpleWorker worker( - &adapter, - { - TWorkerAction{.Action=EWorkerAction::Push, .Expected=EExpectedStatus::Success, .Value=1}, - TWorkerAction{.Action=EWorkerAction::Push, .Expected=EExpectedStatus::Success, .Value=2}, - TWorkerAction{.Action=EWorkerAction::Pop, .Expected=EExpectedStatus::Success, .Value=1}, - TWorkerAction{.Action=EWorkerAction::Pop, .Expected=EExpectedStatus::Success, .Value=2}, - } - ); - RunThreads(std::make_unique>(worker, &collector)); - return std::move(collector); + struct TThreadCounts { + ui64 ProducerThreads = 0; + ui64 ConsumerThreads = 0; + }; + + template + class IBenchCase { + public: + virtual ~IBenchCase () {} + + virtual TStatsCollector Run() = 0; + + virtual TThreadCounts GetThreads(ui64 globalThreads) = 0; + }; + + template + class IBenchCaseWithDurationAndThreads { + public: + virtual ~IBenchCaseWithDurationAndThreads() {} + + virtual TStatsCollector Run(TDuration duration, ui64 threads) = 0; + + virtual TString Validate(ui64 /*threads*/) { + return ""; } - template - static TStatsCollector BasicPushPopMultiThreads() { - TRealQueue realQueue; - TVector> adapters; - TVector> threads; - TStatsCollector collector; - for (ui32 threadIdx = 0; threadIdx < ThreadCount; ++threadIdx) { - TQueueAdaptor *adapter = new TQueueAdaptor(&realQueue); - adapters.emplace_back(adapter); - TSimpleWorker> worker( - adapter, + virtual std::variant GetThreads(ui64 globalThreads) = 0; + }; + + template + struct TTestCases { + + template + class TBasicPushPopSingleThread : public IBenchCase { + public: + TStatsCollector Run() override { + TRealQueue realQueue; + TQueueAdaptor adapter(&realQueue); + TStatsCollector collector; + TSimpleWorker worker( + &adapter, { TWorkerAction{.Action=EWorkerAction::Push, .Expected=EExpectedStatus::Success, .Value=1}, + TWorkerAction{.Action=EWorkerAction::Push, .Expected=EExpectedStatus::Success, .Value=2}, TWorkerAction{.Action=EWorkerAction::Pop, .Expected=EExpectedStatus::Success, .Value=1}, - }, - RepeatCount + TWorkerAction{.Action=EWorkerAction::Pop, .Expected=EExpectedStatus::Success, .Value=2}, + } ); - threads.emplace_back(new TTestThread(worker, &collector)); + RunThreads(std::make_unique>(worker, &collector)); + return std::move(collector); } - RunThreads(threads); - return std::move(collector); - } + TThreadCounts GetThreads(ui64) override { + return {1, 1}; + } + }; + + template + class TBasicPushPopMultiThreads : public IBenchCase { + public: + TStatsCollector Run() override { + TRealQueue realQueue; + TVector> adapters; + TVector> threads; + TStatsCollector collector; + for (ui32 threadIdx = 0; threadIdx < ThreadCount; ++threadIdx) { + TQueueAdaptor *adapter = new TQueueAdaptor(&realQueue); + adapters.emplace_back(adapter); + TSimpleWorker> worker( + adapter, + { + TWorkerAction{.Action=EWorkerAction::Push, .Expected=EExpectedStatus::Success, .Value=1}, + TWorkerAction{.Action=EWorkerAction::Pop, .Expected=EExpectedStatus::Success, .Value=1}, + }, + RepeatCount + ); + threads.emplace_back(new TTestThread(worker, &collector)); + } + RunThreads(threads); + return std::move(collector); + } + + TThreadCounts GetThreads(ui64) override { + return {ThreadCount, ThreadCount}; + } + }; + template - static TStatsCollector BasicProducing() { - TRealQueue realQueue; - TVector> adapters; - TVector> threads; - TStatsCollector collector; - for (ui32 threadIdx = 0; threadIdx < ThreadCount; ++threadIdx) { - TQueueAdaptor *adapter = new TQueueAdaptor(&realQueue); - adapters.emplace_back(adapter); - TProducerWorker> worker(adapter, {}); - threads.emplace_back(new TTestThread(worker, &collector)); - } - RunThreads(threads); - return std::move(collector); - } + class TBasicProducing : public IBenchCase { + public: + TStatsCollector Run() override { + TRealQueue realQueue; + TVector> adapters; + TVector> threads; + TStatsCollector collector; + for (ui32 threadIdx = 0; threadIdx < ThreadCount; ++threadIdx) { + TQueueAdaptor *adapter = new TQueueAdaptor(&realQueue); + adapters.emplace_back(adapter); + TProducerWorker> worker(adapter, {}); + threads.emplace_back(new TTestThread(worker, &collector)); + } + RunThreads(threads); + return std::move(collector); + } + TThreadCounts GetThreads(ui64) override { + return {ThreadCount, 0}; + } + }; + template - static TStatsCollector ConsumingEmptyQueue() { - TRealQueue realQueue; - TVector> adapters; - TVector> threads; - TStatsCollector collector; - for (ui32 threadIdx = 0; threadIdx < ThreadCount; ++threadIdx) { - TQueueAdaptor *adapter = new TQueueAdaptor(&realQueue); - adapters.emplace_back(adapter); - TConsumerWorker> worker(adapter, Reads, true); - threads.emplace_back(new TTestThread(worker, &collector)); - } - RunThreads(threads); - return std::move(collector); - } + class TConsumingEmptyQueue : public IBenchCase { + public: + TStatsCollector Run() override { + TRealQueue realQueue; + TVector> adapters; + TVector> threads; + TStatsCollector collector; + for (ui32 threadIdx = 0; threadIdx < ThreadCount; ++threadIdx) { + TQueueAdaptor *adapter = new TQueueAdaptor(&realQueue); + adapters.emplace_back(adapter); + TConsumerWorker> worker(adapter, Reads, true); + threads.emplace_back(new TTestThread(worker, &collector)); + } + RunThreads(threads); + return std::move(collector); + } + + TThreadCounts GetThreads(ui64) override { + return {0, ThreadCount}; + } + }; template - static TStatsCollector BasicProducingConsuming() { - TRealQueue realQueue; - TVector> adapters; - TVector> threads; - TStatsCollector collector; - for (ui32 threadIdx = 0; threadIdx < ProducingThreadCount; ++threadIdx) { - TQueueAdaptor *adapter = new TQueueAdaptor(&realQueue); - adapters.emplace_back(adapter); - TProducerWorker> worker(adapter, PushedItems); - threads.emplace_back(new TTestThread(worker, &collector)); - } - collector.ConsumerInfo.resize(ConsumingThreadCount); - for (ui32 threadIdx = 0; threadIdx < ConsumingThreadCount; ++threadIdx) { - TQueueAdaptor *adapter = new TQueueAdaptor(&realQueue); - adapters.emplace_back(adapter); - TConsumerWorker> worker(adapter, PoppedItems, false, &collector.ConsumerInfo[threadIdx]); - threads.emplace_back(new TTestThread(worker, &collector)); - } - RunThreads(threads); - return std::move(collector); - } + class TBasicProducingConsuming : public IBenchCase { + public: + TStatsCollector Run() override { + TRealQueue realQueue; + TVector> adapters; + TVector> threads; + TStatsCollector collector; + for (ui32 threadIdx = 0; threadIdx < ProducingThreadCount; ++threadIdx) { + TQueueAdaptor *adapter = new TQueueAdaptor(&realQueue); + adapters.emplace_back(adapter); + TProducerWorker> worker(adapter, PushedItems); + threads.emplace_back(new TTestThread(worker, &collector)); + } + collector.ConsumerInfo.resize(ConsumingThreadCount); + for (ui32 threadIdx = 0; threadIdx < ConsumingThreadCount; ++threadIdx) { + TQueueAdaptor *adapter = new TQueueAdaptor(&realQueue); + adapters.emplace_back(adapter); + TConsumerWorker> worker(adapter, PoppedItems, false, &collector.ConsumerInfo[threadIdx]); + threads.emplace_back(new TTestThread(worker, &collector)); + } + RunThreads(threads); + return std::move(collector); + } + + TThreadCounts GetThreads(ui64) override { + return {ProducingThreadCount, ConsumingThreadCount}; + } + }; + }; + + + template + struct TBenchCasesWithDurationAndThreads { + + template + class TBasicPushPop : public IBenchCaseWithDurationAndThreads { + TStatsCollector Run(TDuration duration, ui64 threadCount) override { + TRealQueue realQueue; + TVector> adapters; + TVector> threads; + TStatsCollector collector; + for (ui32 threadIdx = 0; threadIdx < threadCount; ++threadIdx) { + TQueueAdaptor* adapter = new TQueueAdaptor(&realQueue); + adapters.emplace_back(adapter); + auto action = [queue = adapter] () -> bool { + while (!queue->TryPush(727)) { + if constexpr (Sleep) { + NanoSleep(1'000); + } else { + SpinLockPause(); + } + } + while (!queue->TryPop()) { + if constexpr (Sleep) { + NanoSleep(1'000); + } else { + SpinLockPause(); + } + } + return true; + }; + TWorkerWithDuration> worker(action, duration); + threads.emplace_back(new TTestThread(worker, &collector)); + } + RunThreads(threads); + return std::move(collector); + } + + std::variant GetThreads(ui64 globalThreads) override { + return globalThreads; + } + }; + + template + class TBasicProducingConsuming : public IBenchCaseWithDurationAndThreads { + TStatsCollector Run(TDuration duration, ui64 threadCount) override { + TRealQueue realQueue; + TVector> adapters; + TVector> threads; + TStatsCollector collector; + ui64 producingThreadCount = Producers * (threadCount / (Producers + Consumers)); + ui64 consumingThreadCount = Consumers * (threadCount / (Producers + Consumers)); + for (ui32 threadIdx = 0; threadIdx < producingThreadCount; ++threadIdx) { + TQueueAdaptor *adapter = new TQueueAdaptor(&realQueue); + adapters.emplace_back(adapter); + auto action = [queue = adapter] () -> bool { + if (!queue->TryPush(727)) { + if constexpr (Sleep) { + NanoSleep(1'000); + } else { + SpinLockPause(); + } + } + return true; + }; + TWorkerWithDuration> worker(action, duration); + threads.emplace_back(new TTestThread(worker, &collector)); + } + for (ui32 threadIdx = 0; threadIdx < consumingThreadCount; ++threadIdx) { + TQueueAdaptor *adapter = new TQueueAdaptor(&realQueue); + adapters.emplace_back(adapter); + auto action = [queue = adapter] () -> bool { + if (!queue->TryPop()) { + if constexpr (Sleep) { + NanoSleep(1'000); + } else { + SpinLockPause(); + } + } + return true; + }; + TWorkerWithDuration> worker(action, duration); + threads.emplace_back(new TTestThread(worker, &collector)); + } + RunThreads(threads); + return std::move(collector); + } + + TString Validate(ui64 globalThreads) override { + if (globalThreads % (Producers + Consumers)) { + return TStringBuilder() << "A number divisible by " << Producers + Consumers << " was expected instead of " << globalThreads; + } + return ""; + } + + std::variant GetThreads(ui64 globalThreads) override { + ui64 producingThreadCount = Producers * (globalThreads / (Producers + Consumers)); + ui64 consumingThreadCount = Consumers * (globalThreads / (Producers + Consumers)); + return TThreadCounts{producingThreadCount, consumingThreadCount}; + } + }; + + template + class TSingleProducer: public IBenchCaseWithDurationAndThreads { + TStatsCollector Run(TDuration duration, ui64 threadCount) override { + TRealQueue realQueue; + TVector> adapters; + TVector> threads; + TStatsCollector collector; + ui64 producingThreadCount = 1; + ui64 consumingThreadCount = threadCount - 1; + for (ui32 threadIdx = 0; threadIdx < producingThreadCount; ++threadIdx) { + TQueueAdaptor *adapter = new TQueueAdaptor(&realQueue); + adapters.emplace_back(adapter); + auto action = [queue = adapter] () -> bool { + if (!queue->TryPush(727)) { + if constexpr (Sleep) { + NanoSleep(1'000); + } else { + SpinLockPause(); + } + } + return true; + }; + TWorkerWithDuration> worker(action, duration); + threads.emplace_back(new TTestThread(worker, &collector)); + } + for (ui32 threadIdx = 0; threadIdx < consumingThreadCount; ++threadIdx) { + TQueueAdaptor *adapter = new TQueueAdaptor(&realQueue); + adapters.emplace_back(adapter); + auto action = [queue = adapter] () -> bool { + if (!queue->TryPop()) { + if constexpr (Sleep) { + NanoSleep(1'000); + } else { + SpinLockPause(); + } + } + return true; + }; + TWorkerWithDuration> worker(action, duration); + threads.emplace_back(new TTestThread(worker, &collector)); + } + RunThreads(threads); + return std::move(collector); + } + + TString Validate(ui64 globalThreads) override { + if (globalThreads < 2) { + return TStringBuilder() << "A number greater than 1 was expected instead of " << globalThreads; + } + return ""; + } + + std::variant GetThreads(ui64 globalThreads) override { + return TThreadCounts{1, globalThreads - 1}; + } + }; + + template + class TSingleConsumer: public IBenchCaseWithDurationAndThreads { + TStatsCollector Run(TDuration duration, ui64 threadCount) override { + TRealQueue realQueue; + TVector> adapters; + TVector> threads; + TStatsCollector collector; + ui64 producingThreadCount = threadCount - 1; + ui64 consumingThreadCount = 1; + for (ui32 threadIdx = 0; threadIdx < producingThreadCount; ++threadIdx) { + TQueueAdaptor *adapter = new TQueueAdaptor(&realQueue); + adapters.emplace_back(adapter); + auto action = [queue = adapter] () -> bool { + if (!queue->TryPush(727)) { + if constexpr (Sleep) { + NanoSleep(1'000); + } else { + SpinLockPause(); + } + } + return true; + }; + TWorkerWithDuration> worker(action, duration); + threads.emplace_back(new TTestThread(worker, &collector)); + } + for (ui32 threadIdx = 0; threadIdx < consumingThreadCount; ++threadIdx) { + TQueueAdaptor *adapter = new TQueueAdaptor(&realQueue); + adapters.emplace_back(adapter); + auto action = [queue = adapter] () -> bool { + if (!queue->TryPop()) { + if constexpr (Sleep) { + NanoSleep(1'000); + } else { + SpinLockPause(); + } + } + return true; + }; + TWorkerWithDuration> worker(action, duration); + threads.emplace_back(new TTestThread(worker, &collector)); + } + RunThreads(threads); + return std::move(collector); + } + + TString Validate(ui64 globalThreads) override { + if (globalThreads < 2) { + return TStringBuilder() << "A number greater than 1 was expected instead of " << globalThreads; + } + return ""; + } + + std::variant GetThreads(ui64 globalThreads) override { + return TThreadCounts{globalThreads - 1, 1}; + } + }; }; diff --git a/ydb/library/actors/queues/bench/main.cpp b/ydb/library/actors/queues/bench/main.cpp index 11e543dbd15f..d05e5c4dfa24 100644 --- a/ydb/library/actors/queues/bench/main.cpp +++ b/ydb/library/actors/queues/bench/main.cpp @@ -1,5 +1,8 @@ #include "queue.h" #include "worker.h" +#include "bench_cases.h" +#include "queue_tracer.h" +#include "probes.h" #include #include @@ -13,28 +16,167 @@ #include +using TMonSrvc = NMonitoring::TMonService2; +using namespace NActors; +using namespace NActors::NQueueBench; + +void InitMonService(THolder& monSrvc, int monPort) +{ + monSrvc.Reset(new TMonSrvc(monPort)); + NLwTraceMonPage::RegisterPages(monSrvc->GetRoot()); + NLwTraceMonPage::ProbeRegistry().AddProbesList(LWTRACE_GET_PROBES(BENCH_TRACING_PROVIDER)); +} + +using TTracer = NTracing::TMPMCRingQueueBadPathTracer; +template +using TCasesWithTracer = TBenchCasesWithDurationAndThreads, TAdaptiveQueue>; +using ICaseWithCollector = IBenchCaseWithDurationAndThreads; + + + +using TDegradator = NTracing::TMPMCRingQueueDegradatorAndTracer<1024, 1, 60'000'000>; + +template <> +thread_local ui64 TDegradator::TDegradator::SkipSteps = 0; +template <> +std::atomic_uint64_t TDegradator::TDegradator::InFlight = 0; + +template +using TCasesWithDegradator = TBenchCasesWithDurationAndThreads, TAdaptiveQueue>; + + +THashMap Tests { + {"Basic", static_cast(new TCasesWithTracer<20>::TBasicPushPop)}, + {"Producer1Consumer1", static_cast(new TCasesWithTracer<20>::TBasicProducingConsuming)}, + {"Producer1Consumer2", static_cast(new TCasesWithTracer<20>::TBasicProducingConsuming)}, + {"Producer2Consumer1", static_cast(new TCasesWithTracer<20>::TBasicProducingConsuming)}, + {"SingleProducer", static_cast(new TCasesWithTracer<20>::TSingleProducer)}, + {"SingleConsumer", static_cast(new TCasesWithTracer<20>::TSingleConsumer)}, +}; + +THashMap TestsWithSleep1Us { + {"Basic", static_cast(new TCasesWithTracer<20>::TBasicPushPop)}, + {"Producer1Consumer1", static_cast(new TCasesWithTracer<20>::TBasicProducingConsuming)}, + {"Producer1Consumer2", static_cast(new TCasesWithTracer<20>::TBasicProducingConsuming)}, + {"Producer2Consumer1", static_cast(new TCasesWithTracer<20>::TBasicProducingConsuming)}, + {"SingleProducer", static_cast(new TCasesWithTracer<20>::TSingleProducer)}, + {"SingleConsumer", static_cast(new TCasesWithTracer<20>::TSingleConsumer)}, +}; + +THashMap TestsWithBlockedThread { + {"Basic", static_cast(new TCasesWithDegradator<20>::TBasicPushPop)}, + {"Producer1Consumer1", static_cast(new TCasesWithDegradator<20>::TBasicProducingConsuming)}, + {"Producer1Consumer2", static_cast(new TCasesWithDegradator<20>::TBasicProducingConsuming)}, + {"Producer2Consumer1", static_cast(new TCasesWithDegradator<20>::TBasicProducingConsuming)}, + {"SingleProducer", static_cast(new TCasesWithDegradator<20>::TSingleProducer)}, + {"SingleConsumer", static_cast(new TCasesWithDegradator<20>::TSingleConsumer)}, +}; + +THashMap TestsWithSleep1UsAndBlockedThread { + {"Basic", static_cast(new TCasesWithDegradator<20>::TBasicPushPop)}, + {"Producer1Consumer1", static_cast(new TCasesWithDegradator<20>::TBasicProducingConsuming)}, + {"Producer1Consumer2", static_cast(new TCasesWithDegradator<20>::TBasicProducingConsuming)}, + {"Producer2Consumer1", static_cast(new TCasesWithDegradator<20>::TBasicProducingConsuming)}, + {"SingleProducer", static_cast(new TCasesWithDegradator<20>::TSingleProducer)}, + {"SingleConsumer", static_cast(new TCasesWithDegradator<20>::TSingleConsumer)}, +}; + + int main(int argc, char* argv[]) { //NLWTrace::StartLwtraceFromEnv(); //signal(SIGPIPE, SIG_IGN); - TString configPath; + TString testName; + int testDurationS = 600; int monPort = 7777; - int lwtraceThreadLogSize = 60000; - bool disableLWTrace = false; + int lwtraceThreadLogSize = 1'000'000; + int threadCount = 2; + bool shortOutput = false; + bool sleep1us = false; + bool blockThread = false; NLastGetopt::TOpts opts = NLastGetopt::TOpts::Default(); opts.AddLongOption(0, "mon-port", "port of monitoring service") .RequiredArgument("port") .StoreResult(&monPort, monPort); - opts.AddLongOption('f', "file", "filepath to config") + opts.AddLongOption('n', "name", "test name") .Required() - .RequiredArgument("file") - .StoreResult(&configPath, configPath); + .RequiredArgument("testname") + .StoreResult(&testName, testName); + opts.AddLongOption('d', "duration", "test duration") + .RequiredArgument("seconds") + .StoreResult(&testDurationS, testDurationS); + opts.AddLongOption('t', "threads", "threads in the test") + .RequiredArgument("thread-count") + .StoreResult(&threadCount, threadCount); opts.AddLongOption("lwtrace-thread-log-size", "thread log size") .RequiredArgument("size") .StoreResult(&lwtraceThreadLogSize, lwtraceThreadLogSize); - opts.AddLongOption('L', "disable-lwtrace", "disable lwtrace") + opts.AddLongOption("short-output", "reduce output") + .NoArgument() + .SetFlag(&shortOutput); + opts.AddLongOption("sleep1us", "sleep 1us instead of spin-lock-pause") + .NoArgument() + .SetFlag(&sleep1us); + opts.AddLongOption("block-thread", "every time one thread will sleep 1 minute") .NoArgument() - .SetFlag(&disableLWTrace); + .SetFlag(&blockThread); NLastGetopt::TOptsParseResult res(&opts, argc, argv); + THolder monSrvc; + InitMonService(monSrvc, monPort); + monSrvc->Start(); + NLWTrace::TManager* traceMngr = &NLwTraceMonPage::TraceManager(); + + // init query lwtrace + auto query = NLWTrace::TQuery(); + query.SetPerThreadLogSize(lwtraceThreadLogSize); // s -> ms + auto& block = *query.AddBlocks(); + auto& probeDesc = *block.MutableProbeDesc(); + probeDesc.SetGroup("BenchTracing"); + auto action = block.AddAction(); + action->MutableLogAction(); + + // init query threadpools stats + auto queueStats = NLWTrace::TQuery(); + { + queueStats.SetPerThreadLogSize(lwtraceThreadLogSize); + auto& block = *queueStats.AddBlocks(); + auto& probeDesc = *block.MutableProbeDesc(); + probeDesc.SetGroup("ThreadPoolStats"); + auto action = block.AddAction(); + action->MutableLogAction(); + } + + auto *tests = &Tests; + if (blockThread && sleep1us) { + tests = &TestsWithSleep1UsAndBlockedThread; + } else if (blockThread) { + tests = &TestsWithBlockedThread; + } else if (sleep1us) { + tests = &TestsWithSleep1Us; + } + + auto it = tests->find(testName); + if (it == tests->end()) { + Cerr << "Unknown test\n"; + return 1; + } + TString error = it->second->Validate(threadCount); + if (error) { + Cerr << "Error: " << error << Endl; + return 1; + } + + traceMngr->New(testName, query); + auto stats = it->second->Run(TDuration::Seconds(testDurationS), threadCount); + traceMngr->Stop(testName); + auto threads = it->second->GetThreads(threadCount); + std::visit([&](auto threads) { + if constexpr (std::is_same_v>) { + stats.Print(testDurationS, threads, shortOutput); + } else { + stats.Print(testDurationS, threads.ProducerThreads, threads.ConsumerThreads, shortOutput); + } + }, threads); + return 0; } \ No newline at end of file diff --git a/ydb/library/actors/queues/bench/probes.cpp b/ydb/library/actors/queues/bench/probes.cpp new file mode 100644 index 000000000000..13e24ca8fd16 --- /dev/null +++ b/ydb/library/actors/queues/bench/probes.cpp @@ -0,0 +1,6 @@ +#include "probes.h" + + +#include + +LWTRACE_DEFINE_PROVIDER(BENCH_TRACING_PROVIDER); diff --git a/ydb/library/actors/queues/bench/probes.h b/ydb/library/actors/queues/bench/probes.h new file mode 100644 index 000000000000..895c299fca7d --- /dev/null +++ b/ydb/library/actors/queues/bench/probes.h @@ -0,0 +1,19 @@ +#pragma once + +#include +#include + + +#define BENCH_TRACING_PROVIDER(PROBE, EVENT, GROUPS, TYPES, NAMES) \ + PROBE(FoundOldSlot, GROUPS("BenchTracing"), \ + TYPES(TString), \ + NAMES("method")) \ + PROBE(FailedOperation, GROUPS("BenchTracing"), \ + TYPES(TString), \ + NAMES("method")) \ + PROBE(LongOperation, GROUPS("BenchTracing"), \ + TYPES(TString), \ + NAMES("method")) \ +// BENCH_TRACING_PROVIDER + +LWTRACE_DECLARE_PROVIDER(BENCH_TRACING_PROVIDER) diff --git a/ydb/library/actors/queues/bench/queue.h b/ydb/library/actors/queues/bench/queue.h index afc6ead6af98..d8fb2f34d93f 100644 --- a/ydb/library/actors/queues/bench/queue.h +++ b/ydb/library/actors/queues/bench/queue.h @@ -3,6 +3,7 @@ #include "defs.h" #include +#include namespace NActors::NQueueBench { @@ -13,16 +14,16 @@ namespace NActors::NQueueBench { virtual std::optional TryPop() = 0; }; - template - using TTryPush = bool (TMPMCRingQueue::*)(ui32 value); - template - using TTryPop = std::optional (TMPMCRingQueue::*)(); + template + using TTryPush = bool (TMPMCRingQueue::*)(ui32 value); + template + using TTryPop = std::optional (TMPMCRingQueue::*)(); - template TryPushMethod, TTryPop TryPopMethod> + template TryPushMethod, TTryPop TryPopMethod> struct TMPMCQueueBase : IQueue { - TMPMCRingQueue *Queue; + TMPMCRingQueue *Queue; - TMPMCQueueBase(TMPMCRingQueue *queue) + TMPMCQueueBase(TMPMCRingQueue *queue) : Queue(queue) {} @@ -34,27 +35,27 @@ namespace NActors::NQueueBench { } }; - template - using TVerySlowQueue = TMPMCQueueBase::TryPushSlow, &TMPMCRingQueue::TryPopReallySlow>; + template + using TVerySlowQueue = TMPMCQueueBase::TryPushSlow, &TMPMCRingQueue::TryPopReallySlow>; - template - using TSlowQueue = TMPMCQueueBase::TryPushSlow, &TMPMCRingQueue::TryPopSlow>; + template + using TSlowQueue = TMPMCQueueBase::TryPushSlow, &TMPMCRingQueue::TryPopSlow>; - template - using TFastQueue = TMPMCQueueBase::TryPush, &TMPMCRingQueue::TryPopFast>; + template + using TFastQueue = TMPMCQueueBase::TryPush, &TMPMCRingQueue::TryPopFast>; - template - using TVeryFastQueue = TMPMCQueueBase::TryPush, &TMPMCRingQueue::TryPopReallyFast>; + template + using TVeryFastQueue = TMPMCQueueBase::TryPush, &TMPMCRingQueue::TryPopReallyFast>; - template - using TSingleQueue = TMPMCQueueBase::TryPush, &TMPMCRingQueue::TryPopSingleConsumer>; + template + using TSingleQueue = TMPMCQueueBase::TryPush, &TMPMCRingQueue::TryPopSingleConsumer>; - template + template struct TAdaptiveQueue : IQueue { - TMPMCRingQueue *Queue; - typename TMPMCRingQueue::EPopMode State = TMPMCRingQueue::EPopMode::ReallySlow; + TMPMCRingQueue *Queue; + typename TMPMCRingQueue::EPopMode State = TMPMCRingQueue::EPopMode::ReallySlow; - TAdaptiveQueue(TMPMCRingQueue *queue) + TAdaptiveQueue(TMPMCRingQueue *queue) : Queue(queue) {} @@ -67,4 +68,14 @@ namespace NActors::NQueueBench { }; + template + using TMPMCRingQueueWithStats = TMPMCRingQueue; + + template