Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Fix huge blob keeper FirstLsnToKeep logic #4697

Merged
merged 1 commit into from
May 20, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions ydb/core/blobstorage/vdisk/common/vdisk_context.h
Original file line number Diff line number Diff line change
Expand Up @@ -125,6 +125,7 @@ namespace NKikimr {
OutOfSpaceState.UpdateLocalLog(ev.StatusFlags);
}
return true;
case NKikimrProto::ERROR:
case NKikimrProto::INVALID_OWNER:
case NKikimrProto::INVALID_ROUND:
// BlobStorage group reconfiguration, just return false and wait until
Expand Down
108 changes: 58 additions & 50 deletions ydb/core/blobstorage/vdisk/huge/blobstorage_hullhuge.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -205,11 +205,9 @@ LWTRACE_USING(BLOBSTORAGE_PROVIDER);
ui32 chunkId = HugeSlot.GetChunkId();
ui32 offset = HugeSlot.GetOffset();
HugeKeeperCtx->LsmHullGroup.LsmHugeBytesWritten() += partsPtr->ByteSize();
LOG_DEBUG(ctx, BS_HULLHUGE,
VDISKP(HugeKeeperCtx->VCtx->VDiskLogPrefix,
"Writer: bootstrap: id# %s chunkId# %u offset# %u storedBlobSize# %u "
"writtenSize# %u", HugeSlot.ToString().data(), chunkId, offset,
storedBlobSize, writtenSize));
LOG_DEBUG(ctx, BS_HULLHUGE, VDISKP(HugeKeeperCtx->VCtx->VDiskLogPrefix,
"Writer: bootstrap: id# %s storedBlobSize# %u writtenSize# %u blobId# %s wId# %" PRIu64,
HugeSlot.ToString().data(), storedBlobSize, writtenSize, Item->LogoBlobId.ToString().data(), WriteId));
Span && Span.Event("Send_TEvChunkWrite", {{"ChunkId", chunkId}, {"Offset", offset}, {"WrittenSize", writtenSize}});
auto ev = std::make_unique<NPDisk::TEvChunkWrite>(HugeKeeperCtx->PDiskCtx->Dsk->Owner,
HugeKeeperCtx->PDiskCtx->Dsk->OwnerRound, chunkId, offset,
Expand Down Expand Up @@ -596,7 +594,8 @@ LWTRACE_USING(BLOBSTORAGE_PROVIDER);
{}

ui64 FirstLsnToKeep() const {
return Pers->FirstLsnToKeep(LsnFifo.FirstLsnToKeep());
const ui64 pendingLsn = PendingWrites.empty() ? Max<ui64>() : PendingWrites.begin()->first;
return Pers->FirstLsnToKeep(Min(pendingLsn, LsnFifo.FirstLsnToKeep()));
}

TString FirstLsnToKeepDecomposed() const {
Expand Down Expand Up @@ -674,6 +673,15 @@ LWTRACE_USING(BLOBSTORAGE_PROVIDER);
TActiveActors ActiveActors;
std::unordered_set<ui32> AllocatingChunkPerSlotSize;

void CheckLsn(ui64 lsn, const char *action) {
const ui64 firstLsnToKeep = State.FirstLsnToKeep();
Y_VERIFY_S(firstLsnToKeep <= lsn, HugeKeeperCtx->VCtx->VDiskLogPrefix
<< "FirstLsnToKeep# " << firstLsnToKeep
<< " Lsn# " << lsn
<< " Action# " << action
<< " LsnFifo# " << State.LsnFifo.ToString());
}

void PutToWaitQueue(ui32 slotSize, std::unique_ptr<TEvHullWriteHugeBlob::THandle> item) {
State.WaitQueue[slotSize].emplace_back(std::move(item), NWilson::TSpan(TWilson::VDiskTopLevel,
std::move(item->TraceId), "VDisk.HullHugeKeeper.InWaitQueue"));
Expand All @@ -688,11 +696,8 @@ LWTRACE_USING(BLOBSTORAGE_PROVIDER);
const bool inserted = State.Pers->AllocatedSlots.insert(hugeSlot).second;
Y_ABORT_UNLESS(inserted);
const ui64 lsnInfimum = HugeKeeperCtx->LsnMngr->GetLsn();
CheckLsn(lsnInfimum, "WriteHugeBlob");
const ui64 wId = State.LsnFifo.Push(lsnInfimum);
LOG_DEBUG_S(ctx, NKikimrServices::BS_HULLHUGE, HugeKeeperCtx->VCtx->VDiskLogPrefix
<< "THullHugeKeeper: issuing WriteBlob wId# " << wId
<< " LsnInfimum# " << lsnInfimum
<< " HugeSlot# " << hugeSlot.ToString());
auto aid = ctx.Register(new THullHugeBlobWriter(HugeKeeperCtx, ctx.SelfID, hugeSlot,
std::unique_ptr<TEvHullWriteHugeBlob>(ev->Release().Release()), wId, std::move(traceId)));
ActiveActors.Insert(aid, __FILE__, __LINE__, ctx, NKikimrServices::BLOBSTORAGE);
Expand Down Expand Up @@ -772,7 +777,8 @@ LWTRACE_USING(BLOBSTORAGE_PROVIDER);

// check what if we issue a new huge hull keeper entry point -- would it allow us to
// move the FirstLsnToKeep barrier forward? if so, try to issue an entry point, otherwise exit
const ui64 minInFlightLsn = State.LsnFifo.FirstLsnToKeep();
const ui64 pendingLsn = State.PendingWrites.empty() ? Max<ui64>() : State.PendingWrites.begin()->first;
const ui64 minInFlightLsn = Min(pendingLsn, State.LsnFifo.FirstLsnToKeep());
if (!State.Pers->WouldNewEntryPointAdvanceLog(State.FreeUpToLsn, minInFlightLsn, State.ItemsAfterCommit)) {
// if we issue an entry point now, we will achieve nothing, so return
LOG_DEBUG(ctx, BS_LOGCUTTER,
Expand All @@ -784,7 +790,7 @@ LWTRACE_USING(BLOBSTORAGE_PROVIDER);

// allocate LSN for the brand new entry point
ui64 lsn = HugeKeeperCtx->LsnMngr->AllocLsnForLocalUse().Point();
State.Pers->InitiateNewEntryPointCommit(lsn);
State.Pers->InitiateNewEntryPointCommit(lsn, minInFlightLsn);
State.Committing = true;
State.ItemsAfterCommit = 0;
// serialize log record into string
Expand Down Expand Up @@ -813,19 +819,22 @@ LWTRACE_USING(BLOBSTORAGE_PROVIDER);
<< " FirstPendingWrite# " << (State.PendingWrites.empty() ? 0 : State.PendingWrites.begin()->first));

Y_ABORT_UNLESS(writeId);
if (!State.ProcessingPendingWrite) {
State.LsnFifo.Pop(writeId, lsn, true);

const bool canProcessNow = State.PendingWrites.empty() && lsn < State.LsnFifo.FirstLsnToKeep();
if (!canProcessNow) {
const auto [it, inserted] = State.PendingWrites.emplace(lsn, ev.Release());
Y_ABORT_UNLESS(inserted);
State.MaxPendingWrites = Max(State.MaxPendingWrites, State.PendingWrites.size());
ProcessPendingWrites();
return true;
}
if (State.ProcessingPendingWrite) {
return false;
}

CheckLsn(lsn, action);
State.LsnFifo.Pop(writeId, lsn, true);
const bool canProcessNow = State.PendingWrites.empty() && lsn < State.LsnFifo.FirstLsnToKeep();
if (canProcessNow) {
return false;
} else {
const auto [it, inserted] = State.PendingWrites.emplace(lsn, ev.Release());
Y_ABORT_UNLESS(inserted);
State.MaxPendingWrites = Max(State.MaxPendingWrites, State.PendingWrites.size());
ProcessPendingWrites();
return true;
}
return false;
}

void ProcessPendingWrites() {
Expand Down Expand Up @@ -862,7 +871,6 @@ LWTRACE_USING(BLOBSTORAGE_PROVIDER);
Y_ABORT_UNLESS(numErased == 1);
ActiveActors.Erase(ev->Sender);
ProcessQueue(msg->SlotSize, ctx);
TryToCutLog(ctx);
}

void Handle(TEvHullFreeHugeSlots::TPtr &ev, const TActorContext &ctx) {
Expand All @@ -883,8 +891,8 @@ LWTRACE_USING(BLOBSTORAGE_PROVIDER);
NHuge::TFreeRes freeRes = State.Pers->Heap->Free(x);
LOG_DEBUG(ctx, BS_HULLHUGE,
VDISKP(HugeKeeperCtx->VCtx->VDiskLogPrefix,
"THullHugeKeeper: TEvHullFreeHugeSlots: one slot: addr# %s freeRes# %s",
x.ToString().data(), freeRes.ToString().data()));
"THullHugeKeeper: TEvHullFreeHugeSlots: one slot: addr# %s",
x.ToString().data()));
++State.ItemsAfterCommit;
}

Expand All @@ -910,7 +918,6 @@ LWTRACE_USING(BLOBSTORAGE_PROVIDER);
ProcessQueue(slotSize, ctx);
}
FreeChunks(ctx);
TryToCutLog(ctx);
}

