From e7ab6eab68bd8ed3ccdf9340999cd264471cba66 Mon Sep 17 00:00:00 2001 From: Sergey Belyakov Date: Mon, 16 Sep 2024 17:33:36 +0300 Subject: [PATCH] Fix acceleration in mirror-3-dc groups (#7931) --- ydb/core/blobstorage/dsproxy/dsproxy.h | 11 +- .../dsproxy/dsproxy_blackboard.cpp | 46 +++++- .../blobstorage/dsproxy/dsproxy_blackboard.h | 11 +- ydb/core/blobstorage/dsproxy/dsproxy_get.cpp | 12 +- .../blobstorage/dsproxy/dsproxy_get_impl.cpp | 12 +- ydb/core/blobstorage/dsproxy/dsproxy_impl.cpp | 2 + ydb/core/blobstorage/dsproxy/dsproxy_impl.h | 3 + ydb/core/blobstorage/dsproxy/dsproxy_put.cpp | 4 +- .../blobstorage/dsproxy/dsproxy_put_impl.cpp | 4 +- .../blobstorage/dsproxy/dsproxy_state.cpp | 1 + .../dsproxy/dsproxy_strategy_accelerate_put.h | 8 +- .../dsproxy_strategy_accelerate_put_m3dc.h | 78 +++++----- .../dsproxy/dsproxy_strategy_base.cpp | 83 ++++------- .../dsproxy/dsproxy_strategy_base.h | 12 +- .../dsproxy/dsproxy_strategy_get_m3dc_basic.h | 37 ++--- .../dsproxy_strategy_get_min_iops_block.h | 3 +- .../dsproxy/dsproxy_strategy_put_m3dc.h | 13 +- .../dsproxy/dsproxy_strategy_restore.h | 21 +-- .../nodewarden/node_warden_impl.cpp | 1 + .../blobstorage/nodewarden/node_warden_impl.h | 2 + .../nodewarden/node_warden_proxy.cpp | 3 + .../ut_blobstorage/acceleration.cpp | 133 ++++++++++++------ .../blobstorage/ut_blobstorage/lib/common.h | 6 +- ydb/core/blobstorage/ut_blobstorage/lib/env.h | 3 + ydb/core/protos/config.proto | 9 +- 25 files changed, 280 insertions(+), 238 deletions(-) diff --git a/ydb/core/blobstorage/dsproxy/dsproxy.h b/ydb/core/blobstorage/dsproxy/dsproxy.h index 4f9121de8228..b545f7849c95 100644 --- a/ydb/core/blobstorage/dsproxy/dsproxy.h +++ b/ydb/core/blobstorage/dsproxy/dsproxy.h @@ -55,9 +55,10 @@ const ui32 MaskSizeBits = 32; constexpr bool DefaultEnablePutBatching = true; constexpr bool DefaultEnableVPatch = false; -constexpr float DefaultSlowDiskThreshold = 2; -constexpr float DefaultPredictedDelayMultiplier = 1; +constexpr double DefaultSlowDiskThreshold = 2; +constexpr double DefaultPredictedDelayMultiplier = 1; constexpr TDuration DefaultLongRequestThreshold = TDuration::Seconds(50); +constexpr ui32 DefaultMaxNumOfSlowDisks = 2; constexpr bool WithMovingPatchRequestToStaticNode = true; @@ -193,8 +194,9 @@ inline void SetExecutionRelay(IEventBase& ev, std::shared_ptr { @@ -520,6 +522,7 @@ struct TBlobStorageProxyParameters { const TControlWrapper& SlowDiskThreshold; const TControlWrapper& PredictedDelayMultiplier; const TControlWrapper& LongRequestThresholdMs = TControlWrapper(DefaultLongRequestThreshold.MilliSeconds(), 1, 1'000'000); + const TControlWrapper& MaxNumOfSlowDisks = TControlWrapper(DefaultMaxNumOfSlowDisks, 1, 2); }; IActor* CreateBlobStorageGroupProxyConfigured(TIntrusivePtr&& info, diff --git a/ydb/core/blobstorage/dsproxy/dsproxy_blackboard.cpp b/ydb/core/blobstorage/dsproxy/dsproxy_blackboard.cpp index 1de899664b5c..c705c9a04e2a 100644 --- a/ydb/core/blobstorage/dsproxy/dsproxy_blackboard.cpp +++ b/ydb/core/blobstorage/dsproxy/dsproxy_blackboard.cpp @@ -179,15 +179,16 @@ ui64 TBlobState::GetPredictedDelayNs(const TBlobStorageGroupInfo &info, TGroupQu void TBlobState::GetWorstPredictedDelaysNs(const TBlobStorageGroupInfo &info, TGroupQueues &groupQueues, NKikimrBlobStorage::EVDiskQueueId queueId, TDiskDelayPredictions *outNWorst, - double multiplier) const { + const TAccelerationParams& accelerationParams) const { outNWorst->resize(Disks.size()); for (ui32 diskIdx = 0; diskIdx < Disks.size(); ++diskIdx) { + ui64 predictedDelayNs = GetPredictedDelayNs(info, groupQueues, diskIdx, queueId); (*outNWorst)[diskIdx] = { - static_cast(GetPredictedDelayNs(info, groupQueues, diskIdx, queueId) * multiplier), + static_cast(predictedDelayNs * accelerationParams.PredictedDelayMultiplier), diskIdx }; } - ui32 sortedPrefixSize = std::min(3u, (ui32)Disks.size()); + ui32 sortedPrefixSize = std::min(accelerationParams.MaxNumOfSlowDisks + 1, (ui32)Disks.size()); std::partial_sort(outNWorst->begin(), outNWorst->begin() + sortedPrefixSize, outNWorst->end()); } @@ -466,16 +467,18 @@ void TBlackboard::ReportPartMapStatus(const TLogoBlobID &id, ssize_t partMapInde void TBlackboard::GetWorstPredictedDelaysNs(const TBlobStorageGroupInfo &info, TGroupQueues &groupQueues, NKikimrBlobStorage::EVDiskQueueId queueId, TDiskDelayPredictions *outNWorst, - double multiplier) const { + const TAccelerationParams& accelerationParams) const { ui32 totalVDisks = info.GetTotalVDisksNum(); outNWorst->resize(totalVDisks); for (ui32 orderNumber = 0; orderNumber < totalVDisks; ++orderNumber) { + ui64 predictedDelayNs = groupQueues.GetPredictedDelayNsByOrderNumber(orderNumber, queueId); (*outNWorst)[orderNumber] = { - static_cast(groupQueues.GetPredictedDelayNsByOrderNumber(orderNumber, queueId) * multiplier), + static_cast(predictedDelayNs * accelerationParams.PredictedDelayMultiplier), orderNumber }; } - std::partial_sort(outNWorst->begin(), outNWorst->begin() + std::min(3u, totalVDisks), outNWorst->end()); + ui32 sortedPrefixSize = std::min(accelerationParams.MaxNumOfSlowDisks + 1, totalVDisks); + std::partial_sort(outNWorst->begin(), outNWorst->begin() + sortedPrefixSize, outNWorst->end()); } void TBlackboard::RegisterBlobForPut(const TLogoBlobID& id, size_t blobIdx) { @@ -542,4 +545,35 @@ void TBlackboard::InvalidatePartStates(ui32 orderNumber) { } } +void TBlackboard::MarkSlowDisks(TBlobState& state, bool isPut, const TAccelerationParams& accelerationParams) { + // by default all disks are considered fast + for (TBlobState::TDisk& disk : state.Disks) { + disk.IsSlow = false; + } + + ui32 maxNumSlow = accelerationParams.MaxNumOfSlowDisks; + if (Info->GetTotalVDisksNum() <= maxNumSlow) { + // all disks cannot be slow + return; + } + + TDiskDelayPredictions worstDisks; + state.GetWorstPredictedDelaysNs(*Info, *GroupQueues, + (isPut ? HandleClassToQueueId(PutHandleClass) : HandleClassToQueueId(GetHandleClass)), + &worstDisks, accelerationParams); + + ui64 slowThreshold = worstDisks[maxNumSlow].PredictedNs * accelerationParams.SlowDiskThreshold; + if (slowThreshold == 0) { + // invalid or non-initialized predicted ns, consider all disks not slow + return; + } + + for (ui32 idx = 0; idx < maxNumSlow; ++idx) { + if (worstDisks[idx].PredictedNs > slowThreshold) { + ui32 orderNumber = worstDisks[idx].DiskIdx; + state.Disks[orderNumber].IsSlow = true; + } + } +} + }//NKikimr diff --git a/ydb/core/blobstorage/dsproxy/dsproxy_blackboard.h b/ydb/core/blobstorage/dsproxy/dsproxy_blackboard.h index 8507cc7a9399..fc729ac39bef 100644 --- a/ydb/core/blobstorage/dsproxy/dsproxy_blackboard.h +++ b/ydb/core/blobstorage/dsproxy/dsproxy_blackboard.h @@ -99,10 +99,9 @@ struct TBlobState { ui32 diskIdxInSubring, NKikimrBlobStorage::EVDiskQueueId queueId) const; void GetWorstPredictedDelaysNs(const TBlobStorageGroupInfo &info, TGroupQueues &groupQueues, NKikimrBlobStorage::EVDiskQueueId queueId, TDiskDelayPredictions *outNWorst, - double multipler = 1) const; + const TAccelerationParams& accelerationParams) const; TString ToString() const; bool HasWrittenQuorum(const TBlobStorageGroupInfo& info, const TBlobStorageGroupInfo::TGroupVDisks& expired) const; - static TString SituationToString(ESituation situation); }; @@ -166,7 +165,7 @@ class IStrategy { struct TBlackboard { enum EAccelerationMode { - AccelerationModeSkipOneSlowest, + AccelerationModeSkipNSlowest, AccelerationModeSkipMarked }; @@ -190,7 +189,7 @@ struct TBlackboard { NKikimrBlobStorage::EPutHandleClass putHandleClass, NKikimrBlobStorage::EGetHandleClass getHandleClass) : Info(info) , GroupQueues(groupQueues) - , AccelerationMode(AccelerationModeSkipOneSlowest) + , AccelerationMode(AccelerationModeSkipNSlowest) , PutHandleClass(putHandleClass) , GetHandleClass(getHandleClass) {} @@ -213,7 +212,7 @@ struct TBlackboard { void ReportPartMapStatus(const TLogoBlobID &id, ssize_t partMapIndex, ui32 responseIndex, NKikimrProto::EReplyStatus status); void GetWorstPredictedDelaysNs(const TBlobStorageGroupInfo &info, TGroupQueues &groupQueues, NKikimrBlobStorage::EVDiskQueueId queueId, TDiskDelayPredictions *outNWorst, - double multiplier = 1) const; + const TAccelerationParams& accelerationParams) const; TString ToString() const; void ChangeAll() { @@ -226,6 +225,8 @@ struct TBlackboard { void RegisterBlobForPut(const TLogoBlobID& id, size_t blobIdx); + void MarkSlowDisks(TBlobState& state, bool isPut, const TAccelerationParams& accelerationParams); + TBlobState& operator [](const TLogoBlobID& id); }; diff --git a/ydb/core/blobstorage/dsproxy/dsproxy_get.cpp b/ydb/core/blobstorage/dsproxy/dsproxy_get.cpp index 10041098765f..49f77b80606a 100644 --- a/ydb/core/blobstorage/dsproxy/dsproxy_get.cpp +++ b/ydb/core/blobstorage/dsproxy/dsproxy_get.cpp @@ -326,9 +326,9 @@ class TBlobStorageGroupGetRequest : public TBlobStorageGroupRequestActor { } void TryScheduleGetAcceleration() { - if (!IsGetAccelerateScheduled && GetsAccelerated < 2) { - // Count VDisks that have requests in flight, if there is no more than 2 such VDisks, Accelerate - if (CountDisksWithActiveRequests() <= 2) { + if (!IsGetAccelerateScheduled && GetsAccelerated < AccelerationParams.MaxNumOfSlowDisks) { + // Count VDisks with requests in flight, if there are <= the maximum number of slow VDisks, Accelerate + if (CountDisksWithActiveRequests() <= AccelerationParams.MaxNumOfSlowDisks) { ui64 timeToAccelerateUs = GetImpl.GetTimeToAccelerateGetNs(LogCtx) / 1000; TDuration timeToAccelerate = TDuration::MicroSeconds(timeToAccelerateUs); TMonotonic now = TActivationContext::Monotonic(); @@ -345,9 +345,9 @@ class TBlobStorageGroupGetRequest : public TBlobStorageGroupRequestActor { } void TrySchedulePutAcceleration() { - if (!IsPutAccelerateScheduled && PutsAccelerated < 2) { - // Count VDisks that have requests in flight, if there is no more than 2 such VDisks, Accelerate - if (CountDisksWithActiveRequests() <= 2) { + if (!IsPutAccelerateScheduled && PutsAccelerated < AccelerationParams.MaxNumOfSlowDisks) { + // Count VDisks with requests in flight, if there are <= the maximum number of slow VDisks, Accelerate + if (CountDisksWithActiveRequests() <= AccelerationParams.MaxNumOfSlowDisks) { ui64 timeToAccelerateUs = GetImpl.GetTimeToAcceleratePutNs(LogCtx) / 1000; TDuration timeToAccelerate = TDuration::MicroSeconds(timeToAccelerateUs); TMonotonic now = TActivationContext::Monotonic(); diff --git a/ydb/core/blobstorage/dsproxy/dsproxy_get_impl.cpp b/ydb/core/blobstorage/dsproxy/dsproxy_get_impl.cpp index e1b9a5e0a39a..2a614fbf0e31 100644 --- a/ydb/core/blobstorage/dsproxy/dsproxy_get_impl.cpp +++ b/ydb/core/blobstorage/dsproxy/dsproxy_get_impl.cpp @@ -146,15 +146,13 @@ ui64 TGetImpl::GetTimeToAccelerateNs(TLogContext &logCtx, NKikimrBlobStorage::EV // Find the slowest disk TDiskDelayPredictions worstDisks; if (Blackboard.BlobStates.size() == 1) { - Blackboard.BlobStates.begin()->second.GetWorstPredictedDelaysNs( - *Info, *Blackboard.GroupQueues, queueId, &worstDisks, - AccelerationParams.PredictedDelayMultiplier); + Blackboard.BlobStates.begin()->second.GetWorstPredictedDelaysNs(*Info, *Blackboard.GroupQueues, + queueId, &worstDisks, AccelerationParams); } else { - Blackboard.GetWorstPredictedDelaysNs( - *Info, *Blackboard.GroupQueues, queueId, &worstDisks, - AccelerationParams.PredictedDelayMultiplier); + Blackboard.GetWorstPredictedDelaysNs(*Info, *Blackboard.GroupQueues, queueId, &worstDisks, + AccelerationParams); } - return worstDisks[std::min(3u, (ui32)worstDisks.size() - 1)].PredictedNs; + return worstDisks[std::min(AccelerationParams.MaxNumOfSlowDisks, (ui32)worstDisks.size() - 1)].PredictedNs; } ui64 TGetImpl::GetTimeToAccelerateGetNs(TLogContext &logCtx) { diff --git a/ydb/core/blobstorage/dsproxy/dsproxy_impl.cpp b/ydb/core/blobstorage/dsproxy/dsproxy_impl.cpp index ba506383ea11..081d6208302c 100644 --- a/ydb/core/blobstorage/dsproxy/dsproxy_impl.cpp +++ b/ydb/core/blobstorage/dsproxy/dsproxy_impl.cpp @@ -19,6 +19,7 @@ namespace NKikimr { , EnableVPatch(params.EnableVPatch) , SlowDiskThreshold(params.SlowDiskThreshold) , PredictedDelayMultiplier(params.PredictedDelayMultiplier) + , MaxNumOfSlowDisks(params.MaxNumOfSlowDisks) , LongRequestThresholdMs(params.LongRequestThresholdMs) {} @@ -33,6 +34,7 @@ namespace NKikimr { , EnableVPatch(params.EnableVPatch) , SlowDiskThreshold(params.SlowDiskThreshold) , PredictedDelayMultiplier(params.PredictedDelayMultiplier) + , MaxNumOfSlowDisks(params.MaxNumOfSlowDisks) , LongRequestThresholdMs(params.LongRequestThresholdMs) {} diff --git a/ydb/core/blobstorage/dsproxy/dsproxy_impl.h b/ydb/core/blobstorage/dsproxy/dsproxy_impl.h index 50ba65672107..0cb9362f78eb 100644 --- a/ydb/core/blobstorage/dsproxy/dsproxy_impl.h +++ b/ydb/core/blobstorage/dsproxy/dsproxy_impl.h @@ -121,8 +121,11 @@ class TBlobStorageGroupProxy : public TActorBootstrapped bool HasInvalidGroupId() const { return GroupId.GetRawId() == Max(); } void ProcessInitQueue(); + // Acceleration parameters TMemorizableControlWrapper SlowDiskThreshold; TMemorizableControlWrapper PredictedDelayMultiplier; + TMemorizableControlWrapper MaxNumOfSlowDisks; + TMemorizableControlWrapper LongRequestThresholdMs; TAccelerationParams GetAccelerationParams(); diff --git a/ydb/core/blobstorage/dsproxy/dsproxy_put.cpp b/ydb/core/blobstorage/dsproxy/dsproxy_put.cpp index adc90e54a9b3..34f9aac1c76e 100644 --- a/ydb/core/blobstorage/dsproxy/dsproxy_put.cpp +++ b/ydb/core/blobstorage/dsproxy/dsproxy_put.cpp @@ -416,8 +416,8 @@ class TBlobStorageGroupPutRequest : public TBlobStorageGroupRequestActor { } void TryToScheduleNextAcceleration() { - if (!IsAccelerateScheduled && AccelerateRequestsSent < 2) { - if (WaitingVDiskCount > 0 && WaitingVDiskCount <= 2 && RequestsSent > 1) { + if (!IsAccelerateScheduled && AccelerateRequestsSent < AccelerationParams.MaxNumOfSlowDisks) { + if (WaitingVDiskCount > 0 && WaitingVDiskCount <= AccelerationParams.MaxNumOfSlowDisks && RequestsSent > 1) { ui64 timeToAccelerateUs = Max(1, PutImpl.GetTimeToAccelerateNs(LogCtx) / 1000); if (RequestsPendingBeforeAcceleration == 1 && AccelerateRequestsSent == 1) { // if there is only one request pending, but first accelerate is unsuccessful, make a pause diff --git a/ydb/core/blobstorage/dsproxy/dsproxy_put_impl.cpp b/ydb/core/blobstorage/dsproxy/dsproxy_put_impl.cpp index 1e7b0047eb51..d9d7dddd8068 100644 --- a/ydb/core/blobstorage/dsproxy/dsproxy_put_impl.cpp +++ b/ydb/core/blobstorage/dsproxy/dsproxy_put_impl.cpp @@ -91,8 +91,8 @@ ui64 TPutImpl::GetTimeToAccelerateNs(TLogContext &logCtx) { // Find the n'th slowest disk TDiskDelayPredictions worstDisks; state.GetWorstPredictedDelaysNs(*Info, *Blackboard.GroupQueues, HandleClassToQueueId(Blackboard.PutHandleClass), - &worstDisks, AccelerationParams.PredictedDelayMultiplier); - nthWorstPredictedNsVec[idx++] = worstDisks[2].PredictedNs; + &worstDisks, AccelerationParams); + nthWorstPredictedNsVec[idx++] = worstDisks[AccelerationParams.MaxNumOfSlowDisks].PredictedNs; } return *MaxElement(nthWorstPredictedNsVec.begin(), nthWorstPredictedNsVec.end()); } diff --git a/ydb/core/blobstorage/dsproxy/dsproxy_state.cpp b/ydb/core/blobstorage/dsproxy/dsproxy_state.cpp index 4b0b10450dfc..d0676f7c226a 100644 --- a/ydb/core/blobstorage/dsproxy/dsproxy_state.cpp +++ b/ydb/core/blobstorage/dsproxy/dsproxy_state.cpp @@ -327,6 +327,7 @@ namespace NKikimr { return TAccelerationParams{ .SlowDiskThreshold = .001f * SlowDiskThreshold.Update(TActivationContext::Now()), .PredictedDelayMultiplier = .001f * PredictedDelayMultiplier.Update(TActivationContext::Now()), + .MaxNumOfSlowDisks = (ui32)MaxNumOfSlowDisks.Update(TActivationContext::Now()), }; } diff --git a/ydb/core/blobstorage/dsproxy/dsproxy_strategy_accelerate_put.h b/ydb/core/blobstorage/dsproxy/dsproxy_strategy_accelerate_put.h index 4256081130b3..9e78e5c73248 100644 --- a/ydb/core/blobstorage/dsproxy/dsproxy_strategy_accelerate_put.h +++ b/ydb/core/blobstorage/dsproxy/dsproxy_strategy_accelerate_put.h @@ -15,22 +15,22 @@ class TAcceleratePutStrategy : public TStrategyBase { const TAccelerationParams& accelerationParams) override { Y_UNUSED(accelerationParams); // Find the unput part and disk - TStackVec badDiskIdxs; + ui32 badDisksMask = 0; for (size_t diskIdx = 0; diskIdx < state.Disks.size(); ++diskIdx) { TBlobState::TDisk &disk = state.Disks[diskIdx]; for (size_t partIdx = 0; partIdx < disk.DiskParts.size(); ++partIdx) { TBlobState::TDiskPart &diskPart = disk.DiskParts[partIdx]; if (diskPart.Situation == TBlobState::ESituation::Sent) { - badDiskIdxs.push_back(diskIdx); + badDisksMask |= (1 << diskIdx); } } } - if (!badDiskIdxs.empty()) { + if (badDisksMask > 0) { // Mark the corresponding disks 'bad' // Prepare part layout if possible TBlobStorageGroupType::TPartLayout layout; - PreparePartLayout(state, info, &layout, badDiskIdxs); + PreparePartLayout(state, info, &layout, badDisksMask); TBlobStorageGroupType::TPartPlacement partPlacement; bool isCorrectable = info.Type.CorrectLayout(layout, partPlacement); diff --git a/ydb/core/blobstorage/dsproxy/dsproxy_strategy_accelerate_put_m3dc.h b/ydb/core/blobstorage/dsproxy/dsproxy_strategy_accelerate_put_m3dc.h index 69804e441002..806155f54d9e 100644 --- a/ydb/core/blobstorage/dsproxy/dsproxy_strategy_accelerate_put_m3dc.h +++ b/ydb/core/blobstorage/dsproxy/dsproxy_strategy_accelerate_put_m3dc.h @@ -30,50 +30,56 @@ class TAcceleratePut3dcStrategy : public TStrategyBase { } EStrategyOutcome Process(TLogContext &logCtx, TBlobState &state, const TBlobStorageGroupInfo &info, - TBlackboard& /*blackboard*/, TGroupDiskRequests &groupDiskRequests, + TBlackboard& blackboard, TGroupDiskRequests &groupDiskRequests, const TAccelerationParams& accelerationParams) override { Y_UNUSED(accelerationParams); // Find the unput parts and disks - ui32 badDiskMask = 0; - for (size_t diskIdx = 0; diskIdx < state.Disks.size(); ++diskIdx) { + bool unresponsiveDisk = false; + for (size_t diskIdx = 0; diskIdx < state.Disks.size() && !unresponsiveDisk; ++diskIdx) { TBlobState::TDisk &disk = state.Disks[diskIdx]; - for (size_t partIdx = 0; partIdx < disk.DiskParts.size(); ++partIdx) { - TBlobState::TDiskPart &diskPart = disk.DiskParts[partIdx]; + for (TBlobState::TDiskPart &diskPart : disk.DiskParts) { if (diskPart.Situation == TBlobState::ESituation::Sent) { - badDiskMask |= (1 << diskIdx); + unresponsiveDisk = true; + break; } } } - if (badDiskMask > 0) { - // Mark the 'bad' disk as the single slow disk - for (size_t diskIdx = 0; diskIdx < state.Disks.size(); ++diskIdx) { - state.Disks[diskIdx].IsSlow = badDiskMask & (1 << diskIdx); - } - - // Prepare part placement if possible - TBlobStorageGroupType::TPartPlacement partPlacement; - bool degraded = false; - - // check if we are in degraded mode -- that means that we have one fully failed realm - TBlobStorageGroupInfo::TSubgroupVDisks success(&info.GetTopology()); - TBlobStorageGroupInfo::TSubgroupVDisks error(&info.GetTopology()); - Evaluate3dcSituation(state, NumFailRealms, NumFailDomainsPerFailRealm, info, true, success, error, degraded); - - // check for failure tolerance; we issue ERROR in case when it is not possible to achieve success condition in - // any way; also check if we have already finished writing replicas - const auto& checker = info.GetQuorumChecker(); - if (checker.CheckFailModelForSubgroup(error)) { - if (checker.CheckQuorumForSubgroup(success)) { - // OK - return EStrategyOutcome::DONE; - } - - // now check every realm and check if we have to issue some write requests to it - Prepare3dcPartPlacement(state, NumFailRealms, NumFailDomainsPerFailRealm, - PreferredReplicasPerRealm(degraded), true, partPlacement); - - if (IsPutNeeded(state, partPlacement)) { - PreparePutsForPartPlacement(logCtx, state, info, groupDiskRequests, partPlacement); + if (unresponsiveDisk) { + blackboard.MarkSlowDisks(state, true, accelerationParams); + + for (bool considerSlowAsError : {true, false}) { + // Prepare part placement if possible + TBlobStorageGroupType::TPartPlacement partPlacement; + bool degraded = false; + + // check if we are in degraded mode -- that means that we have one fully failed realm + TBlobStorageGroupInfo::TSubgroupVDisks success(&info.GetTopology()); + TBlobStorageGroupInfo::TSubgroupVDisks error(&info.GetTopology()); + Evaluate3dcSituation(state, NumFailRealms, NumFailDomainsPerFailRealm, info, considerSlowAsError, + success, error, degraded); + // check for failure tolerance; we issue ERROR in case when it is not possible to achieve success condition in + // any way; also check if we have already finished writing replicas + const auto& checker = info.GetQuorumChecker(); + if (checker.CheckFailModelForSubgroup(error)) { + if (checker.CheckQuorumForSubgroup(success)) { + // OK + return EStrategyOutcome::DONE; + } + + // now check every realm and check if we have to issue some write requests to it + bool fullPlacement; + Prepare3dcPartPlacement(state, NumFailRealms, NumFailDomainsPerFailRealm, + PreferredReplicasPerRealm(degraded), considerSlowAsError, true, partPlacement, fullPlacement); + + if (considerSlowAsError && !fullPlacement) { + // unable to place all parts to fast disks, retry + continue; + } + + if (IsPutNeeded(state, partPlacement)) { + PreparePutsForPartPlacement(logCtx, state, info, groupDiskRequests, partPlacement); + } + break; } } } diff --git a/ydb/core/blobstorage/dsproxy/dsproxy_strategy_base.cpp b/ydb/core/blobstorage/dsproxy/dsproxy_strategy_base.cpp index d6c65a51a435..be0ed97bea9c 100644 --- a/ydb/core/blobstorage/dsproxy/dsproxy_strategy_base.cpp +++ b/ydb/core/blobstorage/dsproxy/dsproxy_strategy_base.cpp @@ -194,7 +194,7 @@ void TStrategyBase::AddGetRequest(TLogContext &logCtx, TGroupDiskRequests &group } void TStrategyBase::PreparePartLayout(const TBlobState &state, const TBlobStorageGroupInfo &info, - TBlobStorageGroupType::TPartLayout *layout, const TStackVec& slowDiskIdxs) { + TBlobStorageGroupType::TPartLayout *layout, ui32 slowDiskSubgroupMask) { Y_ABORT_UNLESS(layout); const ui32 totalPartCount = info.Type.TotalPartCount(); const ui32 blobSubringSize = info.Type.BlobSubgroupSize(); @@ -216,7 +216,7 @@ void TStrategyBase::PreparePartLayout(const TBlobState &state, const TBlobStorag if (!isErrorDisk) { for (ui32 partIdx = beginPartIdx; partIdx < endPartIdx; ++partIdx) { TBlobState::ESituation partSituation = disk.DiskParts[partIdx].Situation; - bool isOnSlowDisk = (std::find(slowDiskIdxs.begin(), slowDiskIdxs.end(), diskIdx) != slowDiskIdxs.end()); + bool isOnSlowDisk = (slowDiskSubgroupMask & (1 << diskIdx)); if (partSituation == TBlobState::ESituation::Present || (!isOnSlowDisk && partSituation == TBlobState::ESituation::Sent)) { layout->VDiskPartMask[diskIdx] |= (1ul << partIdx); @@ -225,15 +225,7 @@ void TStrategyBase::PreparePartLayout(const TBlobState &state, const TBlobStorag } } } - if (slowDiskIdxs.empty()) { - layout->SlowVDiskMask = 0; - } else { - layout->SlowVDiskMask = 0; - for (ui32 slowDiskIdx : slowDiskIdxs) { - Y_DEBUG_ABORT_UNLESS(slowDiskIdx < sizeof(layout->SlowVDiskMask) * 8); - layout->SlowVDiskMask |= (1ull << slowDiskIdx); - } - } + layout->SlowVDiskMask = slowDiskSubgroupMask; } bool TStrategyBase::IsPutNeeded(const TBlobState &state, const TBlobStorageGroupType::TPartPlacement &partPlacement) { @@ -363,10 +355,10 @@ void TStrategyBase::Evaluate3dcSituation(const TBlobState &state, } } -void TStrategyBase::Prepare3dcPartPlacement(const TBlobState &state, - size_t numFailRealms, size_t numFailDomainsPerFailRealm, - ui8 preferredReplicasPerRealm, bool considerSlowAsError, - TBlobStorageGroupType::TPartPlacement &outPartPlacement) { +void TStrategyBase::Prepare3dcPartPlacement(const TBlobState& state, size_t numFailRealms, size_t numFailDomainsPerFailRealm, + ui8 preferredReplicasPerRealm, bool considerSlowAsError, bool replaceUnresponsive, + TBlobStorageGroupType::TPartPlacement& outPartPlacement, bool& fullPlacement) { + fullPlacement = true; for (size_t realm = 0; realm < numFailRealms; ++realm) { ui8 placed = 0; for (size_t domain = 0; placed < preferredReplicasPerRealm @@ -377,6 +369,10 @@ void TStrategyBase::Prepare3dcPartPlacement(const TBlobState &state, if (situation != TBlobState::ESituation::Error) { if (situation == TBlobState::ESituation::Present) { placed++; + } else if (situation == TBlobState::ESituation::Sent) { + if (!replaceUnresponsive) { + placed++; + } } else if (!considerSlowAsError || !disk.IsSlow) { if (situation != TBlobState::ESituation::Sent) { outPartPlacement.Records.emplace_back(subgroupIdx, realm); @@ -385,52 +381,29 @@ void TStrategyBase::Prepare3dcPartPlacement(const TBlobState &state, } } } + if (placed < preferredReplicasPerRealm) { + fullPlacement = false; + } } } -ui32 TStrategyBase::MakeSlowSubgroupDiskMask(TBlobState &state, const TBlobStorageGroupInfo &info, TBlackboard &blackboard, - bool isPut, const TAccelerationParams& accelerationParams) { - if (info.GetTotalVDisksNum() == 1) { - // when there is only one disk, we consider it not slow - return 0; - } - // Find the slowest disk +ui32 TStrategyBase::MakeSlowSubgroupDiskMask(TBlobState &state, TBlackboard &blackboard, bool isPut, + const TAccelerationParams& accelerationParams) { + // Find slow disks switch (blackboard.AccelerationMode) { - case TBlackboard::AccelerationModeSkipOneSlowest: { - TDiskDelayPredictions worstDisks; - state.GetWorstPredictedDelaysNs(info, *blackboard.GroupQueues, - (isPut ? HandleClassToQueueId(blackboard.PutHandleClass) : - HandleClassToQueueId(blackboard.GetHandleClass)), - &worstDisks, accelerationParams.PredictedDelayMultiplier); - - // Check if the slowest disk exceptionally slow, or just not very fast - ui32 slowDiskSubgroupMask = 0; - if (worstDisks[1].PredictedNs > 0 && worstDisks[0].PredictedNs > worstDisks[1].PredictedNs * - accelerationParams.SlowDiskThreshold) { - slowDiskSubgroupMask = 1 << worstDisks[0].DiskIdx; - } - - // Mark single slow disk - for (size_t diskIdx = 0; diskIdx < state.Disks.size(); ++diskIdx) { - state.Disks[diskIdx].IsSlow = false; - } - if (slowDiskSubgroupMask > 0) { - state.Disks[worstDisks[0].DiskIdx].IsSlow = true; - } - - return slowDiskSubgroupMask; - } - case TBlackboard::AccelerationModeSkipMarked: { - ui32 slowDiskSubgroupMask = 0; - for (size_t diskIdx = 0; diskIdx < state.Disks.size(); ++diskIdx) { - if (state.Disks[diskIdx].IsSlow) { - slowDiskSubgroupMask |= 1 << diskIdx; - } - } - return slowDiskSubgroupMask; + case TBlackboard::AccelerationModeSkipNSlowest: + blackboard.MarkSlowDisks(state, isPut, accelerationParams); + break; + case TBlackboard::AccelerationModeSkipMarked: + break; + } + ui32 slowDiskSubgroupMask = 0; + for (size_t diskIdx = 0; diskIdx < state.Disks.size(); ++diskIdx) { + if (state.Disks[diskIdx].IsSlow) { + slowDiskSubgroupMask |= 1 << diskIdx; } } - return 0; + return slowDiskSubgroupMask; } }//NKikimr diff --git a/ydb/core/blobstorage/dsproxy/dsproxy_strategy_base.h b/ydb/core/blobstorage/dsproxy/dsproxy_strategy_base.h index ce03192fc93d..3e90c6641aa5 100644 --- a/ydb/core/blobstorage/dsproxy/dsproxy_strategy_base.h +++ b/ydb/core/blobstorage/dsproxy/dsproxy_strategy_base.h @@ -34,7 +34,7 @@ class TStrategyBase : public IStrategy { void AddGetRequest(TLogContext &logCtx, TGroupDiskRequests &groupDiskRequests, TLogoBlobID &fullId, ui32 partIdx, TBlobState::TDisk &disk, TIntervalSet &intervalSet, const char *logMarker); void PreparePartLayout(const TBlobState &state, const TBlobStorageGroupInfo &info, - TBlobStorageGroupType::TPartLayout *layout, const TStackVec& slowDiskIdxs); + TBlobStorageGroupType::TPartLayout *layout, ui32 slowDiskSubgroupMask); bool IsPutNeeded(const TBlobState &state, const TBlobStorageGroupType::TPartPlacement &partPlacement); void PreparePutsForPartPlacement(TLogContext &logCtx, TBlobState &state, const TBlobStorageGroupInfo &info, TGroupDiskRequests &groupDiskRequests, @@ -47,14 +47,12 @@ class TStrategyBase : public IStrategy { TBlobStorageGroupInfo::TSubgroupVDisks &inOutSuccess, TBlobStorageGroupInfo::TSubgroupVDisks &inOutError, bool &outIsDegraded); - void Prepare3dcPartPlacement(const TBlobState &state, size_t numFailRealms, size_t numFailDomainsPerFailRealm, - ui8 preferredReplicasPerRealm, bool considerSlowAsError, - TBlobStorageGroupType::TPartPlacement &outPartPlacement); + void Prepare3dcPartPlacement(const TBlobState& state, size_t numFailRealms, size_t numFailDomainsPerFailRealm, + ui8 preferredReplicasPerRealm, bool considerSlowAsError, bool replaceUnresponsive, + TBlobStorageGroupType::TPartPlacement& outPartPlacement, bool& fullPlacement); // Sets IsSlow for the slow disk, resets for other disks. // returns bit mask with 1 on positions of slow disks - ui32 MakeSlowSubgroupDiskMask(TBlobState &state, const TBlobStorageGroupInfo &info, TBlackboard &blackboard, bool isPut, - const TAccelerationParams& accelerationParams); + ui32 MakeSlowSubgroupDiskMask(TBlobState &state, TBlackboard &blackboard, bool isPut, const TAccelerationParams& accelerationParams); }; - }//NKikimr diff --git a/ydb/core/blobstorage/dsproxy/dsproxy_strategy_get_m3dc_basic.h b/ydb/core/blobstorage/dsproxy/dsproxy_strategy_get_m3dc_basic.h index b35a34f79075..4587edc554a1 100644 --- a/ydb/core/blobstorage/dsproxy/dsproxy_strategy_get_m3dc_basic.h +++ b/ydb/core/blobstorage/dsproxy/dsproxy_strategy_get_m3dc_basic.h @@ -71,35 +71,14 @@ namespace NKikimr { // issue request for a specific disk; returns true if the request was issued and not yet completed, otherwise // false - if (info.GetTotalVDisksNum() > 1) { - // find the slowest disk and mark it - switch (blackboard.AccelerationMode) { - case TBlackboard::AccelerationModeSkipOneSlowest: { - TDiskDelayPredictions worstDisks; - state.GetWorstPredictedDelaysNs(info, *blackboard.GroupQueues, - HandleClassToQueueId(blackboard.GetHandleClass), - &worstDisks, accelerationParams.PredictedDelayMultiplier); - - // Check if the slowest disk exceptionally slow, or just not very fast - i32 slowDiskSubgroupIdx = -1; - if (worstDisks[1].PredictedNs > 0 && worstDisks[0].PredictedNs > - worstDisks[1].PredictedNs * accelerationParams.SlowDiskThreshold) { - slowDiskSubgroupIdx = worstDisks[0].DiskIdx; - } - - // Mark single slow disk - for (size_t diskIdx = 0; diskIdx < state.Disks.size(); ++diskIdx) { - state.Disks[diskIdx].IsSlow = false; - } - if (slowDiskSubgroupIdx >= 0) { - state.Disks[slowDiskSubgroupIdx].IsSlow = true; - } - break; - } - case TBlackboard::AccelerationModeSkipMarked: - // The slowest disk is already marked! - break; - } + // mark slow disks + switch (blackboard.AccelerationMode) { + case TBlackboard::AccelerationModeSkipNSlowest: + blackboard.MarkSlowDisks(state, false, accelerationParams); + break; + case TBlackboard::AccelerationModeSkipMarked: + // Slow disks are already marked! + break; } // create an array defining order in which we traverse the disks diff --git a/ydb/core/blobstorage/dsproxy/dsproxy_strategy_get_min_iops_block.h b/ydb/core/blobstorage/dsproxy/dsproxy_strategy_get_min_iops_block.h index 2d0efd8f8615..bf61ce78044f 100644 --- a/ydb/core/blobstorage/dsproxy/dsproxy_strategy_get_min_iops_block.h +++ b/ydb/core/blobstorage/dsproxy/dsproxy_strategy_get_min_iops_block.h @@ -312,8 +312,7 @@ class TMinIopsBlockStrategy : public TStrategyBase { // Try excluding the slow disk bool isDone = false; // TODO: Mark disk that does not answer when accelerating requests - ui32 slowDiskSubgroupMask = MakeSlowSubgroupDiskMask(state, info, blackboard, false, - accelerationParams); + ui32 slowDiskSubgroupMask = MakeSlowSubgroupDiskMask(state, blackboard, false, accelerationParams); if (slowDiskSubgroupMask >= 0) { TBlobStorageGroupInfo::EBlobState fastPessimisticState = TBlobStorageGroupInfo::EBS_DISINTEGRATED; TBlobStorageGroupInfo::EBlobState fastOptimisticState = TBlobStorageGroupInfo::EBS_DISINTEGRATED; diff --git a/ydb/core/blobstorage/dsproxy/dsproxy_strategy_put_m3dc.h b/ydb/core/blobstorage/dsproxy/dsproxy_strategy_put_m3dc.h index e6e9a376f720..b7b52fe37422 100644 --- a/ydb/core/blobstorage/dsproxy/dsproxy_strategy_put_m3dc.h +++ b/ydb/core/blobstorage/dsproxy/dsproxy_strategy_put_m3dc.h @@ -36,14 +36,14 @@ class TPut3dcStrategy : public TStrategyBase { TBlobStorageGroupType::TPartPlacement partPlacement; bool degraded = false; bool isDone = false; - ui32 slowDiskSubgroupMask = MakeSlowSubgroupDiskMask(state, info, blackboard, true, accelerationParams); + ui32 slowDiskSubgroupMask = MakeSlowSubgroupDiskMask(state, blackboard, true, accelerationParams); do { if (slowDiskSubgroupMask == 0) { break; // ignore this case } TBlobStorageGroupInfo::TSubgroupVDisks success(&info.GetTopology()); TBlobStorageGroupInfo::TSubgroupVDisks error(&info.GetTopology()); - Evaluate3dcSituation(state, NumFailRealms, NumFailDomainsPerFailRealm, info, true, success, error, degraded); + Evaluate3dcSituation(state, NumFailRealms, NumFailDomainsPerFailRealm, info, false, success, error, degraded); TBlobStorageGroupInfo::TSubgroupVDisks slow = TBlobStorageGroupInfo::TSubgroupVDisks::CreateFromMask( &info.GetTopology(), slowDiskSubgroupMask); if ((success | error) & slow) { @@ -61,9 +61,7 @@ class TPut3dcStrategy : public TStrategyBase { // now check every realm and check if we have to issue some write requests to it Prepare3dcPartPlacement(state, NumFailRealms, NumFailDomainsPerFailRealm, - PreferredReplicasPerRealm(degraded), - true, partPlacement); - isDone = true; + PreferredReplicasPerRealm(degraded), true, false, partPlacement, isDone); } } while (false); if (!isDone) { @@ -81,9 +79,10 @@ class TPut3dcStrategy : public TStrategyBase { } // now check every realm and check if we have to issue some write requests to it + partPlacement.Records.clear(); + bool fullPlacement; Prepare3dcPartPlacement(state, NumFailRealms, NumFailDomainsPerFailRealm, - PreferredReplicasPerRealm(degraded), - false, partPlacement); + PreferredReplicasPerRealm(degraded), false, false, partPlacement, fullPlacement); } if (IsPutNeeded(state, partPlacement)) { PreparePutsForPartPlacement(logCtx, state, info, groupDiskRequests, partPlacement); diff --git a/ydb/core/blobstorage/dsproxy/dsproxy_strategy_restore.h b/ydb/core/blobstorage/dsproxy/dsproxy_strategy_restore.h index 4416cfbf9f60..2399fdf8ab99 100644 --- a/ydb/core/blobstorage/dsproxy/dsproxy_strategy_restore.h +++ b/ydb/core/blobstorage/dsproxy/dsproxy_strategy_restore.h @@ -126,26 +126,13 @@ class TRestoreStrategy : public TStrategyBase { return *res; } - TStackVec slowDiskSubgroupIdxs; - if (info.GetTotalVDisksNum() > 1) { - // Find the slowest disk, if there are more than 1 - TDiskDelayPredictions worstDisks; - state.GetWorstPredictedDelaysNs(info, *blackboard.GroupQueues, - HandleClassToQueueId(blackboard.PutHandleClass), - &worstDisks, accelerationParams.PredictedDelayMultiplier); - - // Check if the slowest disk exceptionally slow, or just not very fast - if (worstDisks[1].PredictedNs > 0 && worstDisks[0].PredictedNs > worstDisks[1].PredictedNs * - accelerationParams.SlowDiskThreshold) { - slowDiskSubgroupIdxs.push_back(worstDisks[0].DiskIdx); - } - } + ui32 slowDiskSubgroupMask = MakeSlowSubgroupDiskMask(state, blackboard, true, accelerationParams); bool isDone = false; - if (!slowDiskSubgroupIdxs.empty()) { + if (slowDiskSubgroupMask != 0) { // If there is an exceptionally slow disk, try not touching it, mark isDone TBlobStorageGroupType::TPartLayout layout; - PreparePartLayout(state, info, &layout, slowDiskSubgroupIdxs); + PreparePartLayout(state, info, &layout, slowDiskSubgroupMask); TBlobStorageGroupType::TPartPlacement partPlacement; bool isCorrectable = info.Type.CorrectLayout(layout, partPlacement); @@ -159,7 +146,7 @@ class TRestoreStrategy : public TStrategyBase { if (!isDone) { // Fill in the part layout TBlobStorageGroupType::TPartLayout layout; - PreparePartLayout(state, info, &layout, {}); + PreparePartLayout(state, info, &layout, 0); TBlobStorageGroupType::TPartPlacement partPlacement; bool isCorrectable = info.Type.CorrectLayout(layout, partPlacement); Y_ABORT_UNLESS(isCorrectable); diff --git a/ydb/core/blobstorage/nodewarden/node_warden_impl.cpp b/ydb/core/blobstorage/nodewarden/node_warden_impl.cpp index ce150761a734..1d0bcd2adb57 100644 --- a/ydb/core/blobstorage/nodewarden/node_warden_impl.cpp +++ b/ydb/core/blobstorage/nodewarden/node_warden_impl.cpp @@ -206,6 +206,7 @@ void TNodeWarden::Bootstrap() { icb->RegisterSharedControl(PredictedDelayMultiplier, "DSProxyControls.PredictedDelayMultiplier"); icb->RegisterSharedControl(LongRequestThresholdMs, "DSProxyControls.LongRequestThresholdMs"); icb->RegisterSharedControl(LongRequestReportingDelayMs, "DSProxyControls.LongRequestReportingDelayMs"); + icb->RegisterSharedControl(MaxNumOfSlowDisks, "DSProxyControls.MaxNumOfSlowDisks"); } // start replication broker diff --git a/ydb/core/blobstorage/nodewarden/node_warden_impl.h b/ydb/core/blobstorage/nodewarden/node_warden_impl.h index c6b037adf039..dbdf7409f7c3 100644 --- a/ydb/core/blobstorage/nodewarden/node_warden_impl.h +++ b/ydb/core/blobstorage/nodewarden/node_warden_impl.h @@ -170,6 +170,7 @@ namespace NKikimr::NStorage { TControlWrapper PredictedDelayMultiplier; TControlWrapper LongRequestThresholdMs; TControlWrapper LongRequestReportingDelayMs; + TControlWrapper MaxNumOfSlowDisks; public: struct TGroupRecord; @@ -200,6 +201,7 @@ namespace NKikimr::NStorage { , PredictedDelayMultiplier(1'000, 1, 1000) , LongRequestThresholdMs(50'000, 1, 1'000'000) , LongRequestReportingDelayMs(60'000, 1, 1'000'000) + , MaxNumOfSlowDisks(2, 1, 2) { Y_ABORT_UNLESS(Cfg->BlobStorageConfig.GetServiceSet().AvailabilityDomainsSize() <= 1); AvailDomainId = 1; diff --git a/ydb/core/blobstorage/nodewarden/node_warden_proxy.cpp b/ydb/core/blobstorage/nodewarden/node_warden_proxy.cpp index 02552e701a41..953821ea942d 100644 --- a/ydb/core/blobstorage/nodewarden/node_warden_proxy.cpp +++ b/ydb/core/blobstorage/nodewarden/node_warden_proxy.cpp @@ -42,6 +42,7 @@ void TNodeWarden::StartLocalProxy(ui32 groupId) { .EnableVPatch = EnableVPatch, .SlowDiskThreshold = SlowDiskThreshold, .PredictedDelayMultiplier = PredictedDelayMultiplier, + .MaxNumOfSlowDisks = MaxNumOfSlowDisks, }), TMailboxType::ReadAsFilled, AppData()->SystemPoolId); [[fallthrough]]; case NKikimrBlobStorage::TGroupDecommitStatus::DONE: @@ -62,6 +63,7 @@ void TNodeWarden::StartLocalProxy(ui32 groupId) { .EnableVPatch = EnableVPatch, .SlowDiskThreshold = SlowDiskThreshold, .PredictedDelayMultiplier = PredictedDelayMultiplier, + .MaxNumOfSlowDisks = MaxNumOfSlowDisks, } ) ); @@ -74,6 +76,7 @@ void TNodeWarden::StartLocalProxy(ui32 groupId) { .EnableVPatch = EnableVPatch, .SlowDiskThreshold = SlowDiskThreshold, .PredictedDelayMultiplier = PredictedDelayMultiplier, + .MaxNumOfSlowDisks = MaxNumOfSlowDisks, })); } diff --git a/ydb/core/blobstorage/ut_blobstorage/acceleration.cpp b/ydb/core/blobstorage/ut_blobstorage/acceleration.cpp index 4e88bc051459..7af07d01efe3 100644 --- a/ydb/core/blobstorage/ut_blobstorage/acceleration.cpp +++ b/ydb/core/blobstorage/ut_blobstorage/acceleration.cpp @@ -139,7 +139,8 @@ Y_UNIT_TEST_SUITE(Acceleration) { }; struct TestCtx { - TestCtx(const TBlobStorageGroupType& erasure, float slowDiskThreshold, float delayMultiplier) + TestCtx(const TBlobStorageGroupType& erasure, float slowDiskThreshold, float delayMultiplier, + ui32 maxSlowCount = 2) : NodeCount(erasure.BlobSubgroupSize() + 1) , Erasure(erasure) , Env(new TEnvironmentSetup({ @@ -148,6 +149,7 @@ Y_UNIT_TEST_SUITE(Acceleration) { .LocationGenerator = [this](ui32 nodeId) { return LocationGenerator(nodeId); }, .SlowDiskThreshold = slowDiskThreshold, .VDiskPredictedDelayMultiplier = delayMultiplier, + .MaxNumOfSlowDisks = maxSlowCount, })) , VDiskDelayEmulator(new TVDiskDelayEmulator(Env)) {} @@ -242,31 +244,43 @@ Y_UNIT_TEST_SUITE(Acceleration) { void TestAcceleratePut(const TBlobStorageGroupType& erasure, ui32 slowDisksNum, NKikimrBlobStorage::EPutHandleClass handleClass, TDuration fastDelay, TDuration slowDelay, TDuration initDelay, TDuration waitTime, - float delayMultiplier) { + float delayMultiplier, ui32 maxSlowCount = 2) { ui32 initialRequests = 100; float slowDiskThreshold = 2; TDiskDelay fastDiskDelay = TDiskDelay(fastDelay); TDiskDelay slowDiskDelay = TDiskDelay(slowDelay); TDiskDelay initDiskDelay = TDiskDelay(initDelay); - for (ui32 fastDisksNum = 0; fastDisksNum < erasure.BlobSubgroupSize() - 2; ++fastDisksNum) { + ui32 requests = (erasure.GetErasure() == TBlobStorageGroupType::ErasureMirror3dc) ? 3 : 6; + + for (ui32 fastDisksNum = 0; fastDisksNum < requests - 1; ++fastDisksNum) { Ctest << "fastDisksNum# " << fastDisksNum << Endl; - TestCtx ctx(erasure, slowDiskThreshold, delayMultiplier); + TestCtx ctx(erasure, slowDiskThreshold, delayMultiplier, maxSlowCount); ctx.VDiskDelayEmulator->DefaultDelay = initDiskDelay; ctx.Initialize(); + bool verboseHandlers = false; TString data = MakeData(1024); - auto put = [&](TLogoBlobID blobId) { + auto put = [&](TLogoBlobID blobId, bool timeout) { ctx.Env->Runtime->WrapInActorContext(ctx.Edge, [&] { - SendToBSProxy(ctx.Edge, ctx.GroupId, new TEvBlobStorage::TEvPut(blobId, data, TInstant::Max()), handleClass); + TEvBlobStorage::TEvPut* ev = new TEvBlobStorage::TEvPut(blobId, data, TInstant::Max()); + if (verboseHandlers) { + Ctest << TAppData::TimeProvider->Now() << " Send TEvPut# " << ev->ToString() << Endl; + } + SendToBSProxy(ctx.Edge, ctx.GroupId, ev, handleClass); }); auto res = ctx.Env->WaitForEdgeActorEvent( - ctx.Edge, false, TAppData::TimeProvider->Now() + waitTime); - UNIT_ASSERT_C(res, "fastDisksNum# " << fastDisksNum); - UNIT_ASSERT_VALUES_EQUAL(res->Get()->Status, NKikimrProto::OK); + ctx.Edge, false, timeout ? (TAppData::TimeProvider->Now() + waitTime) : TInstant::Max()); + if (timeout) { + if (slowDisksNum <= maxSlowCount) { + UNIT_ASSERT_C(res, "fastDisksNum# " << fastDisksNum); + UNIT_ASSERT_VALUES_EQUAL(res->Get()->Status, NKikimrProto::OK); + } else { + UNIT_ASSERT_C(!res, "fastDisksNum# " << fastDisksNum); + } + } }; - bool verboseHandlers = false; ctx.VDiskDelayEmulator->AddHandler(TEvBlobStorage::TEvVPutResult::EventType, [&](std::unique_ptr& ev) { ui32 nodeId = ev->Sender.NodeId(); if (nodeId < ctx.NodeCount) { @@ -283,14 +297,14 @@ Y_UNIT_TEST_SUITE(Acceleration) { }); for (ui32 i = 0; i < initialRequests; ++i) { - put(TLogoBlobID(1, 1, 1, 1, data.size(), 123 + i)); + put(TLogoBlobID(1, 1, 1, 1, data.size(), 123 + i), false); } ctx.Env->Sim(slowDelay); std::deque delayByResponseOrder; for (ui32 i = 0; i < erasure.BlobSubgroupSize(); ++i) { - if (i >= fastDisksNum && i < fastDisksNum + slowDisksNum) { + if (i >= fastDisksNum && i < fastDisksNum + slowDisksNum) { delayByResponseOrder.push_back(slowDiskDelay); } else { delayByResponseOrder.push_back(fastDiskDelay); @@ -301,7 +315,8 @@ Y_UNIT_TEST_SUITE(Acceleration) { ctx.VDiskDelayEmulator->LogUnwrap = true; verboseHandlers = true; ADD_DSPROXY_MESSAGE_PRINTER(TEvBlobStorage::TEvVPut); - put(TLogoBlobID(1, 1, 1, 1, data.size(), 1)); + ADD_DSPROXY_MESSAGE_PRINTER(TEvBlobStorage::TEvPutResult); + put(TLogoBlobID(1, 1, 1, 1, data.size(), 1), true); } } @@ -309,16 +324,18 @@ Y_UNIT_TEST_SUITE(Acceleration) { void TestAccelerateGet(const TBlobStorageGroupType& erasure, ui32 slowDisksNum, NKikimrBlobStorage::EGetHandleClass handleClass, TDuration fastDelay, TDuration slowDelay, TDuration initDelay, TDuration waitTime, - float delayMultiplier) { + float delayMultiplier, ui32 maxSlowCount = 2) { ui32 initialRequests = 100; float slowDiskThreshold = 2; TDiskDelay fastDiskDelay = TDiskDelay(fastDelay); TDiskDelay slowDiskDelay = TDiskDelay(slowDelay); TDiskDelay initDiskDelay = TDiskDelay(initDelay); - for (ui32 fastDisksNum = 0; fastDisksNum < erasure.BlobSubgroupSize() - 2; ++fastDisksNum) { + ui32 requests = 3; + + for (ui32 fastDisksNum = 0; fastDisksNum < requests - 1; ++fastDisksNum) { Ctest << "fastDisksNum# " << fastDisksNum << Endl; - TestCtx ctx(erasure, slowDiskThreshold, delayMultiplier); + TestCtx ctx(erasure, slowDiskThreshold, delayMultiplier, maxSlowCount); ctx.VDiskDelayEmulator->DefaultDelay = initDiskDelay; ctx.Initialize(); @@ -340,7 +357,7 @@ Y_UNIT_TEST_SUITE(Acceleration) { }); TString data = MakeData(1024); - auto putAndGet = [&](TLogoBlobID blobId) { + auto putAndGet = [&](TLogoBlobID blobId, bool timeout) { ctx.Env->Runtime->WrapInActorContext(ctx.Edge, [&] { SendToBSProxy(ctx.Edge, ctx.GroupId, new TEvBlobStorage::TEvPut(blobId, data, TInstant::Max())); }); @@ -350,14 +367,21 @@ Y_UNIT_TEST_SUITE(Acceleration) { ctx.Env->Runtime->WrapInActorContext(ctx.Edge, [&] { SendToBSProxy(ctx.Edge, ctx.GroupId, new TEvBlobStorage::TEvGet(blobId, 0, data.size(), TInstant::Max(), handleClass)); }); - auto getRes = ctx.Env->WaitForEdgeActorEvent(ctx.Edge, false, TAppData::TimeProvider->Now() + waitTime); - UNIT_ASSERT_C(getRes, "fastDisksNum# " << fastDisksNum); - UNIT_ASSERT_VALUES_EQUAL(getRes->Get()->Status, NKikimrProto::OK); - UNIT_ASSERT_VALUES_EQUAL(getRes->Get()->Responses[0].Status, NKikimrProto::OK); + auto getRes = ctx.Env->WaitForEdgeActorEvent(ctx.Edge, false, + timeout ? (TAppData::TimeProvider->Now() + waitTime) : TInstant::Max()); + if (timeout) { + if (slowDisksNum <= maxSlowCount) { + UNIT_ASSERT_C(getRes, "fastDisksNum# " << fastDisksNum); + UNIT_ASSERT_VALUES_EQUAL(getRes->Get()->Status, NKikimrProto::OK); + UNIT_ASSERT_VALUES_EQUAL(getRes->Get()->Responses[0].Status, NKikimrProto::OK); + } else { + UNIT_ASSERT_C(!getRes, "fastDisksNum# " << fastDisksNum); + } + } }; for (ui32 i = 0; i < initialRequests; ++i) { - putAndGet(TLogoBlobID(1, 1, 1, 1, data.size(), 123 + i)); + putAndGet(TLogoBlobID(1, 1, 1, 1, data.size(), 123 + i), false); } ctx.Env->Sim(slowDelay); @@ -374,7 +398,8 @@ Y_UNIT_TEST_SUITE(Acceleration) { ctx.VDiskDelayEmulator->LogUnwrap = true; verboseHandlers = true; ADD_DSPROXY_MESSAGE_PRINTER(TEvBlobStorage::TEvVGet); - putAndGet(TLogoBlobID(1, 1, 1, 1, data.size(), 2)); + ADD_DSPROXY_MESSAGE_PRINTER(TEvBlobStorage::TEvGetResult); + putAndGet(TLogoBlobID(1, 1, 1, 1, data.size(), 2), true); } } @@ -424,7 +449,7 @@ Y_UNIT_TEST_SUITE(Acceleration) { } void TestThreshold(const TBlobStorageGroupType& erasure, ui32 slowDisks, bool delayPuts, bool delayGets, - TTestThresholdRequestSender sendRequests) { + TTestThresholdRequestSender sendRequests, float maxRatio) { float delayMultiplier = 1; float slowDiskThreshold = 1.2; TDiskDelay fastDiskDelay = TDiskDelay(TDuration::Seconds(0.1), 10, TDuration::Seconds(1), 1, "fast"); @@ -437,7 +462,7 @@ Y_UNIT_TEST_SUITE(Acceleration) { ui32 groupSize = erasure.BlobSubgroupSize(); std::vector nodeIsSlow(groupSize, true); - std::vector vputsByNode(groupSize, 0); + std::vector vrequestsByNode(groupSize, 0); for (ui32 i = 0; i < groupSize; ++i) { bool isSlow = (i % 3 == 0 && i / 3 < slowDisks); @@ -457,7 +482,7 @@ Y_UNIT_TEST_SUITE(Acceleration) { TDuration delay = ctx.VDiskDelayEmulator->DelayMsg(ev); Ctest << TAppData::TimeProvider->Now() << " TEvVPutResult: vdiskId# " << vdiskId.ToString() << " partId# " << partId.ToString() << " nodeId# " << nodeId << ", delay " << delay << Endl; - ++vputsByNode[nodeId - 1]; + ++vrequestsByNode[nodeId - 1]; return false; } return true; @@ -475,7 +500,7 @@ Y_UNIT_TEST_SUITE(Acceleration) { TDuration delay = ctx.VDiskDelayEmulator->DelayMsg(ev); Ctest << TAppData::TimeProvider->Now() << " TEvVGetResult: vdiskId# " << vdiskId.ToString() << " partId# " << partId.ToString() << " nodeId# " << nodeId << ", delay " << delay << Endl; - ++vputsByNode[nodeId - 1]; + ++vrequestsByNode[nodeId - 1]; return false; } return true; @@ -492,15 +517,15 @@ Y_UNIT_TEST_SUITE(Acceleration) { TStringStream str; - str << "VPUTS BY NODE: "; + str << "VRequests by node: "; for (ui32 i = 0; i < groupSize; ++i) { - str << "{ nodeId# " << i << " isSlow# " << nodeIsSlow[i] << ' ' << vputsByNode[i] << "}, "; + str << "{ nodeId# " << i << " isSlow# " << nodeIsSlow[i] << ' ' << vrequestsByNode[i] << "}, "; if (nodeIsSlow[i]) { ++slowNodesCount; - slowNodesRequests += vputsByNode[i]; + slowNodesRequests += vrequestsByNode[i]; } else { ++fastNodesCount; - fastNodesRequests += vputsByNode[i]; + fastNodesRequests += vrequestsByNode[i]; } } Ctest << str.Str() << Endl; @@ -508,15 +533,17 @@ Y_UNIT_TEST_SUITE(Acceleration) { double slowNodeRequestsAvg = 1. * slowNodesRequests / slowNodesCount; double fastNodeRequestsAvg = 1. * fastNodesRequests / fastNodesCount; - UNIT_ASSERT_LE_C(slowNodeRequestsAvg, fastNodeRequestsAvg / 3, str.Str()); + double ratio = fastNodeRequestsAvg / slowNodeRequestsAvg; + Ctest << "Fast to slow ratio# " << ratio << Endl; + UNIT_ASSERT_GE_C(ratio, maxRatio, "ratio# " << ratio << " " << str.Str()); } void TestThresholdPut(const TBlobStorageGroupType& erasure, ui32 slowDisks) { - TestThreshold(erasure, slowDisks, true, false, TestThresholdSendPutRequests); + TestThreshold(erasure, slowDisks, true, false, TestThresholdSendPutRequests, 5); } void TestThresholdGet(const TBlobStorageGroupType& erasure, ui32 slowDisks) { - TestThreshold(erasure, slowDisks, false, true, TestThresholdSendGetRequests); + TestThreshold(erasure, slowDisks, false, true, TestThresholdSendGetRequests, 1.5); } void TestDelayMultiplierPut(const TBlobStorageGroupType& erasure, ui32 slowDisks) { @@ -526,8 +553,17 @@ Y_UNIT_TEST_SUITE(Acceleration) { void TestDelayMultiplierGet(const TBlobStorageGroupType& erasure, ui32 slowDisks) { TestAccelerateGet(erasure, slowDisks, NKikimrBlobStorage::AsyncRead, TDuration::Seconds(0.9), - TDuration::Seconds(2 - ), TDuration::Seconds(1), TDuration::Seconds(1.95), 0.8); + TDuration::Seconds(2), TDuration::Seconds(1), TDuration::Seconds(1.95), 0.8); + } + + void TestMaxNumOfSlowDisksPut(const TBlobStorageGroupType& erasure, ui32 slowDisks) { + TestAcceleratePut(erasure, slowDisks, NKikimrBlobStorage::AsyncBlob, TDuration::Seconds(1), + TDuration::Seconds(5), TDuration::Seconds(1), TDuration::Seconds(4), 1, 1); + } + + void TestMaxNumOfSlowDisksGet(const TBlobStorageGroupType& erasure, ui32 slowDisks) { + TestAccelerateGet(erasure, slowDisks, NKikimrBlobStorage::AsyncRead, TDuration::Seconds(1), + TDuration::Seconds(5), TDuration::Seconds(1), TDuration::Seconds(4), 1, 1); } #define TEST_ACCELERATE(erasure, method, handleClass, slowDisks) \ @@ -536,6 +572,8 @@ Y_UNIT_TEST_SUITE(Acceleration) { TDuration::Seconds(1), TDuration::Seconds(5), TDuration::Seconds(1), TDuration::Seconds(4), 1); \ } + // TODO fix Acceleration in mirror-3-of-4 + TEST_ACCELERATE(Mirror3dc, Put, AsyncBlob, 1); // TEST_ACCELERATE(Mirror3of4, Put, AsyncBlob, 1); TEST_ACCELERATE(4Plus2Block, Put, AsyncBlob, 1); @@ -557,19 +595,17 @@ Y_UNIT_TEST_SUITE(Acceleration) { Test##param##method(TBlobStorageGroupType::Erasure##erasure, slowDisks); \ } -// TEST_ACCELERATE_PARAMS(Threshold, Put, Mirror3dc, 1); + TEST_ACCELERATE_PARAMS(Threshold, Put, Mirror3dc, 1); TEST_ACCELERATE_PARAMS(Threshold, Put, 4Plus2Block, 1); -// TEST_ACCELERATE_PARAMS(Threshold, Put, Mirror3dc, 2); -// TEST_ACCELERATE_PARAMS(Threshold, Put, 4Plus2Block, 2); + TEST_ACCELERATE_PARAMS(Threshold, Put, Mirror3dc, 2); + TEST_ACCELERATE_PARAMS(Threshold, Put, 4Plus2Block, 2); -// TEST_ACCELERATE_PARAMS(Threshold, Get, Mirror3dc, 1); + TEST_ACCELERATE_PARAMS(Threshold, Get, Mirror3dc, 1); TEST_ACCELERATE_PARAMS(Threshold, Get, 4Plus2Block, 1); -// TEST_ACCELERATE_PARAMS(Threshold, Get, Mirror3dc, 2); -// TEST_ACCELERATE_PARAMS(Threshold, Get, 4Plus2Block, 2); - - // TODO(serg-belyakov): fix all muted tests + TEST_ACCELERATE_PARAMS(Threshold, Get, Mirror3dc, 2); + TEST_ACCELERATE_PARAMS(Threshold, Get, 4Plus2Block, 2); TEST_ACCELERATE_PARAMS(DelayMultiplier, Put, Mirror3dc, 1); TEST_ACCELERATE_PARAMS(DelayMultiplier, Put, 4Plus2Block, 1); @@ -583,6 +619,15 @@ Y_UNIT_TEST_SUITE(Acceleration) { TEST_ACCELERATE_PARAMS(DelayMultiplier, Get, Mirror3dc, 2); TEST_ACCELERATE_PARAMS(DelayMultiplier, Get, 4Plus2Block, 2); + TEST_ACCELERATE_PARAMS(MaxNumOfSlowDisks, Get, Mirror3dc, 1); + TEST_ACCELERATE_PARAMS(MaxNumOfSlowDisks, Get, 4Plus2Block, 1); + + TEST_ACCELERATE_PARAMS(MaxNumOfSlowDisks, Put, Mirror3dc, 1); + TEST_ACCELERATE_PARAMS(MaxNumOfSlowDisks, Put, 4Plus2Block, 1); + + TEST_ACCELERATE_PARAMS(MaxNumOfSlowDisks, Put, Mirror3dc, 2); + TEST_ACCELERATE_PARAMS(MaxNumOfSlowDisks, Put, 4Plus2Block, 2); + #undef TEST_ACCELERATE #undef TEST_ACCELERATE_PARAMS #undef PRINT_DSPROXY_MESSAGE diff --git a/ydb/core/blobstorage/ut_blobstorage/lib/common.h b/ydb/core/blobstorage/ut_blobstorage/lib/common.h index 462d6f67861e..3ed6d98cbc61 100644 --- a/ydb/core/blobstorage/ut_blobstorage/lib/common.h +++ b/ydb/core/blobstorage/ut_blobstorage/lib/common.h @@ -62,7 +62,7 @@ class TWeightedRandom { public: TWeightedRandom(ui64 seed = 0) : PrefixSum({ 0 }) - , Mt64(seed) + , Mt64(new TMersenne(seed)) {} TWeightedRandom(const TWeightedRandom&) = default; @@ -77,7 +77,7 @@ class TWeightedRandom { T GetRandom() { Y_ABORT_UNLESS(WeightSum() != 0); - return Get(Mt64() % WeightSum()); + return Get((*Mt64)() % WeightSum()); } T Get(ui64 w) { @@ -95,5 +95,5 @@ class TWeightedRandom { private: std::vector Values; std::vector PrefixSum; - TMersenne Mt64; + std::shared_ptr> Mt64; }; diff --git a/ydb/core/blobstorage/ut_blobstorage/lib/env.h b/ydb/core/blobstorage/ut_blobstorage/lib/env.h index d1ed93c1d083..a86951f0c66d 100644 --- a/ydb/core/blobstorage/ut_blobstorage/lib/env.h +++ b/ydb/core/blobstorage/ut_blobstorage/lib/env.h @@ -51,6 +51,7 @@ struct TEnvironmentSetup { const float SlowDiskThreshold = 2; const float VDiskPredictedDelayMultiplier = 1; const bool UseActorSystemTimeInBSQueue = true; + const ui32 MaxNumOfSlowDisks = 2; }; const TSettings Settings; @@ -403,6 +404,8 @@ struct TEnvironmentSetup { ADD_ICB_CONTROL("DSProxyControls.SlowDiskThreshold", 2'000, 1, 1'000'000, std::round(Settings.SlowDiskThreshold * 1'000)); ADD_ICB_CONTROL("DSProxyControls.PredictedDelayMultiplier", 1'000, 1, 1'000'000, std::round(Settings.VDiskPredictedDelayMultiplier * 1'000)); + ADD_ICB_CONTROL("DSProxyControls.MaxNumOfSlowDisks", 2, 1, 2, Settings.MaxNumOfSlowDisks); + #undef ADD_ICB_CONTROL { diff --git a/ydb/core/protos/config.proto b/ydb/core/protos/config.proto index c16ad21888a6..32511b4206ae 100644 --- a/ydb/core/protos/config.proto +++ b/ydb/core/protos/config.proto @@ -1319,12 +1319,12 @@ message TImmediateControlsConfig { message TDSProxyControls { optional uint64 SlowDiskThreshold = 1 [(ControlOptions) = { - Description: "The minimum ratio of slowest and second slowest disks, required to accelerate, actual value is divided by 1000", + Description: "The minimum ratio of slowest and second slowest disks, required to accelerate, promille", MinValue: 1, MaxValue: 1000000, DefaultValue: 2000 }]; optional uint64 PredictedDelayMultiplier = 2 [(ControlOptions) = { - Description: "Predicted time of VDisk's response is multiplied by this value divided by 1000", + Description: "Predicted time of VDisk's response multiplier, promille", MinValue: 0, MaxValue: 1000000, DefaultValue: 1000 }]; @@ -1338,6 +1338,11 @@ message TImmediateControlsConfig { MinValue: 1, MaxValue: 1000000, DefaultValue: 60000 }]; + optional uint64 MaxNumOfSlowDisks = 5 [(ControlOptions) = { + Description: "Maximum number of slow disks, which DSProxy can skip with Accelerations", + MinValue: 1, + MaxValue: 2, + DefaultValue: 2 }]; } message TPDiskControls {