Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Support Put and Get requests acceleration for 2 slow disks in block-4-2 erasure #4803

Merged
merged 10 commits into from
Jun 5, 2024
12 changes: 6 additions & 6 deletions ydb/core/blobstorage/dsproxy/dsproxy_blackboard.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -178,21 +178,21 @@ ui64 TBlobState::GetPredictedDelayNs(const TBlobStorageGroupInfo &info, TGroupQu
}

void TBlobState::GetWorstPredictedDelaysNs(const TBlobStorageGroupInfo &info, TGroupQueues &groupQueues,
serbel324 marked this conversation as resolved.
Show resolved Hide resolved
NKikimrBlobStorage::EVDiskQueueId queueId,
ui64 *outWorstNs, ui64 *outNextToWorstNs, i32 *outWorstSubgroupIdx) const {
NKikimrBlobStorage::EVDiskQueueId queueId, ui32 nthWorst,
ui64 *outWorstNs, ui64 *outNthWorstNs, i32 *outWorstSubgroupIdx) const {
*outWorstSubgroupIdx = -1;
*outWorstNs = 0;
*outNextToWorstNs = 0;
TStackVec<ui32, 9> delayNs;
serbel324 marked this conversation as resolved.
Show resolved Hide resolved
for (ui32 diskIdx = 0; diskIdx < Disks.size(); ++diskIdx) {
ui64 predictedNs = GetPredictedDelayNs(info, groupQueues, diskIdx, queueId);
delayNs.push_back(predictedNs);
if (predictedNs > *outWorstNs) {
*outNextToWorstNs = *outWorstNs;
*outWorstNs = predictedNs;
*outWorstSubgroupIdx = diskIdx;
} else if (predictedNs > *outNextToWorstNs) {
*outNextToWorstNs = predictedNs;
}
}
std::nth_element(delayNs.begin(), delayNs.begin() + nthWorst, delayNs.end(), std::greater<ui32>{});
*outNthWorstNs = delayNs[nthWorst];
}