void Handle(TEvHullHugeChunkFreed::TPtr& ev, const TActorContext &ctx) {
Expand All @@ -927,9 +934,6 @@ LWTRACE_USING(BLOBSTORAGE_PROVIDER);
ActiveActors.Erase(ev->Sender);
State.LastCommitTime = TAppData::TimeProvider->Now();
State.Pers->EntryPointCommitted(ev->Get()->EntryPointLsn);

// try to cut the log again -- while we were writing the entry point, something may have changed
TryToCutLog(ctx);
}

void Handle(TEvHullHugeWritten::TPtr &ev, const TActorContext &ctx) {
Expand Down Expand Up @@ -969,12 +973,11 @@ LWTRACE_USING(BLOBSTORAGE_PROVIDER);
// ...free slot
State.Pers->Heap->Free(hugeBlob);
}
// if we are not committing entrypoint right now, we can try to update it as the FirstLsnToKeep may have changed
TryToCutLog(ctx);
}

void Handle(TEvHugePreCompact::TPtr ev, const TActorContext& ctx) {
const ui64 lsnInfimum = HugeKeeperCtx->LsnMngr->GetLsn();
CheckLsn(lsnInfimum, "PreCompact");
const ui64 wId = State.LsnFifo.Push(lsnInfimum);
LOG_DEBUG_S(ctx, NKikimrServices::BS_HULLHUGE, HugeKeeperCtx->VCtx->VDiskLogPrefix
<< "THullHugeKeeper: requested PreCompact wId# " << wId
Expand Down Expand Up @@ -1021,7 +1024,6 @@ LWTRACE_USING(BLOBSTORAGE_PROVIDER);
VDISKP(HugeKeeperCtx->VCtx->VDiskLogPrefix,
"THullHugeKeeper: TEvCutLog: %s", ev->Get()->ToString().data()));
State.FreeUpToLsn = ev->Get()->FreeUpToLsn;
TryToCutLog(ctx);
}

