From b0c51e324a1f3bee348fb203d2ea87c736df7e48 Mon Sep 17 00:00:00 2001 From: Sergey Belyakov Date: Thu, 16 May 2024 05:25:17 +0000 Subject: [PATCH] Fix MultiThreaded throttling test --- ydb/core/jaeger_tracing/throttler_ut.cpp | 124 +++++++++++++---------- 1 file changed, 72 insertions(+), 52 deletions(-) diff --git a/ydb/core/jaeger_tracing/throttler_ut.cpp b/ydb/core/jaeger_tracing/throttler_ut.cpp index 7caeaaa3faff..aed397f09411 100644 --- a/ydb/core/jaeger_tracing/throttler_ut.cpp +++ b/ydb/core/jaeger_tracing/throttler_ut.cpp @@ -1,6 +1,9 @@ #include "throttler.h" -#include "util/generic/scope.h" -#include "util/system/thread.h" + +#include +#include + +#include #include @@ -87,69 +90,86 @@ Y_UNIT_TEST_SUITE(ThrottlerControlTests) { CheckAtLeast(throttler, 1); } - Y_UNIT_TEST(MultiThreaded) { - auto check = []() { - constexpr TDuration delay = TDuration::MilliSeconds(1); - auto timeProvider = MakeIntrusive(TInstant::Now()); + void TestMultiThreaded(ui32 threads, ui64 ticks, ui64 init, ui64 step) { + constexpr std::array delays = { + TDuration::MilliSeconds(1), + TDuration::MilliSeconds(10), + TDuration::MilliSeconds(100), + TDuration::Seconds(1) + }; - TThrottler throttler(60, Init - 1, timeProvider); + auto timeProvider = MakeIntrusive(TInstant::Now()); - auto shouldStop = std::make_shared>(false); - TVector> workers; - Y_SCOPE_EXIT(shouldStop, &workers) { - shouldStop->store(true); + TThrottler throttler(60, init - 1, timeProvider); - try { - for (auto& worker : workers) { - worker->Join(); - } - } catch (yexception& e) { - Cerr << "Failed to join worker:" << Endl; - Cerr << e.what() << Endl; + auto shouldStop = std::make_shared>(false); + TVector> workers; + Y_SCOPE_EXIT(shouldStop, &workers) { + shouldStop->store(true); + + try { + for (auto& worker : workers) { + worker->Join(); } - }; - - std::atomic totalConsumed{0}; - workers.reserve(Threads); - for (size_t i = 0; i < Threads; ++i) { - workers.push_back(MakeHolder([&, shouldStop]() { - while (!shouldStop->load(std::memory_order_relaxed)) { - if (!throttler.Throttle()) { - totalConsumed.fetch_add(1); - } - } - })); + } catch (yexception& e) { + Cerr << "Failed to join worker:" << Endl; + Cerr << e.what() << Endl; } - for (auto& worker : workers) { - worker->Start(); - } - - Sleep(delay); - ui64 expected = Init; - UNIT_ASSERT_EQUAL(totalConsumed.load(), expected); - - auto advance = [&, delay](ui64 seconds, ui64 expectedIncreace) { - timeProvider->Advance(TDuration::Seconds(seconds)); - expected += expectedIncreace; - Sleep(delay); - UNIT_ASSERT_EQUAL(totalConsumed.load(), expected); - }; + }; - advance(1, 1); + std::atomic totalConsumed{0}; + workers.reserve(threads); + for (size_t i = 0; i < threads; ++i) { + workers.push_back(MakeHolder([&]() { + while (!shouldStop->load(std::memory_order_relaxed)) { + if (!throttler.Throttle()) { + totalConsumed.fetch_add(1); + } + } + })); + } + for (auto& worker : workers) { + worker->Start(); + } - for (size_t i = 0; i < Ticks; ++i) { - advance(Step, Step); + auto waitForIncrease = [&](ui64 expected) -> bool { + for (const TDuration& delay : delays) { Sleep(delay); - UNIT_ASSERT_EQUAL(totalConsumed.load(), expected); + if (totalConsumed.load() == expected) { + return true; + } } + return false; + }; + + ui64 expected = init; + UNIT_ASSERT(waitForIncrease(expected)); - advance(Init + 1000, Init); + auto advance = [&](ui64 seconds, ui64 expectedIncrease) { + timeProvider->Advance(TDuration::Seconds(seconds)); + expected += expectedIncrease; + UNIT_ASSERT(waitForIncrease(expected)); }; - check.operator()<2, 200, 30, 7>(); - check.operator()<5, 150, 500, 15>(); - check.operator()<10, 100, 1000, 22>(); + advance(1, 1); + + for (size_t i = 0; i < ticks; ++i) { + advance(step, step); + } + + advance(init + 1000, init); + } + + #define TEST_MULTI_THREADED(threads, ticks, init, step) \ + Y_UNIT_TEST(MultiThreaded##threads##Threads##ticks##Ticks##init##Init##step##Step) { \ + TestMultiThreaded(threads, ticks, init, step); \ } + + TEST_MULTI_THREADED(2, 200, 30, 7); + TEST_MULTI_THREADED(5, 150, 500, 15); + TEST_MULTI_THREADED(10, 100, 1000, 22); + + #undef TEST_MULTI_THREADED } } // namespace NKikimr::NJaegerTracing