Skip to content

Commit

Permalink
Use lock-free Bucket in CostTracker with limited underflow
Browse files Browse the repository at this point in the history
  • Loading branch information
serbel324 committed Sep 2, 2024
1 parent 93089ee commit d97b9f8
Show file tree
Hide file tree
Showing 9 changed files with 286 additions and 44 deletions.
33 changes: 2 additions & 31 deletions library/cpp/bucket_quoter/bucket_quoter.h
Original file line number Diff line number Diff line change
Expand Up @@ -2,9 +2,9 @@

#include <library/cpp/deprecated/atomic/atomic.h>

#include <util/datetime/base.h>
#include <util/system/mutex.h>
#include <util/system/hp_timer.h>

#include "timers.h"

/* Token bucket.
* Makes flow of *inflow* units per second in average, with up to *capacity* bursts.
Expand Down Expand Up @@ -41,35 +41,6 @@
*/

struct TInstantTimerMs {
using TTime = TInstant;
static constexpr ui64 Resolution = 1000ull; // milliseconds
static TTime Now() {
return TInstant::Now();
}
static ui64 Duration(TTime from, TTime to) {
return (to - from).MilliSeconds();
}
};

struct THPTimerUs {
using TTime = NHPTimer::STime;
static constexpr ui64 Resolution = 1000000ull; // microseconds
static TTime Now() {
NHPTimer::STime ret;
NHPTimer::GetTime(&ret);
return ret;
}
static ui64 Duration(TTime from, TTime to) {
i64 cycles = to - from;
if (cycles > 0) {
return ui64(double(cycles) * double(Resolution) / NHPTimer::GetClockRate());
} else {
return 0;
}
}
};