bool TBlobState::HasWrittenQuorum(const TBlobStorageGroupInfo& info, const TBlobStorageGroupInfo::TGroupVDisks& expired) const {
Expand Down
4 changes: 2 additions & 2 deletions ydb/core/blobstorage/dsproxy/dsproxy_blackboard.h
Original file line number Diff line number Diff line change
Expand Up @@ -97,8 +97,8 @@ struct TBlobState {
ui64 GetPredictedDelayNs(const TBlobStorageGroupInfo &info, TGroupQueues &groupQueues,
ui32 diskIdxInSubring, NKikimrBlobStorage::EVDiskQueueId queueId) const;
void GetWorstPredictedDelaysNs(const TBlobStorageGroupInfo &info, TGroupQueues &groupQueues,
NKikimrBlobStorage::EVDiskQueueId queueId,
ui64 *outWorstNs, ui64 *outNextToWorstNs, i32 *outWorstSubgroupIdx) const;
NKikimrBlobStorage::EVDiskQueueId queueId, ui32 nthToWorst,
ui64 *outWorstNs, ui64 *outNthWorstNs, i32 *outWorstSubgroupIdx) const;
TString ToString() const;
bool HasWrittenQuorum(const TBlobStorageGroupInfo& info, const TBlobStorageGroupInfo::TGroupVDisks& expired) const;

Expand Down
2 changes: 1 addition & 1 deletion ydb/core/blobstorage/dsproxy/dsproxy_get_impl.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -167,7 +167,7 @@ ui64 TGetImpl::GetTimeToAccelerateNs(TLogContext &logCtx, NKikimrBlobStorage::EV
if (Blackboard.BlobStates.size() == 1) {
i32 worstSubgroupIdx = -1;
Blackboard.BlobStates.begin()->second.GetWorstPredictedDelaysNs(
*Info, *Blackboard.GroupQueues, queueId,
*Info, *Blackboard.GroupQueues, queueId, 1,
&worstPredictedNs, &nextToWorstPredictedNs, &worstSubgroupIdx);
} else {
i32 worstOrderNumber = -1;
Expand Down
19 changes: 11 additions & 8 deletions ydb/core/blobstorage/dsproxy/dsproxy_put.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -68,7 +68,7 @@ class TBlobStorageGroupPutRequest : public TBlobStorageGroupRequestActor<TBlobSt

TDiskResponsivenessTracker::TPerDiskStatsPtr Stats;

bool IsAccelerated;
ui32 AccelerateRequestsSent;
serbel324 marked this conversation as resolved.
Show resolved Hide resolved
bool IsAccelerateScheduled;

const bool IsMultiPutMode;
Expand Down Expand Up @@ -98,6 +98,7 @@ class TBlobStorageGroupPutRequest : public TBlobStorageGroupRequestActor<TBlobSt
}

void Handle(TEvAccelerate::TPtr &ev) {
IsAccelerateScheduled = false;
RootCauseTrack.OnAccelerate(ev->Get()->CauseIdx);
Accelerate();
SanityCheck(); // May Die
Expand Down Expand Up @@ -129,12 +130,13 @@ class TBlobStorageGroupPutRequest : public TBlobStorageGroupRequestActor<TBlobSt
}

void Accelerate() {
if (IsAccelerated) {
if (AccelerateRequestsSent == 2) {
return;
}
IsAccelerated = true;
++AccelerateRequestsSent;
Action(true);
// *(IsMultiPutMode ? Mon->NodeMon->AccelerateEvVMultiPutCount : Mon->NodeMon->AccelerateEvVPutCount) += v.size();
AccelerateIfNeeded();
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Action(true) a few lines above already accelerates, no?

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes, but we need to schedule second accelerate in case current acceleration sends request to slow disk and doesn't recieve response in time. This call schedules next acceleration.

}

void HandleIncarnation(TMonotonic timestamp, ui32 orderNumber, ui64 incarnationGuid) {
Expand Down Expand Up @@ -363,9 +365,10 @@ class TBlobStorageGroupPutRequest : public TBlobStorageGroupRequestActor<TBlobSt
}

void AccelerateIfNeeded() {
if (!IsAccelerateScheduled && !IsAccelerated) {
if (WaitingVDiskCount == 1 && RequestsSent > 1) {
ui64 timeToAccelerateUs = Max<ui64>(1, PutImpl.GetTimeToAccelerateNs(LogCtx) / 1000);
if (!IsAccelerateScheduled && AccelerateRequestsSent < 2) {
if (WaitingVDiskCount > 0 && WaitingVDiskCount <= 2 && RequestsSent > 1) {
ui64 timeToAccelerateUs = Max<ui64>(1, PutImpl.GetTimeToAccelerateNs(
LogCtx, 2 - AccelerateRequestsSent) / 1000);
TDuration timeSinceStart = TActivationContext::Monotonic() - StartTime;
if (timeSinceStart.MicroSeconds() < timeToAccelerateUs) {
ui64 causeIdx = RootCauseTrack.RegisterAccelerate();
Expand Down Expand Up @@ -512,7 +515,7 @@ class TBlobStorageGroupPutRequest : public TBlobStorageGroupRequestActor<TBlobSt
, TimeStatsEnabled(timeStatsEnabled)
, Tactic(ev->Tactic)
, Stats(std::move(stats))
, IsAccelerated(false)
, AccelerateRequestsSent(0)
, IsAccelerateScheduled(false)
, IsMultiPutMode(false)
, IncarnationRecords(info->GetTotalVDisksNum())
Expand Down Expand Up @@ -556,7 +559,7 @@ class TBlobStorageGroupPutRequest : public TBlobStorageGroupRequestActor<TBlobSt
, TimeStatsEnabled(timeStatsEnabled)
, Tactic(tactic)
, Stats(std::move(stats))
, IsAccelerated(false)
, AccelerateRequestsSent(0)
, IsAccelerateScheduled(false)
, IsMultiPutMode(true)
, IncarnationRecords(info->GetTotalVDisksNum())
Expand Down
10 changes: 5 additions & 5 deletions ydb/core/blobstorage/dsproxy/dsproxy_put_impl.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -82,20 +82,20 @@ void TPutImpl::PrepareReply(NKikimrProto::EReplyStatus status, TLogContext &logC
}
}

ui64 TPutImpl::GetTimeToAccelerateNs(TLogContext &logCtx) {
ui64 TPutImpl::GetTimeToAccelerateNs(TLogContext &logCtx, ui32 nthWorst) {
Y_UNUSED(logCtx);
Y_ABORT_UNLESS(!Blackboard.BlobStates.empty());
TBatchedVec<ui64> nextToWorstPredictedNsVec(Blackboard.BlobStates.size());
TBatchedVec<ui64> nthWorstPredictedNsVec(Blackboard.BlobStates.size());
ui64 idx = 0;
for (auto &[_, state] : Blackboard.BlobStates) {
// Find the slowest disk
i32 worstSubgroupIdx = -1;
ui64 worstPredictedNs = 0;
state.GetWorstPredictedDelaysNs(*Info, *Blackboard.GroupQueues, HandleClassToQueueId(Blackboard.PutHandleClass),
&worstPredictedNs, &nextToWorstPredictedNsVec[idx], &worstSubgroupIdx);
state.GetWorstPredictedDelaysNs(*Info, *Blackboard.GroupQueues, HandleClassToQueueId(Blackboard.PutHandleClass), nthWorst,
&worstPredictedNs, &nthWorstPredictedNsVec[idx], &worstSubgroupIdx);
idx++;
}
return *MaxElement(nextToWorstPredictedNsVec.begin(), nextToWorstPredictedNsVec.end());
return *MaxElement(nthWorstPredictedNsVec.begin(), nthWorstPredictedNsVec.end());
}

TString TPutImpl::DumpFullState() const {
Expand Down
3 changes: 1 addition & 2 deletions ydb/core/blobstorage/dsproxy/dsproxy_put_impl.h
Original file line number Diff line number Diff line change
Expand Up @@ -187,7 +187,7 @@ class TPutImpl {
void PrepareOneReply(NKikimrProto::EReplyStatus status, size_t blobIdx, TLogContext &logCtx,
TString errorReason, TPutResultVec &outPutResults);

ui64 GetTimeToAccelerateNs(TLogContext &logCtx);
ui64 GetTimeToAccelerateNs(TLogContext &logCtx, ui32 nthWorst);

TString DumpFullState() const;

Expand All @@ -207,7 +207,6 @@ class TPutImpl {

TDeque<TPutEvent> GeneratePutRequests() {
TDeque<TPutEvent> events;

// Group put requests together by VDiskID.
std::unordered_multimap<ui32, TDiskPutRequest*> puts;
for (auto& put : Blackboard.GroupDiskRequests.PutsPending) {
Expand Down
13 changes: 7 additions & 6 deletions ydb/core/blobstorage/dsproxy/dsproxy_strategy_accelerate_put.h
Original file line number Diff line number Diff line change
Expand Up @@ -13,26 +13,27 @@ class TAcceleratePutStrategy : public TStrategyBase {
EStrategyOutcome Process(TLogContext &logCtx, TBlobState &state, const TBlobStorageGroupInfo &info,
TBlackboard& /*blackboard*/, TGroupDiskRequests &groupDiskRequests) override {
// Find the unput part and disk
i32 badDiskIdx = -1;
TStackVec<ui32, 2> badDiskIdxs;
for (size_t diskIdx = 0; diskIdx < state.Disks.size(); ++diskIdx) {
TBlobState::TDisk &disk = state.Disks[diskIdx];
for (size_t partIdx = 0; partIdx < disk.DiskParts.size(); ++partIdx) {
TBlobState::TDiskPart &diskPart = disk.DiskParts[partIdx];
if (diskPart.Situation == TBlobState::ESituation::Sent) {
badDiskIdx = diskIdx;
badDiskIdxs.push_back(diskIdx);
}
}
}

if (badDiskIdx >= 0) {
// Mark the disk 'bad'
if (!badDiskIdxs.empty()) {
// Mark the corresponding disks 'bad'
// Prepare part layout if possible
TBlobStorageGroupType::TPartLayout layout;
PreparePartLayout(state, info, &layout, badDiskIdx);
PreparePartLayout(state, info, &layout, badDiskIdxs);

TBlobStorageGroupType::TPartPlacement partPlacement;
bool isCorrectable = info.Type.CorrectLayout(layout, partPlacement);
if (isCorrectable && IsPutNeeded(state, partPlacement)) {
bool isPutNeeded = IsPutNeeded(state, partPlacement);
if (isCorrectable && isPutNeeded) {
PreparePutsForPartPlacement(logCtx, state, info, groupDiskRequests, partPlacement);
}
}
Expand Down
19 changes: 11 additions & 8 deletions ydb/core/blobstorage/dsproxy/dsproxy_strategy_base.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -194,7 +194,7 @@ void TStrategyBase::AddGetRequest(TLogContext &logCtx, TGroupDiskRequests &group
}

void TStrategyBase::PreparePartLayout(const TBlobState &state, const TBlobStorageGroupInfo &info,
TBlobStorageGroupType::TPartLayout *layout, ui32 slowDiskIdx) {
TBlobStorageGroupType::TPartLayout *layout, const TStackVec<ui32, 2>& slowDiskIdxs) {
Y_ABORT_UNLESS(layout);
const ui32 totalPartCount = info.Type.TotalPartCount();
const ui32 blobSubringSize = info.Type.BlobSubgroupSize();
Expand All @@ -216,26 +216,29 @@ void TStrategyBase::PreparePartLayout(const TBlobState &state, const TBlobStorag
if (!isErrorDisk) {
for (ui32 partIdx = beginPartIdx; partIdx < endPartIdx; ++partIdx) {
TBlobState::ESituation partSituation = disk.DiskParts[partIdx].Situation;
bool isOnSlowDisk = (std::find(slowDiskIdxs.begin(), slowDiskIdxs.end(), diskIdx) != slowDiskIdxs.end());
if (partSituation == TBlobState::ESituation::Present ||
(diskIdx != slowDiskIdx && partSituation == TBlobState::ESituation::Sent)) {
(!isOnSlowDisk && partSituation == TBlobState::ESituation::Sent)) {
layout->VDiskPartMask[diskIdx] |= (1ul << partIdx);
}
layout->VDiskMask |= (1ul << diskIdx);
}
}
}
if (slowDiskIdx == InvalidVDiskIdx) {
if (slowDiskIdxs.empty()) {
layout->SlowVDiskMask = 0;
} else {
Y_DEBUG_ABORT_UNLESS(slowDiskIdx < sizeof(layout->SlowVDiskMask) * 8);
layout->SlowVDiskMask = (1ull << slowDiskIdx);
layout->SlowVDiskMask = 0;
for (ui32 slowDiskIdx : slowDiskIdxs) {
Y_DEBUG_ABORT_UNLESS(slowDiskIdx < sizeof(layout->SlowVDiskMask) * 8);
layout->SlowVDiskMask |= (1ull << slowDiskIdx);
}
}
}

bool TStrategyBase::IsPutNeeded(const TBlobState &state, const TBlobStorageGroupType::TPartPlacement &partPlacement) {
bool isNeeded = false;
for (ui32 i = 0; i < partPlacement.Records.size(); ++i) {
const TBlobStorageGroupType::TPartPlacement::TVDiskPart& record = partPlacement.Records[i];
for (const TBlobStorageGroupType::TPartPlacement::TVDiskPart& record : partPlacement.Records) {
const TBlobState::TDisk &disk = state.Disks[record.VDiskIdx];
TBlobState::ESituation partSituation = disk.DiskParts[record.PartIdx].Situation;
switch (partSituation) {
Expand Down Expand Up @@ -395,7 +398,7 @@ i32 TStrategyBase::MarkSlowSubgroupDisk(TBlobState &state, const TBlobStorageGro
ui64 nextToWorstPredictedNs = 0;
state.GetWorstPredictedDelaysNs(info, *blackboard.GroupQueues,
(isPut ? HandleClassToQueueId(blackboard.PutHandleClass) :
HandleClassToQueueId(blackboard.GetHandleClass)),
HandleClassToQueueId(blackboard.GetHandleClass)), 1,
&worstPredictedNs, &nextToWorstPredictedNs, &worstSubgroupIdx);

// Check if the slowest disk exceptionally slow, or just not very fast
Expand Down
2 changes: 1 addition & 1 deletion ydb/core/blobstorage/dsproxy/dsproxy_strategy_base.h
Original file line number Diff line number Diff line change
Expand Up @@ -34,7 +34,7 @@ class TStrategyBase : public IStrategy {
void AddGetRequest(TLogContext &logCtx, TGroupDiskRequests &groupDiskRequests, TLogoBlobID &fullId, ui32 partIdx,
TBlobState::TDisk &disk, TIntervalSet<i32> &intervalSet, const char *logMarker);
void PreparePartLayout(const TBlobState &state, const TBlobStorageGroupInfo &info,
TBlobStorageGroupType::TPartLayout *layout, ui32 slowDiskIdx);
TBlobStorageGroupType::TPartLayout *layout, const TStackVec<ui32, 2>& slowDiskIdxs);
bool IsPutNeeded(const TBlobState &state, const TBlobStorageGroupType::TPartPlacement &partPlacement);
void PreparePutsForPartPlacement(TLogContext &logCtx, TBlobState &state,
const TBlobStorageGroupInfo &info, TGroupDiskRequests &groupDiskRequests,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -77,7 +77,7 @@ namespace NKikimr {
ui64 worstPredictedNs = 0;
ui64 nextToWorstPredictedNs = 0;
state.GetWorstPredictedDelaysNs(info, *blackboard.GroupQueues,
HandleClassToQueueId(blackboard.GetHandleClass),
HandleClassToQueueId(blackboard.GetHandleClass), 1,
&worstPredictedNs, &nextToWorstPredictedNs, &worstSubgroupIdx);

// Check if the slowest disk exceptionally slow, or just not very fast
Expand Down
12 changes: 6 additions & 6 deletions ydb/core/blobstorage/dsproxy/dsproxy_strategy_restore.h
Original file line number Diff line number Diff line change
Expand Up @@ -130,20 +130,20 @@ class TRestoreStrategy : public TStrategyBase {
ui64 worstPredictedNs = 0;
ui64 nextToWorstPredictedNs = 0;
state.GetWorstPredictedDelaysNs(info, *blackboard.GroupQueues,
HandleClassToQueueId(blackboard.PutHandleClass),
HandleClassToQueueId(blackboard.PutHandleClass), 1,
&worstPredictedNs, &nextToWorstPredictedNs, &worstSubgroupIdx);

// Check if the slowest disk exceptionally slow, or just not very fast
i32 slowDiskSubgroupIdx = -1;
TStackVec<ui32, 2> slowDiskSubgroupIdxs;
if (nextToWorstPredictedNs > 0 && worstPredictedNs > nextToWorstPredictedNs * 2) {
slowDiskSubgroupIdx = worstSubgroupIdx;
slowDiskSubgroupIdxs.push_back(worstSubgroupIdx);
}

bool isDone = false;
if (slowDiskSubgroupIdx >= 0) {
if (!slowDiskSubgroupIdxs.empty()) {
// If there is an exceptionally slow disk, try not touching it, mark isDone
TBlobStorageGroupType::TPartLayout layout;
PreparePartLayout(state, info, &layout, slowDiskSubgroupIdx);
PreparePartLayout(state, info, &layout, slowDiskSubgroupIdxs);

TBlobStorageGroupType::TPartPlacement partPlacement;
bool isCorrectable = info.Type.CorrectLayout(layout, partPlacement);
Expand All @@ -157,7 +157,7 @@ class TRestoreStrategy : public TStrategyBase {
if (!isDone) {
// Fill in the part layout
TBlobStorageGroupType::TPartLayout layout;
PreparePartLayout(state, info, &layout, InvalidVDiskIdx);
PreparePartLayout(state, info, &layout, {});
TBlobStorageGroupType::TPartPlacement partPlacement;
bool isCorrectable = info.Type.CorrectLayout(layout, partPlacement);
Y_ABORT_UNLESS(isCorrectable);
Expand Down
Loading
Loading