diff --git a/ydb/core/blobstorage/dsproxy/dsproxy.h b/ydb/core/blobstorage/dsproxy/dsproxy.h index 0b74ec7c03fe..c020b45e118a 100644 --- a/ydb/core/blobstorage/dsproxy/dsproxy.h +++ b/ydb/core/blobstorage/dsproxy/dsproxy.h @@ -61,6 +61,16 @@ constexpr bool WithMovingPatchRequestToStaticNode = true; // Common types //////////////////////////////////////////////////////////////////////////////////////////////////////////////////////// +struct TDiskDelayPrediction { + ui64 PredictedNs; + ui32 DiskIdx; + + bool operator<(const TDiskDelayPrediction& other) const { + return PredictedNs < other.PredictedNs; + } +}; + +using TDiskDelayPredictions = TStackVec; struct TEvDeathNote : public TEventLocal { TStackVec, 16> Responsiveness; diff --git a/ydb/core/blobstorage/dsproxy/dsproxy_blackboard.cpp b/ydb/core/blobstorage/dsproxy/dsproxy_blackboard.cpp index 6df5e4ce8698..397a2cbd3de3 100644 --- a/ydb/core/blobstorage/dsproxy/dsproxy_blackboard.cpp +++ b/ydb/core/blobstorage/dsproxy/dsproxy_blackboard.cpp @@ -178,21 +178,12 @@ ui64 TBlobState::GetPredictedDelayNs(const TBlobStorageGroupInfo &info, TGroupQu } void TBlobState::GetWorstPredictedDelaysNs(const TBlobStorageGroupInfo &info, TGroupQueues &groupQueues, - NKikimrBlobStorage::EVDiskQueueId queueId, - ui64 *outWorstNs, ui64 *outNextToWorstNs, i32 *outWorstSubgroupIdx) const { - *outWorstSubgroupIdx = -1; - *outWorstNs = 0; - *outNextToWorstNs = 0; + NKikimrBlobStorage::EVDiskQueueId queueId, ui32 nWorst, TDiskDelayPredictions *outNWorst) const { + outNWorst->resize(Disks.size()); for (ui32 diskIdx = 0; diskIdx < Disks.size(); ++diskIdx) { - ui64 predictedNs = GetPredictedDelayNs(info, groupQueues, diskIdx, queueId); - if (predictedNs > *outWorstNs) { - *outNextToWorstNs = *outWorstNs; - *outWorstNs = predictedNs; - *outWorstSubgroupIdx = diskIdx; - } else if (predictedNs > *outNextToWorstNs) { - *outNextToWorstNs = predictedNs; - } + (*outNWorst)[diskIdx] = { GetPredictedDelayNs(info, groupQueues, diskIdx, queueId), diskIdx }; } + std::partial_sort(outNWorst->begin(), outNWorst->begin() + std::min(nWorst, (ui32)Disks.size()), outNWorst->end()); } bool TBlobState::HasWrittenQuorum(const TBlobStorageGroupInfo& info, const TBlobStorageGroupInfo::TGroupVDisks& expired) const { @@ -467,22 +458,13 @@ void TBlackboard::ReportPartMapStatus(const TLogoBlobID &id, ssize_t partMapInde } void TBlackboard::GetWorstPredictedDelaysNs(const TBlobStorageGroupInfo &info, TGroupQueues &groupQueues, - NKikimrBlobStorage::EVDiskQueueId queueId, - ui64 *outWorstNs, ui64 *outNextToWorstNs, i32 *outWorstOrderNumber) const { - *outWorstOrderNumber = -1; - *outWorstNs = 0; - *outNextToWorstNs = 0; + NKikimrBlobStorage::EVDiskQueueId queueId, ui32 nWorst, TDiskDelayPredictions *outNWorst) const { ui32 totalVDisks = info.GetTotalVDisksNum(); + outNWorst->resize(totalVDisks); for (ui32 orderNumber = 0; orderNumber < totalVDisks; ++orderNumber) { - ui64 predictedNs = groupQueues.GetPredictedDelayNsByOrderNumber(orderNumber, queueId); - if (predictedNs > *outWorstNs) { - *outNextToWorstNs = *outWorstNs; - *outWorstNs = predictedNs; - *outWorstOrderNumber = orderNumber; - } else if (predictedNs > *outNextToWorstNs) { - *outNextToWorstNs = predictedNs; - } + (*outNWorst)[orderNumber] = { groupQueues.GetPredictedDelayNsByOrderNumber(orderNumber, queueId), orderNumber }; } + std::partial_sort(outNWorst->begin(), outNWorst->begin() + std::min(nWorst, totalVDisks), outNWorst->end()); } void TBlackboard::RegisterBlobForPut(const TLogoBlobID& id, size_t blobIdx) { diff --git a/ydb/core/blobstorage/dsproxy/dsproxy_blackboard.h b/ydb/core/blobstorage/dsproxy/dsproxy_blackboard.h index 84dc4780c9a5..54d53302cdf5 100644 --- a/ydb/core/blobstorage/dsproxy/dsproxy_blackboard.h +++ b/ydb/core/blobstorage/dsproxy/dsproxy_blackboard.h @@ -97,8 +97,7 @@ struct TBlobState { ui64 GetPredictedDelayNs(const TBlobStorageGroupInfo &info, TGroupQueues &groupQueues, ui32 diskIdxInSubring, NKikimrBlobStorage::EVDiskQueueId queueId) const; void GetWorstPredictedDelaysNs(const TBlobStorageGroupInfo &info, TGroupQueues &groupQueues, - NKikimrBlobStorage::EVDiskQueueId queueId, - ui64 *outWorstNs, ui64 *outNextToWorstNs, i32 *outWorstSubgroupIdx) const; + NKikimrBlobStorage::EVDiskQueueId queueId, ui32 nWorst, TDiskDelayPredictions *outNWorst) const; TString ToString() const; bool HasWrittenQuorum(const TBlobStorageGroupInfo& info, const TBlobStorageGroupInfo::TGroupVDisks& expired) const; @@ -209,8 +208,7 @@ struct TBlackboard { ssize_t AddPartMap(const TLogoBlobID &id, ui32 diskOrderNumber, ui32 requestIndex); void ReportPartMapStatus(const TLogoBlobID &id, ssize_t partMapIndex, ui32 responseIndex, NKikimrProto::EReplyStatus status); void GetWorstPredictedDelaysNs(const TBlobStorageGroupInfo &info, TGroupQueues &groupQueues, - NKikimrBlobStorage::EVDiskQueueId queueId, - ui64 *outWorstNs, ui64 *outNextToWorstNs, i32 *outWorstOrderNumber) const; + NKikimrBlobStorage::EVDiskQueueId queueId, ui32 nWorst, TDiskDelayPredictions *outNWorst) const; TString ToString() const; void ChangeAll() { diff --git a/ydb/core/blobstorage/dsproxy/dsproxy_get.cpp b/ydb/core/blobstorage/dsproxy/dsproxy_get.cpp index dd6134fe47b7..ee708be52344 100644 --- a/ydb/core/blobstorage/dsproxy/dsproxy_get.cpp +++ b/ydb/core/blobstorage/dsproxy/dsproxy_get.cpp @@ -38,8 +38,9 @@ class TBlobStorageGroupGetRequest : public TBlobStorageGroupRequestActor DiskCounters; - bool IsGetAccelerated = false; + ui32 GetsAccelerated = 0; bool IsGetAccelerateScheduled = false; - bool IsPutAccelerated = false; + ui32 PutsAccelerated = 0; bool IsPutAccelerateScheduled = false; void Handle(TEvAccelerateGet::TPtr &ev) { + IsGetAccelerateScheduled = false; RootCauseTrack.OnAccelerate(ev->Get()->CauseIdx); AccelerateGet(); } void Handle(TEvAcceleratePut::TPtr &ev) { + IsPutAccelerateScheduled = false; RootCauseTrack.OnAccelerate(ev->Get()->CauseIdx); AcceleratePut(); } void AccelerateGet() { - if (IsGetAccelerated) { + if (GetsAccelerated == 2) { return; } - IsGetAccelerated = true; + GetsAccelerated++; TDeque> vGets; TDeque> vPuts; - GetImpl.AccelerateGet(LogCtx, GetUnresponsiveDiskOrderNumber(), vGets, vPuts); + GetImpl.AccelerateGet(LogCtx, GetUnresponsiveDisksMask(), vGets, vPuts); *Mon->NodeMon->AccelerateEvVPutCount += vPuts.size(); *Mon->NodeMon->AccelerateEvVGetCount += vGets.size(); SendVGetsAndVPuts(vGets, vPuts); + TryScheduleGetAcceleration(); } void AcceleratePut() { - if (IsPutAccelerated) { + if (PutsAccelerated == 2) { return; } - IsPutAccelerated = true; + PutsAccelerated++; TDeque> vGets; TDeque> vPuts; - GetImpl.AcceleratePut(LogCtx, GetUnresponsiveDiskOrderNumber(), vGets, vPuts); + GetImpl.AcceleratePut(LogCtx, GetUnresponsiveDisksMask(), vGets, vPuts); *Mon->NodeMon->AccelerateEvVPutCount += vPuts.size(); *Mon->NodeMon->AccelerateEvVGetCount += vGets.size(); SendVGetsAndVPuts(vGets, vPuts); + TrySchedulePutAcceleration(); } void SendVGetsAndVPuts(TDeque> &vGets, TDeque> &vPuts) { ReportBytes(GetImpl.GrabBytesToReport()); - RequestsSent += vGets.size(); - RequestsSent += vPuts.size(); + RequestsSent += vGets.size() + vPuts.size(); CountPuts(vPuts); if (vPuts.size()) { if (!IsPutStarted) { @@ -146,14 +150,14 @@ class TBlobStorageGroupGetRequest : public TBlobStorageGroupRequestActor ResponsesReceived, "RequestsSent# %" PRIu64 " ResponsesReceived# %" PRIu64, ui64(RequestsSent), ui64(ResponsesReceived)); @@ -307,10 +312,10 @@ class TBlobStorageGroupGetRequest : public TBlobStorageGroupRequestActor StartTime) ? (now - StartTime) : TDuration::MilliSeconds(0); if (timeSinceStart.MicroSeconds() < timeToAccelerateUs) { @@ -326,10 +331,10 @@ class TBlobStorageGroupGetRequest : public TBlobStorageGroupRequestActor StartTimePut) ? (now - StartTimePut) : TDuration::MilliSeconds(0); if (timeSinceStart.MicroSeconds() < timeToAccelerateUs) { @@ -401,8 +406,7 @@ class TBlobStorageGroupGetRequest : public TBlobStorageGroupRequestActorDeadline) , StartTime(now) , StartTimePut(StartTime) - , RequestsSent(0) - , ResponsesReceived(0) + , GroupSize(info->Type.BlobSubgroupSize()) , ReportedBytes(0) { ReportBytes(sizeof(*this)); diff --git a/ydb/core/blobstorage/dsproxy/dsproxy_get_impl.cpp b/ydb/core/blobstorage/dsproxy/dsproxy_get_impl.cpp index 87cdf96a2bf9..dba55f7d112c 100644 --- a/ydb/core/blobstorage/dsproxy/dsproxy_get_impl.cpp +++ b/ydb/core/blobstorage/dsproxy/dsproxy_get_impl.cpp @@ -159,31 +159,28 @@ void TGetImpl::PrepareReply(NKikimrProto::EReplyStatus status, TString errorReas } -ui64 TGetImpl::GetTimeToAccelerateNs(TLogContext &logCtx, NKikimrBlobStorage::EVDiskQueueId queueId) { +ui64 TGetImpl::GetTimeToAccelerateNs(TLogContext &logCtx, NKikimrBlobStorage::EVDiskQueueId queueId, ui32 nthWorst) { Y_UNUSED(logCtx); // Find the slowest disk - ui64 worstPredictedNs = 0; - ui64 nextToWorstPredictedNs = 0; + TDiskDelayPredictions worstDisks; if (Blackboard.BlobStates.size() == 1) { - i32 worstSubgroupIdx = -1; Blackboard.BlobStates.begin()->second.GetWorstPredictedDelaysNs( - *Info, *Blackboard.GroupQueues, queueId, - &worstPredictedNs, &nextToWorstPredictedNs, &worstSubgroupIdx); + *Info, *Blackboard.GroupQueues, queueId, nthWorst, &worstDisks); } else { - i32 worstOrderNumber = -1; Blackboard.GetWorstPredictedDelaysNs( - *Info, *Blackboard.GroupQueues, queueId, - &worstPredictedNs, &nextToWorstPredictedNs, &worstOrderNumber); + *Info, *Blackboard.GroupQueues, queueId, nthWorst, &worstDisks); } - return nextToWorstPredictedNs * 1; + return worstDisks[nthWorst].PredictedNs; } -ui64 TGetImpl::GetTimeToAccelerateGetNs(TLogContext &logCtx) { - return GetTimeToAccelerateNs(logCtx, HandleClassToQueueId(Blackboard.GetHandleClass)); +ui64 TGetImpl::GetTimeToAccelerateGetNs(TLogContext &logCtx, ui32 acceleratesSent) { + Y_DEBUG_ABORT_UNLESS(acceleratesSent < 2); + return GetTimeToAccelerateNs(logCtx, HandleClassToQueueId(Blackboard.GetHandleClass), 2 - acceleratesSent); } -ui64 TGetImpl::GetTimeToAcceleratePutNs(TLogContext &logCtx) { - return GetTimeToAccelerateNs(logCtx, HandleClassToQueueId(Blackboard.PutHandleClass)); +ui64 TGetImpl::GetTimeToAcceleratePutNs(TLogContext &logCtx, ui32 acceleratesSent) { + Y_DEBUG_ABORT_UNLESS(acceleratesSent < 2); + return GetTimeToAccelerateNs(logCtx, HandleClassToQueueId(Blackboard.PutHandleClass), 2 - acceleratesSent); } TString TGetImpl::DumpFullState() const { diff --git a/ydb/core/blobstorage/dsproxy/dsproxy_get_impl.h b/ydb/core/blobstorage/dsproxy/dsproxy_get_impl.h index 13cb325251cd..2b7623c63f93 100644 --- a/ydb/core/blobstorage/dsproxy/dsproxy_get_impl.h +++ b/ydb/core/blobstorage/dsproxy/dsproxy_get_impl.h @@ -249,7 +249,7 @@ class TGetImpl { void PrepareReply(NKikimrProto::EReplyStatus status, TString errorReason, TLogContext &logCtx, TAutoPtr &outGetResult); - void AccelerateGet(TLogContext &logCtx, i32 slowDiskOrderNumber, + void AccelerateGet(TLogContext &logCtx, ui32 slowDisksMask, TDeque> &outVGets, TDeque> &outVPuts) { TAutoPtr outGetResult; @@ -259,7 +259,7 @@ class TGetImpl { TStackVec &disks = it->second.Disks; for (ui32 i = 0; i < disks.size(); ++i) { TBlobState::TDisk &disk = disks[i]; - disk.IsSlow = ((i32)disk.OrderNumber == slowDiskOrderNumber); + disk.IsSlow = slowDisksMask & (1 << disk.OrderNumber); } } Blackboard.ChangeAll(); @@ -269,14 +269,14 @@ class TGetImpl { RequestPrefix.data(), outGetResult->Print(false).c_str(), DumpFullState().c_str()); } - void AcceleratePut(TLogContext &logCtx, i32 slowDiskOrderNumber, + void AcceleratePut(TLogContext &logCtx, ui32 slowDisksMask, TDeque> &outVGets, TDeque> &outVPuts) { - AccelerateGet(logCtx, slowDiskOrderNumber, outVGets, outVPuts); + AccelerateGet(logCtx, slowDisksMask, outVGets, outVPuts); } - ui64 GetTimeToAccelerateGetNs(TLogContext &logCtx); - ui64 GetTimeToAcceleratePutNs(TLogContext &logCtx); + ui64 GetTimeToAccelerateGetNs(TLogContext &logCtx, ui32 acceleratesSent); + ui64 GetTimeToAcceleratePutNs(TLogContext &logCtx, ui32 acceleratesSent); TString DumpFullState() const; @@ -313,7 +313,7 @@ class TGetImpl { void PrepareVPuts(TLogContext &logCtx, TDeque> &outVPuts); - ui64 GetTimeToAccelerateNs(TLogContext &logCtx, NKikimrBlobStorage::EVDiskQueueId queueId); + ui64 GetTimeToAccelerateNs(TLogContext &logCtx, NKikimrBlobStorage::EVDiskQueueId queueId, ui32 nthWorst); }; //TGetImpl }//NKikimr diff --git a/ydb/core/blobstorage/dsproxy/dsproxy_put.cpp b/ydb/core/blobstorage/dsproxy/dsproxy_put.cpp index 6e7cdd31419a..0c8202cf4253 100644 --- a/ydb/core/blobstorage/dsproxy/dsproxy_put.cpp +++ b/ydb/core/blobstorage/dsproxy/dsproxy_put.cpp @@ -68,8 +68,8 @@ class TBlobStorageGroupPutRequest : public TBlobStorageGroupRequestActorGet()->CauseIdx); Accelerate(); SanityCheck(); // May Die @@ -129,12 +130,13 @@ class TBlobStorageGroupPutRequest : public TBlobStorageGroupRequestActorNodeMon->AccelerateEvVMultiPutCount : Mon->NodeMon->AccelerateEvVPutCount) += v.size(); + AccelerateIfNeeded(); } void HandleIncarnation(TMonotonic timestamp, ui32 orderNumber, ui64 incarnationGuid) { @@ -363,9 +365,9 @@ class TBlobStorageGroupPutRequest : public TBlobStorageGroupRequestActor 1) { - ui64 timeToAccelerateUs = Max(1, PutImpl.GetTimeToAccelerateNs(LogCtx) / 1000); + if (!IsAccelerateScheduled && AccelerateRequestsSent < 2) { + if (WaitingVDiskCount > 0 && WaitingVDiskCount <= 2 && RequestsSent > 1) { + ui64 timeToAccelerateUs = Max(1, PutImpl.GetTimeToAccelerateNs(LogCtx, 2 - AccelerateRequestsSent) / 1000); TDuration timeSinceStart = TActivationContext::Monotonic() - StartTime; if (timeSinceStart.MicroSeconds() < timeToAccelerateUs) { ui64 causeIdx = RootCauseTrack.RegisterAccelerate(); @@ -512,8 +514,6 @@ class TBlobStorageGroupPutRequest : public TBlobStorageGroupRequestActorTactic) , Stats(std::move(stats)) - , IsAccelerated(false) - , IsAccelerateScheduled(false) , IsMultiPutMode(false) , IncarnationRecords(info->GetTotalVDisksNum()) , ExpiredVDiskSet(&info->GetTopology()) @@ -556,8 +556,6 @@ class TBlobStorageGroupPutRequest : public TBlobStorageGroupRequestActorGetTotalVDisksNum()) , ExpiredVDiskSet(&info->GetTopology()) diff --git a/ydb/core/blobstorage/dsproxy/dsproxy_put_impl.cpp b/ydb/core/blobstorage/dsproxy/dsproxy_put_impl.cpp index 3049ab1a039b..fbf6d5c52cab 100644 --- a/ydb/core/blobstorage/dsproxy/dsproxy_put_impl.cpp +++ b/ydb/core/blobstorage/dsproxy/dsproxy_put_impl.cpp @@ -82,20 +82,19 @@ void TPutImpl::PrepareReply(NKikimrProto::EReplyStatus status, TLogContext &logC } } -ui64 TPutImpl::GetTimeToAccelerateNs(TLogContext &logCtx) { +ui64 TPutImpl::GetTimeToAccelerateNs(TLogContext &logCtx, ui32 nthWorst) { Y_UNUSED(logCtx); Y_ABORT_UNLESS(!Blackboard.BlobStates.empty()); - TBatchedVec nextToWorstPredictedNsVec(Blackboard.BlobStates.size()); + TBatchedVec nthWorstPredictedNsVec(Blackboard.BlobStates.size()); ui64 idx = 0; for (auto &[_, state] : Blackboard.BlobStates) { - // Find the slowest disk - i32 worstSubgroupIdx = -1; - ui64 worstPredictedNs = 0; - state.GetWorstPredictedDelaysNs(*Info, *Blackboard.GroupQueues, HandleClassToQueueId(Blackboard.PutHandleClass), - &worstPredictedNs, &nextToWorstPredictedNsVec[idx], &worstSubgroupIdx); - idx++; + // Find the n'th slowest disk + TDiskDelayPredictions worstDisks; + state.GetWorstPredictedDelaysNs(*Info, *Blackboard.GroupQueues, HandleClassToQueueId(Blackboard.PutHandleClass), nthWorst, + &worstDisks); + nthWorstPredictedNsVec[idx++] = worstDisks[nthWorst].PredictedNs; } - return *MaxElement(nextToWorstPredictedNsVec.begin(), nextToWorstPredictedNsVec.end()); + return *MaxElement(nthWorstPredictedNsVec.begin(), nthWorstPredictedNsVec.end()); } TString TPutImpl::DumpFullState() const { diff --git a/ydb/core/blobstorage/dsproxy/dsproxy_put_impl.h b/ydb/core/blobstorage/dsproxy/dsproxy_put_impl.h index b15b864837aa..14887a6c9817 100644 --- a/ydb/core/blobstorage/dsproxy/dsproxy_put_impl.h +++ b/ydb/core/blobstorage/dsproxy/dsproxy_put_impl.h @@ -187,7 +187,7 @@ class TPutImpl { void PrepareOneReply(NKikimrProto::EReplyStatus status, size_t blobIdx, TLogContext &logCtx, TString errorReason, TPutResultVec &outPutResults); - ui64 GetTimeToAccelerateNs(TLogContext &logCtx); + ui64 GetTimeToAccelerateNs(TLogContext &logCtx, ui32 nthWorst); TString DumpFullState() const; @@ -207,7 +207,6 @@ class TPutImpl { TDeque GeneratePutRequests() { TDeque events; - // Group put requests together by VDiskID. std::unordered_multimap puts; for (auto& put : Blackboard.GroupDiskRequests.PutsPending) { diff --git a/ydb/core/blobstorage/dsproxy/dsproxy_strategy_accelerate_put.h b/ydb/core/blobstorage/dsproxy/dsproxy_strategy_accelerate_put.h index fa71410a08c2..3b40f5374a4c 100644 --- a/ydb/core/blobstorage/dsproxy/dsproxy_strategy_accelerate_put.h +++ b/ydb/core/blobstorage/dsproxy/dsproxy_strategy_accelerate_put.h @@ -13,26 +13,27 @@ class TAcceleratePutStrategy : public TStrategyBase { EStrategyOutcome Process(TLogContext &logCtx, TBlobState &state, const TBlobStorageGroupInfo &info, TBlackboard& /*blackboard*/, TGroupDiskRequests &groupDiskRequests) override { // Find the unput part and disk - i32 badDiskIdx = -1; + TStackVec badDiskIdxs; for (size_t diskIdx = 0; diskIdx < state.Disks.size(); ++diskIdx) { TBlobState::TDisk &disk = state.Disks[diskIdx]; for (size_t partIdx = 0; partIdx < disk.DiskParts.size(); ++partIdx) { TBlobState::TDiskPart &diskPart = disk.DiskParts[partIdx]; if (diskPart.Situation == TBlobState::ESituation::Sent) { - badDiskIdx = diskIdx; + badDiskIdxs.push_back(diskIdx); } } } - if (badDiskIdx >= 0) { - // Mark the disk 'bad' + if (!badDiskIdxs.empty()) { + // Mark the corresponding disks 'bad' // Prepare part layout if possible TBlobStorageGroupType::TPartLayout layout; - PreparePartLayout(state, info, &layout, badDiskIdx); + PreparePartLayout(state, info, &layout, badDiskIdxs); TBlobStorageGroupType::TPartPlacement partPlacement; bool isCorrectable = info.Type.CorrectLayout(layout, partPlacement); - if (isCorrectable && IsPutNeeded(state, partPlacement)) { + bool isPutNeeded = IsPutNeeded(state, partPlacement); + if (isCorrectable && isPutNeeded) { PreparePutsForPartPlacement(logCtx, state, info, groupDiskRequests, partPlacement); } } diff --git a/ydb/core/blobstorage/dsproxy/dsproxy_strategy_base.cpp b/ydb/core/blobstorage/dsproxy/dsproxy_strategy_base.cpp index bd3b1c73bbaa..1d4bf102da19 100644 --- a/ydb/core/blobstorage/dsproxy/dsproxy_strategy_base.cpp +++ b/ydb/core/blobstorage/dsproxy/dsproxy_strategy_base.cpp @@ -194,7 +194,7 @@ void TStrategyBase::AddGetRequest(TLogContext &logCtx, TGroupDiskRequests &group } void TStrategyBase::PreparePartLayout(const TBlobState &state, const TBlobStorageGroupInfo &info, - TBlobStorageGroupType::TPartLayout *layout, ui32 slowDiskIdx) { + TBlobStorageGroupType::TPartLayout *layout, const TStackVec& slowDiskIdxs) { Y_ABORT_UNLESS(layout); const ui32 totalPartCount = info.Type.TotalPartCount(); const ui32 blobSubringSize = info.Type.BlobSubgroupSize(); @@ -216,26 +216,29 @@ void TStrategyBase::PreparePartLayout(const TBlobState &state, const TBlobStorag if (!isErrorDisk) { for (ui32 partIdx = beginPartIdx; partIdx < endPartIdx; ++partIdx) { TBlobState::ESituation partSituation = disk.DiskParts[partIdx].Situation; + bool isOnSlowDisk = (std::find(slowDiskIdxs.begin(), slowDiskIdxs.end(), diskIdx) != slowDiskIdxs.end()); if (partSituation == TBlobState::ESituation::Present || - (diskIdx != slowDiskIdx && partSituation == TBlobState::ESituation::Sent)) { + (!isOnSlowDisk && partSituation == TBlobState::ESituation::Sent)) { layout->VDiskPartMask[diskIdx] |= (1ul << partIdx); } layout->VDiskMask |= (1ul << diskIdx); } } } - if (slowDiskIdx == InvalidVDiskIdx) { + if (slowDiskIdxs.empty()) { layout->SlowVDiskMask = 0; } else { - Y_DEBUG_ABORT_UNLESS(slowDiskIdx < sizeof(layout->SlowVDiskMask) * 8); - layout->SlowVDiskMask = (1ull << slowDiskIdx); + layout->SlowVDiskMask = 0; + for (ui32 slowDiskIdx : slowDiskIdxs) { + Y_DEBUG_ABORT_UNLESS(slowDiskIdx < sizeof(layout->SlowVDiskMask) * 8); + layout->SlowVDiskMask |= (1ull << slowDiskIdx); + } } } bool TStrategyBase::IsPutNeeded(const TBlobState &state, const TBlobStorageGroupType::TPartPlacement &partPlacement) { bool isNeeded = false; - for (ui32 i = 0; i < partPlacement.Records.size(); ++i) { - const TBlobStorageGroupType::TPartPlacement::TVDiskPart& record = partPlacement.Records[i]; + for (const TBlobStorageGroupType::TPartPlacement::TVDiskPart& record : partPlacement.Records) { const TBlobState::TDisk &disk = state.Disks[record.VDiskIdx]; TBlobState::ESituation partSituation = disk.DiskParts[record.PartIdx].Situation; switch (partSituation) { @@ -385,45 +388,44 @@ void TStrategyBase::Prepare3dcPartPlacement(const TBlobState &state, } } -i32 TStrategyBase::MarkSlowSubgroupDisk(TBlobState &state, const TBlobStorageGroupInfo &info, TBlackboard &blackboard, +ui32 TStrategyBase::MakeSlowSubgroupDiskMask(TBlobState &state, const TBlobStorageGroupInfo &info, TBlackboard &blackboard, bool isPut) { // Find the slowest disk switch (blackboard.AccelerationMode) { case TBlackboard::AccelerationModeSkipOneSlowest: { - i32 worstSubgroupIdx = -1; - ui64 worstPredictedNs = 0; - ui64 nextToWorstPredictedNs = 0; + TDiskDelayPredictions worstDisks; state.GetWorstPredictedDelaysNs(info, *blackboard.GroupQueues, (isPut ? HandleClassToQueueId(blackboard.PutHandleClass) : - HandleClassToQueueId(blackboard.GetHandleClass)), - &worstPredictedNs, &nextToWorstPredictedNs, &worstSubgroupIdx); + HandleClassToQueueId(blackboard.GetHandleClass)), 1, + &worstDisks); // Check if the slowest disk exceptionally slow, or just not very fast - i32 slowDiskSubgroupIdx = -1; - if (nextToWorstPredictedNs > 0 && worstPredictedNs > nextToWorstPredictedNs * 2) { - slowDiskSubgroupIdx = worstSubgroupIdx; + ui32 slowDiskSubgroupMask = 0; + if (worstDisks[1].PredictedNs > 0 && worstDisks[0].PredictedNs > worstDisks[1].PredictedNs * 2) { + slowDiskSubgroupMask = 1 << worstDisks[0].DiskIdx; } // Mark single slow disk for (size_t diskIdx = 0; diskIdx < state.Disks.size(); ++diskIdx) { state.Disks[diskIdx].IsSlow = false; } - if (slowDiskSubgroupIdx >= 0) { - state.Disks[slowDiskSubgroupIdx].IsSlow = true; + if (slowDiskSubgroupMask > 0) { + state.Disks[worstDisks[0].DiskIdx].IsSlow = true; } - return slowDiskSubgroupIdx; + return slowDiskSubgroupMask; } case TBlackboard::AccelerationModeSkipMarked: { + ui32 slowDiskSubgroupMask = 0; for (size_t diskIdx = 0; diskIdx < state.Disks.size(); ++diskIdx) { if (state.Disks[diskIdx].IsSlow) { - return diskIdx; + slowDiskSubgroupMask |= 1 << diskIdx; } } - return -1; + return slowDiskSubgroupMask; } } - return -1; + return 0; } }//NKikimr diff --git a/ydb/core/blobstorage/dsproxy/dsproxy_strategy_base.h b/ydb/core/blobstorage/dsproxy/dsproxy_strategy_base.h index 929aeb96d0f3..67da2e3fef3e 100644 --- a/ydb/core/blobstorage/dsproxy/dsproxy_strategy_base.h +++ b/ydb/core/blobstorage/dsproxy/dsproxy_strategy_base.h @@ -34,7 +34,7 @@ class TStrategyBase : public IStrategy { void AddGetRequest(TLogContext &logCtx, TGroupDiskRequests &groupDiskRequests, TLogoBlobID &fullId, ui32 partIdx, TBlobState::TDisk &disk, TIntervalSet &intervalSet, const char *logMarker); void PreparePartLayout(const TBlobState &state, const TBlobStorageGroupInfo &info, - TBlobStorageGroupType::TPartLayout *layout, ui32 slowDiskIdx); + TBlobStorageGroupType::TPartLayout *layout, const TStackVec& slowDiskIdxs); bool IsPutNeeded(const TBlobState &state, const TBlobStorageGroupType::TPartPlacement &partPlacement); void PreparePutsForPartPlacement(TLogContext &logCtx, TBlobState &state, const TBlobStorageGroupInfo &info, TGroupDiskRequests &groupDiskRequests, @@ -52,7 +52,7 @@ class TStrategyBase : public IStrategy { TBlobStorageGroupType::TPartPlacement &outPartPlacement); // Sets IsSlow for the slow disk, resets for other disks. // Returns -1 if there is no slow disk, or subgroupIdx of the slow disk. - i32 MarkSlowSubgroupDisk(TBlobState &state, const TBlobStorageGroupInfo &info, TBlackboard &blackboard, bool isPut); + ui32 MakeSlowSubgroupDiskMask(TBlobState &state, const TBlobStorageGroupInfo &info, TBlackboard &blackboard, bool isPut); }; diff --git a/ydb/core/blobstorage/dsproxy/dsproxy_strategy_get_m3dc_basic.h b/ydb/core/blobstorage/dsproxy/dsproxy_strategy_get_m3dc_basic.h index 2e4ffac74be9..b6b8798107b2 100644 --- a/ydb/core/blobstorage/dsproxy/dsproxy_strategy_get_m3dc_basic.h +++ b/ydb/core/blobstorage/dsproxy/dsproxy_strategy_get_m3dc_basic.h @@ -73,17 +73,15 @@ namespace NKikimr { // find the slowest disk and mark it switch (blackboard.AccelerationMode) { case TBlackboard::AccelerationModeSkipOneSlowest: { - i32 worstSubgroupIdx = -1; - ui64 worstPredictedNs = 0; - ui64 nextToWorstPredictedNs = 0; + TDiskDelayPredictions worstDisks; state.GetWorstPredictedDelaysNs(info, *blackboard.GroupQueues, - HandleClassToQueueId(blackboard.GetHandleClass), - &worstPredictedNs, &nextToWorstPredictedNs, &worstSubgroupIdx); + HandleClassToQueueId(blackboard.GetHandleClass), 1, + &worstDisks); // Check if the slowest disk exceptionally slow, or just not very fast i32 slowDiskSubgroupIdx = -1; - if (nextToWorstPredictedNs > 0 && worstPredictedNs > nextToWorstPredictedNs * 2) { - slowDiskSubgroupIdx = worstSubgroupIdx; + if (worstDisks[1].PredictedNs > 0 && worstDisks[0].PredictedNs > worstDisks[1].PredictedNs * 2) { + slowDiskSubgroupIdx = worstDisks[1].DiskIdx; } // Mark single slow disk diff --git a/ydb/core/blobstorage/dsproxy/dsproxy_strategy_get_min_iops_block.h b/ydb/core/blobstorage/dsproxy/dsproxy_strategy_get_min_iops_block.h index e98d931865e3..5829bd7f35b6 100644 --- a/ydb/core/blobstorage/dsproxy/dsproxy_strategy_get_min_iops_block.h +++ b/ydb/core/blobstorage/dsproxy/dsproxy_strategy_get_min_iops_block.h @@ -311,8 +311,8 @@ class TMinIopsBlockStrategy : public TStrategyBase { // Try excluding the slow disk bool isDone = false; // TODO: Mark disk that does not answer when accelerating requests - i32 slowDiskSubgroupIdx = MarkSlowSubgroupDisk(state, info, blackboard, false); - if (slowDiskSubgroupIdx >= 0) { + ui32 slowDiskSubgroupMask = MakeSlowSubgroupDiskMask(state, info, blackboard, false); + if (slowDiskSubgroupMask >= 0) { TBlobStorageGroupInfo::EBlobState fastPessimisticState = TBlobStorageGroupInfo::EBS_DISINTEGRATED; TBlobStorageGroupInfo::EBlobState fastOptimisticState = TBlobStorageGroupInfo::EBS_DISINTEGRATED; TBlobStorageGroupInfo::EBlobState fastAltruisticState = TBlobStorageGroupInfo::EBS_DISINTEGRATED; diff --git a/ydb/core/blobstorage/dsproxy/dsproxy_strategy_put_m3dc.h b/ydb/core/blobstorage/dsproxy/dsproxy_strategy_put_m3dc.h index 77cc75296057..ed07074072fe 100644 --- a/ydb/core/blobstorage/dsproxy/dsproxy_strategy_put_m3dc.h +++ b/ydb/core/blobstorage/dsproxy/dsproxy_strategy_put_m3dc.h @@ -35,15 +35,16 @@ class TPut3dcStrategy : public TStrategyBase { TBlobStorageGroupType::TPartPlacement partPlacement; bool degraded = false; bool isDone = false; - i32 slowDiskSubgroupIdx = MarkSlowSubgroupDisk(state, info, blackboard, true); + ui32 slowDiskSubgroupMask = MakeSlowSubgroupDiskMask(state, info, blackboard, true); do { - if (slowDiskSubgroupIdx < 0) { + if (slowDiskSubgroupMask == 0) { break; // ignore this case } TBlobStorageGroupInfo::TSubgroupVDisks success(&info.GetTopology()); TBlobStorageGroupInfo::TSubgroupVDisks error(&info.GetTopology()); Evaluate3dcSituation(state, NumFailRealms, NumFailDomainsPerFailRealm, info, true, success, error, degraded); - TBlobStorageGroupInfo::TSubgroupVDisks slow(&info.GetTopology(), slowDiskSubgroupIdx); + TBlobStorageGroupInfo::TSubgroupVDisks slow = TBlobStorageGroupInfo::TSubgroupVDisks::CreateFromMask( + &info.GetTopology(), slowDiskSubgroupMask); if ((success | error) & slow) { break; // slow disk is already marked as successful or erroneous } diff --git a/ydb/core/blobstorage/dsproxy/dsproxy_strategy_restore.h b/ydb/core/blobstorage/dsproxy/dsproxy_strategy_restore.h index 04d4c78f8aa3..d46cee639fd1 100644 --- a/ydb/core/blobstorage/dsproxy/dsproxy_strategy_restore.h +++ b/ydb/core/blobstorage/dsproxy/dsproxy_strategy_restore.h @@ -126,24 +126,22 @@ class TRestoreStrategy : public TStrategyBase { } // Find the slowest disk - i32 worstSubgroupIdx = -1; - ui64 worstPredictedNs = 0; - ui64 nextToWorstPredictedNs = 0; + TDiskDelayPredictions worstDisks; state.GetWorstPredictedDelaysNs(info, *blackboard.GroupQueues, - HandleClassToQueueId(blackboard.PutHandleClass), - &worstPredictedNs, &nextToWorstPredictedNs, &worstSubgroupIdx); + HandleClassToQueueId(blackboard.PutHandleClass), 1, + &worstDisks); // Check if the slowest disk exceptionally slow, or just not very fast - i32 slowDiskSubgroupIdx = -1; - if (nextToWorstPredictedNs > 0 && worstPredictedNs > nextToWorstPredictedNs * 2) { - slowDiskSubgroupIdx = worstSubgroupIdx; + TStackVec slowDiskSubgroupIdxs; + if (worstDisks[1].PredictedNs > 0 && worstDisks[0].PredictedNs > worstDisks[1].PredictedNs * 2) { + slowDiskSubgroupIdxs.push_back(worstDisks[0].DiskIdx); } bool isDone = false; - if (slowDiskSubgroupIdx >= 0) { + if (!slowDiskSubgroupIdxs.empty()) { // If there is an exceptionally slow disk, try not touching it, mark isDone TBlobStorageGroupType::TPartLayout layout; - PreparePartLayout(state, info, &layout, slowDiskSubgroupIdx); + PreparePartLayout(state, info, &layout, slowDiskSubgroupIdxs); TBlobStorageGroupType::TPartPlacement partPlacement; bool isCorrectable = info.Type.CorrectLayout(layout, partPlacement); @@ -157,7 +155,7 @@ class TRestoreStrategy : public TStrategyBase { if (!isDone) { // Fill in the part layout TBlobStorageGroupType::TPartLayout layout; - PreparePartLayout(state, info, &layout, InvalidVDiskIdx); + PreparePartLayout(state, info, &layout, {}); TBlobStorageGroupType::TPartPlacement partPlacement; bool isCorrectable = info.Type.CorrectLayout(layout, partPlacement); Y_ABORT_UNLESS(isCorrectable); diff --git a/ydb/core/blobstorage/ut_blobstorage/acceleration.cpp b/ydb/core/blobstorage/ut_blobstorage/acceleration.cpp new file mode 100644 index 000000000000..69ae209bf1e6 --- /dev/null +++ b/ydb/core/blobstorage/ut_blobstorage/acceleration.cpp @@ -0,0 +1,181 @@ +#include + +#include +#include + +#include "ut_helpers.h" + +#define Ctest Cnull + +Y_UNIT_TEST_SUITE(Acceleration) { + + void SetupEnv(const TBlobStorageGroupType& erasure, std::unique_ptr& env, + ui32& nodeCount, ui32& groupId) { + nodeCount = erasure.BlobSubgroupSize(); + + env.reset(new TEnvironmentSetup{{ + .NodeCount = nodeCount, + .Erasure = erasure, + }}); + + + env->CreateBoxAndPool(1, 1); + env->Sim(TDuration::Minutes(1)); + + NKikimrBlobStorage::TBaseConfig base = env->FetchBaseConfig(); + UNIT_ASSERT_VALUES_EQUAL(base.GroupSize(), 1); + groupId = base.GetGroup(0).GetGroupId(); + + TActorId edge = env->Runtime->AllocateEdgeActor(1); + + env->Runtime->WrapInActorContext(edge, [&] { + SendToBSProxy(edge, groupId, new TEvBlobStorage::TEvStatus(TInstant::Max())); + }); + auto res = env->WaitForEdgeActorEvent(edge, true, TInstant::Max()); + UNIT_ASSERT_VALUES_EQUAL(res->Get()->Status, NKikimrProto::OK); + } + + void TestAcceleratePut(const TBlobStorageGroupType& erasure, ui32 slowDisksNum, + NKikimrBlobStorage::EPutHandleClass handleClass) { + for (ui32 fastDisksNum = 0; fastDisksNum < erasure.BlobSubgroupSize() - 2; ++fastDisksNum) { + std::unique_ptr env; + ui32 nodeCount; + ui32 groupId; + SetupEnv(erasure, env, nodeCount, groupId); + + constexpr TDuration delay = TDuration::Seconds(2); + constexpr TDuration waitFor = TDuration::Seconds(1); + + Ctest << "fastDisksNum# " << fastDisksNum << Endl; + + TActorId edge = env->Runtime->AllocateEdgeActor(1); + TString data = "Test"; + TLogoBlobID blobId = TLogoBlobID(1, 1, 1, 1, data.size(), 1); + + env->Runtime->WrapInActorContext(edge, [&] { + SendToBSProxy(edge, groupId, new TEvBlobStorage::TEvPut(blobId, data, TInstant::Max()), handleClass); + }); + + THashSet fastDisks; + THashSet slowDisks; + + env->Runtime->FilterFunction = [&](ui32/* nodeId*/, std::unique_ptr& ev) { + if (ev->GetTypeRewrite() == TEvBlobStorage::TEvVPutResult::EventType) { + TVDiskID vdiskId = VDiskIDFromVDiskID(ev->Get()->Record.GetVDiskID()); + TLogoBlobID partId = LogoBlobIDFromLogoBlobID(ev->Get()->Record.GetBlobID()); + Ctest << TAppData::TimeProvider->Now() << " TEvVPutResult: vdiskId# " << vdiskId.ToString() << + " partId# " << partId.ToString() << ", "; + if (fastDisks.size() < fastDisksNum || fastDisks.count(vdiskId)) { + fastDisks.insert(vdiskId); + Ctest << "pass message" << Endl; + return true; + } else if (!slowDisks.count(vdiskId) && slowDisks.size() >= slowDisksNum) { + Ctest << "pass message" << Endl; + return true; + } else { + Ctest << "delay message for " << delay.ToString() << Endl; + slowDisks.insert(vdiskId); + env->Runtime->WrapInActorContext(edge, [&] { + TActivationContext::Schedule(delay, ev.release()); + }); + + return false; + } + } + return true; + }; + + auto res = env->WaitForEdgeActorEvent(edge, false, TAppData::TimeProvider->Now() + waitFor); + UNIT_ASSERT_C(res, "fastDisksNum# " << fastDisksNum); + UNIT_ASSERT_VALUES_EQUAL(res->Get()->Status, NKikimrProto::OK); + } + } + + void TestAccelerateGet(const TBlobStorageGroupType& erasure, ui32 slowDisksNum, + NKikimrBlobStorage::EGetHandleClass handleClass) { + for (ui32 fastDisksNum = 0; fastDisksNum < erasure.BlobSubgroupSize() - 2; ++fastDisksNum) { + std::unique_ptr env; + ui32 nodeCount; + ui32 groupId; + SetupEnv(erasure, env, nodeCount, groupId); + + constexpr TDuration delay = TDuration::Seconds(2); + constexpr TDuration waitFor = TDuration::Seconds(1); + + Ctest << "fastDisksNum# " << fastDisksNum << Endl; + + TActorId edge = env->Runtime->AllocateEdgeActor(1); + TString data = MakeData(1024); + TLogoBlobID blobId = TLogoBlobID(1, 1, 1, 1, data.size(), 1); + + env->Runtime->WrapInActorContext(edge, [&] { + SendToBSProxy(edge, groupId, new TEvBlobStorage::TEvPut(blobId, data, TInstant::Max())); + }); + + env->WaitForEdgeActorEvent(edge, false, TInstant::Max()); + + env->Runtime->WrapInActorContext(edge, [&] { + SendToBSProxy(edge, groupId, new TEvBlobStorage::TEvGet(blobId, 0, data.size(), TInstant::Max(), handleClass)); + }); + + THashSet slowDisks; + THashSet fastDisks; + + env->Runtime->FilterFunction = [&](ui32/* nodeId*/, std::unique_ptr& ev) { + if (ev->GetTypeRewrite() == TEvBlobStorage::TEvVGetResult::EventType) { + TVDiskID vdiskId = VDiskIDFromVDiskID(ev->Get()->Record.GetVDiskID()); + TLogoBlobID partId = LogoBlobIDFromLogoBlobID( + ev->Get()->Record.GetResult(0).GetBlobID()); + Ctest << TAppData::TimeProvider->Now() << " TEvVGetResult: " << vdiskId.ToString() << + " partId# " << partId.ToString() << ", "; + if (fastDisks.size() < fastDisksNum || fastDisks.count(vdiskId)) { + fastDisks.insert(vdiskId); + Ctest << "pass message" << Endl; + return true; + } else if (!slowDisks.count(vdiskId) && slowDisks.size() >= slowDisksNum) { + Ctest << "pass message" << Endl; + return true; + } else { + Ctest << "delay message for " << delay.ToString() << Endl; + slowDisks.insert(vdiskId); + env->Runtime->WrapInActorContext(edge, [&] { + TActivationContext::Schedule(delay, ev.release()); + }); + + return false; + } + } + return true; + }; + + auto res = env->WaitForEdgeActorEvent(edge, false, TAppData::TimeProvider->Now() + waitFor); + UNIT_ASSERT_C(res, "fastDisksNum# " << fastDisksNum); + UNIT_ASSERT_VALUES_EQUAL(res->Get()->Status, NKikimrProto::OK); + UNIT_ASSERT_VALUES_EQUAL(res->Get()->Responses[0].Status, NKikimrProto::OK); + Ctest << "TEvGetResult# " << res->Get()->ToString() << Endl; + } + } + + #define TEST_ACCELERATE(erasure, method, handleClass, slowDisks) \ + Y_UNIT_TEST(Test##erasure##method##handleClass##slowDisks##Slow) { \ + TestAccelerate##method(TBlobStorageGroupType::Erasure##erasure, slowDisks, NKikimrBlobStorage::handleClass); \ + } + + TEST_ACCELERATE(Mirror3dc, Put, AsyncBlob, 1); +// TEST_ACCELERATE(Mirror3of4, Put, AsyncBlob, 1); + TEST_ACCELERATE(4Plus2Block, Put, AsyncBlob, 1); + +// TEST_ACCELERATE(Mirror3dc, Put, AsyncBlob, 2); +// TEST_ACCELERATE(Mirror3of4, Put, AsyncBlob, 2); + TEST_ACCELERATE(4Plus2Block, Put, AsyncBlob, 2); + + TEST_ACCELERATE(Mirror3dc, Get, AsyncRead, 1); +// TEST_ACCELERATE(Mirror3of4, Get, AsyncRead, 1); + TEST_ACCELERATE(4Plus2Block, Get, AsyncRead, 1); + +// TEST_ACCELERATE(Mirror3dc, Get, AsyncRead, 2); +// TEST_ACCELERATE(Mirror3of4, Get, AsyncRead, 2); + TEST_ACCELERATE(4Plus2Block, Get, AsyncRead, 2); + + #undef TEST_ACCELERATE +} diff --git a/ydb/core/blobstorage/ut_blobstorage/ya.make b/ydb/core/blobstorage/ut_blobstorage/ya.make index b7aa13f247de..63333224d407 100644 --- a/ydb/core/blobstorage/ut_blobstorage/ya.make +++ b/ydb/core/blobstorage/ut_blobstorage/ya.make @@ -12,6 +12,7 @@ ELSE() ENDIF() SRCS( + acceleration.cpp assimilation.cpp block_race.cpp counting_events.cpp