Skip to content

Commit

Permalink
Harden huge blob placement checks in VDisk, fix HugeKeeper WAL (#10034)
Browse files Browse the repository at this point in the history
  • Loading branch information
alexvru authored Oct 4, 2024
1 parent ce8c6eb commit da5623c
Show file tree
Hide file tree
Showing 10 changed files with 204 additions and 60 deletions.
14 changes: 8 additions & 6 deletions ydb/core/blobstorage/vdisk/huge/blobstorage_hullhuge.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -693,8 +693,8 @@ LWTRACE_USING(BLOBSTORAGE_PROVIDER);
NHuge::THugeSlot hugeSlot;
ui32 slotSize;
if (State.Pers->Heap->Allocate(msg.Data.GetSize(), &hugeSlot, &slotSize)) {
const bool inserted = State.Pers->AllocatedSlots.insert(hugeSlot).second;
Y_ABORT_UNLESS(inserted);
State.Pers->AddSlotInFlight(hugeSlot);
State.Pers->AddChunkSize(hugeSlot);
const ui64 lsnInfimum = HugeKeeperCtx->LsnMngr->GetLsn();
CheckLsn(lsnInfimum, "WriteHugeBlob");
const ui64 wId = State.LsnFifo.Push(lsnInfimum);
Expand Down Expand Up @@ -889,6 +889,7 @@ LWTRACE_USING(BLOBSTORAGE_PROVIDER);
for (const auto &x : msg->HugeBlobs) {
slotSizes.insert(State.Pers->Heap->SlotSizeOfThisSize(x.Size));
NHuge::TFreeRes freeRes = State.Pers->Heap->Free(x);
State.Pers->DeleteChunkSize(State.Pers->Heap->ConvertDiskPartToHugeSlot(x));
LOG_DEBUG(ctx, BS_HULLHUGE,
VDISKP(HugeKeeperCtx->VCtx->VDiskLogPrefix,
"THullHugeKeeper: TEvHullFreeHugeSlots: one slot: addr# %s",
Expand Down Expand Up @@ -961,8 +962,8 @@ LWTRACE_USING(BLOBSTORAGE_PROVIDER);
// manage allocated slots
const TDiskPart &hugeBlob = msg->HugeBlob;
NHuge::THugeSlot hugeSlot(State.Pers->Heap->ConvertDiskPartToHugeSlot(hugeBlob));
auto nErased = State.Pers->AllocatedSlots.erase(hugeSlot);
Y_ABORT_UNLESS(nErased == 1);
const bool deleted = State.Pers->DeleteSlotInFlight(hugeSlot);
Y_ABORT_UNLESS(deleted);
// depending on SlotIsUsed...
if (msg->SlotIsUsed) {
Y_VERIFY_S(State.Pers->LogPos.HugeBlobLoggedLsn < msg->RecLsn,
Expand All @@ -972,6 +973,8 @@ LWTRACE_USING(BLOBSTORAGE_PROVIDER);
} else {
// ...free slot
State.Pers->Heap->Free(hugeBlob);
// and remove chunk size record
State.Pers->DeleteChunkSize(hugeSlot);
}
}

Expand Down Expand Up @@ -1090,8 +1093,7 @@ LWTRACE_USING(BLOBSTORAGE_PROVIDER);
: HugeKeeperCtx(std::move(hugeKeeperCtx))
, State(std::move(persState))
{
Y_ABORT_UNLESS(State.Pers->Recovered &&
State.Pers->AllocatedSlots.empty());
Y_ABORT_UNLESS(State.Pers->Recovered && State.Pers->SlotsInFlight.empty());
}

void Bootstrap(const TActorContext &ctx) {
Expand Down
2 changes: 0 additions & 2 deletions ydb/core/blobstorage/vdisk/huge/blobstorage_hullhuge_ut.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -34,8 +34,6 @@ namespace NKikimr {
overhead, freeChunksReservation, logf));

state->LogPos = THullHugeRecoveryLogPos(0, 0, 100500, 50000, 70000, 56789, 39482);
NHuge::THugeSlot hugeSlot(453, 0, 234);
state->AllocatedSlots.insert(hugeSlot);

TString serialized(state->Serialize());
UNIT_ASSERT(THullHugeKeeperPersState::CheckEntryPoint(serialized));
Expand Down
4 changes: 3 additions & 1 deletion ydb/core/blobstorage/vdisk/huge/blobstorage_hullhugedefs.h
Original file line number Diff line number Diff line change
Expand Up @@ -23,12 +23,14 @@ namespace NKikimr {
ui32 ChunkId = 0;
TMask Mask;
ui32 MaskSize = 0;
bool InLockedChunks = false;

TFreeRes() = default;
TFreeRes(ui32 chunkId, TMask mask, ui32 maskSize)
TFreeRes(ui32 chunkId, TMask mask, ui32 maskSize, bool inLockedChunks)
: ChunkId(chunkId)
, Mask(mask)
, MaskSize(maskSize)
, InLockedChunks(inLockedChunks)
{}

void Output(IOutputStream &str) const;
Expand Down
63 changes: 46 additions & 17 deletions ydb/core/blobstorage/vdisk/huge/blobstorage_hullhugeheap.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -138,7 +138,7 @@ namespace NKikimr {
Y_VERIFY_S(chunkId, VDiskLogPrefix << "chunkId# " << chunkId);
TFreeSpace::iterator it;

auto freeFoundSlot = [&] (TFreeSpace &container, const char *containerName) {
auto freeFoundSlot = [&] (TFreeSpace &container, const char *containerName, bool inLockedChunks) {
TMask &mask = it->second;
Y_VERIFY_S(!mask.Get(slotId), VDiskLogPrefix << "TChain::Free: containerName# " << containerName
<< " id# " << id.ToString() << " State# " << ToString());
Expand All @@ -148,15 +148,15 @@ namespace NKikimr {
// free chunk
container.erase(it);
FreeSlotsInFreeSpace -= SlotsInChunk;
return TFreeRes(chunkId, ConstMask, SlotsInChunk);
return TFreeRes(chunkId, ConstMask, SlotsInChunk, inLockedChunks);
} else
return TFreeRes(0, mask, SlotsInChunk);
return TFreeRes(0, mask, SlotsInChunk, false);
};

if ((it = FreeSpace.find(chunkId)) != FreeSpace.end()) {
return freeFoundSlot(FreeSpace, "FreeSpace");
return freeFoundSlot(FreeSpace, "FreeSpace", false);
} else if ((it = LockedChunks.find(chunkId)) != LockedChunks.end()) {
return freeFoundSlot(LockedChunks, "LockedChunks");
return freeFoundSlot(LockedChunks, "LockedChunks", true);
} else {
// chunk is neither in FreeSpace nor in LockedChunks
TDynBitMap mask;
Expand All @@ -166,13 +166,13 @@ namespace NKikimr {
++FreeSlotsInFreeSpace;

FreeSpace.emplace(chunkId, mask);
return TFreeRes(0, mask, SlotsInChunk); // no empty chunk
return TFreeRes(0, mask, SlotsInChunk, false); // no empty chunk
}
}

bool TChain::LockChunkForAllocation(TChunkID chunkId) {
if (TFreeSpace::iterator it = FreeSpace.find(chunkId); it != FreeSpace.end()) {
LockedChunks.insert(FreeSpace.extract(it));
if (auto nh = FreeSpace.extract(chunkId)) {
LockedChunks.insert(std::move(nh));
return true;
} else {
// chunk is already freed
Expand All @@ -181,8 +181,8 @@ namespace NKikimr {
}

void TChain::UnlockChunk(TChunkID chunkId) {
if (auto it = LockedChunks.find(chunkId); it != LockedChunks.end()) {
FreeSpace.insert(LockedChunks.extract(it));
if (auto nh = LockedChunks.extract(chunkId)) {
FreeSpace.insert(std::move(nh));
}
}

Expand Down Expand Up @@ -211,15 +211,20 @@ namespace NKikimr {
ui32 chunkId = id.GetChunkId();
ui32 slotId = id.GetSlotId();

TFreeSpace::iterator it = FreeSpace.find(chunkId);
if (it != FreeSpace.end()) {
TFreeSpace *map = &FreeSpace;
TFreeSpace::iterator it = map->find(chunkId);
if (it == map->end()) {
map = &LockedChunks;
it = map->find(chunkId);
}
if (it != map->end()) {
TMask &mask = it->second;
Y_VERIFY_S(mask.Get(slotId), VDiskLogPrefix << "RecoveryModeAllocate:"
<< " id# " << id.ToString() << " State# " << ToString());
mask.Reset(slotId);

if (mask.Empty()) {
FreeSpace.erase(it);
map->erase(it);
}

--FreeSlotsInFreeSpace;
Expand All @@ -230,11 +235,11 @@ namespace NKikimr {
}
}

void TChain::RecoveryModeAllocate(const NPrivate::TChunkSlot &id, TChunkID chunkId) {
Y_VERIFY_S(id.GetChunkId() == chunkId && FreeSpace.find(chunkId) == FreeSpace.end(),
void TChain::RecoveryModeAllocate(const NPrivate::TChunkSlot &id, TChunkID chunkId, bool inLockedChunks) {
Y_VERIFY_S(id.GetChunkId() == chunkId && !FreeSpace.contains(chunkId) && !LockedChunks.contains(chunkId),
VDiskLogPrefix << " id# " << id.ToString() << " chunkId# " << chunkId << " State# " << ToString());

FreeSpace.emplace(chunkId, ConstMask);
(inLockedChunks ? LockedChunks : FreeSpace).emplace(chunkId, ConstMask);
FreeSlotsInFreeSpace += SlotsInChunk;
bool res = RecoveryModeAllocate(id);

Expand Down Expand Up @@ -358,6 +363,10 @@ namespace NKikimr {
return NPrivate::TChunkSlot(addr.ChunkIdx, slotId);
}

NPrivate::TChunkSlot TChainDelegator::Convert(const THugeSlot &slot) const {
return Convert(slot.GetDiskPart());
}

void TChainDelegator::Save(IOutputStream *s) const {
::Save(s, *ChainPtr);
}
Expand Down Expand Up @@ -791,7 +800,7 @@ namespace NKikimr {
TFreeChunks::iterator it = FreeChunks.find(chunkId);
Y_VERIFY_S(it != FreeChunks.end(), VDiskLogPrefix << "addr# " << addr.ToString() << " State# " << ToString());
FreeChunks.erase(it);
chainD->ChainPtr->RecoveryModeAllocate(id, chunkId);
chainD->ChainPtr->RecoveryModeAllocate(id, chunkId, false);
}
}

Expand All @@ -807,6 +816,26 @@ namespace NKikimr {
}
}

bool THeap::ReleaseSlot(THugeSlot slot) {
TChainDelegator* const chain = Chains.GetChain(slot.GetSize());
Y_VERIFY_S(chain, VDiskLogPrefix << "State# " << ToString() << " slot# " << slot.ToString());
if (TFreeRes res = chain->ChainPtr->Free(chain->Convert(slot)); res.ChunkId) {
PutChunkIdToFreeChunks(res.ChunkId);
return res.InLockedChunks;
}
return false;
}

void THeap::OccupySlot(THugeSlot slot, bool inLockedChunks) {
TChainDelegator* const chain = Chains.GetChain(slot.GetSize());
Y_VERIFY_S(chain, VDiskLogPrefix << "State# " << ToString() << " slot# " << slot.ToString());
if (!chain->ChainPtr->RecoveryModeAllocate(chain->Convert(slot))) {
const size_t numErased = FreeChunks.erase(slot.GetChunkId());
Y_VERIFY_S(numErased, VDiskLogPrefix << "State# " << ToString() << " slot# " << slot.ToString());
chain->ChainPtr->RecoveryModeAllocate(chain->Convert(slot), slot.GetChunkId(), inLockedChunks);
}
}

//////////////////////////////////////////////////////////////////////////////////////////
// THeap: Serialize/Parse/Check
//////////////////////////////////////////////////////////////////////////////////////////
Expand Down
5 changes: 4 additions & 1 deletion ydb/core/blobstorage/vdisk/huge/blobstorage_hullhugeheap.h
Original file line number Diff line number Diff line change
Expand Up @@ -124,7 +124,7 @@ namespace NKikimr {
THeapStat GetStat() const;
// returns true is allocated, false otherwise
bool RecoveryModeAllocate(const NPrivate::TChunkSlot &id);
void RecoveryModeAllocate(const NPrivate::TChunkSlot &id, TChunkID chunkId);
void RecoveryModeAllocate(const NPrivate::TChunkSlot &id, TChunkID chunkId, bool inLockedChunks);
void Save(IOutputStream *s) const;
void Load(IInputStream *s);
bool HaveBeenUsed() const;
Expand Down Expand Up @@ -158,6 +158,7 @@ namespace NKikimr {
TChainDelegator &operator =(const TChainDelegator &) = delete;
THugeSlot Convert(const NPrivate::TChunkSlot &id) const;
NPrivate::TChunkSlot Convert(const TDiskPart &addr) const;
NPrivate::TChunkSlot Convert(const THugeSlot &slot) const;
void Save(IOutputStream *s) const;
void Load(IInputStream *s);
bool HaveBeenUsed() const;
Expand Down Expand Up @@ -299,6 +300,8 @@ namespace NKikimr {
void RecoveryModeAllocate(const TDiskPart &addr);
void RecoveryModeAddChunk(ui32 chunkId);
void RecoveryModeRemoveChunks(const TVector<ui32> &chunkIds);
bool ReleaseSlot(THugeSlot slot);
void OccupySlot(THugeSlot slot, bool inLockedChunks);

//////////////////////////////////////////////////////////////////////////////////////////
// Serialize/Parse/Check
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,13 +17,13 @@ namespace NKikimr {
TMask mask;
mask.Set(0, 8);
mask.Reset(1);
TFreeRes res = {15, mask, 8};
TFreeRes res = {15, mask, 8, false};

STR << "TFreeRes# " << res.ToString() << "\n";
UNIT_ASSERT_EQUAL(res.ToString(), "{ChunkIdx: 15 Mask# 10111111}");

TMask constMask = TChain::BuildConstMask("", 8);
TFreeRes constRes = {0, constMask, 8};
TFreeRes constRes = {0, constMask, 8, false};
STR << "constMask# " << constRes.ToString() << "\n";
UNIT_ASSERT_EQUAL(constRes.ToString(), "{ChunkIdx: 0 Mask# 11111111}");

Expand Down
Loading

0 comments on commit da5623c

Please sign in to comment.