diff --git a/ydb/core/base/blobstorage.h b/ydb/core/base/blobstorage.h index 143f8d7169d2..012326411cb9 100644 --- a/ydb/core/base/blobstorage.h +++ b/ydb/core/base/blobstorage.h @@ -709,6 +709,8 @@ struct TEvBlobStorage { EvStartBalancing, EvReplCheckProgress, EvMinHugeBlobSizeUpdate, + EvHugePreCompact, + EvHugePreCompactResult, EvYardInitResult = EvPut + 9 * 512, /// 268 636 672 EvLogResult, diff --git a/ydb/core/blobstorage/vdisk/huge/blobstorage_hullhuge.cpp b/ydb/core/blobstorage/vdisk/huge/blobstorage_hullhuge.cpp index 0329c645cb9d..1f1337469005 100644 --- a/ydb/core/blobstorage/vdisk/huge/blobstorage_hullhuge.cpp +++ b/ydb/core/blobstorage/vdisk/huge/blobstorage_hullhuge.cpp @@ -27,6 +27,10 @@ LWTRACE_USING(BLOBSTORAGE_PROVIDER); //////////////////////////////////////////////////////////////////////////// class THugeBlobLogLsnFifo { public: + THugeBlobLogLsnFifo(ui64 seqWriteId = 0) + : SeqWriteId(seqWriteId) + {} + ui64 Push(ui64 lsn) { Y_VERIFY_S(Fifo.empty() || Fifo.rbegin()->second <= lsn, ErrorReport(SeqWriteId, lsn)); if (NodeCache.empty()) { @@ -569,16 +573,16 @@ LWTRACE_USING(BLOBSTORAGE_PROVIDER); TMaybe LastCommitTime; std::shared_ptr Pers; THugeBlobLogLsnFifo LogLsnFifo; + THugeBlobLogLsnFifo CompactLsnFifo{1}; ui64 LastReportedFirstLsnToKeep = 0; + ui32 ItemsAfterCommit = 0; THullHugeKeeperState(std::shared_ptr &&pers) : Pers(std::move(pers)) {} ui64 FirstLsnToKeep() const { - ui64 persLsn = Pers->FirstLsnToKeep(); - ui64 logLsnFifoLastKeepLsn = LogLsnFifo.FirstLsnToKeep(); - return Min(persLsn, logLsnFifoLastKeepLsn); + return Pers->FirstLsnToKeep(Min(LogLsnFifo.FirstLsnToKeep(), CompactLsnFifo.FirstLsnToKeep())); } TString FirstLsnToKeepDecomposed() const { @@ -586,6 +590,7 @@ LWTRACE_USING(BLOBSTORAGE_PROVIDER); str << "{FirstLsnToKeep# " << FirstLsnToKeep() << " pers# " << Pers->FirstLsnToKeepDecomposed() << " LogLsnFifo# " << LogLsnFifo.FirstLsnToKeepDecomposed() + << " CompactLsnFifo# " << CompactLsnFifo.FirstLsnToKeepDecomposed() << "}"; return str.Str(); } @@ -595,9 +600,12 @@ LWTRACE_USING(BLOBSTORAGE_PROVIDER); str << "WaitQueueSize: " << WaitQueueSize << "
"; str << "WaitQueueByteSize: " << WaitQueueByteSize << "
"; str << "Committing: " << boolToString(Committing) << "
"; + str << "ItemsAfterCommit: " << ItemsAfterCommit << "
"; str << "FreeUpToLsn: " << FreeUpToLsn << "
"; str << "LastCommitTime: " << (LastCommitTime ? ToStringLocalTimeUpToSeconds(*LastCommitTime) : "not yet") << "
"; str << "FirstLsnToKeep: " << FirstLsnToKeep() << "
"; + str << "LogLsnFifo.FirstLsnToKeep: " << LogLsnFifo.FirstLsnToKeep() << "
"; + str << "CompactLsnFifo.FirstLsnToKeep: " << CompactLsnFifo.FirstLsnToKeep() << "
"; Pers->RenderHtml(str); } @@ -746,8 +754,8 @@ LWTRACE_USING(BLOBSTORAGE_PROVIDER); // check what if we issue a new huge hull keeper entry point -- would it allow us to // move the FirstLsnToKeep barrier forward? if so, try to issue an entry point, otherwise exit - const bool inFlightWrites = State.LogLsnFifo.FirstLsnToKeep() != Max(); - if (!State.Pers->WouldNewEntryPointAdvanceLog(State.FreeUpToLsn, inFlightWrites)) { + const ui64 minInFlightLsn = Min(State.LogLsnFifo.FirstLsnToKeep(), State.CompactLsnFifo.FirstLsnToKeep()); + if (!State.Pers->WouldNewEntryPointAdvanceLog(State.FreeUpToLsn, minInFlightLsn, State.ItemsAfterCommit)) { // if we issue an entry point now, we will achieve nothing, so return LOG_DEBUG(ctx, BS_LOGCUTTER, VDISKP(HugeKeeperCtx->VCtx->VDiskLogPrefix, @@ -758,8 +766,9 @@ LWTRACE_USING(BLOBSTORAGE_PROVIDER); // allocate LSN for the brand new entry point ui64 lsn = HugeKeeperCtx->LsnMngr->AllocLsnForLocalUse().Point(); - State.Pers->InitiateNewEntryPointCommit(lsn, inFlightWrites); + State.Pers->InitiateNewEntryPointCommit(lsn); State.Committing = true; + State.ItemsAfterCommit = 0; // serialize log record into string TString serialized = State.Pers->Serialize(); @@ -810,6 +819,12 @@ LWTRACE_USING(BLOBSTORAGE_PROVIDER); VDISKP(HugeKeeperCtx->VCtx->VDiskLogPrefix, "THullHugeKeeper: TEvHullFreeHugeSlots: one slot: addr# %s freeRes# %s", x.ToString().data(), freeRes.ToString().data())); + ++State.ItemsAfterCommit; + Y_ABORT_UNLESS(msg->WId); + } + + if (msg->WId) { + State.CompactLsnFifo.Pop(msg->WId, msg->DeletionLsn, true); } auto checkAndSet = [this, msg] (ui64 &dbLsn) { @@ -872,6 +887,7 @@ LWTRACE_USING(BLOBSTORAGE_PROVIDER); "THullHugeKeeper: TEvHullHugeBlobLogged: %s", msg->ToString().data())); // manage log requests in flight State.LogLsnFifo.Pop(msg->WriteId, msg->RecLsn, msg->SlotIsUsed); + State.ItemsAfterCommit += msg->SlotIsUsed; // manage allocated slots const TDiskPart &hugeBlob = msg->HugeBlob; NHuge::THugeSlot hugeSlot(State.Pers->Heap->ConvertDiskPartToHugeSlot(hugeBlob)); @@ -891,6 +907,11 @@ LWTRACE_USING(BLOBSTORAGE_PROVIDER); TryToCutLog(ctx); } + void Handle(TEvHugePreCompact::TPtr ev, const TActorContext& ctx) { + const ui64 wId = State.CompactLsnFifo.Push(ev->Get()->LsnInfimum); + ctx.Send(ev->Sender, new TEvHugePreCompactResult(wId), 0, ev->Cookie); + } + void Handle(TEvHugeLockChunks::TPtr &ev, const TActorContext &ctx) { const TEvHugeLockChunks *msg = ev->Get(); LOG_DEBUG(ctx, BS_HULLHUGE, @@ -971,6 +992,7 @@ LWTRACE_USING(BLOBSTORAGE_PROVIDER); HFunc(TEvHullHugeCommitted, Handle) HFunc(TEvHullHugeWritten, Handle) HFunc(TEvHullHugeBlobLogged, Handle) + HFunc(TEvHugePreCompact, Handle) HFunc(TEvHugeLockChunks, Handle) HFunc(TEvHugeUnlockChunks, Handle) HFunc(TEvHugeStat, Handle) diff --git a/ydb/core/blobstorage/vdisk/huge/blobstorage_hullhuge.h b/ydb/core/blobstorage/vdisk/huge/blobstorage_hullhuge.h index 9de3615fc690..af071a05aa78 100644 --- a/ydb/core/blobstorage/vdisk/huge/blobstorage_hullhuge.h +++ b/ydb/core/blobstorage/vdisk/huge/blobstorage_hullhuge.h @@ -133,19 +133,19 @@ namespace NKikimr { const TDiskPartVec HugeBlobs; const ui64 DeletionLsn; const TLogSignature Signature; // identifies database we send update for + const ui64 WId; - TEvHullFreeHugeSlots(TDiskPartVec &&hugeBlobs, ui64 deletionLsn, - TLogSignature signature) + TEvHullFreeHugeSlots(TDiskPartVec &&hugeBlobs, ui64 deletionLsn, TLogSignature signature, ui64 wId) : HugeBlobs(std::move(hugeBlobs)) , DeletionLsn(deletionLsn) , Signature(signature) + , WId(wId) {} TString ToString() const { TStringStream str; - str << "{" << Signature.ToString() - << " DelLsn# " << DeletionLsn << " Slots# " - << HugeBlobs.ToString() << "}"; + str << "{" << Signature.ToString() << " DelLsn# " << DeletionLsn << " Slots# " << HugeBlobs.ToString() + << " WId# " << WId << "}"; return str.Str(); } }; @@ -209,6 +209,16 @@ namespace NKikimr { NHuge::THeapStat Stat; }; + struct TEvHugePreCompact : TEventLocal { + ui64 LsnInfimum; // the resulting LSN of the operation MUST be not less that the provided value + TEvHugePreCompact(ui64 lsnInfimum) : LsnInfimum(lsnInfimum) {} + }; + + struct TEvHugePreCompactResult : TEventLocal { + ui64 WId; // this is going to be provided in free slots operation + TEvHugePreCompactResult(ui64 wId) : WId(wId) {} + }; + //////////////////////////////////////////////////////////////////////////// // THugeKeeperCtx //////////////////////////////////////////////////////////////////////////// diff --git a/ydb/core/blobstorage/vdisk/huge/blobstorage_hullhugerecovery.cpp b/ydb/core/blobstorage/vdisk/huge/blobstorage_hullhugerecovery.cpp index 067a0dbdbb85..0efbc08de2b9 100644 --- a/ydb/core/blobstorage/vdisk/huge/blobstorage_hullhugerecovery.cpp +++ b/ydb/core/blobstorage/vdisk/huge/blobstorage_hullhugerecovery.cpp @@ -11,31 +11,6 @@ namespace NKikimr { //////////////////////////////////////////////////////////////////////////// // THullHugeRecoveryLogPos //////////////////////////////////////////////////////////////////////////// - ui64 THullHugeRecoveryLogPos::FirstLsnToKeep() const { - return Min(LogoBlobsDbSlotDelLsn, BlocksDbSlotDelLsn, BarriersDbSlotDelLsn, - EntryPointLsn, HugeBlobLoggedLsn); - - // NOTE: we do use HugeBlobLoggedLsn for LastKeepLsn calculation, because in - // run time we may have some in-fly messages to log, that are not reflected - // in HugeBlobLoggedLsn yet - - // NOTE: LogoBlobsDbSlotDelLsn, BlocksDbSlotDelLsn and BarriersDbSlotDelLsn are - // continiously increasing even if nobody writes to the database, because - // the hull component regularly writes its state into the log. This allows us - // to calculate LastKeepLsn based on these values and also progress continiously. - } - - TString THullHugeRecoveryLogPos::FirstLsnToKeepDecomposed() const { - TStringStream str; - str << "{LogoBlobsDbSlotDelLsn# " << LogoBlobsDbSlotDelLsn - << " BlocksDbSlotDelLsn# " << BlocksDbSlotDelLsn - << " BarriersDbSlotDelLsn# " << BarriersDbSlotDelLsn - << " EntryPointLsn# " << EntryPointLsn - << " HugeBlobLoggedLsn# " << HugeBlobLoggedLsn - << "}"; - return str.Str(); - } - TString THullHugeRecoveryLogPos::ToString() const { TStringStream str; str << "{ChunkAllocationLsn# " << ChunkAllocationLsn @@ -80,76 +55,6 @@ namespace NKikimr { return serialized.size() == SerializedSize; } - - //////////////////////////////////////////////////////////////////////////// - // TLogTracker - //////////////////////////////////////////////////////////////////////////// - TLogTracker::TPosition::TPosition(const THullHugeRecoveryLogPos &logPos) - : EntryPointLsn(logPos.EntryPointLsn) - , HugeBlobLoggedLsn(logPos.HugeBlobLoggedLsn) - {} - - void TLogTracker::TPosition::Output(IOutputStream &str) const { - str << "{EntryPointLsn# " << EntryPointLsn << " HugeBlobLoggedLsn# " << HugeBlobLoggedLsn << "}"; - } - - TString TLogTracker::TPosition::ToString() const { - TStringStream str; - Output(str); - return str.Str(); - } - - void TLogTracker::EntryPointFromRecoveryLog(TPosition pos) { - PrivateNewLsn(pos); - } - - void TLogTracker::FinishRecovery(ui64 entryPointLsn) { - Y_ABORT_UNLESS(entryPointLsn == 0 || Cur->EntryPointLsn == entryPointLsn); - } - - // Prepare to commit - void TLogTracker::InitiateNewEntryPointCommit(TPosition pos) { - Y_ABORT_UNLESS(InProgress.Empty()); - InProgress = pos; - } - - // Committed - void TLogTracker::EntryPointCommitted(ui64 lsn) { - Y_ABORT_UNLESS(InProgress.Defined() && InProgress->EntryPointLsn == lsn); - Prev = Cur; - Cur = InProgress; - InProgress.Clear(); - } - - ui64 TLogTracker::FirstLsnToKeep() const { - return Prev.Empty() ? 0 : Min(Prev->EntryPointLsn, Prev->HugeBlobLoggedLsn); - } - - TString TLogTracker::FirstLsnToKeepDecomposed() const { - TStringStream str; - str << "{Prev# " << Prev << " Cur# " << Cur << "}"; - return str.Str(); - } - - bool TLogTracker::WouldNewEntryPointAdvanceLog(ui64 freeUpToLsn, bool inFlightWrites) const { - Y_UNUSED(inFlightWrites); - Y_ABORT_UNLESS(InProgress.Empty()); - return FirstLsnToKeep() < freeUpToLsn; - } - - void TLogTracker::PrivateNewLsn(TPosition pos) { - Y_ABORT_UNLESS(pos.EntryPointLsn != 0 && - ((Prev.Empty() && Cur.Empty()) || - (Prev.Empty() && Cur.Defined()) || - (Prev.Defined() && Cur.Defined() && - Prev->EntryPointLsn < Cur->EntryPointLsn)) && - (Cur.Empty() || pos.EntryPointLsn > Cur->EntryPointLsn)); - - Prev = Cur; - Cur = pos; - } - - //////////////////////////////////////////////////////////////////////////// // THullHugeKeeperPersState //////////////////////////////////////////////////////////////////////////// @@ -167,7 +72,6 @@ namespace NKikimr { std::function logFunc) : VCtx(std::move(vctx)) , LogPos(THullHugeRecoveryLogPos::Default()) - , CommittedLogPos(LogPos) , Heap(new NHuge::THeap(VCtx->VDiskLogPrefix, chunkSize, appendBlockSize, minHugeBlobInBytes, oldMinHugeBlobInBytes, milestoneHugeBlobInBytes, maxBlobInBytes, overhead, freeChunksReservation)) @@ -193,7 +97,6 @@ namespace NKikimr { std::function logFunc) : VCtx(std::move(vctx)) , LogPos(THullHugeRecoveryLogPos::Default()) - , CommittedLogPos(LogPos) , Heap(new NHuge::THeap(VCtx->VDiskLogPrefix, chunkSize, appendBlockSize, minHugeBlobInBytes, oldMinHugeBlobInBytes, milestoneHugeBlobInBytes, maxBlobInBytes, overhead, freeChunksReservation)) @@ -205,7 +108,6 @@ namespace NKikimr { logFunc(VDISKP(VCtx->VDiskLogPrefix, "Recovery started (guid# %" PRIu64 " entryLsn# %" PRIu64 "): State# %s", Guid, entryPointLsn, ToString().data())); - CommittedLogPos = LogPos; } THullHugeKeeperPersState::THullHugeKeeperPersState(TIntrusivePtr vctx, @@ -222,7 +124,6 @@ namespace NKikimr { std::function logFunc) : VCtx(std::move(vctx)) , LogPos(THullHugeRecoveryLogPos::Default()) - , CommittedLogPos(LogPos) , Heap(new NHuge::THeap(VCtx->VDiskLogPrefix, chunkSize, appendBlockSize, minHugeBlobInBytes, oldMinHugeBlobInBytes, milestoneHugeBlobInBytes, maxBlobInBytes, overhead, freeChunksReservation)) @@ -234,7 +135,6 @@ namespace NKikimr { logFunc(VDISKP(VCtx->VDiskLogPrefix, "Recovery started (guid# %" PRIu64 " entryLsn# %" PRIu64 "): State# %s", Guid, entryPointLsn, ToString().data())); - CommittedLogPos = LogPos; } THullHugeKeeperPersState::~THullHugeKeeperPersState() { @@ -391,7 +291,7 @@ namespace NKikimr { } void THullHugeKeeperPersState::RenderHtml(IOutputStream &str) const { - str << "LogPos: " << LogPos.ToString() << "
"; + str << "LogPos: " << LogPos.ToString() << "
"; str << "AllocatedSlots:"; if (!AllocatedSlots.empty()) { for (const auto &x : AllocatedSlots) { @@ -403,49 +303,36 @@ namespace NKikimr { Heap->RenderHtml(str); } - ui64 THullHugeKeeperPersState::FirstLsnToKeep() const { - return Min(LogPos.FirstLsnToKeep(), LogTracker.FirstLsnToKeep(), - // special case if these LSM tree entrypoints with deletions would be applied by the recovery code - CommittedLogPos.LogoBlobsDbSlotDelLsn, - CommittedLogPos.BlocksDbSlotDelLsn, - CommittedLogPos.BarriersDbSlotDelLsn); + ui64 THullHugeKeeperPersState::FirstLsnToKeep(ui64 minInFlightLsn) const { + const ui64 res = Min(minInFlightLsn, LogPos.EntryPointLsn); + + Y_VERIFY_S(FirstLsnToKeepReported <= res, "FirstLsnToKeepReported# " << FirstLsnToKeepReported + << " res# " << res << " state# " << FirstLsnToKeepDecomposed() << " minInFlightLsn# " << minInFlightLsn); + FirstLsnToKeepReported = res; + + return res; } TString THullHugeKeeperPersState::FirstLsnToKeepDecomposed() const { TStringStream str; - str << "{LogPos# " << LogPos.FirstLsnToKeepDecomposed() - << " CommittedLogPos# " << CommittedLogPos.FirstLsnToKeepDecomposed() - << " LogTracker# " << LogTracker.FirstLsnToKeepDecomposed() - << "}"; + str << "{LogPos# " << LogPos.EntryPointLsn << "}"; return str.Str(); } - bool THullHugeKeeperPersState::WouldNewEntryPointAdvanceLog(ui64 freeUpToLsn, bool inFlightWrites) const { - return LogTracker.WouldNewEntryPointAdvanceLog(freeUpToLsn, inFlightWrites); + bool THullHugeKeeperPersState::WouldNewEntryPointAdvanceLog(ui64 freeUpToLsn, ui64 minInFlightLsn, + ui32 itemsAfterCommit) const { + return freeUpToLsn < minInFlightLsn && (LogPos.EntryPointLsn <= freeUpToLsn || itemsAfterCommit > 10000); } // initiate commit - void THullHugeKeeperPersState::InitiateNewEntryPointCommit(ui64 lsn, bool inFlightWrites) { + void THullHugeKeeperPersState::InitiateNewEntryPointCommit(ui64 lsn) { Y_ABORT_UNLESS(lsn > LogPos.EntryPointLsn); - // set up previous entry point position to prevent log from being occasionally cut and update new entry - // point position in persistent state LogPos.EntryPointLsn = lsn; - if (!inFlightWrites) { - // no active writes are going on, we can promote HugeBlobLoggedLsn - LogPos.HugeBlobLoggedLsn = lsn; - } - - TLogTracker::TPosition pos; - pos.EntryPointLsn = LogPos.EntryPointLsn; - pos.HugeBlobLoggedLsn = LogPos.HugeBlobLoggedLsn; - LogTracker.InitiateNewEntryPointCommit(pos); - CommittedLogPos = LogPos; } // finish commit void THullHugeKeeperPersState::EntryPointCommitted(ui64 entryPointLsn) { Y_ABORT_UNLESS(entryPointLsn == LogPos.EntryPointLsn); - LogTracker.EntryPointCommitted(entryPointLsn); } // chunk allocation @@ -606,7 +493,6 @@ namespace NKikimr { auto logPos = THullHugeRecoveryLogPos::Default(); logPos.ParseFromString(logPosSerialized); Y_ABORT_UNLESS(logPos.EntryPointLsn == lsn); - LogTracker.EntryPointFromRecoveryLog(logPos); LOG_DEBUG(ctx, BS_HULLHUGE, VDISKP(VCtx->VDiskLogPrefix, @@ -630,7 +516,6 @@ namespace NKikimr { auto logPos = THullHugeRecoveryLogPos::Default(); logPos.ParseFromArray(logPosSerialized.GetData(), logPosSerialized.GetSize()); Y_ABORT_UNLESS(logPos.EntryPointLsn == lsn); - LogTracker.EntryPointFromRecoveryLog(logPos); LOG_DEBUG(ctx, BS_HULLHUGE, VDISKP(VCtx->VDiskLogPrefix, @@ -651,7 +536,6 @@ namespace NKikimr { AllocatedSlots.clear(); } - LogTracker.FinishRecovery(LogPos.EntryPointLsn); Recovered = true; LOG_DEBUG(ctx, BS_HULLHUGE, VDISKP(VCtx->VDiskLogPrefix, "Recovery(guid# %" PRIu64 ") finished", Guid)); @@ -663,7 +547,3 @@ namespace NKikimr { } // NHuge } // NKikimr - -Y_DECLARE_OUT_SPEC(, NKikimr::NHuge::TLogTracker::TPosition, stream, value) { - value.Output(stream); -} diff --git a/ydb/core/blobstorage/vdisk/huge/blobstorage_hullhugerecovery.h b/ydb/core/blobstorage/vdisk/huge/blobstorage_hullhugerecovery.h index abc1145c59d2..b9b5385ec601 100644 --- a/ydb/core/blobstorage/vdisk/huge/blobstorage_hullhugerecovery.h +++ b/ydb/core/blobstorage/vdisk/huge/blobstorage_hullhugerecovery.h @@ -43,8 +43,6 @@ namespace NKikimr { return THullHugeRecoveryLogPos(0, 0, 0, 0, 0, 0, 0); } - ui64 FirstLsnToKeep() const; - TString FirstLsnToKeepDecomposed() const; TString ToString() const; TString Serialize() const; void ParseFromString(const TString &serialized); @@ -67,45 +65,6 @@ namespace NKikimr { TRlas &operator=(const TRlas &) = default; }; - //////////////////////////////////////////////////////////////////////////// - // TLogTracker - // This class tracks two entry points: current and previous - // It alse correctly implements recovery procedure. - // The idea behind: we MUST keep all log records between Prev and Cur - // entry points - //////////////////////////////////////////////////////////////////////////// - class TLogTracker { - public: - struct TPosition { - ui64 EntryPointLsn = 0; - ui64 HugeBlobLoggedLsn = 0; - TPosition(const THullHugeRecoveryLogPos &logPos); - TPosition() = default; - void Output(IOutputStream &str) const; - TString ToString() const; - }; - - // RecoveryMode: call on every entry point seen - void EntryPointFromRecoveryLog(TPosition pos); - // RecoveryMode: call when recovery is finished - void FinishRecovery(ui64 entryPointLsn); - - // Prepare to commit - void InitiateNewEntryPointCommit(TPosition pos); - // Committed - void EntryPointCommitted(ui64 lsn); - // Calculate FirstLsnToKeep - ui64 FirstLsnToKeep() const; - TString FirstLsnToKeepDecomposed() const; - bool WouldNewEntryPointAdvanceLog(ui64 freeUpToLsn, bool inFlightWrites) const; - private: - TMaybe Prev = {}; - TMaybe Cur = {}; - TMaybe InProgress = {}; - - void PrivateNewLsn(TPosition pos); - }; - //////////////////////////////////////////////////////////////////////////// // THullHugeKeeperPersState //////////////////////////////////////////////////////////////////////////// @@ -118,17 +77,15 @@ namespace NKikimr { TIntrusivePtr VCtx; // current pos THullHugeRecoveryLogPos LogPos; - // last committed log pos - THullHugeRecoveryLogPos CommittedLogPos; std::unique_ptr Heap; // slots that are already allocated, but not written to log TAllocatedSlots AllocatedSlots; // guard to avoid using structure before recovery has been completed bool Recovered = false; - // manage last two entry points - TLogTracker LogTracker; // guid for this instance of pers state const ui64 Guid; + // last reported FirstLsnToKeep; can't decrease + mutable ui64 FirstLsnToKeepReported = 0; THullHugeKeeperPersState(TIntrusivePtr vctx, const ui32 chunkSize, @@ -176,12 +133,12 @@ namespace NKikimr { TString ToString() const; void RenderHtml(IOutputStream &str) const; ui32 GetMinREALHugeBlobInBytes() const; - ui64 FirstLsnToKeep() const; + ui64 FirstLsnToKeep(ui64 minInFlightLsn = Max()) const; TString FirstLsnToKeepDecomposed() const; - bool WouldNewEntryPointAdvanceLog(ui64 freeUpToLsn, bool inFlightWrites) const; + bool WouldNewEntryPointAdvanceLog(ui64 freeUpToLsn, ui64 minInFlightLsn, ui32 itemsAfterCommit) const; // initiate commit - void InitiateNewEntryPointCommit(ui64 lsn, bool inFlightWrites); + void InitiateNewEntryPointCommit(ui64 lsn); // finish commit void EntryPointCommitted(ui64 lsn); diff --git a/ydb/core/blobstorage/vdisk/huge/blobstorage_hullhugerecovery_ut.cpp b/ydb/core/blobstorage/vdisk/huge/blobstorage_hullhugerecovery_ut.cpp deleted file mode 100644 index 809f9e5ca268..000000000000 --- a/ydb/core/blobstorage/vdisk/huge/blobstorage_hullhugerecovery_ut.cpp +++ /dev/null @@ -1,68 +0,0 @@ -#include "blobstorage_hullhugerecovery.h" -#include - -#include - - -// change to Cerr if you want logging -#define STR Cnull - - -namespace NKikimr { - - using namespace NHuge; - - Y_UNIT_TEST_SUITE(TBlobStorageHullHugeRecovery) { - - Y_UNIT_TEST(LogPos) { - ui64 chunkAllocationLsn = 48265416; - ui64 chunkFreeingLsn = 11198354; - ui64 hugeBlobLoggedLsn = 48767829; - ui64 logoBlobsDbSlotDelLsn = 48613932; - ui64 blocksDbSlotDelLsn = 45042322; - ui64 barriersDbSlotDelLsn = 45043017; - ui64 entryPointLsn = 48767829; - THullHugeRecoveryLogPos logPos( - chunkAllocationLsn, - chunkFreeingLsn, - hugeBlobLoggedLsn, - logoBlobsDbSlotDelLsn, - blocksDbSlotDelLsn, - barriersDbSlotDelLsn, - entryPointLsn); - UNIT_ASSERT(logPos.FirstLsnToKeep() == 45042322); - } - - Y_UNIT_TEST(LogTracker) { - TLogTracker lt; - - TLogTracker::TPosition prev; - prev.EntryPointLsn = 100; - prev.HugeBlobLoggedLsn = 100; - lt.EntryPointFromRecoveryLog(prev); - - prev.EntryPointLsn = 200; - prev.HugeBlobLoggedLsn = 200; - lt.EntryPointFromRecoveryLog(prev); - - TLogTracker::TPosition cur; - cur.EntryPointLsn = 300; - cur.HugeBlobLoggedLsn = 300; - lt.EntryPointFromRecoveryLog(cur); - - - lt.FinishRecovery(300); - UNIT_ASSERT(lt.FirstLsnToKeep() == 200); - - - - TLogTracker::TPosition newPos; - newPos.EntryPointLsn = 400; - newPos.HugeBlobLoggedLsn = 400; - lt.InitiateNewEntryPointCommit(newPos); - lt.EntryPointCommitted(400); - UNIT_ASSERT(lt.FirstLsnToKeep() == 300); - } - } - -} // NKikimr diff --git a/ydb/core/blobstorage/vdisk/huge/ut/ya.make b/ydb/core/blobstorage/vdisk/huge/ut/ya.make index 09ee8a70534b..ef3c4f7755a9 100644 --- a/ydb/core/blobstorage/vdisk/huge/ut/ya.make +++ b/ydb/core/blobstorage/vdisk/huge/ut/ya.make @@ -19,7 +19,6 @@ SRCS( blobstorage_hullhugedefs_ut.cpp blobstorage_hullhugeheap_ctx_ut.cpp blobstorage_hullhugeheap_ut.cpp - blobstorage_hullhugerecovery_ut.cpp blobstorage_hullhuge_ut.cpp top_ut.cpp ) diff --git a/ydb/core/blobstorage/vdisk/hullop/blobstorage_hullactor.cpp b/ydb/core/blobstorage/vdisk/hullop/blobstorage_hullactor.cpp index e260a8f33680..1670b65633fc 100644 --- a/ydb/core/blobstorage/vdisk/hullop/blobstorage_hullactor.cpp +++ b/ydb/core/blobstorage/vdisk/hullop/blobstorage_hullactor.cpp @@ -284,12 +284,23 @@ namespace NKikimr { } case NHullComp::ActDeleteSsts: { Y_ABORT_UNLESS(CompactionTask->GetSstsToAdd().Empty() && !CompactionTask->GetSstsToDelete().Empty()); - ApplyCompactionResult(ctx, {}, {}); + if (CompactionTask->GetHugeBlobsToDelete().Empty()) { + ApplyCompactionResult(ctx, {}, {}, 0); + } else { + const ui64 cookie = NextPreCompactCookie++; + ctx.Send(HullLogCtx->HugeKeeperId, new TEvHugePreCompact(RTCtx->LsnMngr->GetLsn()), 0, cookie); + PreCompactCallbacks.emplace(cookie, [this, ev](ui64 wId, const TActorContext& ctx) mutable { + Y_ABORT_UNLESS(wId); + ApplyCompactionResult(ctx, {}, {}, wId); + RTCtx->LevelIndex->UpdateLevelStat(LevelStat); + }); + return; + } break; } case NHullComp::ActMoveSsts: { Y_ABORT_UNLESS(!CompactionTask->GetSstsToAdd().Empty() && !CompactionTask->GetSstsToDelete().Empty()); - ApplyCompactionResult(ctx, {}, {}); + ApplyCompactionResult(ctx, {}, {}, 0); break; } case NHullComp::ActCompactSsts: { @@ -348,10 +359,8 @@ namespace NKikimr { } } - void ApplyCompactionResult( - const TActorContext &ctx, - TVector chunksAdded, - TVector reservedChunksLeft) + void ApplyCompactionResult(const TActorContext &ctx, TVector chunksAdded, TVector reservedChunksLeft, + ui64 wId) { // create new slice RTCtx->LevelIndex->SetCompState(TLevelIndexBase::StateWaitCommit); @@ -402,7 +411,7 @@ namespace NKikimr { // run level committer TDiskPartVec removedHugeBlobs(CompactionTask->ExtractHugeBlobsToDelete()); auto committer = std::make_unique(HullLogCtx, HullDbCommitterCtx, RTCtx->LevelIndex, - ctx.SelfID, std::move(chunksAdded), std::move(deleteChunks), std::move(removedHugeBlobs)); + ctx.SelfID, std::move(chunksAdded), std::move(deleteChunks), std::move(removedHugeBlobs), wId); TActorId committerID = ctx.RegisterWithSameMailbox(committer.release()); ActiveActors.Insert(committerID, __FILE__, __LINE__, ctx, NKikimrServices::BLOBSTORAGE); @@ -422,10 +431,22 @@ namespace NKikimr { } } - void Handle(typename THullChange::TPtr &ev, const TActorContext &ctx) { - ActiveActors.Erase(ev->Sender); + void Handle(typename THullChange::TPtr &ev, const TActorContext &ctx, ui64 wId = 0) { + if (!wId) { + ActiveActors.Erase(ev->Sender); + } THullChange *msg = ev->Get(); + if (!msg->FreedHugeBlobs.Empty() && !wId && !msg->Aborted) { + const ui64 cookie = NextPreCompactCookie++; + ctx.Send(HullLogCtx->HugeKeeperId, new TEvHugePreCompact(RTCtx->LsnMngr->GetLsn()), 0, cookie); + PreCompactCallbacks.emplace(cookie, [this, ev](ui64 wId, const TActorContext& ctx) mutable { + Y_ABORT_UNLESS(wId); + Handle(ev, ctx, wId); + }); + return; + } + // NOTE: when we run committer (Fresh or Level) we allocate Lsn and // perform LevelIndex serialization in this handler to _guarantee_ order // of log messages @@ -461,7 +482,7 @@ namespace NKikimr { // run fresh committer auto committer = std::make_unique(HullLogCtx, HullDbCommitterCtx, RTCtx->LevelIndex, ctx.SelfID, std::move(msg->CommitChunks), std::move(msg->ReservedChunks), - std::move(msg->FreedHugeBlobs), dbg.Str()); + std::move(msg->FreedHugeBlobs), dbg.Str(), wId); auto aid = ctx.RegisterWithSameMailbox(committer.release()); ActiveActors.Insert(aid, __FILE__, __LINE__, ctx, NKikimrServices::BLOBSTORAGE); } else { @@ -480,12 +501,21 @@ namespace NKikimr { Y_ABORT_UNLESS(!CompactionTask->GetSstsToDelete().Empty()); } - ApplyCompactionResult(ctx, std::move(msg->CommitChunks), std::move(msg->ReservedChunks)); + ApplyCompactionResult(ctx, std::move(msg->CommitChunks), std::move(msg->ReservedChunks), wId); } RTCtx->LevelIndex->UpdateLevelStat(LevelStat); } + THashMap> PreCompactCallbacks; + ui64 NextPreCompactCookie = 1; + + void Handle(TEvHugePreCompactResult::TPtr ev, const TActorContext& ctx) { + const auto it = PreCompactCallbacks.find(ev->Cookie); + Y_ABORT_UNLESS(it != PreCompactCallbacks.end()); + it->second(ev->Get()->WId, ctx); + } + void Handle(typename TFreshAppendixCompactionDone::TPtr& ev, const TActorContext& ctx) { auto newJob = ev->Get()->Job.ApplyCompactionResult(); if (!newJob.Empty()) { @@ -644,7 +674,8 @@ namespace NKikimr { HTemplFunc(TEvAddBulkSst, Handle) HTemplFunc(TSelected, Handle) HFunc(TEvents::TEvPoisonPill, HandlePoison) - CFunc(TEvBlobStorage::EvPermitGarbageCollection, HandlePermitGarbageCollection); + CFunc(TEvBlobStorage::EvPermitGarbageCollection, HandlePermitGarbageCollection) + HFunc(TEvHugePreCompactResult, Handle) ) public: diff --git a/ydb/core/blobstorage/vdisk/hullop/blobstorage_hullcommit.h b/ydb/core/blobstorage/vdisk/hullop/blobstorage_hullcommit.h index 78a037f894b5..2f1fd8e480e8 100644 --- a/ydb/core/blobstorage/vdisk/hullop/blobstorage_hullcommit.h +++ b/ydb/core/blobstorage/vdisk/hullop/blobstorage_hullcommit.h @@ -102,6 +102,7 @@ namespace NKikimr { NPDisk::TCommitRecord CommitRecord; TStringStream DebugMessage; TString CallerInfo; + const ui64 WId; void Bootstrap(const TActorContext& ctx) { TThis::Become(&TThis::StateFunc); @@ -136,11 +137,13 @@ namespace NKikimr { void Handle(NPDisk::TEvLogResult::TPtr& ev, const TActorContext& ctx) { CHECK_PDISK_RESPONSE(Ctx->HullCtx->VCtx, ev, ctx); + Y_VERIFY_S(!WId == Metadata.RemovedHugeBlobs.Empty(), "WId# " << WId << " RemovedHugeBlobs# " << Metadata.RemovedHugeBlobs.Size()); + // notify delayed deleter when log record is actually written; we MUST ensure that updates are coming in // order of increasing LSN's; this is achieved automatically as all actors reside on the same mailbox LevelIndex->DelayedCompactionDeleterInfo->Update(LsnSeg.Last, std::move(Metadata.RemovedHugeBlobs), CommitRecord.DeleteToDecommitted ? CommitRecord.DeleteChunks : TVector(), - PDiskSignatureForHullDbKey(), ctx, Ctx->HugeKeeperId, Ctx->SkeletonId, Ctx->PDiskCtx, + PDiskSignatureForHullDbKey(), WId, ctx, Ctx->HugeKeeperId, Ctx->SkeletonId, Ctx->PDiskCtx, Ctx->HullCtx->VCtx); NPDisk::TEvLogResult* msg = ev->Get(); @@ -284,7 +287,8 @@ namespace NKikimr { const TActorId& notifyID, const TActorId& secondNotifyID, THullCommitMeta&& metadata, - const TString &callerInfo) + const TString &callerInfo, + ui64 wId) : HullLogCtx(std::move(hullLogCtx)) , Ctx(std::move(ctx)) , LevelIndex(std::move(levelIndex)) @@ -292,7 +296,9 @@ namespace NKikimr { , SecondNotifyID(secondNotifyID) , Metadata(std::move(metadata)) , CallerInfo(callerInfo) + , WId(wId) { + Y_ABORT_UNLESS(!WId == Metadata.RemovedHugeBlobs.Empty()); // we create commit message in the constructor to avoid race condition GenerateCommitMessage(); } @@ -320,7 +326,8 @@ namespace NKikimr { notifyID, TActorId(), {TVector(), TVector(), TDiskPartVec(), false}, - callerInfo) + callerInfo, + 0) {} }; @@ -346,14 +353,16 @@ namespace NKikimr { TVector&& chunksAdded, TVector&& chunksDeleted, TDiskPartVec&& removedHugeBlobs, - const TString &callerInfo) + const TString &callerInfo, + ui64 wId) : TBase(std::move(hullLogCtx), std::move(ctx), std::move(levelIndex), notifyID, TActorId(), {std::move(chunksAdded), std::move(chunksDeleted), std::move(removedHugeBlobs), false}, - callerInfo) + callerInfo, + wId) {} }; @@ -375,14 +384,16 @@ namespace NKikimr { const TActorId& notifyID, TVector&& chunksAdded, TVector&& chunksDeleted, - TDiskPartVec&& removedHugeBlobs) + TDiskPartVec&& removedHugeBlobs, + ui64 wId) : TBase(std::move(hullLogCtx), std::move(ctx), std::move(levelIndex), notifyID, TActorId(), {std::move(chunksAdded), std::move(chunksDeleted), std::move(removedHugeBlobs), true}, - TString()) + TString(), + wId) {} }; @@ -414,7 +425,8 @@ namespace NKikimr { notifyID, secondNotifyID, {std::move(chunksAdded), std::move(chunksDeleted), std::move(replSst), numRecoveredBlobs}, - TString()) + TString(), + 0) {} }; diff --git a/ydb/core/blobstorage/vdisk/hullop/hullcompdelete/blobstorage_hullcompdelete.h b/ydb/core/blobstorage/vdisk/hullop/hullcompdelete/blobstorage_hullcompdelete.h index a333f3ef6223..676e6d6b5dcd 100644 --- a/ydb/core/blobstorage/vdisk/hullop/hullcompdelete/blobstorage_hullcompdelete.h +++ b/ydb/core/blobstorage/vdisk/hullop/hullcompdelete/blobstorage_hullcompdelete.h @@ -58,13 +58,15 @@ namespace NKikimr { TDiskPartVec RemovedHugeBlobs; TVector ChunksToForget; TLogSignature Signature; + ui64 WId; TReleaseQueueItem(ui64 recordLsn, TDiskPartVec&& removedHugeBlobs, TVector chunksToForget, - TLogSignature signature) + TLogSignature signature, ui64 wId) : RecordLsn(recordLsn) , RemovedHugeBlobs(std::move(removedHugeBlobs)) , ChunksToForget(std::move(chunksToForget)) , Signature(signature) + , WId(wId) {} }; TDeque ReleaseQueue; @@ -89,11 +91,11 @@ namespace NKikimr { // this function is called every time when compaction is about to commit new entrypoint containing at least // one removed huge blob; recordLsn is allocated LSN of this entrypoint void Update(ui64 recordLsn, TDiskPartVec&& removedHugeBlobs, TVector chunksToForget, TLogSignature signature, - const TActorContext& ctx, const TActorId& hugeKeeperId, const TActorId& skeletonId, const TPDiskCtxPtr& pdiskCtx, - const TVDiskContextPtr& vctx) { + ui64 wId, const TActorContext& ctx, const TActorId& hugeKeeperId, const TActorId& skeletonId, + const TPDiskCtxPtr& pdiskCtx, const TVDiskContextPtr& vctx) { Y_ABORT_UNLESS(recordLsn > LastDeletionLsn); LastDeletionLsn = recordLsn; - ReleaseQueue.emplace_back(recordLsn, std::move(removedHugeBlobs), std::move(chunksToForget), signature); + ReleaseQueue.emplace_back(recordLsn, std::move(removedHugeBlobs), std::move(chunksToForget), signature, wId); ProcessReleaseQueue(ctx, hugeKeeperId, skeletonId, pdiskCtx, vctx); } @@ -216,7 +218,7 @@ namespace NKikimr { if (CurrentSnapshots.empty() || (item.RecordLsn <= CurrentSnapshots.begin()->first)) { // matching record -- commit it to huge hull keeper and throw out of the queue ctx.Send(hugeKeeperId, new TEvHullFreeHugeSlots(std::move(item.RemovedHugeBlobs), - item.RecordLsn, item.Signature)); + item.RecordLsn, item.Signature, item.WId)); if (item.ChunksToForget) { LOG_DEBUG(ctx, NKikimrServices::BS_VDISK_CHUNKS, VDISKP(vctx->VDiskLogPrefix, "FORGET: PDiskId# %s ChunksToForget# %s", pdiskCtx->PDiskIdString.data(),