Skip to content

Commit

Permalink
Fix HugeBlob keeper log cutting logic
Browse files Browse the repository at this point in the history
  • Loading branch information
alexvru committed May 17, 2024
1 parent 3516c45 commit 60b2dc6
Show file tree
Hide file tree
Showing 11 changed files with 144 additions and 298 deletions.
2 changes: 2 additions & 0 deletions ydb/core/base/blobstorage.h
Original file line number Diff line number Diff line change
Expand Up @@ -709,6 +709,8 @@ struct TEvBlobStorage {
EvStartBalancing,
EvReplCheckProgress,
EvMinHugeBlobSizeUpdate,
EvHugePreCompact,
EvHugePreCompactResult,

EvYardInitResult = EvPut + 9 * 512, /// 268 636 672
EvLogResult,
Expand Down
21 changes: 10 additions & 11 deletions ydb/core/blobstorage/vdisk/common/blobstorage_dblogcutter.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,7 @@ namespace NKikimr {

TInstant LastCutTime;
TDeque<ui64> FreeUpToLsn;
ui64 FreeUpToLsnLastWritten = 0;
ui64 FirstLsnToKeepLastWritten = 0;

const TDuration FirstDuration;
const TDuration RegularDuration;
Expand Down Expand Up @@ -115,33 +115,32 @@ namespace NKikimr {

const ui64 curLsn = Min(HullLsnToKeep, SyncLogLsnToKeep, SyncerLsnToKeep, HugeKeeperLsnToKeep, ScrubLsnToKeep);

// find the maximum of requested LSN's to free up to
TMaybe<ui64> freeUpToLsn;
while (FreeUpToLsn && curLsn >= FreeUpToLsn.front()) {
freeUpToLsn = FreeUpToLsn.front();
FreeUpToLsn.pop_front();
// only issue command if there is a progress in FreeUpToLsn queue
bool progress = false;
for (; FreeUpToLsn && FreeUpToLsn.front() < curLsn; FreeUpToLsn.pop_front()) {
progress = true;
}

if (freeUpToLsn) {
if (progress) {
LastCutTime = TAppData::TimeProvider->Now();

// generate clear log message
NPDisk::TCommitRecord commitRec;
commitRec.FirstLsnToKeep = *freeUpToLsn;
commitRec.FirstLsnToKeep = curLsn;
commitRec.IsStartingPoint = false;
TLsnSeg seg = LogCutterCtx.LsnMngr->AllocLsnForLocalUse();
ui8 signature = TLogSignature::SignatureHullCutLog;
ctx.Send(LogCutterCtx.LoggerId,
new NPDisk::TEvLog(LogCutterCtx.PDiskCtx->Dsk->Owner,
LogCutterCtx.PDiskCtx->Dsk->OwnerRound, signature, commitRec, TRcBuf(), seg, nullptr));
WriteInProgress = true;
FreeUpToLsnLastWritten = *freeUpToLsn;
FirstLsnToKeepLastWritten = curLsn;

LOG_DEBUG(ctx, NKikimrServices::BS_LOGCUTTER,
VDISKP(LogCutterCtx.VCtx->VDiskLogPrefix,
"CUT: Lsn# %" PRIu64 " Hull# %" PRIu64 " SyncLog# %" PRIu64
" Syncer# %" PRIu64 " Huge# %" PRIu64 " Db# LogoBlobs Db# Barriers Db# Blocks",
*freeUpToLsn, HullLsnToKeep, SyncLogLsnToKeep, SyncerLsnToKeep, HugeKeeperLsnToKeep));
curLsn, HullLsnToKeep, SyncLogLsnToKeep, SyncerLsnToKeep, HugeKeeperLsnToKeep));
}
}

Expand All @@ -167,7 +166,7 @@ namespace NKikimr {

str << "FreeUpToLsn: " << FormatList(FreeUpToLsn) << "<br>";

str << "FreeUpToLsnLastWritten: [Lsn=" << FreeUpToLsnLastWritten
str << "FirstLsnToKeepLastWritten: [Lsn=" << FirstLsnToKeepLastWritten
<< ", LastUpdate=" << ToStringLocalTimeUpToSeconds(LastCutTime) << "]<br>";

str << "FirstDuration: " << FirstDuration << "<br>";
Expand Down
34 changes: 28 additions & 6 deletions ydb/core/blobstorage/vdisk/huge/blobstorage_hullhuge.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,10 @@ LWTRACE_USING(BLOBSTORAGE_PROVIDER);
////////////////////////////////////////////////////////////////////////////
class THugeBlobLogLsnFifo {
public:
THugeBlobLogLsnFifo(ui64 seqWriteId = 0)
: SeqWriteId(seqWriteId)
{}

ui64 Push(ui64 lsn) {
Y_VERIFY_S(Fifo.empty() || Fifo.rbegin()->second <= lsn, ErrorReport(SeqWriteId, lsn));
if (NodeCache.empty()) {
Expand Down Expand Up @@ -569,23 +573,24 @@ LWTRACE_USING(BLOBSTORAGE_PROVIDER);
TMaybe<TInstant> LastCommitTime;
std::shared_ptr<THullHugeKeeperPersState> Pers;
THugeBlobLogLsnFifo LogLsnFifo;
THugeBlobLogLsnFifo CompactLsnFifo{1};
ui64 LastReportedFirstLsnToKeep = 0;
ui32 ItemsAfterCommit = 0;

THullHugeKeeperState(std::shared_ptr<THullHugeKeeperPersState> &&pers)
: Pers(std::move(pers))
{}

ui64 FirstLsnToKeep() const {
ui64 persLsn = Pers->FirstLsnToKeep();
ui64 logLsnFifoLastKeepLsn = LogLsnFifo.FirstLsnToKeep();
return Min(persLsn, logLsnFifoLastKeepLsn);
return Pers->FirstLsnToKeep(Min(LogLsnFifo.FirstLsnToKeep(), CompactLsnFifo.FirstLsnToKeep()));
}

TString FirstLsnToKeepDecomposed() const {
TStringStream str;
str << "{FirstLsnToKeep# " << FirstLsnToKeep()
<< " pers# " << Pers->FirstLsnToKeepDecomposed()
<< " LogLsnFifo# " << LogLsnFifo.FirstLsnToKeepDecomposed()
<< " CompactLsnFifo# " << CompactLsnFifo.FirstLsnToKeepDecomposed()
<< "}";
return str.Str();
}
Expand All @@ -595,9 +600,12 @@ LWTRACE_USING(BLOBSTORAGE_PROVIDER);
str << "WaitQueueSize: " << WaitQueueSize << "<br>";
str << "WaitQueueByteSize: " << WaitQueueByteSize << "<br>";
str << "Committing: " << boolToString(Committing) << "<br>";
str << "ItemsAfterCommit: " << ItemsAfterCommit << "<br>";
str << "FreeUpToLsn: " << FreeUpToLsn << "<br>";
str << "LastCommitTime: " << (LastCommitTime ? ToStringLocalTimeUpToSeconds(*LastCommitTime) : "not yet") << "<br>";
str << "FirstLsnToKeep: " << FirstLsnToKeep() << "<br>";
str << "LogLsnFifo.FirstLsnToKeep: " << LogLsnFifo.FirstLsnToKeep() << "<br>";
str << "CompactLsnFifo.FirstLsnToKeep: " << CompactLsnFifo.FirstLsnToKeep() << "<br>";
Pers->RenderHtml(str);
}

Expand Down Expand Up @@ -746,8 +754,8 @@ LWTRACE_USING(BLOBSTORAGE_PROVIDER);

// check what if we issue a new huge hull keeper entry point -- would it allow us to
// move the FirstLsnToKeep barrier forward? if so, try to issue an entry point, otherwise exit
const bool inFlightWrites = State.LogLsnFifo.FirstLsnToKeep() != Max<ui64>();
if (!State.Pers->WouldNewEntryPointAdvanceLog(State.FreeUpToLsn, inFlightWrites)) {
const ui64 minInFlightLsn = Min(State.LogLsnFifo.FirstLsnToKeep(), State.CompactLsnFifo.FirstLsnToKeep());
if (!State.Pers->WouldNewEntryPointAdvanceLog(State.FreeUpToLsn, minInFlightLsn, State.ItemsAfterCommit)) {
// if we issue an entry point now, we will achieve nothing, so return
LOG_DEBUG(ctx, BS_LOGCUTTER,
VDISKP(HugeKeeperCtx->VCtx->VDiskLogPrefix,
Expand All @@ -758,8 +766,9 @@ LWTRACE_USING(BLOBSTORAGE_PROVIDER);

// allocate LSN for the brand new entry point
ui64 lsn = HugeKeeperCtx->LsnMngr->AllocLsnForLocalUse().Point();
State.Pers->InitiateNewEntryPointCommit(lsn, inFlightWrites);
State.Pers->InitiateNewEntryPointCommit(lsn);
State.Committing = true;
State.ItemsAfterCommit = 0;
// serialize log record into string
TString serialized = State.Pers->Serialize();

Expand Down Expand Up @@ -810,6 +819,12 @@ LWTRACE_USING(BLOBSTORAGE_PROVIDER);
VDISKP(HugeKeeperCtx->VCtx->VDiskLogPrefix,
"THullHugeKeeper: TEvHullFreeHugeSlots: one slot: addr# %s freeRes# %s",
x.ToString().data(), freeRes.ToString().data()));
++State.ItemsAfterCommit;
Y_ABORT_UNLESS(msg->WId);
}

if (msg->WId) {
State.CompactLsnFifo.Pop(msg->WId, msg->DeletionLsn, true);
}

auto checkAndSet = [this, msg] (ui64 &dbLsn) {
Expand Down Expand Up @@ -872,6 +887,7 @@ LWTRACE_USING(BLOBSTORAGE_PROVIDER);
"THullHugeKeeper: TEvHullHugeBlobLogged: %s", msg->ToString().data()));
// manage log requests in flight
State.LogLsnFifo.Pop(msg->WriteId, msg->RecLsn, msg->SlotIsUsed);
State.ItemsAfterCommit += msg->SlotIsUsed;
// manage allocated slots
const TDiskPart &hugeBlob = msg->HugeBlob;
NHuge::THugeSlot hugeSlot(State.Pers->Heap->ConvertDiskPartToHugeSlot(hugeBlob));
Expand All @@ -891,6 +907,11 @@ LWTRACE_USING(BLOBSTORAGE_PROVIDER);
TryToCutLog(ctx);
}

void Handle(TEvHugePreCompact::TPtr ev, const TActorContext& ctx) {
const ui64 wId = State.CompactLsnFifo.Push(ev->Get()->LsnInfimum);
ctx.Send(ev->Sender, new TEvHugePreCompactResult(wId), 0, ev->Cookie);
}

void Handle(TEvHugeLockChunks::TPtr &ev, const TActorContext &ctx) {
const TEvHugeLockChunks *msg = ev->Get();
LOG_DEBUG(ctx, BS_HULLHUGE,
Expand Down Expand Up @@ -971,6 +992,7 @@ LWTRACE_USING(BLOBSTORAGE_PROVIDER);
HFunc(TEvHullHugeCommitted, Handle)
HFunc(TEvHullHugeWritten, Handle)
HFunc(TEvHullHugeBlobLogged, Handle)
HFunc(TEvHugePreCompact, Handle)
HFunc(TEvHugeLockChunks, Handle)
HFunc(TEvHugeUnlockChunks, Handle)
HFunc(TEvHugeStat, Handle)
Expand Down
20 changes: 15 additions & 5 deletions ydb/core/blobstorage/vdisk/huge/blobstorage_hullhuge.h
Original file line number Diff line number Diff line change
Expand Up @@ -133,19 +133,19 @@ namespace NKikimr {
const TDiskPartVec HugeBlobs;
const ui64 DeletionLsn;
const TLogSignature Signature; // identifies database we send update for
const ui64 WId;

TEvHullFreeHugeSlots(TDiskPartVec &&hugeBlobs, ui64 deletionLsn,
TLogSignature signature)
TEvHullFreeHugeSlots(TDiskPartVec &&hugeBlobs, ui64 deletionLsn, TLogSignature signature, ui64 wId)
: HugeBlobs(std::move(hugeBlobs))
, DeletionLsn(deletionLsn)
, Signature(signature)
, WId(wId)
{}

TString ToString() const {
TStringStream str;
str << "{" << Signature.ToString()
<< " DelLsn# " << DeletionLsn << " Slots# "
<< HugeBlobs.ToString() << "}";
str << "{" << Signature.ToString() << " DelLsn# " << DeletionLsn << " Slots# " << HugeBlobs.ToString()
<< " WId# " << WId << "}";
return str.Str();
}
};
Expand Down Expand Up @@ -209,6 +209,16 @@ namespace NKikimr {
NHuge::THeapStat Stat;
};

struct TEvHugePreCompact : TEventLocal<TEvHugePreCompact, TEvBlobStorage::EvHugePreCompact> {
ui64 LsnInfimum; // the resulting LSN of the operation MUST be not less that the provided value
TEvHugePreCompact(ui64 lsnInfimum) : LsnInfimum(lsnInfimum) {}
};

struct TEvHugePreCompactResult : TEventLocal<TEvHugePreCompactResult, TEvBlobStorage::EvHugePreCompactResult> {
ui64 WId; // this is going to be provided in free slots operation
TEvHugePreCompactResult(ui64 wId) : WId(wId) {}
};

////////////////////////////////////////////////////////////////////////////
// THugeKeeperCtx
////////////////////////////////////////////////////////////////////////////
Expand Down
Loading

0 comments on commit 60b2dc6

Please sign in to comment.