template <typename StatCounter, typename Lock = TMutex, typename Timer = TInstantTimerMs>
class TBucketQuoter {
public:
Expand Down
33 changes: 33 additions & 0 deletions library/cpp/bucket_quoter/timers.h
Original file line number Diff line number Diff line change
@@ -0,0 +1,33 @@
#pragma once

#include <util/datetime/base.h>
#include <util/system/hp_timer.h>

struct TInstantTimerMs {
using TTime = TInstant;
static constexpr ui64 Resolution = 1000ull; // milliseconds
static TTime Now() {
return TInstant::Now();
}
static ui64 Duration(TTime from, TTime to) {
return (to - from).MilliSeconds();
}
};

struct THPTimerUs {
using TTime = NHPTimer::STime;
static constexpr ui64 Resolution = 1000000ull; // microseconds
static TTime Now() {
NHPTimer::STime ret;
NHPTimer::GetTime(&ret);
return ret;
}
static ui64 Duration(TTime from, TTime to) {
i64 cycles = to - from;
if (cycles > 0) {
return ui64(double(cycles) * double(Resolution) / NHPTimer::GetClockRate());
} else {
return 0;
}
}
};
1 change: 1 addition & 0 deletions library/cpp/lockfree_bucket/lockfree_bucket.cpp
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
#include "lockfree_bucket.h"
81 changes: 81 additions & 0 deletions library/cpp/lockfree_bucket/lockfree_bucket.h
Original file line number Diff line number Diff line change
@@ -0,0 +1,81 @@
#pragma once

#include <atomic>
#include <limits>

#include <library/cpp/bucket_quoter/timers.h>
#include <util/datetime/base.h>

template<class TTimer = TInstantTimerMs>
class TLockFreeBucket {
public:
TLockFreeBucket(std::atomic<i64>& maxTokens, std::atomic<i64>& minTokens, std::atomic<ui64>& inflowPerSecond)
: MaxTokens(maxTokens)
, MinTokens(minTokens)
, InflowPerSecond(inflowPerSecond)
, Tokens(maxTokens.load())
{
Y_DEBUG_ABORT_UNLESS(maxTokens > 0);
Y_DEBUG_ABORT_UNLESS(minTokens < 0);
}

bool IsEmpty() {
FillBucket();
return Tokens.load() <= 0;
}

void FillAndTake(i64 tokens) {
FillBucket();
TakeTokens(tokens);
}

void SetMaxTokens(i64 tokens) {
Y_DEBUG_ABORT_UNLESS(tokens > 0);
MaxTokens.store(tokens);
}

void SetMinTokens(i64 tokens) {
Y_DEBUG_ABORT_UNLESS(tokens < 0);
MinTokens.store(tokens);
}

void SetInflow(ui64 tokensPerSecond) {
Y_DEBUG_ABORT_UNLESS(tokensPerSecond >= 0);
}

private:
void FillBucket() {
TTime prev;
TTime now;
for (prev = LastUpdate.load(), now = TTimer::Now(); !LastUpdate.compare_exchange_strong(prev, now); ) {}

ui64 rawInflow = InflowPerSecond.load() * TTimer::Duration(prev, now);
if (rawInflow >= TTimer::Resolution) {
Tokens.fetch_add(rawInflow / TTimer::Resolution);
for (i64 tokens = Tokens.load(), maxTokens = MaxTokens.load(); tokens > maxTokens; ) {
if (Tokens.compare_exchange_strong(tokens, maxTokens)) {
break;
}
}
}
}

void TakeTokens(i64 tokens) {
Tokens.fetch_sub(tokens);
for (i64 tokens = Tokens.load(), minTokens = MinTokens.load(); tokens < minTokens; ) {
if (Tokens.compare_exchange_strong(tokens, minTokens)) {
break;
}
}
}

private:
using TTime = typename TTimer::TTime;

std::atomic<i64>& MaxTokens;
std::atomic<i64>& MinTokens;
std::atomic<ui64>& InflowPerSecond;

std::atomic<i64> Tokens;
std::atomic<TTime> LastUpdate;
};
130 changes: 130 additions & 0 deletions library/cpp/lockfree_bucket/ut/main.cpp
Original file line number Diff line number Diff line change
@@ -0,0 +1,130 @@
#include <library/cpp/testing/unittest/registar.h>
#include <library/cpp/lockfree_bucket/lockfree_bucket.h>
#include <util/system/guard.h>
#include <util/system/spinlock.h>
#include <util/system/types.h>

#include <thread>

struct TTestTimerMs {
using TTime = TInstant;
static constexpr ui64 Resolution = 1000ull; // milliseconds

static TTime Now() {
return TInstant::Zero() + TDuration::MilliSeconds(Time.load());
}

static ui64 Duration(TTime from, TTime to) {
return (to - from).MilliSeconds();
}

static std::atomic<ui64> Time;

static void Reset() {
Time.store(0);
}

static void Advance(TDuration delta) {
Time.fetch_add(delta.MilliSeconds());
}
};

std::atomic<ui64> TTestTimerMs::Time = {};

Y_UNIT_TEST_SUITE(TLockFreeBucket) {
struct TTestContext {

TTestContext() {
MaxTokens.store(1'000'000);
MinTokens.store(-1'000'000);
Inflow.store(1'000'000);
TTestTimerMs::Reset();
}

template<class TCallback>
void Initialize(TCallback callback, ui32 threadCount) {
for (ui32 i = 0; i < threadCount; ++i) {
Threads.emplace_back(callback);
}
}

~TTestContext() {
JoinAll();
}

void JoinAll() {
for (std::thread& t : Threads) {
t.join();
}
Threads.clear();
}

std::atomic<i64> MaxTokens;
std::atomic<i64> MinTokens;
std::atomic<ui64> Inflow;
std::vector<std::thread> Threads;
};

void TestLowerLimit(ui32 threadCount) {
TTestContext ctx;
TLockFreeBucket<TTestTimerMs> bucket(ctx.MaxTokens, ctx.MinTokens, ctx.Inflow);

auto worker = [&]() {
for (ui32 i = 0; i < 100; ++i) {
TTestTimerMs::Advance(TDuration::MilliSeconds(10));
bucket.FillAndTake(123'456);
}
};

ctx.Initialize(worker, threadCount);
ctx.JoinAll();

TTestTimerMs::Advance(TDuration::Seconds(1));
TTestTimerMs::Advance(TDuration::MilliSeconds(1));

UNIT_ASSERT(!bucket.IsEmpty());
}


void TestUpperLimit(ui32 tokensTaken, bool isEmpty, ui32 threadCount) {
TTestContext ctx;
TLockFreeBucket<TTestTimerMs> bucket(ctx.MaxTokens, ctx.MinTokens, ctx.Inflow);
TTestTimerMs::Advance(TDuration::Seconds(100500));

auto worker = [&]() {
for (ui32 i = 0; i < 100; ++i) {
TTestTimerMs::Advance(TDuration::MilliSeconds(10));
bucket.FillAndTake(tokensTaken);
}
};

ctx.Initialize(worker, threadCount);
ctx.JoinAll();

UNIT_ASSERT_VALUES_EQUAL(bucket.IsEmpty(), isEmpty);
}

Y_UNIT_TEST(LowerLimitSingleThreaded) {
TestLowerLimit(1);
}

Y_UNIT_TEST(LowerLimitMultiThreaded) {
TestLowerLimit(20);
}

Y_UNIT_TEST(UpperLimitSingleThreaded) {
TestUpperLimit(123'456, true, 1);
}

Y_UNIT_TEST(UpperLimitMultiThreaded) {
TestUpperLimit(123'456, true, 20);
}

Y_UNIT_TEST(LowIntakeSingleThreaded) {
TestUpperLimit(1, false, 1);
}

Y_UNIT_TEST(LowIntakeMultiThreaded) {
TestUpperLimit(1, false, 20);
}
}
11 changes: 11 additions & 0 deletions library/cpp/lockfree_bucket/ut/ya.make
Original file line number Diff line number Diff line change
@@ -0,0 +1,11 @@
UNITTEST()

FORK_SUBTESTS()
SRCS(
main.cpp
)
PEERDIR(
library/cpp/lockfree_bucket
)

END()
12 changes: 12 additions & 0 deletions library/cpp/lockfree_bucket/ya.make
Original file line number Diff line number Diff line change
@@ -0,0 +1,12 @@
LIBRARY()


SRCS(
lockfree_bucket.cpp
)

END()

RECURSE_FOR_TESTS(
ut
)
Original file line number Diff line number Diff line change
Expand Up @@ -47,11 +47,11 @@ TBsCostTracker::TBsCostTracker(const TBlobStorageGroupType& groupType, NPDisk::E
: GroupType(groupType)
, CostCounters(counters->GetSubgroup("subsystem", "advancedCost"))
, MonGroup(std::make_shared<NMonGroup::TCostTrackerGroup>(CostCounters))
, Bucket(&DiskTimeAvailable, &BucketCapacity, nullptr, nullptr, nullptr, nullptr, true)
, Bucket(BucketUpperLimit, BucketLowerLimit, DiskTimeAvailable)
, BurstThresholdNs(costMetricsParameters.BurstThresholdNs)
, DiskTimeAvailableScale(costMetricsParameters.DiskTimeAvailableScale)
{
AtomicSet(BucketCapacity, GetDiskTimeAvailableScale() * BurstThresholdNs);
BucketUpperLimit.store(BurstThresholdNs * GetDiskTimeAvailableScale());
BurstDetector.Initialize(CostCounters, "BurstDetector");
switch (GroupType.GetErasure()) {
case TBlobStorageGroupType::ErasureMirror3dc:
Expand Down
25 changes: 14 additions & 11 deletions ydb/core/blobstorage/vdisk/common/blobstorage_cost_tracker.h
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,7 @@
#include "vdisk_mongroups.h"
#include "vdisk_performance_params.h"

#include <library/cpp/bucket_quoter/bucket_quoter.h>
#include <library/cpp/lockfree_bucket/lockfree_bucket.h>
#include <util/system/compiler.h>
#include <ydb/core/base/blobstorage.h>
#include <ydb/core/blobstorage/base/blobstorage_events.h>
Expand Down Expand Up @@ -315,12 +315,14 @@ class TBsCostTracker {
TIntrusivePtr<::NMonitoring::TDynamicCounters> CostCounters;
std::shared_ptr<NMonGroup::TCostTrackerGroup> MonGroup;

TAtomic BucketCapacity = 1'000'000'000; // 10^9 nsec
TAtomic DiskTimeAvailable = 1'000'000'000;
TBucketQuoter<i64, TSpinLock, TAppDataTimerMs<TInstantTimerMs>> Bucket;
const double BucketRelativeMinimum = 2;
std::atomic<i64> BucketUpperLimit = 1'000'000'000; // 10^9 nsec
std::atomic<i64> BucketLowerLimit = 1'000'000'000 * -BucketRelativeMinimum;
std::atomic<ui64> DiskTimeAvailable = 1'000'000'000;

TLockFreeBucket<TAppDataTimerMs<TInstantTimerMs>> Bucket;
TLight BurstDetector;
std::atomic<ui64> SeqnoBurstDetector = 0;
static constexpr ui32 ConcurrentHugeRequestsAllowed = 3;

TMemorizableControlWrapper BurstThresholdNs;
TMemorizableControlWrapper DiskTimeAvailableScale;
Expand Down Expand Up @@ -349,15 +351,16 @@ class TBsCostTracker {
}

void CountRequest(ui64 cost) {
AtomicSet(BucketCapacity, GetDiskTimeAvailableScale() * BurstThresholdNs.Update(TAppData::TimeProvider->Now()));
Bucket.UseAndFill(cost);
BurstDetector.Set(!Bucket.IsAvail(), SeqnoBurstDetector.fetch_add(1));
i64 bucketCapacity = GetDiskTimeAvailableScale() * BurstThresholdNs.Update(TAppData::TimeProvider->Now());
BucketUpperLimit.store(bucketCapacity);
BucketLowerLimit.store(bucketCapacity * -BucketRelativeMinimum);
Bucket.FillAndTake(cost);
BurstDetector.Set(Bucket.IsEmpty(), SeqnoBurstDetector.fetch_add(1));
}

void SetTimeAvailable(ui64 diskTimeAvailableNSec) {
ui64 diskTimeAvailable = diskTimeAvailableNSec * GetDiskTimeAvailableScale();

AtomicSet(DiskTimeAvailable, diskTimeAvailable);
DiskTimeAvailable.store(diskTimeAvailable);
MonGroup->DiskTimeAvailableCtr() = diskTimeAvailable;
}

Expand Down Expand Up @@ -408,7 +411,7 @@ class TBsCostTracker {
}

void CountPDiskResponse() {
BurstDetector.Set(!Bucket.IsAvail(), SeqnoBurstDetector.fetch_add(1));
BurstDetector.Set(Bucket.IsEmpty(), SeqnoBurstDetector.fetch_add(1));
}

private:
Expand Down

0 comments on commit d97b9f8

Please sign in to comment.