From da5623cb5caf5e2ab1bcb9151289ee910f57a9ac Mon Sep 17 00:00:00 2001 From: Alexander Rutkovsky Date: Fri, 4 Oct 2024 15:50:29 +0300 Subject: [PATCH] Harden huge blob placement checks in VDisk, fix HugeKeeper WAL (#10034) --- .../vdisk/huge/blobstorage_hullhuge.cpp | 14 +- .../vdisk/huge/blobstorage_hullhuge_ut.cpp | 2 - .../vdisk/huge/blobstorage_hullhugedefs.h | 4 +- .../vdisk/huge/blobstorage_hullhugeheap.cpp | 63 ++++++--- .../vdisk/huge/blobstorage_hullhugeheap.h | 5 +- .../huge/blobstorage_hullhugeheap_ut.cpp | 4 +- .../huge/blobstorage_hullhugerecovery.cpp | 127 ++++++++++++++---- .../vdisk/huge/blobstorage_hullhugerecovery.h | 11 +- .../localrecovery/localrecovery_logreplay.cpp | 3 + .../localrecovery/localrecovery_public.cpp | 31 +++++ 10 files changed, 204 insertions(+), 60 deletions(-) diff --git a/ydb/core/blobstorage/vdisk/huge/blobstorage_hullhuge.cpp b/ydb/core/blobstorage/vdisk/huge/blobstorage_hullhuge.cpp index ee1597bd88ed..ea8e84b3b8a3 100644 --- a/ydb/core/blobstorage/vdisk/huge/blobstorage_hullhuge.cpp +++ b/ydb/core/blobstorage/vdisk/huge/blobstorage_hullhuge.cpp @@ -693,8 +693,8 @@ LWTRACE_USING(BLOBSTORAGE_PROVIDER); NHuge::THugeSlot hugeSlot; ui32 slotSize; if (State.Pers->Heap->Allocate(msg.Data.GetSize(), &hugeSlot, &slotSize)) { - const bool inserted = State.Pers->AllocatedSlots.insert(hugeSlot).second; - Y_ABORT_UNLESS(inserted); + State.Pers->AddSlotInFlight(hugeSlot); + State.Pers->AddChunkSize(hugeSlot); const ui64 lsnInfimum = HugeKeeperCtx->LsnMngr->GetLsn(); CheckLsn(lsnInfimum, "WriteHugeBlob"); const ui64 wId = State.LsnFifo.Push(lsnInfimum); @@ -889,6 +889,7 @@ LWTRACE_USING(BLOBSTORAGE_PROVIDER); for (const auto &x : msg->HugeBlobs) { slotSizes.insert(State.Pers->Heap->SlotSizeOfThisSize(x.Size)); NHuge::TFreeRes freeRes = State.Pers->Heap->Free(x); + State.Pers->DeleteChunkSize(State.Pers->Heap->ConvertDiskPartToHugeSlot(x)); LOG_DEBUG(ctx, BS_HULLHUGE, VDISKP(HugeKeeperCtx->VCtx->VDiskLogPrefix, "THullHugeKeeper: TEvHullFreeHugeSlots: one slot: addr# %s", @@ -961,8 +962,8 @@ LWTRACE_USING(BLOBSTORAGE_PROVIDER); // manage allocated slots const TDiskPart &hugeBlob = msg->HugeBlob; NHuge::THugeSlot hugeSlot(State.Pers->Heap->ConvertDiskPartToHugeSlot(hugeBlob)); - auto nErased = State.Pers->AllocatedSlots.erase(hugeSlot); - Y_ABORT_UNLESS(nErased == 1); + const bool deleted = State.Pers->DeleteSlotInFlight(hugeSlot); + Y_ABORT_UNLESS(deleted); // depending on SlotIsUsed... if (msg->SlotIsUsed) { Y_VERIFY_S(State.Pers->LogPos.HugeBlobLoggedLsn < msg->RecLsn, @@ -972,6 +973,8 @@ LWTRACE_USING(BLOBSTORAGE_PROVIDER); } else { // ...free slot State.Pers->Heap->Free(hugeBlob); + // and remove chunk size record + State.Pers->DeleteChunkSize(hugeSlot); } } @@ -1090,8 +1093,7 @@ LWTRACE_USING(BLOBSTORAGE_PROVIDER); : HugeKeeperCtx(std::move(hugeKeeperCtx)) , State(std::move(persState)) { - Y_ABORT_UNLESS(State.Pers->Recovered && - State.Pers->AllocatedSlots.empty()); + Y_ABORT_UNLESS(State.Pers->Recovered && State.Pers->SlotsInFlight.empty()); } void Bootstrap(const TActorContext &ctx) { diff --git a/ydb/core/blobstorage/vdisk/huge/blobstorage_hullhuge_ut.cpp b/ydb/core/blobstorage/vdisk/huge/blobstorage_hullhuge_ut.cpp index a1b738a5d365..2aa98ea92cc0 100644 --- a/ydb/core/blobstorage/vdisk/huge/blobstorage_hullhuge_ut.cpp +++ b/ydb/core/blobstorage/vdisk/huge/blobstorage_hullhuge_ut.cpp @@ -34,8 +34,6 @@ namespace NKikimr { overhead, freeChunksReservation, logf)); state->LogPos = THullHugeRecoveryLogPos(0, 0, 100500, 50000, 70000, 56789, 39482); - NHuge::THugeSlot hugeSlot(453, 0, 234); - state->AllocatedSlots.insert(hugeSlot); TString serialized(state->Serialize()); UNIT_ASSERT(THullHugeKeeperPersState::CheckEntryPoint(serialized)); diff --git a/ydb/core/blobstorage/vdisk/huge/blobstorage_hullhugedefs.h b/ydb/core/blobstorage/vdisk/huge/blobstorage_hullhugedefs.h index e0053822b1c4..ef5ce75f88bc 100644 --- a/ydb/core/blobstorage/vdisk/huge/blobstorage_hullhugedefs.h +++ b/ydb/core/blobstorage/vdisk/huge/blobstorage_hullhugedefs.h @@ -23,12 +23,14 @@ namespace NKikimr { ui32 ChunkId = 0; TMask Mask; ui32 MaskSize = 0; + bool InLockedChunks = false; TFreeRes() = default; - TFreeRes(ui32 chunkId, TMask mask, ui32 maskSize) + TFreeRes(ui32 chunkId, TMask mask, ui32 maskSize, bool inLockedChunks) : ChunkId(chunkId) , Mask(mask) , MaskSize(maskSize) + , InLockedChunks(inLockedChunks) {} void Output(IOutputStream &str) const; diff --git a/ydb/core/blobstorage/vdisk/huge/blobstorage_hullhugeheap.cpp b/ydb/core/blobstorage/vdisk/huge/blobstorage_hullhugeheap.cpp index 8a26098494d2..be27874e3f7e 100644 --- a/ydb/core/blobstorage/vdisk/huge/blobstorage_hullhugeheap.cpp +++ b/ydb/core/blobstorage/vdisk/huge/blobstorage_hullhugeheap.cpp @@ -138,7 +138,7 @@ namespace NKikimr { Y_VERIFY_S(chunkId, VDiskLogPrefix << "chunkId# " << chunkId); TFreeSpace::iterator it; - auto freeFoundSlot = [&] (TFreeSpace &container, const char *containerName) { + auto freeFoundSlot = [&] (TFreeSpace &container, const char *containerName, bool inLockedChunks) { TMask &mask = it->second; Y_VERIFY_S(!mask.Get(slotId), VDiskLogPrefix << "TChain::Free: containerName# " << containerName << " id# " << id.ToString() << " State# " << ToString()); @@ -148,15 +148,15 @@ namespace NKikimr { // free chunk container.erase(it); FreeSlotsInFreeSpace -= SlotsInChunk; - return TFreeRes(chunkId, ConstMask, SlotsInChunk); + return TFreeRes(chunkId, ConstMask, SlotsInChunk, inLockedChunks); } else - return TFreeRes(0, mask, SlotsInChunk); + return TFreeRes(0, mask, SlotsInChunk, false); }; if ((it = FreeSpace.find(chunkId)) != FreeSpace.end()) { - return freeFoundSlot(FreeSpace, "FreeSpace"); + return freeFoundSlot(FreeSpace, "FreeSpace", false); } else if ((it = LockedChunks.find(chunkId)) != LockedChunks.end()) { - return freeFoundSlot(LockedChunks, "LockedChunks"); + return freeFoundSlot(LockedChunks, "LockedChunks", true); } else { // chunk is neither in FreeSpace nor in LockedChunks TDynBitMap mask; @@ -166,13 +166,13 @@ namespace NKikimr { ++FreeSlotsInFreeSpace; FreeSpace.emplace(chunkId, mask); - return TFreeRes(0, mask, SlotsInChunk); // no empty chunk + return TFreeRes(0, mask, SlotsInChunk, false); // no empty chunk } } bool TChain::LockChunkForAllocation(TChunkID chunkId) { - if (TFreeSpace::iterator it = FreeSpace.find(chunkId); it != FreeSpace.end()) { - LockedChunks.insert(FreeSpace.extract(it)); + if (auto nh = FreeSpace.extract(chunkId)) { + LockedChunks.insert(std::move(nh)); return true; } else { // chunk is already freed @@ -181,8 +181,8 @@ namespace NKikimr { } void TChain::UnlockChunk(TChunkID chunkId) { - if (auto it = LockedChunks.find(chunkId); it != LockedChunks.end()) { - FreeSpace.insert(LockedChunks.extract(it)); + if (auto nh = LockedChunks.extract(chunkId)) { + FreeSpace.insert(std::move(nh)); } } @@ -211,15 +211,20 @@ namespace NKikimr { ui32 chunkId = id.GetChunkId(); ui32 slotId = id.GetSlotId(); - TFreeSpace::iterator it = FreeSpace.find(chunkId); - if (it != FreeSpace.end()) { + TFreeSpace *map = &FreeSpace; + TFreeSpace::iterator it = map->find(chunkId); + if (it == map->end()) { + map = &LockedChunks; + it = map->find(chunkId); + } + if (it != map->end()) { TMask &mask = it->second; Y_VERIFY_S(mask.Get(slotId), VDiskLogPrefix << "RecoveryModeAllocate:" << " id# " << id.ToString() << " State# " << ToString()); mask.Reset(slotId); if (mask.Empty()) { - FreeSpace.erase(it); + map->erase(it); } --FreeSlotsInFreeSpace; @@ -230,11 +235,11 @@ namespace NKikimr { } } - void TChain::RecoveryModeAllocate(const NPrivate::TChunkSlot &id, TChunkID chunkId) { - Y_VERIFY_S(id.GetChunkId() == chunkId && FreeSpace.find(chunkId) == FreeSpace.end(), + void TChain::RecoveryModeAllocate(const NPrivate::TChunkSlot &id, TChunkID chunkId, bool inLockedChunks) { + Y_VERIFY_S(id.GetChunkId() == chunkId && !FreeSpace.contains(chunkId) && !LockedChunks.contains(chunkId), VDiskLogPrefix << " id# " << id.ToString() << " chunkId# " << chunkId << " State# " << ToString()); - FreeSpace.emplace(chunkId, ConstMask); + (inLockedChunks ? LockedChunks : FreeSpace).emplace(chunkId, ConstMask); FreeSlotsInFreeSpace += SlotsInChunk; bool res = RecoveryModeAllocate(id); @@ -358,6 +363,10 @@ namespace NKikimr { return NPrivate::TChunkSlot(addr.ChunkIdx, slotId); } + NPrivate::TChunkSlot TChainDelegator::Convert(const THugeSlot &slot) const { + return Convert(slot.GetDiskPart()); + } + void TChainDelegator::Save(IOutputStream *s) const { ::Save(s, *ChainPtr); } @@ -791,7 +800,7 @@ namespace NKikimr { TFreeChunks::iterator it = FreeChunks.find(chunkId); Y_VERIFY_S(it != FreeChunks.end(), VDiskLogPrefix << "addr# " << addr.ToString() << " State# " << ToString()); FreeChunks.erase(it); - chainD->ChainPtr->RecoveryModeAllocate(id, chunkId); + chainD->ChainPtr->RecoveryModeAllocate(id, chunkId, false); } } @@ -807,6 +816,26 @@ namespace NKikimr { } } + bool THeap::ReleaseSlot(THugeSlot slot) { + TChainDelegator* const chain = Chains.GetChain(slot.GetSize()); + Y_VERIFY_S(chain, VDiskLogPrefix << "State# " << ToString() << " slot# " << slot.ToString()); + if (TFreeRes res = chain->ChainPtr->Free(chain->Convert(slot)); res.ChunkId) { + PutChunkIdToFreeChunks(res.ChunkId); + return res.InLockedChunks; + } + return false; + } + + void THeap::OccupySlot(THugeSlot slot, bool inLockedChunks) { + TChainDelegator* const chain = Chains.GetChain(slot.GetSize()); + Y_VERIFY_S(chain, VDiskLogPrefix << "State# " << ToString() << " slot# " << slot.ToString()); + if (!chain->ChainPtr->RecoveryModeAllocate(chain->Convert(slot))) { + const size_t numErased = FreeChunks.erase(slot.GetChunkId()); + Y_VERIFY_S(numErased, VDiskLogPrefix << "State# " << ToString() << " slot# " << slot.ToString()); + chain->ChainPtr->RecoveryModeAllocate(chain->Convert(slot), slot.GetChunkId(), inLockedChunks); + } + } + ////////////////////////////////////////////////////////////////////////////////////////// // THeap: Serialize/Parse/Check ////////////////////////////////////////////////////////////////////////////////////////// diff --git a/ydb/core/blobstorage/vdisk/huge/blobstorage_hullhugeheap.h b/ydb/core/blobstorage/vdisk/huge/blobstorage_hullhugeheap.h index f79aaa78370c..6fbae987e4ba 100644 --- a/ydb/core/blobstorage/vdisk/huge/blobstorage_hullhugeheap.h +++ b/ydb/core/blobstorage/vdisk/huge/blobstorage_hullhugeheap.h @@ -124,7 +124,7 @@ namespace NKikimr { THeapStat GetStat() const; // returns true is allocated, false otherwise bool RecoveryModeAllocate(const NPrivate::TChunkSlot &id); - void RecoveryModeAllocate(const NPrivate::TChunkSlot &id, TChunkID chunkId); + void RecoveryModeAllocate(const NPrivate::TChunkSlot &id, TChunkID chunkId, bool inLockedChunks); void Save(IOutputStream *s) const; void Load(IInputStream *s); bool HaveBeenUsed() const; @@ -158,6 +158,7 @@ namespace NKikimr { TChainDelegator &operator =(const TChainDelegator &) = delete; THugeSlot Convert(const NPrivate::TChunkSlot &id) const; NPrivate::TChunkSlot Convert(const TDiskPart &addr) const; + NPrivate::TChunkSlot Convert(const THugeSlot &slot) const; void Save(IOutputStream *s) const; void Load(IInputStream *s); bool HaveBeenUsed() const; @@ -299,6 +300,8 @@ namespace NKikimr { void RecoveryModeAllocate(const TDiskPart &addr); void RecoveryModeAddChunk(ui32 chunkId); void RecoveryModeRemoveChunks(const TVector &chunkIds); + bool ReleaseSlot(THugeSlot slot); + void OccupySlot(THugeSlot slot, bool inLockedChunks); ////////////////////////////////////////////////////////////////////////////////////////// // Serialize/Parse/Check diff --git a/ydb/core/blobstorage/vdisk/huge/blobstorage_hullhugeheap_ut.cpp b/ydb/core/blobstorage/vdisk/huge/blobstorage_hullhugeheap_ut.cpp index 9b39f25a3796..5de29e6f46bc 100644 --- a/ydb/core/blobstorage/vdisk/huge/blobstorage_hullhugeheap_ut.cpp +++ b/ydb/core/blobstorage/vdisk/huge/blobstorage_hullhugeheap_ut.cpp @@ -17,13 +17,13 @@ namespace NKikimr { TMask mask; mask.Set(0, 8); mask.Reset(1); - TFreeRes res = {15, mask, 8}; + TFreeRes res = {15, mask, 8, false}; STR << "TFreeRes# " << res.ToString() << "\n"; UNIT_ASSERT_EQUAL(res.ToString(), "{ChunkIdx: 15 Mask# 10111111}"); TMask constMask = TChain::BuildConstMask("", 8); - TFreeRes constRes = {0, constMask, 8}; + TFreeRes constRes = {0, constMask, 8, false}; STR << "constMask# " << constRes.ToString() << "\n"; UNIT_ASSERT_EQUAL(constRes.ToString(), "{ChunkIdx: 0 Mask# 11111111}"); diff --git a/ydb/core/blobstorage/vdisk/huge/blobstorage_hullhugerecovery.cpp b/ydb/core/blobstorage/vdisk/huge/blobstorage_hullhugerecovery.cpp index d9ff45dd08c9..f874b1c5d581 100644 --- a/ydb/core/blobstorage/vdisk/huge/blobstorage_hullhugerecovery.cpp +++ b/ydb/core/blobstorage/vdisk/huge/blobstorage_hullhugerecovery.cpp @@ -75,7 +75,6 @@ namespace NKikimr { , Heap(new NHuge::THeap(VCtx->VDiskLogPrefix, chunkSize, appendBlockSize, minHugeBlobInBytes, oldMinHugeBlobInBytes, milestoneHugeBlobInBytes, maxBlobInBytes, overhead, freeChunksReservation)) - , AllocatedSlots() , Guid(TAppData::RandomProvider->GenRand64()) { logFunc(VDISKP(VCtx->VDiskLogPrefix, @@ -100,7 +99,6 @@ namespace NKikimr { , Heap(new NHuge::THeap(VCtx->VDiskLogPrefix, chunkSize, appendBlockSize, minHugeBlobInBytes, oldMinHugeBlobInBytes, milestoneHugeBlobInBytes, maxBlobInBytes, overhead, freeChunksReservation)) - , AllocatedSlots() , Guid(TAppData::RandomProvider->GenRand64()) , PersistentLsn(entryPointLsn) { @@ -128,7 +126,6 @@ namespace NKikimr { , Heap(new NHuge::THeap(VCtx->VDiskLogPrefix, chunkSize, appendBlockSize, minHugeBlobInBytes, oldMinHugeBlobInBytes, milestoneHugeBlobInBytes, maxBlobInBytes, overhead, freeChunksReservation)) - , AllocatedSlots() , Guid(TAppData::RandomProvider->GenRand64()) , PersistentLsn(entryPointLsn) { @@ -153,7 +150,18 @@ namespace NKikimr { str.Write(serializedLogPos.data(), THullHugeRecoveryLogPos::SerializedSize); // heap + std::vector inLockedChunks; + inLockedChunks.reserve(SlotsInFlight.size()); + for (const THugeSlot& slot : SlotsInFlight) { + inLockedChunks.push_back(Heap->ReleaseSlot(slot)); // mark this slot as free one for the means of serialization + } TString serializedHeap = Heap->Serialize(); + size_t index = 0; + for (const THugeSlot& slot : SlotsInFlight) { + Y_DEBUG_ABORT_UNLESS(index < inLockedChunks.size()); + Heap->OccupySlot(slot, inLockedChunks[index++]); // restore slot ownership + } + Y_DEBUG_ABORT_UNLESS(index == inLockedChunks.size()); ui32 heapSize = serializedHeap.size(); str.Write(&heapSize, sizeof(ui32)); str.Write(serializedHeap.data(), heapSize); @@ -163,14 +171,9 @@ namespace NKikimr { Y_ABORT_UNLESS(!chunksSize); str.Write(&chunksSize, sizeof(ui32)); - // allocated slots - ui32 slotsSize = AllocatedSlots.size(); + // allocated slots (we really never save them now, they're considered as free ones while serializing Heap) + ui32 slotsSize = 0; str.Write(&slotsSize, sizeof(ui32)); - for (const auto &x : AllocatedSlots) { - x.Serialize(str); - ui64 refPointLsn = 0; // refPointLsn (for backward compatibility, can be removed) - str.Write(&refPointLsn, sizeof(ui64)); - } return str.Str(); } @@ -181,7 +184,7 @@ namespace NKikimr { void THullHugeKeeperPersState::ParseFromArray(const char* data, size_t size) { Y_UNUSED(size); - AllocatedSlots.clear(); + SlotsInFlight.clear(); const char *cur = data; cur += sizeof(ui32); // signature @@ -209,8 +212,7 @@ namespace NKikimr { hugeSlot.Parse(cur, cur + NHuge::THugeSlot::SerializedSize); cur += NHuge::THugeSlot::SerializedSize; cur += sizeof(ui64); // refPointLsn (for backward compatibility, can be removed) - bool inserted = AllocatedSlots.insert(hugeSlot).second; - Y_ABORT_UNLESS(inserted); + AddSlotInFlight(hugeSlot); } } @@ -280,9 +282,9 @@ namespace NKikimr { TString THullHugeKeeperPersState::ToString() const { TStringStream str; str << "LogPos: " << LogPos.ToString(); - str << " AllocatedSlots:"; - if (!AllocatedSlots.empty()) { - for (const auto &x : AllocatedSlots) { + str << " SlotsInFlight:"; + if (!SlotsInFlight.empty()) { + for (const auto &x : SlotsInFlight) { str << " " << x.ToString(); } } else { @@ -294,14 +296,38 @@ namespace NKikimr { void THullHugeKeeperPersState::RenderHtml(IOutputStream &str) const { str << "LogPos: " << LogPos.ToString() << "
"; - str << "AllocatedSlots:"; - if (!AllocatedSlots.empty()) { - for (const auto &x : AllocatedSlots) { + str << "SlotsInFlight:"; + if (!SlotsInFlight.empty()) { + for (const auto &x : SlotsInFlight) { str << " " << x.ToString(); } } else { str << " empty
"; } + HTML(str) { + COLLAPSED_BUTTON_CONTENT("chunkstoslotsizeid", "ChunksToSlotSize") { + TABLE_CLASS ("table table-condensed") { + TABLEHEAD() { + TABLER() { + TABLEH() {str << "ChunkId";} + TABLEH() {str << "RefCount";} + TABLEH() {str << "SlotSize";} + } + } + TABLEBODY() { + for (const auto& [key, value] : ChunkToSlotSize) { + TABLER() { + const auto& [refcount, size] = value; + TABLED() {str << key;} + TABLED() {str << refcount;} + TABLED() {str << size;} + } + } + } + } + } + str << "
"; + } Heap->RenderHtml(str); } @@ -425,8 +451,9 @@ namespace NKikimr { "Recovery(guid# %" PRIu64 " lsn# %" PRIu64 " entryLsn# %" PRIu64 "): " "RmHugeBlobs apply: %s", Guid, lsn, LogPos.EntryPointLsn, rec.ToString().data())); - for (const auto &x : rec) + for (const auto &x : rec) { Heap->RecoveryModeFree(x); + } *logPosDelLsn = lsn; PersistentLsn = Min(PersistentLsn, lsn); @@ -461,9 +488,7 @@ namespace NKikimr { NHuge::THugeSlot hugeSlot(Heap->ConvertDiskPartToHugeSlot(rec.DiskAddr)); if (lsn > LogPos.HugeBlobLoggedLsn) { // apply - TAllocatedSlots::iterator it = AllocatedSlots.find(hugeSlot); - if (it != AllocatedSlots.end()) { - AllocatedSlots.erase(it); + if (DeleteSlotInFlight(hugeSlot)) { LOG_DEBUG(ctx, BS_HULLHUGE, VDISKP(VCtx->VDiskLogPrefix, "Recovery(guid# %" PRIu64 " lsn# %" PRIu64 " entryLsn# %" PRIu64 "): " @@ -536,13 +561,11 @@ namespace NKikimr { } void THullHugeKeeperPersState::FinishRecovery(const TActorContext &ctx) { - // handle AllocatedSlots - if (!AllocatedSlots.empty()) { - for (const auto &x : AllocatedSlots) { - Heap->RecoveryModeFree(x.GetDiskPart()); - } - AllocatedSlots.clear(); + // handle SlotsInFlight + for (const auto &x : SlotsInFlight) { + Heap->RecoveryModeFree(x.GetDiskPart()); } + SlotsInFlight.clear(); Recovered = true; LOG_DEBUG(ctx, BS_HULLHUGE, @@ -553,5 +576,51 @@ namespace NKikimr { Heap->GetOwnedChunks(chunks); } + void THullHugeKeeperPersState::AddSlotInFlight(THugeSlot hugeSlot) { + const auto [it, inserted] = SlotsInFlight.insert(hugeSlot); + Y_ABORT_UNLESS(inserted); + } + + bool THullHugeKeeperPersState::DeleteSlotInFlight(THugeSlot hugeSlot) { + if (const auto it = SlotsInFlight.find(hugeSlot); it != SlotsInFlight.end()) { + Y_ABORT_UNLESS(it->GetSize() == hugeSlot.GetSize()); + SlotsInFlight.erase(it); + return true; + } else { + return false; + } + } + + void THullHugeKeeperPersState::AddChunkSize(THugeSlot hugeSlot) { + const auto it = ChunkToSlotSize.emplace(hugeSlot.GetChunkId(), std::make_tuple(0, hugeSlot.GetSize())).first; + auto& [refcount, size] = it->second; + Y_VERIFY_DEBUG_S(size == hugeSlot.GetSize(), VCtx->VDiskLogPrefix << "HugeSlot# " << hugeSlot.ToString() + << " Expected# " << size); + if (size != hugeSlot.GetSize() && TlsActivationContext) { + LOG_CRIT_S(*TlsActivationContext, NKikimrServices::BS_HULLHUGE, VCtx->VDiskLogPrefix + << "HugeSlot# " << hugeSlot.ToString() << " size is not as Expected# " << size); + } + ++refcount; + } + + void THullHugeKeeperPersState::DeleteChunkSize(THugeSlot hugeSlot) { + const auto jt = ChunkToSlotSize.find(hugeSlot.GetChunkId()); + Y_VERIFY_S(jt != ChunkToSlotSize.end(), VCtx->VDiskLogPrefix << "HugeSlot# " << hugeSlot.ToString()); + auto& [refcount, size] = jt->second; + Y_VERIFY_DEBUG_S(size == hugeSlot.GetSize(), VCtx->VDiskLogPrefix << "HugeSlot# " << hugeSlot.ToString() + << " Expected# " << size); + if (size != hugeSlot.GetSize() && TlsActivationContext) { + LOG_CRIT_S(*TlsActivationContext, NKikimrServices::BS_HULLHUGE, VCtx->VDiskLogPrefix + << "HugeSlot# " << hugeSlot.ToString() << " size is not as Expected# " << size); + } + if (!--refcount) { + ChunkToSlotSize.erase(jt); + } + } + + void THullHugeKeeperPersState::RegisterBlob(TDiskPart diskPart) { + AddChunkSize(Heap->ConvertDiskPartToHugeSlot(diskPart)); + } + } // NHuge } // NKikimr diff --git a/ydb/core/blobstorage/vdisk/huge/blobstorage_hullhugerecovery.h b/ydb/core/blobstorage/vdisk/huge/blobstorage_hullhugerecovery.h index 4f50ba113fbd..f7b04ba7b42b 100644 --- a/ydb/core/blobstorage/vdisk/huge/blobstorage_hullhugerecovery.h +++ b/ydb/core/blobstorage/vdisk/huge/blobstorage_hullhugerecovery.h @@ -71,7 +71,6 @@ namespace NKikimr { class THeap; struct THullHugeKeeperPersState { - typedef THashSet TAllocatedSlots; static const ui32 Signature; TIntrusivePtr VCtx; @@ -79,7 +78,8 @@ namespace NKikimr { THullHugeRecoveryLogPos LogPos; std::unique_ptr Heap; // slots that are already allocated, but not written to log - TAllocatedSlots AllocatedSlots; + THashSet SlotsInFlight; + THashMap> ChunkToSlotSize; // guard to avoid using structure before recovery has been completed bool Recovered = false; // guid for this instance of pers state @@ -143,6 +143,13 @@ namespace NKikimr { // finish commit void EntryPointCommitted(ui64 lsn); + void AddSlotInFlight(THugeSlot hugeSlot); + bool DeleteSlotInFlight(THugeSlot hugeSlot); + + void AddChunkSize(THugeSlot hugeSlot); + void DeleteChunkSize(THugeSlot hugeSlot); + void RegisterBlob(TDiskPart diskPart); + enum ESlotDelDbType { LogoBlobsDb, BlocksDb, diff --git a/ydb/core/blobstorage/vdisk/localrecovery/localrecovery_logreplay.cpp b/ydb/core/blobstorage/vdisk/localrecovery/localrecovery_logreplay.cpp index fb7f0875a391..0ff8c5bc8fc4 100644 --- a/ydb/core/blobstorage/vdisk/localrecovery/localrecovery_logreplay.cpp +++ b/ydb/core/blobstorage/vdisk/localrecovery/localrecovery_logreplay.cpp @@ -230,6 +230,9 @@ namespace NKikimr { TLogoBlobID genId(id, 0); LocRecCtx->HullDbRecovery->ReplayAddHugeLogoBlobCmd(ctx, genId, ingress, diskAddr, lsn, THullDbRecovery::RECOVERY); + if (diskAddr.ChunkIdx && diskAddr.Size) { + LocRecCtx->RepairedHuge->RegisterBlob(diskAddr); + } } // skip records that already in synclog diff --git a/ydb/core/blobstorage/vdisk/localrecovery/localrecovery_public.cpp b/ydb/core/blobstorage/vdisk/localrecovery/localrecovery_public.cpp index ab40800fd481..107936cde0b9 100644 --- a/ydb/core/blobstorage/vdisk/localrecovery/localrecovery_public.cpp +++ b/ydb/core/blobstorage/vdisk/localrecovery/localrecovery_public.cpp @@ -190,6 +190,37 @@ namespace NKikimr { Become(&TThis::StateLoadBulkFormedSegments); VDiskMonGroup.VDiskLocalRecoveryState() = TDbMon::TDbLocalRecovery::LoadBulkFormedSegments; + // find all the huge blobs and track their slot size + { + TIntrusivePtr& logoBlobs = LocRecCtx->HullDbRecovery->GetHullDs()->LogoBlobs; + TLevelSlice::TSstIterator iter(logoBlobs->CurSlice.Get(), + logoBlobs->CurSlice->Level0CurSstsNum()); + + for (iter.SeekToFirst(); iter.Valid(); iter.Next()) { + struct TMerger { + TThis* const Self; + + void AddFromSegment(const TMemRecLogoBlob& memRec, const TDiskPart *outbound, + const TKeyLogoBlob& /*key*/, ui64 /*circaLsn*/) { + if (memRec.GetType() == TBlobType::HugeBlob || memRec.GetType() == TBlobType::ManyHugeBlobs) { + TDiskDataExtractor extr; + memRec.GetDiskData(&extr, outbound); + for (const TDiskPart *location = extr.Begin; location != extr.End; ++location) { + if (location->ChunkIdx && location->Size) { + Self->LocRecCtx->RepairedHuge->RegisterBlob(*location); + } + } + } + } + } merger{this}; + + TLevelSegment::TMemIterator blobIter(iter.Get().SstPtr.Get()); + for (blobIter.SeekToFirst(); blobIter.Valid(); blobIter.Next()) { + blobIter.PutToMerger(&merger); + } + } + } + // start loading bulk-formed segments that are already not in index, but still required to recover SyncLog auto aid = ctx.Register(LocRecCtx->HullDbRecovery->GetHullDs()->LogoBlobs->CurSlice->BulkFormedSegments.CreateLoaderActor( LocRecCtx->VCtx, LocRecCtx->PDiskCtx, SyncLogMaxLsnStored, ctx.SelfID));