Skip to content

Commit

Permalink
Fix acceleration in mirror-3-dc groups (#7931)
Browse files Browse the repository at this point in the history
  • Loading branch information
serbel324 authored Sep 16, 2024
1 parent 30d882c commit e7ab6ea
Show file tree
Hide file tree
Showing 25 changed files with 280 additions and 238 deletions.
11 changes: 7 additions & 4 deletions ydb/core/blobstorage/dsproxy/dsproxy.h
Original file line number Diff line number Diff line change
Expand Up @@ -55,9 +55,10 @@ const ui32 MaskSizeBits = 32;
constexpr bool DefaultEnablePutBatching = true;
constexpr bool DefaultEnableVPatch = false;

constexpr float DefaultSlowDiskThreshold = 2;
constexpr float DefaultPredictedDelayMultiplier = 1;
constexpr double DefaultSlowDiskThreshold = 2;
constexpr double DefaultPredictedDelayMultiplier = 1;
constexpr TDuration DefaultLongRequestThreshold = TDuration::Seconds(50);
constexpr ui32 DefaultMaxNumOfSlowDisks = 2;

constexpr bool WithMovingPatchRequestToStaticNode = true;

Expand Down Expand Up @@ -193,8 +194,9 @@ inline void SetExecutionRelay(IEventBase& ev, std::shared_ptr<TEvBlobStorage::TE
}

struct TAccelerationParams {
double SlowDiskThreshold = 2;
double PredictedDelayMultiplier = 1;
double SlowDiskThreshold = DefaultSlowDiskThreshold;
double PredictedDelayMultiplier = DefaultPredictedDelayMultiplier;
ui32 MaxNumOfSlowDisks = DefaultMaxNumOfSlowDisks;
};

class TBlobStorageGroupRequestActor : public TActor<TBlobStorageGroupRequestActor> {
Expand Down Expand Up @@ -520,6 +522,7 @@ struct TBlobStorageProxyParameters {
const TControlWrapper& SlowDiskThreshold;
const TControlWrapper& PredictedDelayMultiplier;
const TControlWrapper& LongRequestThresholdMs = TControlWrapper(DefaultLongRequestThreshold.MilliSeconds(), 1, 1'000'000);
const TControlWrapper& MaxNumOfSlowDisks = TControlWrapper(DefaultMaxNumOfSlowDisks, 1, 2);
};

IActor* CreateBlobStorageGroupProxyConfigured(TIntrusivePtr<TBlobStorageGroupInfo>&& info,
Expand Down
46 changes: 40 additions & 6 deletions ydb/core/blobstorage/dsproxy/dsproxy_blackboard.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -179,15 +179,16 @@ ui64 TBlobState::GetPredictedDelayNs(const TBlobStorageGroupInfo &info, TGroupQu

void TBlobState::GetWorstPredictedDelaysNs(const TBlobStorageGroupInfo &info, TGroupQueues &groupQueues,
NKikimrBlobStorage::EVDiskQueueId queueId, TDiskDelayPredictions *outNWorst,
double multiplier) const {
const TAccelerationParams& accelerationParams) const {
outNWorst->resize(Disks.size());
for (ui32 diskIdx = 0; diskIdx < Disks.size(); ++diskIdx) {
ui64 predictedDelayNs = GetPredictedDelayNs(info, groupQueues, diskIdx, queueId);
(*outNWorst)[diskIdx] = {
static_cast<ui64>(GetPredictedDelayNs(info, groupQueues, diskIdx, queueId) * multiplier),
static_cast<ui64>(predictedDelayNs * accelerationParams.PredictedDelayMultiplier),
diskIdx
};
}
ui32 sortedPrefixSize = std::min(3u, (ui32)Disks.size());
ui32 sortedPrefixSize = std::min(accelerationParams.MaxNumOfSlowDisks + 1, (ui32)Disks.size());
std::partial_sort(outNWorst->begin(), outNWorst->begin() + sortedPrefixSize, outNWorst->end());
}

Expand Down Expand Up @@ -466,16 +467,18 @@ void TBlackboard::ReportPartMapStatus(const TLogoBlobID &id, ssize_t partMapInde

void TBlackboard::GetWorstPredictedDelaysNs(const TBlobStorageGroupInfo &info, TGroupQueues &groupQueues,
NKikimrBlobStorage::EVDiskQueueId queueId, TDiskDelayPredictions *outNWorst,
double multiplier) const {
const TAccelerationParams& accelerationParams) const {
ui32 totalVDisks = info.GetTotalVDisksNum();
outNWorst->resize(totalVDisks);
for (ui32 orderNumber = 0; orderNumber < totalVDisks; ++orderNumber) {
ui64 predictedDelayNs = groupQueues.GetPredictedDelayNsByOrderNumber(orderNumber, queueId);
(*outNWorst)[orderNumber] = {
static_cast<ui64>(groupQueues.GetPredictedDelayNsByOrderNumber(orderNumber, queueId) * multiplier),
static_cast<ui64>(predictedDelayNs * accelerationParams.PredictedDelayMultiplier),
orderNumber
};
}
std::partial_sort(outNWorst->begin(), outNWorst->begin() + std::min(3u, totalVDisks), outNWorst->end());
ui32 sortedPrefixSize = std::min(accelerationParams.MaxNumOfSlowDisks + 1, totalVDisks);
std::partial_sort(outNWorst->begin(), outNWorst->begin() + sortedPrefixSize, outNWorst->end());
}

void TBlackboard::RegisterBlobForPut(const TLogoBlobID& id, size_t blobIdx) {
Expand Down Expand Up @@ -542,4 +545,35 @@ void TBlackboard::InvalidatePartStates(ui32 orderNumber) {
}
}

void TBlackboard::MarkSlowDisks(TBlobState& state, bool isPut, const TAccelerationParams& accelerationParams) {
// by default all disks are considered fast
for (TBlobState::TDisk& disk : state.Disks) {
disk.IsSlow = false;
}

ui32 maxNumSlow = accelerationParams.MaxNumOfSlowDisks;
if (Info->GetTotalVDisksNum() <= maxNumSlow) {
// all disks cannot be slow
return;
}

TDiskDelayPredictions worstDisks;
state.GetWorstPredictedDelaysNs(*Info, *GroupQueues,
(isPut ? HandleClassToQueueId(PutHandleClass) : HandleClassToQueueId(GetHandleClass)),
&worstDisks, accelerationParams);

ui64 slowThreshold = worstDisks[maxNumSlow].PredictedNs * accelerationParams.SlowDiskThreshold;
if (slowThreshold == 0) {
// invalid or non-initialized predicted ns, consider all disks not slow
return;
}

for (ui32 idx = 0; idx < maxNumSlow; ++idx) {
if (worstDisks[idx].PredictedNs > slowThreshold) {
ui32 orderNumber = worstDisks[idx].DiskIdx;
state.Disks[orderNumber].IsSlow = true;
}
}
}

}//NKikimr
11 changes: 6 additions & 5 deletions ydb/core/blobstorage/dsproxy/dsproxy_blackboard.h
Original file line number Diff line number Diff line change
Expand Up @@ -99,10 +99,9 @@ struct TBlobState {
ui32 diskIdxInSubring, NKikimrBlobStorage::EVDiskQueueId queueId) const;
void GetWorstPredictedDelaysNs(const TBlobStorageGroupInfo &info, TGroupQueues &groupQueues,
NKikimrBlobStorage::EVDiskQueueId queueId, TDiskDelayPredictions *outNWorst,
double multipler = 1) const;
const TAccelerationParams& accelerationParams) const;
TString ToString() const;
bool HasWrittenQuorum(const TBlobStorageGroupInfo& info, const TBlobStorageGroupInfo::TGroupVDisks& expired) const;

static TString SituationToString(ESituation situation);
};

Expand Down Expand Up @@ -166,7 +165,7 @@ class IStrategy {

struct TBlackboard {
enum EAccelerationMode {
AccelerationModeSkipOneSlowest,
AccelerationModeSkipNSlowest,
AccelerationModeSkipMarked
};

Expand All @@ -190,7 +189,7 @@ struct TBlackboard {
NKikimrBlobStorage::EPutHandleClass putHandleClass, NKikimrBlobStorage::EGetHandleClass getHandleClass)
: Info(info)
, GroupQueues(groupQueues)
, AccelerationMode(AccelerationModeSkipOneSlowest)
, AccelerationMode(AccelerationModeSkipNSlowest)
, PutHandleClass(putHandleClass)
, GetHandleClass(getHandleClass)
{}
Expand All @@ -213,7 +212,7 @@ struct TBlackboard {
void ReportPartMapStatus(const TLogoBlobID &id, ssize_t partMapIndex, ui32 responseIndex, NKikimrProto::EReplyStatus status);
void GetWorstPredictedDelaysNs(const TBlobStorageGroupInfo &info, TGroupQueues &groupQueues,
NKikimrBlobStorage::EVDiskQueueId queueId, TDiskDelayPredictions *outNWorst,
double multiplier = 1) const;
const TAccelerationParams& accelerationParams) const;
TString ToString() const;

void ChangeAll() {
Expand All @@ -226,6 +225,8 @@ struct TBlackboard {

void RegisterBlobForPut(const TLogoBlobID& id, size_t blobIdx);

void MarkSlowDisks(TBlobState& state, bool isPut, const TAccelerationParams& accelerationParams);

TBlobState& operator [](const TLogoBlobID& id);
};

Expand Down
12 changes: 6 additions & 6 deletions ydb/core/blobstorage/dsproxy/dsproxy_get.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -326,9 +326,9 @@ class TBlobStorageGroupGetRequest : public TBlobStorageGroupRequestActor {
}

void TryScheduleGetAcceleration() {
if (!IsGetAccelerateScheduled && GetsAccelerated < 2) {
// Count VDisks that have requests in flight, if there is no more than 2 such VDisks, Accelerate
if (CountDisksWithActiveRequests() <= 2) {
if (!IsGetAccelerateScheduled && GetsAccelerated < AccelerationParams.MaxNumOfSlowDisks) {
// Count VDisks with requests in flight, if there are <= the maximum number of slow VDisks, Accelerate
if (CountDisksWithActiveRequests() <= AccelerationParams.MaxNumOfSlowDisks) {
ui64 timeToAccelerateUs = GetImpl.GetTimeToAccelerateGetNs(LogCtx) / 1000;
TDuration timeToAccelerate = TDuration::MicroSeconds(timeToAccelerateUs);
TMonotonic now = TActivationContext::Monotonic();
Expand All @@ -345,9 +345,9 @@ class TBlobStorageGroupGetRequest : public TBlobStorageGroupRequestActor {
}

void TrySchedulePutAcceleration() {
if (!IsPutAccelerateScheduled && PutsAccelerated < 2) {
// Count VDisks that have requests in flight, if there is no more than 2 such VDisks, Accelerate
if (CountDisksWithActiveRequests() <= 2) {
if (!IsPutAccelerateScheduled && PutsAccelerated < AccelerationParams.MaxNumOfSlowDisks) {
// Count VDisks with requests in flight, if there are <= the maximum number of slow VDisks, Accelerate
if (CountDisksWithActiveRequests() <= AccelerationParams.MaxNumOfSlowDisks) {
ui64 timeToAccelerateUs = GetImpl.GetTimeToAcceleratePutNs(LogCtx) / 1000;
TDuration timeToAccelerate = TDuration::MicroSeconds(timeToAccelerateUs);
TMonotonic now = TActivationContext::Monotonic();
Expand Down
12 changes: 5 additions & 7 deletions ydb/core/blobstorage/dsproxy/dsproxy_get_impl.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -146,15 +146,13 @@ ui64 TGetImpl::GetTimeToAccelerateNs(TLogContext &logCtx, NKikimrBlobStorage::EV
// Find the slowest disk
TDiskDelayPredictions worstDisks;
if (Blackboard.BlobStates.size() == 1) {
Blackboard.BlobStates.begin()->second.GetWorstPredictedDelaysNs(
*Info, *Blackboard.GroupQueues, queueId, &worstDisks,
AccelerationParams.PredictedDelayMultiplier);
Blackboard.BlobStates.begin()->second.GetWorstPredictedDelaysNs(*Info, *Blackboard.GroupQueues,
queueId, &worstDisks, AccelerationParams);
} else {
Blackboard.GetWorstPredictedDelaysNs(
*Info, *Blackboard.GroupQueues, queueId, &worstDisks,
AccelerationParams.PredictedDelayMultiplier);
Blackboard.GetWorstPredictedDelaysNs(*Info, *Blackboard.GroupQueues, queueId, &worstDisks,
AccelerationParams);
}
return worstDisks[std::min(3u, (ui32)worstDisks.size() - 1)].PredictedNs;
return worstDisks[std::min(AccelerationParams.MaxNumOfSlowDisks, (ui32)worstDisks.size() - 1)].PredictedNs;
}

ui64 TGetImpl::GetTimeToAccelerateGetNs(TLogContext &logCtx) {
Expand Down
2 changes: 2 additions & 0 deletions ydb/core/blobstorage/dsproxy/dsproxy_impl.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@ namespace NKikimr {
, EnableVPatch(params.EnableVPatch)
, SlowDiskThreshold(params.SlowDiskThreshold)
, PredictedDelayMultiplier(params.PredictedDelayMultiplier)
, MaxNumOfSlowDisks(params.MaxNumOfSlowDisks)
, LongRequestThresholdMs(params.LongRequestThresholdMs)
{}

Expand All @@ -33,6 +34,7 @@ namespace NKikimr {
, EnableVPatch(params.EnableVPatch)
, SlowDiskThreshold(params.SlowDiskThreshold)
, PredictedDelayMultiplier(params.PredictedDelayMultiplier)
, MaxNumOfSlowDisks(params.MaxNumOfSlowDisks)
, LongRequestThresholdMs(params.LongRequestThresholdMs)
{}

Expand Down
3 changes: 3 additions & 0 deletions ydb/core/blobstorage/dsproxy/dsproxy_impl.h
Original file line number Diff line number Diff line change
Expand Up @@ -121,8 +121,11 @@ class TBlobStorageGroupProxy : public TActorBootstrapped<TBlobStorageGroupProxy>
bool HasInvalidGroupId() const { return GroupId.GetRawId() == Max<ui32>(); }
void ProcessInitQueue();

// Acceleration parameters
TMemorizableControlWrapper SlowDiskThreshold;
TMemorizableControlWrapper PredictedDelayMultiplier;
TMemorizableControlWrapper MaxNumOfSlowDisks;

TMemorizableControlWrapper LongRequestThresholdMs;

TAccelerationParams GetAccelerationParams();
Expand Down
4 changes: 2 additions & 2 deletions ydb/core/blobstorage/dsproxy/dsproxy_put.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -416,8 +416,8 @@ class TBlobStorageGroupPutRequest : public TBlobStorageGroupRequestActor {
}

void TryToScheduleNextAcceleration() {
if (!IsAccelerateScheduled && AccelerateRequestsSent < 2) {
if (WaitingVDiskCount > 0 && WaitingVDiskCount <= 2 && RequestsSent > 1) {
if (!IsAccelerateScheduled && AccelerateRequestsSent < AccelerationParams.MaxNumOfSlowDisks) {
if (WaitingVDiskCount > 0 && WaitingVDiskCount <= AccelerationParams.MaxNumOfSlowDisks && RequestsSent > 1) {
ui64 timeToAccelerateUs = Max<ui64>(1, PutImpl.GetTimeToAccelerateNs(LogCtx) / 1000);
if (RequestsPendingBeforeAcceleration == 1 && AccelerateRequestsSent == 1) {
// if there is only one request pending, but first accelerate is unsuccessful, make a pause
Expand Down
4 changes: 2 additions & 2 deletions ydb/core/blobstorage/dsproxy/dsproxy_put_impl.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -91,8 +91,8 @@ ui64 TPutImpl::GetTimeToAccelerateNs(TLogContext &logCtx) {
// Find the n'th slowest disk
TDiskDelayPredictions worstDisks;
state.GetWorstPredictedDelaysNs(*Info, *Blackboard.GroupQueues, HandleClassToQueueId(Blackboard.PutHandleClass),
&worstDisks, AccelerationParams.PredictedDelayMultiplier);
nthWorstPredictedNsVec[idx++] = worstDisks[2].PredictedNs;
&worstDisks, AccelerationParams);
nthWorstPredictedNsVec[idx++] = worstDisks[AccelerationParams.MaxNumOfSlowDisks].PredictedNs;
}
return *MaxElement(nthWorstPredictedNsVec.begin(), nthWorstPredictedNsVec.end());
}
Expand Down
1 change: 1 addition & 0 deletions ydb/core/blobstorage/dsproxy/dsproxy_state.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -327,6 +327,7 @@ namespace NKikimr {
return TAccelerationParams{
.SlowDiskThreshold = .001f * SlowDiskThreshold.Update(TActivationContext::Now()),
.PredictedDelayMultiplier = .001f * PredictedDelayMultiplier.Update(TActivationContext::Now()),
.MaxNumOfSlowDisks = (ui32)MaxNumOfSlowDisks.Update(TActivationContext::Now()),
};
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -15,22 +15,22 @@ class TAcceleratePutStrategy : public TStrategyBase {
const TAccelerationParams& accelerationParams) override {
Y_UNUSED(accelerationParams);
// Find the unput part and disk
TStackVec<ui32, 2> badDiskIdxs;
ui32 badDisksMask = 0;
for (size_t diskIdx = 0; diskIdx < state.Disks.size(); ++diskIdx) {
TBlobState::TDisk &disk = state.Disks[diskIdx];
for (size_t partIdx = 0; partIdx < disk.DiskParts.size(); ++partIdx) {
TBlobState::TDiskPart &diskPart = disk.DiskParts[partIdx];
if (diskPart.Situation == TBlobState::ESituation::Sent) {
badDiskIdxs.push_back(diskIdx);
badDisksMask |= (1 << diskIdx);
}
}
}

if (!badDiskIdxs.empty()) {
if (badDisksMask > 0) {
// Mark the corresponding disks 'bad'
// Prepare part layout if possible
TBlobStorageGroupType::TPartLayout layout;
PreparePartLayout(state, info, &layout, badDiskIdxs);
PreparePartLayout(state, info, &layout, badDisksMask);

TBlobStorageGroupType::TPartPlacement partPlacement;
bool isCorrectable = info.Type.CorrectLayout(layout, partPlacement);
Expand Down
78 changes: 42 additions & 36 deletions ydb/core/blobstorage/dsproxy/dsproxy_strategy_accelerate_put_m3dc.h
Original file line number Diff line number Diff line change
Expand Up @@ -30,50 +30,56 @@ class TAcceleratePut3dcStrategy : public TStrategyBase {
}

EStrategyOutcome Process(TLogContext &logCtx, TBlobState &state, const TBlobStorageGroupInfo &info,
TBlackboard& /*blackboard*/, TGroupDiskRequests &groupDiskRequests,
TBlackboard& blackboard, TGroupDiskRequests &groupDiskRequests,
const TAccelerationParams& accelerationParams) override {
Y_UNUSED(accelerationParams);
// Find the unput parts and disks
ui32 badDiskMask = 0;
for (size_t diskIdx = 0; diskIdx < state.Disks.size(); ++diskIdx) {
bool unresponsiveDisk = false;
for (size_t diskIdx = 0; diskIdx < state.Disks.size() && !unresponsiveDisk; ++diskIdx) {
TBlobState::TDisk &disk = state.Disks[diskIdx];
for (size_t partIdx = 0; partIdx < disk.DiskParts.size(); ++partIdx) {
TBlobState::TDiskPart &diskPart = disk.DiskParts[partIdx];
for (TBlobState::TDiskPart &diskPart : disk.DiskParts) {
if (diskPart.Situation == TBlobState::ESituation::Sent) {
badDiskMask |= (1 << diskIdx);
unresponsiveDisk = true;
break;
}
}
}
if (badDiskMask > 0) {
// Mark the 'bad' disk as the single slow disk
for (size_t diskIdx = 0; diskIdx < state.Disks.size(); ++diskIdx) {
state.Disks[diskIdx].IsSlow = badDiskMask & (1 << diskIdx);
}

// Prepare part placement if possible
TBlobStorageGroupType::TPartPlacement partPlacement;
bool degraded = false;

// check if we are in degraded mode -- that means that we have one fully failed realm
TBlobStorageGroupInfo::TSubgroupVDisks success(&info.GetTopology());
TBlobStorageGroupInfo::TSubgroupVDisks error(&info.GetTopology());
Evaluate3dcSituation(state, NumFailRealms, NumFailDomainsPerFailRealm, info, true, success, error, degraded);

// check for failure tolerance; we issue ERROR in case when it is not possible to achieve success condition in
// any way; also check if we have already finished writing replicas
const auto& checker = info.GetQuorumChecker();
if (checker.CheckFailModelForSubgroup(error)) {
if (checker.CheckQuorumForSubgroup(success)) {
// OK
return EStrategyOutcome::DONE;
}

// now check every realm and check if we have to issue some write requests to it
Prepare3dcPartPlacement(state, NumFailRealms, NumFailDomainsPerFailRealm,
PreferredReplicasPerRealm(degraded), true, partPlacement);

if (IsPutNeeded(state, partPlacement)) {
PreparePutsForPartPlacement(logCtx, state, info, groupDiskRequests, partPlacement);
if (unresponsiveDisk) {
blackboard.MarkSlowDisks(state, true, accelerationParams);

for (bool considerSlowAsError : {true, false}) {
// Prepare part placement if possible
TBlobStorageGroupType::TPartPlacement partPlacement;
bool degraded = false;

// check if we are in degraded mode -- that means that we have one fully failed realm
TBlobStorageGroupInfo::TSubgroupVDisks success(&info.GetTopology());
TBlobStorageGroupInfo::TSubgroupVDisks error(&info.GetTopology());
Evaluate3dcSituation(state, NumFailRealms, NumFailDomainsPerFailRealm, info, considerSlowAsError,
success, error, degraded);
// check for failure tolerance; we issue ERROR in case when it is not possible to achieve success condition in
// any way; also check if we have already finished writing replicas
const auto& checker = info.GetQuorumChecker();
if (checker.CheckFailModelForSubgroup(error)) {
if (checker.CheckQuorumForSubgroup(success)) {
// OK
return EStrategyOutcome::DONE;
}

// now check every realm and check if we have to issue some write requests to it
bool fullPlacement;
Prepare3dcPartPlacement(state, NumFailRealms, NumFailDomainsPerFailRealm,
PreferredReplicasPerRealm(degraded), considerSlowAsError, true, partPlacement, fullPlacement);

if (considerSlowAsError && !fullPlacement) {
// unable to place all parts to fast disks, retry
continue;
}

if (IsPutNeeded(state, partPlacement)) {
PreparePutsForPartPlacement(logCtx, state, info, groupDiskRequests, partPlacement);
}
break;
}
}
}
Expand Down
Loading

0 comments on commit e7ab6ea

Please sign in to comment.