Skip to content

Commit

Permalink
Merge 6117a84 into cea9d83
Browse files Browse the repository at this point in the history
  • Loading branch information
serbel324 authored Sep 6, 2024
2 parents cea9d83 + 6117a84 commit 025bf18
Show file tree
Hide file tree
Showing 8 changed files with 236 additions and 12 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -47,11 +47,11 @@ TBsCostTracker::TBsCostTracker(const TBlobStorageGroupType& groupType, NPDisk::E
: GroupType(groupType)
, CostCounters(counters->GetSubgroup("subsystem", "advancedCost"))
, MonGroup(std::make_shared<NMonGroup::TCostTrackerGroup>(CostCounters))
, Bucket(&DiskTimeAvailable, &BucketCapacity, nullptr, nullptr, nullptr, nullptr, true)
, Bucket(BucketUpperLimit, BucketLowerLimit, DiskTimeAvailable)
, BurstThresholdNs(costMetricsParameters.BurstThresholdNs)
, DiskTimeAvailableScale(costMetricsParameters.DiskTimeAvailableScale)
{
AtomicSet(BucketCapacity, GetDiskTimeAvailableScale() * BurstThresholdNs);
BucketUpperLimit.store(BurstThresholdNs * GetDiskTimeAvailableScale());
BurstDetector.Initialize(CostCounters, "BurstDetector");
switch (GroupType.GetErasure()) {
case TBlobStorageGroupType::ErasureMirror3dc:
Expand Down
24 changes: 14 additions & 10 deletions ydb/core/blobstorage/vdisk/common/blobstorage_cost_tracker.h
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@
#include "vdisk_performance_params.h"

#include <library/cpp/bucket_quoter/bucket_quoter.h>
#include <ydb/library/lockfree_bucket/lockfree_bucket.h>
#include <util/system/compiler.h>
#include <ydb/core/base/blobstorage.h>
#include <ydb/core/blobstorage/base/blobstorage_events.h>
Expand Down Expand Up @@ -315,12 +316,14 @@ class TBsCostTracker {
TIntrusivePtr<::NMonitoring::TDynamicCounters> CostCounters;
std::shared_ptr<NMonGroup::TCostTrackerGroup> MonGroup;

TAtomic BucketCapacity = 1'000'000'000; // 10^9 nsec
TAtomic DiskTimeAvailable = 1'000'000'000;
TBucketQuoter<i64, TSpinLock, TAppDataTimerMs<TInstantTimerMs>> Bucket;
const double BucketRelativeMinimum = 2;
std::atomic<i64> BucketUpperLimit = 1'000'000'000; // 10^9 nsec
std::atomic<i64> BucketLowerLimit = 1'000'000'000 * -BucketRelativeMinimum;
std::atomic<ui64> DiskTimeAvailable = 1'000'000'000;

TLockFreeBucket<TAppDataTimerMs<TInstantTimerMs>> Bucket;
TLight BurstDetector;
std::atomic<ui64> SeqnoBurstDetector = 0;
static constexpr ui32 ConcurrentHugeRequestsAllowed = 3;

TMemorizableControlWrapper BurstThresholdNs;
TMemorizableControlWrapper DiskTimeAvailableScale;
Expand Down Expand Up @@ -349,15 +352,16 @@ class TBsCostTracker {
}

void CountRequest(ui64 cost) {
AtomicSet(BucketCapacity, GetDiskTimeAvailableScale() * BurstThresholdNs.Update(TAppData::TimeProvider->Now()));
Bucket.UseAndFill(cost);
BurstDetector.Set(!Bucket.IsAvail(), SeqnoBurstDetector.fetch_add(1));
i64 bucketCapacity = GetDiskTimeAvailableScale() * BurstThresholdNs.Update(TAppData::TimeProvider->Now());
BucketUpperLimit.store(bucketCapacity);
BucketLowerLimit.store(bucketCapacity * -BucketRelativeMinimum);
Bucket.FillAndTake(cost);
BurstDetector.Set(Bucket.IsEmpty(), SeqnoBurstDetector.fetch_add(1));
}

void SetTimeAvailable(ui64 diskTimeAvailableNSec) {
ui64 diskTimeAvailable = diskTimeAvailableNSec * GetDiskTimeAvailableScale();

AtomicSet(DiskTimeAvailable, diskTimeAvailable);
DiskTimeAvailable.store(diskTimeAvailable);
MonGroup->DiskTimeAvailableCtr() = diskTimeAvailable;
}

Expand Down Expand Up @@ -408,7 +412,7 @@ class TBsCostTracker {
}

void CountPDiskResponse() {
BurstDetector.Set(!Bucket.IsAvail(), SeqnoBurstDetector.fetch_add(1));
BurstDetector.Set(Bucket.IsEmpty(), SeqnoBurstDetector.fetch_add(1));
}

private:
Expand Down
1 change: 1 addition & 0 deletions ydb/library/lockfree_bucket/lockfree_bucket.cpp
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
#include "lockfree_bucket.h"
67 changes: 67 additions & 0 deletions ydb/library/lockfree_bucket/lockfree_bucket.h
Original file line number Diff line number Diff line change
@@ -0,0 +1,67 @@
#pragma once

#include <atomic>
#include <limits>

#include <library/cpp/bucket_quoter/timers.h>
#include <util/datetime/base.h>

template<class TTimer>
class TLockFreeBucket {
public:
TLockFreeBucket(std::atomic<i64>& maxTokens, std::atomic<i64>& minTokens, std::atomic<ui64>& inflowPerSecond)
: MaxTokens(maxTokens)
, MinTokens(minTokens)
, InflowPerSecond(inflowPerSecond)
, Tokens(maxTokens.load())
{
Y_DEBUG_ABORT_UNLESS(maxTokens > 0);
Y_DEBUG_ABORT_UNLESS(minTokens < 0);
}

bool IsEmpty() {
FillBucket();
return Tokens.load() <= 0;
}

void FillAndTake(i64 tokens) {
FillBucket();
TakeTokens(tokens);
}

private:
void FillBucket() {
TTime prev;
TTime now;
for (prev = LastUpdate.load(), now = TTimer::Now(); !LastUpdate.compare_exchange_strong(prev, now); ) {}

ui64 rawInflow = InflowPerSecond.load() * TTimer::Duration(prev, now);
if (rawInflow >= TTimer::Resolution) {
Tokens.fetch_add(rawInflow / TTimer::Resolution);
for (i64 tokens = Tokens.load(), maxTokens = MaxTokens.load(); tokens > maxTokens; ) {
if (Tokens.compare_exchange_strong(tokens, maxTokens)) {
break;
}
}
}
}

void TakeTokens(i64 tokens) {
Tokens.fetch_sub(tokens);
for (i64 tokens = Tokens.load(), minTokens = MinTokens.load(); tokens < minTokens; ) {
if (Tokens.compare_exchange_strong(tokens, minTokens)) {
break;
}
}
}

private:
using TTime = typename TTimer::TTime;

std::atomic<i64>& MaxTokens;
std::atomic<i64>& MinTokens;
std::atomic<ui64>& InflowPerSecond;

std::atomic<i64> Tokens;
std::atomic<TTime> LastUpdate;
};
128 changes: 128 additions & 0 deletions ydb/library/lockfree_bucket/ut/main.cpp
Original file line number Diff line number Diff line change
@@ -0,0 +1,128 @@
#include <library/cpp/testing/unittest/registar.h>
#include <library/cpp/lockfree_bucket/lockfree_bucket.h>
#include <util/system/guard.h>
#include <util/system/spinlock.h>
#include <util/system/types.h>

#include <thread>

struct TTestTimerMs {
using TTime = TInstant;
static constexpr ui64 Resolution = 1000ull; // milliseconds

static TTime Now() {
return TInstant::Zero() + TDuration::MilliSeconds(Time.load());
}

static ui64 Duration(TTime from, TTime to) {
return (to - from).MilliSeconds();
}

static std::atomic<ui64> Time;

static void Reset() {
Time.store(0);
}

static void Advance(TDuration delta) {
Time.fetch_add(delta.MilliSeconds());
}
};

std::atomic<ui64> TTestTimerMs::Time = {};

Y_UNIT_TEST_SUITE(TLockFreeBucket) {
struct TTestContext {
TTestContext() {
MaxTokens.store(1'000'000);
MinTokens.store(-1'000'000);
Inflow.store(1'000'000);
TTestTimerMs::Reset();
}

template<class TCallback>
void Initialize(TCallback callback, ui32 threadCount) {
for (ui32 i = 0; i < threadCount; ++i) {
Threads.emplace_back(callback);
}
}

~TTestContext() {
JoinAll();
}

void JoinAll() {
for (std::thread& t : Threads) {
t.join();
}
Threads.clear();
}

std::atomic<i64> MaxTokens;
std::atomic<i64> MinTokens;
std::atomic<ui64> Inflow;
std::vector<std::thread> Threads;
};

void TestLowerLimit(ui32 threadCount) {
TTestContext ctx;
TLockFreeBucket<TTestTimerMs> bucket(ctx.MaxTokens, ctx.MinTokens, ctx.Inflow);

auto worker = [&]() {
for (ui32 i = 0; i < 100; ++i) {
TTestTimerMs::Advance(TDuration::MilliSeconds(10));
bucket.FillAndTake(123'456);
}
};

ctx.Initialize(worker, threadCount);
ctx.JoinAll();

TTestTimerMs::Advance(TDuration::Seconds(1));
TTestTimerMs::Advance(TDuration::MilliSeconds(1));

UNIT_ASSERT(!bucket.IsEmpty());
}

void TestUpperLimit(ui32 tokensTaken, bool isEmpty, ui32 threadCount) {
TTestContext ctx;
TLockFreeBucket<TTestTimerMs> bucket(ctx.MaxTokens, ctx.MinTokens, ctx.Inflow);
TTestTimerMs::Advance(TDuration::Seconds(100500));

auto worker = [&]() {
for (ui32 i = 0; i < 100; ++i) {
TTestTimerMs::Advance(TDuration::MilliSeconds(10));
bucket.FillAndTake(tokensTaken);
}
};

ctx.Initialize(worker, threadCount);
ctx.JoinAll();

UNIT_ASSERT_VALUES_EQUAL(bucket.IsEmpty(), isEmpty);
}

Y_UNIT_TEST(LowerLimitSingleThreaded) {
TestLowerLimit(1);
}

Y_UNIT_TEST(LowerLimitMultiThreaded) {
TestLowerLimit(20);
}

Y_UNIT_TEST(UpperLimitSingleThreaded) {
TestUpperLimit(123'456, true, 1);
}

Y_UNIT_TEST(UpperLimitMultiThreaded) {
TestUpperLimit(123'456, true, 20);
}

Y_UNIT_TEST(LowIntakeSingleThreaded) {
TestUpperLimit(1, false, 1);
}

Y_UNIT_TEST(LowIntakeMultiThreaded) {
TestUpperLimit(1, false, 20);
}
}
11 changes: 11 additions & 0 deletions ydb/library/lockfree_bucket/ut/ya.make
Original file line number Diff line number Diff line change
@@ -0,0 +1,11 @@
UNITTEST()

FORK_SUBTESTS()
SRCS(
main.cpp
)
PEERDIR(
library/cpp/lockfree_bucket
)

END()
12 changes: 12 additions & 0 deletions ydb/library/lockfree_bucket/ya.make
Original file line number Diff line number Diff line change
@@ -0,0 +1,12 @@
LIBRARY()


SRCS(
lockfree_bucket.cpp
)

END()

RECURSE_FOR_TESTS(
ut
)
1 change: 1 addition & 0 deletions ydb/library/ya.make
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@ RECURSE(
grpc
http_proxy
keys
lockfree_bucket
logger
login
mkql_proto
Expand Down

0 comments on commit 025bf18

Please sign in to comment.