Skip to content

Commit

Permalink
Merge 8f40cb4 into 8bca55f
Browse files Browse the repository at this point in the history
  • Loading branch information
serbel324 authored May 28, 2024
2 parents 8bca55f + 8f40cb4 commit ba1c745
Show file tree
Hide file tree
Showing 18 changed files with 321 additions and 152 deletions.
10 changes: 10 additions & 0 deletions ydb/core/blobstorage/dsproxy/dsproxy.h
Original file line number Diff line number Diff line change
Expand Up @@ -61,6 +61,16 @@ constexpr bool WithMovingPatchRequestToStaticNode = true;
// Common types
////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////

struct TDiskDelayPrediction {
ui64 PredictedNs;
ui32 DiskIdx;

bool operator<(const TDiskDelayPrediction& other) const {
return PredictedNs < other.PredictedNs;
}
};

using TDiskDelayPredictions = TStackVec<TDiskDelayPrediction, TypicalDisksInSubring>;

struct TEvDeathNote : public TEventLocal<TEvDeathNote, TEvBlobStorage::EvDeathNote> {
TStackVec<std::pair<TDiskResponsivenessTracker::TDiskId, TDuration>, 16> Responsiveness;
Expand Down
34 changes: 8 additions & 26 deletions ydb/core/blobstorage/dsproxy/dsproxy_blackboard.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -178,21 +178,12 @@ ui64 TBlobState::GetPredictedDelayNs(const TBlobStorageGroupInfo &info, TGroupQu
}

void TBlobState::GetWorstPredictedDelaysNs(const TBlobStorageGroupInfo &info, TGroupQueues &groupQueues,
NKikimrBlobStorage::EVDiskQueueId queueId,
ui64 *outWorstNs, ui64 *outNextToWorstNs, i32 *outWorstSubgroupIdx) const {
*outWorstSubgroupIdx = -1;
*outWorstNs = 0;
*outNextToWorstNs = 0;
NKikimrBlobStorage::EVDiskQueueId queueId, ui32 nWorst, TDiskDelayPredictions *outNWorst) const {
outNWorst->resize(Disks.size());
for (ui32 diskIdx = 0; diskIdx < Disks.size(); ++diskIdx) {
ui64 predictedNs = GetPredictedDelayNs(info, groupQueues, diskIdx, queueId);
if (predictedNs > *outWorstNs) {
*outNextToWorstNs = *outWorstNs;
*outWorstNs = predictedNs;
*outWorstSubgroupIdx = diskIdx;
} else if (predictedNs > *outNextToWorstNs) {
*outNextToWorstNs = predictedNs;
}
(*outNWorst)[diskIdx] = { GetPredictedDelayNs(info, groupQueues, diskIdx, queueId), diskIdx };
}
std::partial_sort(outNWorst->begin(), outNWorst->begin() + std::min(nWorst, (ui32)Disks.size()), outNWorst->end());
}

bool TBlobState::HasWrittenQuorum(const TBlobStorageGroupInfo& info, const TBlobStorageGroupInfo::TGroupVDisks& expired) const {
Expand Down Expand Up @@ -467,22 +458,13 @@ void TBlackboard::ReportPartMapStatus(const TLogoBlobID &id, ssize_t partMapInde
}

void TBlackboard::GetWorstPredictedDelaysNs(const TBlobStorageGroupInfo &info, TGroupQueues &groupQueues,
NKikimrBlobStorage::EVDiskQueueId queueId,
ui64 *outWorstNs, ui64 *outNextToWorstNs, i32 *outWorstOrderNumber) const {
*outWorstOrderNumber = -1;
*outWorstNs = 0;
*outNextToWorstNs = 0;
NKikimrBlobStorage::EVDiskQueueId queueId, ui32 nWorst, TDiskDelayPredictions *outNWorst) const {
ui32 totalVDisks = info.GetTotalVDisksNum();
outNWorst->resize(totalVDisks);
for (ui32 orderNumber = 0; orderNumber < totalVDisks; ++orderNumber) {
ui64 predictedNs = groupQueues.GetPredictedDelayNsByOrderNumber(orderNumber, queueId);
if (predictedNs > *outWorstNs) {
*outNextToWorstNs = *outWorstNs;
*outWorstNs = predictedNs;
*outWorstOrderNumber = orderNumber;
} else if (predictedNs > *outNextToWorstNs) {
*outNextToWorstNs = predictedNs;
}
(*outNWorst)[orderNumber] = { groupQueues.GetPredictedDelayNsByOrderNumber(orderNumber, queueId), orderNumber };
}
std::partial_sort(outNWorst->begin(), outNWorst->begin() + std::min(nWorst, totalVDisks), outNWorst->end());
}

void TBlackboard::RegisterBlobForPut(const TLogoBlobID& id, size_t blobIdx) {
Expand Down
6 changes: 2 additions & 4 deletions ydb/core/blobstorage/dsproxy/dsproxy_blackboard.h
Original file line number Diff line number Diff line change
Expand Up @@ -97,8 +97,7 @@ struct TBlobState {
ui64 GetPredictedDelayNs(const TBlobStorageGroupInfo &info, TGroupQueues &groupQueues,
ui32 diskIdxInSubring, NKikimrBlobStorage::EVDiskQueueId queueId) const;
void GetWorstPredictedDelaysNs(const TBlobStorageGroupInfo &info, TGroupQueues &groupQueues,
NKikimrBlobStorage::EVDiskQueueId queueId,
ui64 *outWorstNs, ui64 *outNextToWorstNs, i32 *outWorstSubgroupIdx) const;
NKikimrBlobStorage::EVDiskQueueId queueId, ui32 nWorst, TDiskDelayPredictions *outNWorst) const;
TString ToString() const;
bool HasWrittenQuorum(const TBlobStorageGroupInfo& info, const TBlobStorageGroupInfo::TGroupVDisks& expired) const;

Expand Down Expand Up @@ -209,8 +208,7 @@ struct TBlackboard {
ssize_t AddPartMap(const TLogoBlobID &id, ui32 diskOrderNumber, ui32 requestIndex);
void ReportPartMapStatus(const TLogoBlobID &id, ssize_t partMapIndex, ui32 responseIndex, NKikimrProto::EReplyStatus status);
void GetWorstPredictedDelaysNs(const TBlobStorageGroupInfo &info, TGroupQueues &groupQueues,
NKikimrBlobStorage::EVDiskQueueId queueId,
ui64 *outWorstNs, ui64 *outNextToWorstNs, i32 *outWorstOrderNumber) const;
NKikimrBlobStorage::EVDiskQueueId queueId, ui32 nWorst, TDiskDelayPredictions *outNWorst) const;
TString ToString() const;

void ChangeAll() {
Expand Down
56 changes: 30 additions & 26 deletions ydb/core/blobstorage/dsproxy/dsproxy_get.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -38,8 +38,9 @@ class TBlobStorageGroupGetRequest : public TBlobStorageGroupRequestActor<TBlobSt
const TInstant Deadline;
TInstant StartTime;
TInstant StartTimePut;
ui32 RequestsSent;
ui32 ResponsesReceived;
ui32 RequestsSent = 0;
ui32 ResponsesReceived = 0;
ui32 GroupSize;
i64 ReportedBytes;
ui32 MaxSaneRequests = 0;
bool IsPutStarted = false;
Expand All @@ -51,54 +52,57 @@ class TBlobStorageGroupGetRequest : public TBlobStorageGroupRequestActor<TBlobSt

TStackVec<TDiskCounters, TypicalDisksInGroup> DiskCounters;

bool IsGetAccelerated = false;
ui32 GetsAccelerated = 0;
bool IsGetAccelerateScheduled = false;
bool IsPutAccelerated = false;
ui32 PutsAccelerated = 0;
bool IsPutAccelerateScheduled = false;

void Handle(TEvAccelerateGet::TPtr &ev) {
IsGetAccelerateScheduled = false;
RootCauseTrack.OnAccelerate(ev->Get()->CauseIdx);
AccelerateGet();
}

void Handle(TEvAcceleratePut::TPtr &ev) {
IsPutAccelerateScheduled = false;
RootCauseTrack.OnAccelerate(ev->Get()->CauseIdx);
AcceleratePut();
}

void AccelerateGet() {
if (IsGetAccelerated) {
if (GetsAccelerated == 2) {
return;
}
IsGetAccelerated = true;
GetsAccelerated++;

TDeque<std::unique_ptr<TEvBlobStorage::TEvVGet>> vGets;
TDeque<std::unique_ptr<TEvBlobStorage::TEvVPut>> vPuts;
GetImpl.AccelerateGet(LogCtx, GetUnresponsiveDiskOrderNumber(), vGets, vPuts);
GetImpl.AccelerateGet(LogCtx, GetUnresponsiveDisksMask(), vGets, vPuts);
*Mon->NodeMon->AccelerateEvVPutCount += vPuts.size();
*Mon->NodeMon->AccelerateEvVGetCount += vGets.size();
SendVGetsAndVPuts(vGets, vPuts);
TryScheduleGetAcceleration();
}

void AcceleratePut() {
if (IsPutAccelerated) {
if (PutsAccelerated == 2) {
return;
}
IsPutAccelerated = true;
PutsAccelerated++;

TDeque<std::unique_ptr<TEvBlobStorage::TEvVGet>> vGets;
TDeque<std::unique_ptr<TEvBlobStorage::TEvVPut>> vPuts;
GetImpl.AcceleratePut(LogCtx, GetUnresponsiveDiskOrderNumber(), vGets, vPuts);
GetImpl.AcceleratePut(LogCtx, GetUnresponsiveDisksMask(), vGets, vPuts);
*Mon->NodeMon->AccelerateEvVPutCount += vPuts.size();
*Mon->NodeMon->AccelerateEvVGetCount += vGets.size();
SendVGetsAndVPuts(vGets, vPuts);
TrySchedulePutAcceleration();
}

void SendVGetsAndVPuts(TDeque<std::unique_ptr<TEvBlobStorage::TEvVGet>> &vGets,
TDeque<std::unique_ptr<TEvBlobStorage::TEvVPut>> &vPuts) {
ReportBytes(GetImpl.GrabBytesToReport());
RequestsSent += vGets.size();
RequestsSent += vPuts.size();
RequestsSent += vGets.size() + vPuts.size();
CountPuts(vPuts);
if (vPuts.size()) {
if (!IsPutStarted) {
Expand Down Expand Up @@ -146,14 +150,14 @@ class TBlobStorageGroupGetRequest : public TBlobStorageGroupRequestActor<TBlobSt
return activeCount;
}

i32 GetUnresponsiveDiskOrderNumber() {
i32 unresponsiveDiskOrderNumber = -1;
ui32 GetUnresponsiveDisksMask() {
i32 unresponsiveDisksMask = 0;
for (size_t i = 0; i < DiskCounters.size(); ++i) {
if (DiskCounters[i].Sent != DiskCounters[i].Received) {
unresponsiveDiskOrderNumber = i;
unresponsiveDisksMask |= 1 << i;
}
}
return unresponsiveDiskOrderNumber;
return unresponsiveDisksMask;
}

void Handle(TEvBlobStorage::TEvVGetResult::TPtr &ev) {
Expand Down Expand Up @@ -299,6 +303,7 @@ class TBlobStorageGroupGetRequest : public TBlobStorageGroupRequestActor<TBlobSt
SendReplyAndDie(getResult);
return;
}

Y_ABORT_UNLESS(RequestsSent > ResponsesReceived, "RequestsSent# %" PRIu64 " ResponsesReceived# %" PRIu64,
ui64(RequestsSent), ui64(ResponsesReceived));

Expand All @@ -307,10 +312,10 @@ class TBlobStorageGroupGetRequest : public TBlobStorageGroupRequestActor<TBlobSt
}

void TryScheduleGetAcceleration() {
if (!IsGetAccelerateScheduled && !IsGetAccelerated) {
// Count VDisks that have requests in flight, if there is exactly one such VDisk, Accelerate
if (CountDisksWithActiveRequests() <= 1) {
ui64 timeToAccelerateUs = GetImpl.GetTimeToAccelerateGetNs(LogCtx) / 1000;
if (!IsGetAccelerateScheduled && GetsAccelerated < 2) {
// Count VDisks that have requests in flight, if there is no more than 2 such VDisks, Accelerate
if (CountDisksWithActiveRequests() <= 2) {
ui64 timeToAccelerateUs = GetImpl.GetTimeToAccelerateGetNs(LogCtx, GetsAccelerated) / 1000;
TInstant now = TActivationContext::Now();
TDuration timeSinceStart = (now > StartTime) ? (now - StartTime) : TDuration::MilliSeconds(0);
if (timeSinceStart.MicroSeconds() < timeToAccelerateUs) {
Expand All @@ -326,10 +331,10 @@ class TBlobStorageGroupGetRequest : public TBlobStorageGroupRequestActor<TBlobSt
}

void TrySchedulePutAcceleration() {
if (!IsPutAccelerateScheduled && !IsPutAccelerated) {
// Count VDisks that have requests in flight, if there is exactly one such VDisk, Accelerate
if (CountDisksWithActiveRequests() <= 1) {
ui64 timeToAccelerateUs = GetImpl.GetTimeToAcceleratePutNs(LogCtx) / 1000;
if (!IsPutAccelerateScheduled && PutsAccelerated < 2) {
// Count VDisks that have requests in flight, if there is no more than 2 such VDisks, Accelerate
if (CountDisksWithActiveRequests() <= 2) {
ui64 timeToAccelerateUs = GetImpl.GetTimeToAcceleratePutNs(LogCtx, PutsAccelerated) / 1000;
TInstant now = TActivationContext::Now();
TDuration timeSinceStart = (now > StartTimePut) ? (now - StartTimePut) : TDuration::MilliSeconds(0);
if (timeSinceStart.MicroSeconds() < timeToAccelerateUs) {
Expand Down Expand Up @@ -401,8 +406,7 @@ class TBlobStorageGroupGetRequest : public TBlobStorageGroupRequestActor<TBlobSt
, Deadline(ev->Deadline)
, StartTime(now)
, StartTimePut(StartTime)
, RequestsSent(0)
, ResponsesReceived(0)
, GroupSize(info->Type.BlobSubgroupSize())
, ReportedBytes(0)
{
ReportBytes(sizeof(*this));
Expand Down
25 changes: 11 additions & 14 deletions ydb/core/blobstorage/dsproxy/dsproxy_get_impl.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -159,31 +159,28 @@ void TGetImpl::PrepareReply(NKikimrProto::EReplyStatus status, TString errorReas
}


ui64 TGetImpl::GetTimeToAccelerateNs(TLogContext &logCtx, NKikimrBlobStorage::EVDiskQueueId queueId) {
ui64 TGetImpl::GetTimeToAccelerateNs(TLogContext &logCtx, NKikimrBlobStorage::EVDiskQueueId queueId, ui32 nthWorst) {
Y_UNUSED(logCtx);
// Find the slowest disk
ui64 worstPredictedNs = 0;
ui64 nextToWorstPredictedNs = 0;
TDiskDelayPredictions worstDisks;
if (Blackboard.BlobStates.size() == 1) {
i32 worstSubgroupIdx = -1;
Blackboard.BlobStates.begin()->second.GetWorstPredictedDelaysNs(
*Info, *Blackboard.GroupQueues, queueId,
&worstPredictedNs, &nextToWorstPredictedNs, &worstSubgroupIdx);
*Info, *Blackboard.GroupQueues, queueId, nthWorst, &worstDisks);
} else {
i32 worstOrderNumber = -1;
Blackboard.GetWorstPredictedDelaysNs(
*Info, *Blackboard.GroupQueues, queueId,
&worstPredictedNs, &nextToWorstPredictedNs, &worstOrderNumber);
*Info, *Blackboard.GroupQueues, queueId, nthWorst, &worstDisks);
}
return nextToWorstPredictedNs * 1;
return worstDisks[nthWorst].PredictedNs;
}

ui64 TGetImpl::GetTimeToAccelerateGetNs(TLogContext &logCtx) {
return GetTimeToAccelerateNs(logCtx, HandleClassToQueueId(Blackboard.GetHandleClass));
ui64 TGetImpl::GetTimeToAccelerateGetNs(TLogContext &logCtx, ui32 acceleratesSent) {
Y_DEBUG_ABORT_UNLESS(acceleratesSent < 2);
return GetTimeToAccelerateNs(logCtx, HandleClassToQueueId(Blackboard.GetHandleClass), 2 - acceleratesSent);
}

ui64 TGetImpl::GetTimeToAcceleratePutNs(TLogContext &logCtx) {
return GetTimeToAccelerateNs(logCtx, HandleClassToQueueId(Blackboard.PutHandleClass));
ui64 TGetImpl::GetTimeToAcceleratePutNs(TLogContext &logCtx, ui32 acceleratesSent) {
Y_DEBUG_ABORT_UNLESS(acceleratesSent < 2);
return GetTimeToAccelerateNs(logCtx, HandleClassToQueueId(Blackboard.PutHandleClass), 2 - acceleratesSent);
}

TString TGetImpl::DumpFullState() const {
Expand Down
14 changes: 7 additions & 7 deletions ydb/core/blobstorage/dsproxy/dsproxy_get_impl.h
Original file line number Diff line number Diff line change
Expand Up @@ -249,7 +249,7 @@ class TGetImpl {
void PrepareReply(NKikimrProto::EReplyStatus status, TString errorReason, TLogContext &logCtx,
TAutoPtr<TEvBlobStorage::TEvGetResult> &outGetResult);

void AccelerateGet(TLogContext &logCtx, i32 slowDiskOrderNumber,
void AccelerateGet(TLogContext &logCtx, ui32 slowDisksMask,
TDeque<std::unique_ptr<TEvBlobStorage::TEvVGet>> &outVGets,
TDeque<std::unique_ptr<TEvBlobStorage::TEvVPut>> &outVPuts) {
TAutoPtr<TEvBlobStorage::TEvGetResult> outGetResult;
Expand All @@ -259,7 +259,7 @@ class TGetImpl {
TStackVec<TBlobState::TDisk, TypicalDisksInSubring> &disks = it->second.Disks;
for (ui32 i = 0; i < disks.size(); ++i) {
TBlobState::TDisk &disk = disks[i];
disk.IsSlow = ((i32)disk.OrderNumber == slowDiskOrderNumber);
disk.IsSlow = slowDisksMask & (1 << disk.OrderNumber);
}
}
Blackboard.ChangeAll();
Expand All @@ -269,14 +269,14 @@ class TGetImpl {
RequestPrefix.data(), outGetResult->Print(false).c_str(), DumpFullState().c_str());
}

void AcceleratePut(TLogContext &logCtx, i32 slowDiskOrderNumber,
void AcceleratePut(TLogContext &logCtx, ui32 slowDisksMask,
TDeque<std::unique_ptr<TEvBlobStorage::TEvVGet>> &outVGets,
TDeque<std::unique_ptr<TEvBlobStorage::TEvVPut>> &outVPuts) {
AccelerateGet(logCtx, slowDiskOrderNumber, outVGets, outVPuts);
AccelerateGet(logCtx, slowDisksMask, outVGets, outVPuts);
}

ui64 GetTimeToAccelerateGetNs(TLogContext &logCtx);
ui64 GetTimeToAcceleratePutNs(TLogContext &logCtx);
ui64 GetTimeToAccelerateGetNs(TLogContext &logCtx, ui32 acceleratesSent);
ui64 GetTimeToAcceleratePutNs(TLogContext &logCtx, ui32 acceleratesSent);

TString DumpFullState() const;

Expand Down Expand Up @@ -313,7 +313,7 @@ class TGetImpl {
void PrepareVPuts(TLogContext &logCtx,
TDeque<std::unique_ptr<TEvBlobStorage::TEvVPut>> &outVPuts);

ui64 GetTimeToAccelerateNs(TLogContext &logCtx, NKikimrBlobStorage::EVDiskQueueId queueId);
ui64 GetTimeToAccelerateNs(TLogContext &logCtx, NKikimrBlobStorage::EVDiskQueueId queueId, ui32 nthWorst);
}; //TGetImpl

}//NKikimr
20 changes: 9 additions & 11 deletions ydb/core/blobstorage/dsproxy/dsproxy_put.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -68,8 +68,8 @@ class TBlobStorageGroupPutRequest : public TBlobStorageGroupRequestActor<TBlobSt

TDiskResponsivenessTracker::TPerDiskStatsPtr Stats;

bool IsAccelerated;
bool IsAccelerateScheduled;
ui32 AccelerateRequestsSent = 0;
bool IsAccelerateScheduled = false;

const bool IsMultiPutMode;

Expand Down Expand Up @@ -98,6 +98,7 @@ class TBlobStorageGroupPutRequest : public TBlobStorageGroupRequestActor<TBlobSt
}

void Handle(TEvAccelerate::TPtr &ev) {
IsAccelerateScheduled = false;
RootCauseTrack.OnAccelerate(ev->Get()->CauseIdx);
Accelerate();
SanityCheck(); // May Die
Expand Down Expand Up @@ -129,12 +130,13 @@ class TBlobStorageGroupPutRequest : public TBlobStorageGroupRequestActor<TBlobSt
}

void Accelerate() {
if (IsAccelerated) {
if (AccelerateRequestsSent == 2) {
return;
}
IsAccelerated = true;
++AccelerateRequestsSent;
Action(true);
// *(IsMultiPutMode ? Mon->NodeMon->AccelerateEvVMultiPutCount : Mon->NodeMon->AccelerateEvVPutCount) += v.size();
AccelerateIfNeeded();
}

void HandleIncarnation(TMonotonic timestamp, ui32 orderNumber, ui64 incarnationGuid) {
Expand Down Expand Up @@ -363,9 +365,9 @@ class TBlobStorageGroupPutRequest : public TBlobStorageGroupRequestActor<TBlobSt
}

void AccelerateIfNeeded() {
if (!IsAccelerateScheduled && !IsAccelerated) {
if (WaitingVDiskCount == 1 && RequestsSent > 1) {
ui64 timeToAccelerateUs = Max<ui64>(1, PutImpl.GetTimeToAccelerateNs(LogCtx) / 1000);
if (!IsAccelerateScheduled && AccelerateRequestsSent < 2) {
if (WaitingVDiskCount > 0 && WaitingVDiskCount <= 2 && RequestsSent > 1) {
ui64 timeToAccelerateUs = Max<ui64>(1, PutImpl.GetTimeToAccelerateNs(LogCtx, 2 - AccelerateRequestsSent) / 1000);
TDuration timeSinceStart = TActivationContext::Monotonic() - StartTime;
if (timeSinceStart.MicroSeconds() < timeToAccelerateUs) {
ui64 causeIdx = RootCauseTrack.RegisterAccelerate();
Expand Down Expand Up @@ -512,8 +514,6 @@ class TBlobStorageGroupPutRequest : public TBlobStorageGroupRequestActor<TBlobSt
, TimeStatsEnabled(timeStatsEnabled)
, Tactic(ev->Tactic)
, Stats(std::move(stats))
, IsAccelerated(false)
, IsAccelerateScheduled(false)
, IsMultiPutMode(false)
, IncarnationRecords(info->GetTotalVDisksNum())
, ExpiredVDiskSet(&info->GetTopology())
Expand Down Expand Up @@ -556,8 +556,6 @@ class TBlobStorageGroupPutRequest : public TBlobStorageGroupRequestActor<TBlobSt
, TimeStatsEnabled(timeStatsEnabled)
, Tactic(tactic)
, Stats(std::move(stats))
, IsAccelerated(false)
, IsAccelerateScheduled(false)
, IsMultiPutMode(true)
, IncarnationRecords(info->GetTotalVDisksNum())
, ExpiredVDiskSet(&info->GetTopology())
Expand Down
Loading

0 comments on commit ba1c745

Please sign in to comment.