Skip to content

Commit

Permalink
Merge b0c51e3 into da209ec
Browse files Browse the repository at this point in the history
  • Loading branch information
serbel324 authored May 16, 2024
2 parents da209ec + b0c51e3 commit 7f5c23b
Showing 1 changed file with 72 additions and 52 deletions.
124 changes: 72 additions & 52 deletions ydb/core/jaeger_tracing/throttler_ut.cpp
Original file line number Diff line number Diff line change
@@ -1,6 +1,9 @@
#include "throttler.h"
#include "util/generic/scope.h"
#include "util/system/thread.h"

#include <util/generic/scope.h>
#include <util/system/thread.h>

#include <library/cpp/time_provider/time_provider.h>

#include <library/cpp/testing/unittest/registar.h>

Expand Down Expand Up @@ -87,69 +90,86 @@ Y_UNIT_TEST_SUITE(ThrottlerControlTests) {
CheckAtLeast(throttler, 1);
}

Y_UNIT_TEST(MultiThreaded) {
auto check = []<size_t Threads, ui64 Ticks, ui64 Init, ui64 Step>() {
constexpr TDuration delay = TDuration::MilliSeconds(1);
auto timeProvider = MakeIntrusive<TTimeProviderMock>(TInstant::Now());
void TestMultiThreaded(ui32 threads, ui64 ticks, ui64 init, ui64 step) {
constexpr std::array<TDuration, 4> delays = {
TDuration::MilliSeconds(1),
TDuration::MilliSeconds(10),
TDuration::MilliSeconds(100),
TDuration::Seconds(1)
};

TThrottler throttler(60, Init - 1, timeProvider);
auto timeProvider = MakeIntrusive<TTimeProviderMock>(TInstant::Now());

auto shouldStop = std::make_shared<std::atomic<bool>>(false);
TVector<THolder<TThread>> workers;
Y_SCOPE_EXIT(shouldStop, &workers) {
shouldStop->store(true);
TThrottler throttler(60, init - 1, timeProvider);

try {
for (auto& worker : workers) {
worker->Join();
}
} catch (yexception& e) {
Cerr << "Failed to join worker:" << Endl;
Cerr << e.what() << Endl;
auto shouldStop = std::make_shared<std::atomic<bool>>(false);
TVector<THolder<TThread>> workers;
Y_SCOPE_EXIT(shouldStop, &workers) {
shouldStop->store(true);

try {
for (auto& worker : workers) {
worker->Join();
}
};

std::atomic<ui64> totalConsumed{0};
workers.reserve(Threads);
for (size_t i = 0; i < Threads; ++i) {
workers.push_back(MakeHolder<TThread>([&, shouldStop]() {
while (!shouldStop->load(std::memory_order_relaxed)) {
if (!throttler.Throttle()) {
totalConsumed.fetch_add(1);
}
}
}));
} catch (yexception& e) {
Cerr << "Failed to join worker:" << Endl;
Cerr << e.what() << Endl;
}
for (auto& worker : workers) {
worker->Start();
}

Sleep(delay);
ui64 expected = Init;
UNIT_ASSERT_EQUAL(totalConsumed.load(), expected);

auto advance = [&, delay](ui64 seconds, ui64 expectedIncreace) {
timeProvider->Advance(TDuration::Seconds(seconds));
expected += expectedIncreace;
Sleep(delay);
UNIT_ASSERT_EQUAL(totalConsumed.load(), expected);
};
};

advance(1, 1);
std::atomic<ui64> totalConsumed{0};
workers.reserve(threads);
for (size_t i = 0; i < threads; ++i) {
workers.push_back(MakeHolder<TThread>([&]() {
while (!shouldStop->load(std::memory_order_relaxed)) {
if (!throttler.Throttle()) {
totalConsumed.fetch_add(1);
}
}
}));
}
for (auto& worker : workers) {
worker->Start();
}

for (size_t i = 0; i < Ticks; ++i) {
advance(Step, Step);
auto waitForIncrease = [&](ui64 expected) -> bool {
for (const TDuration& delay : delays) {
Sleep(delay);
UNIT_ASSERT_EQUAL(totalConsumed.load(), expected);
if (totalConsumed.load() == expected) {
return true;
}
}
return false;
};

ui64 expected = init;
UNIT_ASSERT(waitForIncrease(expected));

advance(Init + 1000, Init);
auto advance = [&](ui64 seconds, ui64 expectedIncrease) {
timeProvider->Advance(TDuration::Seconds(seconds));
expected += expectedIncrease;
UNIT_ASSERT(waitForIncrease(expected));
};

check.operator()<2, 200, 30, 7>();
check.operator()<5, 150, 500, 15>();
check.operator()<10, 100, 1000, 22>();
advance(1, 1);

for (size_t i = 0; i < ticks; ++i) {
advance(step, step);
}

advance(init + 1000, init);
}

#define TEST_MULTI_THREADED(threads, ticks, init, step) \
Y_UNIT_TEST(MultiThreaded##threads##Threads##ticks##Ticks##init##Init##step##Step) { \
TestMultiThreaded(threads, ticks, init, step); \
}

TEST_MULTI_THREADED(2, 200, 30, 7);
TEST_MULTI_THREADED(5, 150, 500, 15);
TEST_MULTI_THREADED(10, 100, 1000, 22);

#undef TEST_MULTI_THREADED
}

} // namespace NKikimr::NJaegerTracing

0 comments on commit 7f5c23b

Please sign in to comment.