void Handle(NMon::TEvHttpInfo::TPtr &ev, const TActorContext &ctx) {
Expand Down Expand Up @@ -1054,22 +1056,28 @@ LWTRACE_USING(BLOBSTORAGE_PROVIDER);

//////////// Event Handlers ////////////////////////////////////

STRICT_STFUNC(StateFunc,
HFunc(TEvHullWriteHugeBlob, Handle)
HFunc(TEvHullHugeChunkAllocated, Handle)
HFunc(TEvHullFreeHugeSlots, Handle)
HFunc(TEvHullHugeChunkFreed, Handle)
HFunc(TEvHullHugeCommitted, Handle)
HFunc(TEvHullHugeWritten, Handle)
HFunc(TEvHullHugeBlobLogged, Handle)
HFunc(TEvHugePreCompact, Handle)
HFunc(TEvHugeLockChunks, Handle)
HFunc(TEvHugeUnlockChunks, Handle)
HFunc(TEvHugeStat, Handle)
HFunc(NPDisk::TEvCutLog, Handle)
HFunc(NMon::TEvHttpInfo, Handle)
HFunc(TEvents::TEvPoisonPill, Handle)
)
STFUNC(StateFunc) {
const bool poison = ev->GetTypeRewrite() == TEvents::TSystem::Poison;
STRICT_STFUNC_BODY(
HFunc(TEvHullWriteHugeBlob, Handle)
HFunc(TEvHullHugeChunkAllocated, Handle)
HFunc(TEvHullFreeHugeSlots, Handle)
HFunc(TEvHullHugeChunkFreed, Handle)
HFunc(TEvHullHugeCommitted, Handle)
HFunc(TEvHullHugeWritten, Handle)
HFunc(TEvHullHugeBlobLogged, Handle)
HFunc(TEvHugePreCompact, Handle)
HFunc(TEvHugeLockChunks, Handle)
HFunc(TEvHugeUnlockChunks, Handle)
HFunc(TEvHugeStat, Handle)
HFunc(NPDisk::TEvCutLog, Handle)
HFunc(NMon::TEvHttpInfo, Handle)
HFunc(TEvents::TEvPoisonPill, Handle)
)
if (!poison) {
TryToCutLog(TActivationContext::AsActorContext());
}
}

public:
static constexpr NKikimrServices::TActivity::EType ActorActivityType() {
Expand Down
6 changes: 1 addition & 5 deletions ydb/core/blobstorage/vdisk/huge/blobstorage_hullhugedefs.h
Original file line number Diff line number Diff line change
Expand Up @@ -129,11 +129,7 @@ namespace NKikimr {
}

ui64 Hash() const {
ui64 x = 0;
x |= (ui64)ChunkId;
x <<= 32u;
x |= (ui64)Offset;
return x;
return MultiHash(ChunkId, Offset);
}

void Serialize(IOutputStream &str) const {
Expand Down
13 changes: 10 additions & 3 deletions ydb/core/blobstorage/vdisk/huge/blobstorage_hullhugerecovery.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -102,6 +102,7 @@ namespace NKikimr {
maxBlobInBytes, overhead, freeChunksReservation))
, AllocatedSlots()
, Guid(TAppData::RandomProvider->GenRand64())
, PersistentLsn(entryPointLsn)
{
ParseFromString(entryPointData);
Y_ABORT_UNLESS(entryPointLsn == LogPos.EntryPointLsn);
Expand Down Expand Up @@ -129,6 +130,7 @@ namespace NKikimr {
maxBlobInBytes, overhead, freeChunksReservation))
, AllocatedSlots()
, Guid(TAppData::RandomProvider->GenRand64())
, PersistentLsn(entryPointLsn)
{
ParseFromArray(entryPointData.GetData(), entryPointData.GetSize());
Y_ABORT_UNLESS(entryPointLsn == LogPos.EntryPointLsn);
Expand Down Expand Up @@ -304,7 +306,7 @@ namespace NKikimr {
}

ui64 THullHugeKeeperPersState::FirstLsnToKeep(ui64 minInFlightLsn) const {
const ui64 res = Min(minInFlightLsn, LogPos.EntryPointLsn);
const ui64 res = Min(minInFlightLsn, PersistentLsn);

Y_VERIFY_S(FirstLsnToKeepReported <= res, "FirstLsnToKeepReported# " << FirstLsnToKeepReported
<< " res# " << res << " state# " << FirstLsnToKeepDecomposed() << " minInFlightLsn# " << minInFlightLsn);
Expand All @@ -321,13 +323,14 @@ namespace NKikimr {

bool THullHugeKeeperPersState::WouldNewEntryPointAdvanceLog(ui64 freeUpToLsn, ui64 minInFlightLsn,
ui32 itemsAfterCommit) const {
return freeUpToLsn < minInFlightLsn && (LogPos.EntryPointLsn <= freeUpToLsn || itemsAfterCommit > 10000);
return freeUpToLsn < minInFlightLsn && (PersistentLsn <= freeUpToLsn || itemsAfterCommit > 10000);
}

// initiate commit
void THullHugeKeeperPersState::InitiateNewEntryPointCommit(ui64 lsn) {
void THullHugeKeeperPersState::InitiateNewEntryPointCommit(ui64 lsn, ui64 minInFlightLsn) {
Y_ABORT_UNLESS(lsn > LogPos.EntryPointLsn);
LogPos.EntryPointLsn = lsn;
PersistentLsn = Min(lsn, minInFlightLsn);

// these metabases never have huge blobs and we never care about them actually
LogPos.BlocksDbSlotDelLsn = lsn;
Expand All @@ -353,6 +356,7 @@ namespace NKikimr {
Guid, lsn, LogPos.EntryPointLsn, rec.ToString().data()));
Heap->RecoveryModeAddChunk(rec.ChunkId);
LogPos.ChunkAllocationLsn = lsn;
PersistentLsn = Min(PersistentLsn, lsn);
return TRlas(true, false);
} else {
// skip
Expand Down Expand Up @@ -380,6 +384,7 @@ namespace NKikimr {
Guid, lsn, LogPos.EntryPointLsn, rec.ToString().data()));
Heap->RecoveryModeRemoveChunks(rec.ChunkIds);
LogPos.ChunkFreeingLsn = lsn;
PersistentLsn = Min(PersistentLsn, lsn);
return TRlas(true, false);
} else {
// skip
Expand Down Expand Up @@ -424,6 +429,7 @@ namespace NKikimr {
Heap->RecoveryModeFree(x);

*logPosDelLsn = lsn;
PersistentLsn = Min(PersistentLsn, lsn);
return TRlas(true, false);
} else {
// skip
Expand Down Expand Up @@ -472,6 +478,7 @@ namespace NKikimr {
Heap->RecoveryModeAllocate(rec.DiskAddr);
}
LogPos.HugeBlobLoggedLsn = lsn;
PersistentLsn = Min(PersistentLsn, lsn);
return TRlas(true, false);
} else {
// skip
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -86,6 +86,7 @@ namespace NKikimr {
const ui64 Guid;
// last reported FirstLsnToKeep; can't decrease
mutable ui64 FirstLsnToKeepReported = 0;
ui64 PersistentLsn = 0;

THullHugeKeeperPersState(TIntrusivePtr<TVDiskContext> vctx,
const ui32 chunkSize,
Expand Down Expand Up @@ -138,7 +139,7 @@ namespace NKikimr {
bool WouldNewEntryPointAdvanceLog(ui64 freeUpToLsn, ui64 minInFlightLsn, ui32 itemsAfterCommit) const;

// initiate commit
void InitiateNewEntryPointCommit(ui64 lsn);
void InitiateNewEntryPointCommit(ui64 lsn, ui64 minInFlightLsn);
// finish commit
void EntryPointCommitted(ui64 lsn);

Expand Down
Loading