From 62b5db35434639fc4e1231a4a0f65775e9f1f16f Mon Sep 17 00:00:00 2001 From: kruall Date: Wed, 7 Feb 2024 12:15:08 +0300 Subject: [PATCH] Fix leaking blobs via using patching (#1639) --- .../blobstorage/dsproxy/dsproxy_patch.cpp | 26 ++++++++++++------- .../blobstorage/vdisk/common/vdisk_events.h | 13 ++++++++-- .../skeleton/blobstorage_skeletonfront.cpp | 4 ++- .../vdisk/skeleton/skeleton_vpatch_actor.cpp | 11 +++++--- 4 files changed, 38 insertions(+), 16 deletions(-) diff --git a/ydb/core/blobstorage/dsproxy/dsproxy_patch.cpp b/ydb/core/blobstorage/dsproxy/dsproxy_patch.cpp index 131112c799d5..e3429b5f3de4 100644 --- a/ydb/core/blobstorage/dsproxy/dsproxy_patch.cpp +++ b/ydb/core/blobstorage/dsproxy/dsproxy_patch.cpp @@ -68,6 +68,7 @@ class TBlobStorageGroupPatchRequest : public TBlobStorageGroupRequestActor ReceivedResponseFlags; TStackVec EmptyResponseFlags; TStackVec ErrorResponseFlags; + TStackVec ForceStopFlags; TBlobStorageGroupInfo::TVDiskIds VDisks; bool UseVPatch = false; @@ -332,8 +333,15 @@ class TBlobStorageGroupPatchRequest : public TBlobStorageGroupRequestActorGet()->Record; + + Y_ABORT_UNLESS(record.HasCookie()); + ui8 subgroupIdx = record.GetCookie(); + if (ForceStopFlags[subgroupIdx]) { + return; // ignore force stop response + } + ReceivedResults++; + PullOutStatusFlagsAndFressSpace(record); Y_ABORT_UNLESS(record.HasStatus()); NKikimrProto::EReplyStatus status = record.GetStatus(); @@ -342,9 +350,6 @@ class TBlobStorageGroupPatchRequest : public TBlobStorageGroupRequestActor> events; - for (ui32 vdiskIdx = 0; vdiskIdx < VDisks.size(); ++vdiskIdx) { - if (!ErrorResponseFlags[vdiskIdx] && !EmptyResponseFlags[vdiskIdx] && ReceivedResponseFlags[vdiskIdx]) { + for (ui32 subgroupIdx = 0; subgroupIdx < VDisks.size(); ++subgroupIdx) { + if (!ErrorResponseFlags[subgroupIdx] && !EmptyResponseFlags[subgroupIdx] && ReceivedResponseFlags[subgroupIdx]) { std::unique_ptr ev = std::make_unique( - OriginalId, PatchedId, VDisks[vdiskIdx], 0, Deadline, vdiskIdx); + OriginalId, PatchedId, VDisks[subgroupIdx], 0, Deadline, subgroupIdx); ev->SetForceEnd(); + ForceStopFlags[subgroupIdx] = true; events.emplace_back(std::move(ev)); PATCH_LOG(PRI_DEBUG, BS_PROXY_PATCH, BPPA19, "Send stop message", - (VDiskIdxInSubgroup, vdiskIdx), - (VDiskId, VDisks[vdiskIdx])); + (VDiskIdxInSubgroup, subgroupIdx), + (VDiskId, VDisks[subgroupIdx])); } } SendToQueues(events, false); @@ -495,6 +501,7 @@ class TBlobStorageGroupPatchRequest : public TBlobStorageGroupRequestActor> events; diff --git a/ydb/core/blobstorage/vdisk/common/vdisk_events.h b/ydb/core/blobstorage/vdisk/common/vdisk_events.h index f33888e534a9..dfc24c096ff9 100644 --- a/ydb/core/blobstorage/vdisk/common/vdisk_events.h +++ b/ydb/core/blobstorage/vdisk/common/vdisk_events.h @@ -366,10 +366,12 @@ namespace NKikimr { : public TEventLocal { TVMsgContext Ctx; std::unique_ptr Event; + bool DoNotResend; - TEvVDiskRequestCompleted(const TVMsgContext &ctx, std::unique_ptr event) + TEvVDiskRequestCompleted(const TVMsgContext &ctx, std::unique_ptr event, bool doNotResend = false) : Ctx(ctx) , Event(std::move(event)) + , DoNotResend(doNotResend) { Y_DEBUG_ABORT_UNLESS(Ctx.ExtQueueId != NKikimrBlobStorage::EVDiskQueueId::Unknown); Y_DEBUG_ABORT_UNLESS(Ctx.IntQueueId != NKikimrBlobStorage::EVDiskInternalQueueId::IntUnknown); @@ -468,6 +470,9 @@ namespace NKikimr { TActorIDPtr SkeletonFrontIDPtr; THPTimer ExecutionTimer; + protected: + bool DoNotResendFromSkeletonFront = false; + public: TEvVResultBaseWithQoSPB() = default; @@ -526,7 +531,7 @@ namespace NKikimr { byteSize, this->ToString().data()); if (SkeletonFrontIDPtr && MsgCtx.IntQueueId != NKikimrBlobStorage::IntUnknown) { - ctx.Send(*SkeletonFrontIDPtr, new TEvVDiskRequestCompleted(MsgCtx, std::move(ev))); + ctx.Send(*SkeletonFrontIDPtr, new TEvVDiskRequestCompleted(MsgCtx, std::move(ev), DoNotResendFromSkeletonFront)); } else { TActivationContext::Send(ev.release()); } @@ -2182,6 +2187,10 @@ namespace NKikimr { Record.SetApproximateFreeSpaceShare(approximateFreeSpaceShare); } + void SetForceEndResponse() { + DoNotResendFromSkeletonFront = true; + } + void MakeError(NKikimrProto::EReplyStatus status, const TString &errorReason, const NKikimrBlobStorage::TEvVPatchDiff &request) { diff --git a/ydb/core/blobstorage/vdisk/skeleton/blobstorage_skeletonfront.cpp b/ydb/core/blobstorage/vdisk/skeleton/blobstorage_skeletonfront.cpp index 2447d28501f0..b10c280ddf31 100644 --- a/ydb/core/blobstorage/vdisk/skeleton/blobstorage_skeletonfront.cpp +++ b/ydb/core/blobstorage/vdisk/skeleton/blobstorage_skeletonfront.cpp @@ -1633,7 +1633,9 @@ namespace NKikimr { TIntQueueClass &intQueue = GetIntQueue(msgCtx.IntQueueId); intQueue.Completed(ctx, msgCtx, *this, id); VCtx->CostTracker->CountPDiskResponse(); - TActivationContext::Send(event.release()); + if (!ev->Get()->DoNotResend) { + TActivationContext::Send(event.release()); + } } void ChangeGeneration(const TVDiskID& vdiskId, const TIntrusivePtr& info, diff --git a/ydb/core/blobstorage/vdisk/skeleton/skeleton_vpatch_actor.cpp b/ydb/core/blobstorage/vdisk/skeleton/skeleton_vpatch_actor.cpp index 8aaec57b68b4..a3587f75ef24 100644 --- a/ydb/core/blobstorage/vdisk/skeleton/skeleton_vpatch_actor.cpp +++ b/ydb/core/blobstorage/vdisk/skeleton/skeleton_vpatch_actor.cpp @@ -291,7 +291,7 @@ namespace NKikimr::NPrivate { } } - void SendVPatchResult(NKikimrProto::EReplyStatus status) + void SendVPatchResult(NKikimrProto::EReplyStatus status, bool forceEnd = false) { STLOG(PRI_INFO, BS_VDISK_PATCH, BSVSP07, VDiskLogPrefix << " TEvVPatch: send patch result;", @@ -308,6 +308,9 @@ namespace NKikimr::NPrivate { } AddMark((status == NKikimrProto::OK ? "Patch ends with OK" : "Patch ends witn NOT OK")); CurrentEventTrace = nullptr; + if (forceEnd) { + ResultEvent->SetForceEndResponse(); + } SendVDiskResponse(TActivationContext::AsActorContext(), Sender, ResultEvent.release(), Cookie); } @@ -501,7 +504,7 @@ namespace NKikimr::NPrivate { Cookie = ev->Cookie; CurrentEventTrace = ev->Get()->VDiskSkeletonTrace; AddMark("Error: HandleError TEvVPatchDiff"); - SendVPatchResult(NKikimrProto::ERROR); + SendVPatchResult(NKikimrProto::ERROR, ev->Get()->IsForceEnd()); } void HandleForceEnd(TEvBlobStorage::TEvVPatchDiff::TPtr &ev) { @@ -509,7 +512,7 @@ namespace NKikimr::NPrivate { SendVPatchFoundParts(NKikimrProto::ERROR); if (forceEnd) { AddMark("Force end"); - SendVPatchResult(NKikimrProto::OK); + SendVPatchResult(NKikimrProto::OK, true); } else { AddMark("Force end by error"); SendVPatchResult(NKikimrProto::ERROR); @@ -566,7 +569,7 @@ namespace NKikimr::NPrivate { if (forceEnd) { AddMark("Force end"); - SendVPatchResult(NKikimrProto::OK); + SendVPatchResult(NKikimrProto::OK, true); NotifySkeletonAboutDying(); Become(&TThis::ErrorState); return;