Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Fix HugeBlob keeper log cutting logic #4606

Merged
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions ydb/core/base/blobstorage.h
Original file line number Diff line number Diff line change
Expand Up @@ -709,6 +709,8 @@ struct TEvBlobStorage {
EvStartBalancing,
EvReplCheckProgress,
EvMinHugeBlobSizeUpdate,
EvHugePreCompact,
EvHugePreCompactResult,

EvYardInitResult = EvPut + 9 * 512, /// 268 636 672
EvLogResult,
Expand Down
21 changes: 10 additions & 11 deletions ydb/core/blobstorage/vdisk/common/blobstorage_dblogcutter.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,7 @@ namespace NKikimr {

TInstant LastCutTime;
TDeque<ui64> FreeUpToLsn;
ui64 FreeUpToLsnLastWritten = 0;
ui64 FirstLsnToKeepLastWritten = 0;

const TDuration FirstDuration;
const TDuration RegularDuration;
Expand Down Expand Up @@ -115,33 +115,32 @@ namespace NKikimr {

const ui64 curLsn = Min(HullLsnToKeep, SyncLogLsnToKeep, SyncerLsnToKeep, HugeKeeperLsnToKeep, ScrubLsnToKeep);

// find the maximum of requested LSN's to free up to
TMaybe<ui64> freeUpToLsn;
while (FreeUpToLsn && curLsn >= FreeUpToLsn.front()) {
freeUpToLsn = FreeUpToLsn.front();
FreeUpToLsn.pop_front();
// only issue command if there is a progress in FreeUpToLsn queue
bool progress = false;
for (; FreeUpToLsn && FreeUpToLsn.front() < curLsn; FreeUpToLsn.pop_front()) {
progress = true;
}

if (freeUpToLsn) {
if (progress) {
LastCutTime = TAppData::TimeProvider->Now();

// generate clear log message
NPDisk::TCommitRecord commitRec;
commitRec.FirstLsnToKeep = *freeUpToLsn;
commitRec.FirstLsnToKeep = curLsn;
commitRec.IsStartingPoint = false;
TLsnSeg seg = LogCutterCtx.LsnMngr->AllocLsnForLocalUse();
ui8 signature = TLogSignature::SignatureHullCutLog;
ctx.Send(LogCutterCtx.LoggerId,
new NPDisk::TEvLog(LogCutterCtx.PDiskCtx->Dsk->Owner,
LogCutterCtx.PDiskCtx->Dsk->OwnerRound, signature, commitRec, TRcBuf(), seg, nullptr));
WriteInProgress = true;
FreeUpToLsnLastWritten = *freeUpToLsn;
FirstLsnToKeepLastWritten = curLsn;

LOG_DEBUG(ctx, NKikimrServices::BS_LOGCUTTER,
VDISKP(LogCutterCtx.VCtx->VDiskLogPrefix,
"CUT: Lsn# %" PRIu64 " Hull# %" PRIu64 " SyncLog# %" PRIu64
" Syncer# %" PRIu64 " Huge# %" PRIu64 " Db# LogoBlobs Db# Barriers Db# Blocks",
*freeUpToLsn, HullLsnToKeep, SyncLogLsnToKeep, SyncerLsnToKeep, HugeKeeperLsnToKeep));
curLsn, HullLsnToKeep, SyncLogLsnToKeep, SyncerLsnToKeep, HugeKeeperLsnToKeep));
}
}

Expand All @@ -167,7 +166,7 @@ namespace NKikimr {

str << "FreeUpToLsn: " << FormatList(FreeUpToLsn) << "<br>";

str << "FreeUpToLsnLastWritten: [Lsn=" << FreeUpToLsnLastWritten
str << "FirstLsnToKeepLastWritten: [Lsn=" << FirstLsnToKeepLastWritten
<< ", LastUpdate=" << ToStringLocalTimeUpToSeconds(LastCutTime) << "]<br>";

str << "FirstDuration: " << FirstDuration << "<br>";
Expand Down
34 changes: 28 additions & 6 deletions ydb/core/blobstorage/vdisk/huge/blobstorage_hullhuge.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,10 @@ LWTRACE_USING(BLOBSTORAGE_PROVIDER);
////////////////////////////////////////////////////////////////////////////
class THugeBlobLogLsnFifo {
public:
THugeBlobLogLsnFifo(ui64 seqWriteId = 0)
: SeqWriteId(seqWriteId)
{}

ui64 Push(ui64 lsn) {
Y_VERIFY_S(Fifo.empty() || Fifo.rbegin()->second <= lsn, ErrorReport(SeqWriteId, lsn));
if (NodeCache.empty()) {
Expand Down Expand Up @@ -569,23 +573,24 @@ LWTRACE_USING(BLOBSTORAGE_PROVIDER);
TMaybe<TInstant> LastCommitTime;
std::shared_ptr<THullHugeKeeperPersState> Pers;
THugeBlobLogLsnFifo LogLsnFifo;
THugeBlobLogLsnFifo CompactLsnFifo{1};
ui64 LastReportedFirstLsnToKeep = 0;
ui32 ItemsAfterCommit = 0;

THullHugeKeeperState(std::shared_ptr<THullHugeKeeperPersState> &&pers)
: Pers(std::move(pers))
{}

ui64 FirstLsnToKeep() const {
ui64 persLsn = Pers->FirstLsnToKeep();
ui64 logLsnFifoLastKeepLsn = LogLsnFifo.FirstLsnToKeep();
return Min(persLsn, logLsnFifoLastKeepLsn);
return Pers->FirstLsnToKeep(Min(LogLsnFifo.FirstLsnToKeep(), CompactLsnFifo.FirstLsnToKeep()));
}

TString FirstLsnToKeepDecomposed() const {
TStringStream str;
str << "{FirstLsnToKeep# " << FirstLsnToKeep()
<< " pers# " << Pers->FirstLsnToKeepDecomposed()
<< " LogLsnFifo# " << LogLsnFifo.FirstLsnToKeepDecomposed()
<< " CompactLsnFifo# " << CompactLsnFifo.FirstLsnToKeepDecomposed()
<< "}";
return str.Str();
}
Expand All @@ -595,9 +600,12 @@ LWTRACE_USING(BLOBSTORAGE_PROVIDER);
str << "WaitQueueSize: " << WaitQueueSize << "<br>";
str << "WaitQueueByteSize: " << WaitQueueByteSize << "<br>";
str << "Committing: " << boolToString(Committing) << "<br>";
str << "ItemsAfterCommit: " << ItemsAfterCommit << "<br>";
str << "FreeUpToLsn: " << FreeUpToLsn << "<br>";
str << "LastCommitTime: " << (LastCommitTime ? ToStringLocalTimeUpToSeconds(*LastCommitTime) : "not yet") << "<br>";
str << "FirstLsnToKeep: " << FirstLsnToKeep() << "<br>";
str << "LogLsnFifo.FirstLsnToKeep: " << LogLsnFifo.FirstLsnToKeep() << "<br>";
str << "CompactLsnFifo.FirstLsnToKeep: " << CompactLsnFifo.FirstLsnToKeep() << "<br>";
Pers->RenderHtml(str);
}

Expand Down Expand Up @@ -746,8 +754,8 @@ LWTRACE_USING(BLOBSTORAGE_PROVIDER);

// check what if we issue a new huge hull keeper entry point -- would it allow us to
// move the FirstLsnToKeep barrier forward? if so, try to issue an entry point, otherwise exit
const bool inFlightWrites = State.LogLsnFifo.FirstLsnToKeep() != Max<ui64>();
if (!State.Pers->WouldNewEntryPointAdvanceLog(State.FreeUpToLsn, inFlightWrites)) {
const ui64 minInFlightLsn = Min(State.LogLsnFifo.FirstLsnToKeep(), State.CompactLsnFifo.FirstLsnToKeep());
if (!State.Pers->WouldNewEntryPointAdvanceLog(State.FreeUpToLsn, minInFlightLsn, State.ItemsAfterCommit)) {
// if we issue an entry point now, we will achieve nothing, so return
LOG_DEBUG(ctx, BS_LOGCUTTER,
VDISKP(HugeKeeperCtx->VCtx->VDiskLogPrefix,
Expand All @@ -758,8 +766,9 @@ LWTRACE_USING(BLOBSTORAGE_PROVIDER);

// allocate LSN for the brand new entry point
ui64 lsn = HugeKeeperCtx->LsnMngr->AllocLsnForLocalUse().Point();
State.Pers->InitiateNewEntryPointCommit(lsn, inFlightWrites);
State.Pers->InitiateNewEntryPointCommit(lsn);
State.Committing = true;
State.ItemsAfterCommit = 0;
// serialize log record into string
TString serialized = State.Pers->Serialize();

Expand Down Expand Up @@ -810,6 +819,12 @@ LWTRACE_USING(BLOBSTORAGE_PROVIDER);
VDISKP(HugeKeeperCtx->VCtx->VDiskLogPrefix,
"THullHugeKeeper: TEvHullFreeHugeSlots: one slot: addr# %s freeRes# %s",
x.ToString().data(), freeRes.ToString().data()));
++State.ItemsAfterCommit;
Y_ABORT_UNLESS(msg->WId);
}

if (msg->WId) {
State.CompactLsnFifo.Pop(msg->WId, msg->DeletionLsn, true);
}

auto checkAndSet = [this, msg] (ui64 &dbLsn) {
Expand Down Expand Up @@ -872,6 +887,7 @@ LWTRACE_USING(BLOBSTORAGE_PROVIDER);
"THullHugeKeeper: TEvHullHugeBlobLogged: %s", msg->ToString().data()));
// manage log requests in flight
State.LogLsnFifo.Pop(msg->WriteId, msg->RecLsn, msg->SlotIsUsed);
State.ItemsAfterCommit += msg->SlotIsUsed;
// manage allocated slots
const TDiskPart &hugeBlob = msg->HugeBlob;
NHuge::THugeSlot hugeSlot(State.Pers->Heap->ConvertDiskPartToHugeSlot(hugeBlob));
Expand All @@ -891,6 +907,11 @@ LWTRACE_USING(BLOBSTORAGE_PROVIDER);
TryToCutLog(ctx);
}

void Handle(TEvHugePreCompact::TPtr ev, const TActorContext& ctx) {
const ui64 wId = State.CompactLsnFifo.Push(ev->Get()->LsnInfimum);
ctx.Send(ev->Sender, new TEvHugePreCompactResult(wId), 0, ev->Cookie);
}

void Handle(TEvHugeLockChunks::TPtr &ev, const TActorContext &ctx) {
const TEvHugeLockChunks *msg = ev->Get();
LOG_DEBUG(ctx, BS_HULLHUGE,
Expand Down Expand Up @@ -971,6 +992,7 @@ LWTRACE_USING(BLOBSTORAGE_PROVIDER);
HFunc(TEvHullHugeCommitted, Handle)
HFunc(TEvHullHugeWritten, Handle)
HFunc(TEvHullHugeBlobLogged, Handle)
HFunc(TEvHugePreCompact, Handle)
HFunc(TEvHugeLockChunks, Handle)
HFunc(TEvHugeUnlockChunks, Handle)
HFunc(TEvHugeStat, Handle)
Expand Down
20 changes: 15 additions & 5 deletions ydb/core/blobstorage/vdisk/huge/blobstorage_hullhuge.h
Original file line number Diff line number Diff line change
Expand Up @@ -133,19 +133,19 @@ namespace NKikimr {
const TDiskPartVec HugeBlobs;
const ui64 DeletionLsn;
const TLogSignature Signature; // identifies database we send update for
const ui64 WId;

TEvHullFreeHugeSlots(TDiskPartVec &&hugeBlobs, ui64 deletionLsn,
TLogSignature signature)
TEvHullFreeHugeSlots(TDiskPartVec &&hugeBlobs, ui64 deletionLsn, TLogSignature signature, ui64 wId)
: HugeBlobs(std::move(hugeBlobs))
, DeletionLsn(deletionLsn)
, Signature(signature)
, WId(wId)
{}

TString ToString() const {
TStringStream str;
str << "{" << Signature.ToString()
<< " DelLsn# " << DeletionLsn << " Slots# "
<< HugeBlobs.ToString() << "}";
str << "{" << Signature.ToString() << " DelLsn# " << DeletionLsn << " Slots# " << HugeBlobs.ToString()
<< " WId# " << WId << "}";
return str.Str();
}
};
Expand Down Expand Up @@ -209,6 +209,16 @@ namespace NKikimr {
NHuge::THeapStat Stat;
};

struct TEvHugePreCompact : TEventLocal<TEvHugePreCompact, TEvBlobStorage::EvHugePreCompact> {
ui64 LsnInfimum; // the resulting LSN of the operation MUST be not less that the provided value
TEvHugePreCompact(ui64 lsnInfimum) : LsnInfimum(lsnInfimum) {}
};

struct TEvHugePreCompactResult : TEventLocal<TEvHugePreCompactResult, TEvBlobStorage::EvHugePreCompactResult> {
ui64 WId; // this is going to be provided in free slots operation
TEvHugePreCompactResult(ui64 wId) : WId(wId) {}
};

////////////////////////////////////////////////////////////////////////////
// THugeKeeperCtx
////////////////////////////////////////////////////////////////////////////
Expand Down
Loading
Loading