Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Prepare to disable blob header by default in VDisk #9683

Merged
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 1 addition & 5 deletions ydb/core/blobstorage/vdisk/common/disk_part.h
Original file line number Diff line number Diff line change
Expand Up @@ -131,11 +131,7 @@ namespace NKikimr {
}

ui64 Hash() const {
ui64 x = 0;
x |= (ui64)ChunkIdx;
x <<= 32u;
x |= (ui64)Offset;
return x;
return MultiHash(ChunkIdx, Offset);
}

inline bool operator <(const TDiskPart &x) const {
Expand Down
1 change: 1 addition & 0 deletions ydb/core/blobstorage/vdisk/common/vdisk_events.h
Original file line number Diff line number Diff line change
Expand Up @@ -1252,6 +1252,7 @@ namespace NKikimr {
if (record.GetIndexOnly())
str << " IndexOnly";
if (record.HasMsgQoS()) {
str << ' ';
TEvBlobStorage::TEvVPut::OutMsgQos(record.GetMsgQoS(), str);
}
str << " Notify# " << record.GetNotifyIfNotReady()
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -379,7 +379,7 @@ namespace NKikimr {
template <class TRecordMerger>
void PutToMerger(const TMemRec &memRec, ui64 lsn, TRecordMerger *merger) {
TKey key = It.GetValue().Key;
if (merger->HaveToMergeData() && memRec.HasData() && memRec.GetType() == TBlobType::MemBlob) {
if (merger->HaveToMergeData() && memRec.GetType() == TBlobType::MemBlob) {
const TMemPart p = memRec.GetMemData();
const TRope& rope = Seg->GetLogoBlobData(p);
merger->AddFromFresh(memRec, &rope, key, lsn);
Expand Down
247 changes: 82 additions & 165 deletions ydb/core/blobstorage/vdisk/hulldb/generic/blobstorage_hullrecmerger.h
Original file line number Diff line number Diff line change
Expand Up @@ -108,14 +108,12 @@ namespace NKikimr {
};


////////////////////////////////////////////////////////////////////////////
// TCompactRecordMergerBase
////////////////////////////////////////////////////////////////////////////
// Valid call sequence:
// Clear(); Add(); ... Add(); Finish()
// GetMemRec(); GetData();
template <class TKey, class TMemRec>
class TCompactRecordMergerBase : public TRecordMergerBase<TKey, TMemRec> {
class TCompactRecordMerger : public TRecordMergerBase<TKey, TMemRec> {
protected:
using TBase = TRecordMergerBase<TKey, TMemRec>;
using TBase::MemRec;
Expand All @@ -132,18 +130,16 @@ namespace NKikimr {
};

public:
TCompactRecordMergerBase(const TBlobStorageGroupType &gtype, bool addHeader)
TCompactRecordMerger(const TBlobStorageGroupType &gtype, bool addHeader)
: TBase(gtype, true)
, MemRecs()
, ProducingSmallBlob(false)
, NeedToLoadData(ELoadData::NotSet)
, AddHeader(addHeader)
{}

void Clear() {
TBase::Clear();
MemRecs.clear();
ProducingSmallBlob = false;
ProducingHugeBlob = false;
NeedToLoadData = ELoadData::NotSet;
DataMerger.Clear();
}
Expand All @@ -156,51 +152,51 @@ namespace NKikimr {
}

void AddFromSegment(const TMemRec &memRec, const TDiskPart *outbound, const TKey &key, ui64 circaLsn) {
Y_DEBUG_ABORT_UNLESS(NeedToLoadData != ELoadData::NotSet);
AddBasic(memRec, key);
switch (memRec.GetType()) {
case TBlobType::DiskBlob: {
if (memRec.HasData() && NeedToLoadData == ELoadData::LoadData) {
MemRecs.push_back(memRec);
ProducingSmallBlob = true;
}
break;
}
case TBlobType::HugeBlob:
case TBlobType::ManyHugeBlobs: {
TDiskDataExtractor extr;
memRec.GetDiskData(&extr, outbound);
const NMatrix::TVectorType v = memRec.GetLocalParts(GType);
DataMerger.AddHugeBlob(extr.Begin, extr.End, v, circaLsn);
break;
}
default:
Y_ABORT("Impossible case");
}
VerifyConsistency(memRec, outbound);
Add(memRec, nullptr, outbound, key, circaLsn);
}

void AddFromFresh(const TMemRec &memRec, const TRope *data, const TKey &key, ui64 lsn) {
Add(memRec, data, nullptr, key, lsn);
}

void Add(const TMemRec& memRec, const TRope *data, const TDiskPart *outbound, const TKey& key, ui64 lsn) {
Y_DEBUG_ABORT_UNLESS(NeedToLoadData != ELoadData::NotSet);
AddBasic(memRec, key);
if (memRec.HasData()) {
if (data) {
Y_ABORT_UNLESS(memRec.GetType() == TBlobType::MemBlob || memRec.GetType() == TBlobType::DiskBlob);
if (NeedToLoadData == ELoadData::LoadData) {
DataMerger.AddBlob(TDiskBlob(data, memRec.GetLocalParts(GType), GType, key.LogoBlobID()));
ProducingSmallBlob = true;
} else {
// intentionally do nothing: don't add any data to DataMerger, because we don't need it
}
} else {
Y_ABORT_UNLESS(memRec.GetType() == TBlobType::HugeBlob);
TDiskDataExtractor extr;
memRec.GetDiskData(&extr, nullptr);
const NMatrix::TVectorType v = memRec.GetLocalParts(GType);
DataMerger.AddHugeBlob(extr.Begin, extr.End, v, lsn);
if (const NMatrix::TVectorType local = memRec.GetLocalParts(GType); !local.Empty()) {
TDiskDataExtractor extr;
switch (memRec.GetType()) {
case TBlobType::MemBlob:
case TBlobType::DiskBlob:
if (NeedToLoadData == ELoadData::LoadData) {
if (data) {
// we have some data in-memory
DataMerger.AddBlob(TDiskBlob(data, local, GType, key.LogoBlobID()));
}
if (memRec.GetType() == TBlobType::DiskBlob) {
if (memRec.HasData()) { // there is something to read from the disk
MemRecs.push_back(memRec);
} else { // headerless metadata stored
static TRope emptyRope;
DataMerger.AddBlob(TDiskBlob(&emptyRope, local, GType, key.LogoBlobID()));
}
}
Y_DEBUG_ABORT_UNLESS(!ProducingHugeBlob);
ProducingSmallBlob = true;
}
break;

case TBlobType::ManyHugeBlobs:
Y_ABORT_UNLESS(outbound);
[[fallthrough]];
case TBlobType::HugeBlob:
memRec.GetDiskData(&extr, outbound);
DataMerger.AddHugeBlob(extr.Begin, extr.End, local, lsn);
Y_DEBUG_ABORT_UNLESS(!ProducingSmallBlob);
ProducingHugeBlob = true;
break;
}
}
VerifyConsistency(memRec, nullptr);
VerifyConsistency(memRec, outbound);
}

void VerifyConsistency(const TMemRec& memRec, const TDiskPart *outbound) {
Expand Down Expand Up @@ -239,6 +235,13 @@ namespace NKikimr {
}

void Finish() {
if (NeedToLoadData == ELoadData::DontLoadData) {
Y_ABORT_UNLESS(!DataMerger.HasSmallBlobs()); // we didn't put any small blob to the data merger
// if we have huge blobs for the record, than we set TBlobType::HugeBlob or
// TBlobType::ManyHugeBlobs a few lines below
MemRec.SetNoBlob();
}

Y_DEBUG_ABORT_UNLESS(!Empty());
VerifyConsistency();

Expand All @@ -263,118 +266,22 @@ namespace NKikimr {
return &DataMerger;
}

protected:
TStackVec<TMemRec, 16> MemRecs;
bool ProducingSmallBlob;
ELoadData NeedToLoadData;
TDataMerger DataMerger;
const bool AddHeader;
};

////////////////////////////////////////////////////////////////////////////
// TCompactRecordMergerIndexPass
////////////////////////////////////////////////////////////////////////////
template<typename TKey, typename TMemRec>
class TCompactRecordMergerIndexPass : public TCompactRecordMergerBase<TKey, TMemRec> {
using TBase = TCompactRecordMergerBase<TKey, TMemRec>;

using ELoadData = typename TBase::ELoadData;

using TBase::MemRecs;
using TBase::ProducingSmallBlob;
using TBase::NeedToLoadData;
using TBase::DataMerger;
using TBase::MemRec;

public:
TCompactRecordMergerIndexPass(const TBlobStorageGroupType &gtype, bool addHeader)
: TBase(gtype, addHeader)
{}

void Finish() {
if (NeedToLoadData == ELoadData::DontLoadData) {
Y_ABORT_UNLESS(!DataMerger.HasSmallBlobs()); // we didn't put any small blob to the data merger
// if we have huge blobs for the record, than we set TBlobType::HugeBlob or
// TBlobType::ManyHugeBlobs a few lines below
MemRec.SetNoBlob();
}

TBase::Finish();
}

template<typename TCallback>
void ForEachSmallDiskBlob(TCallback&& callback) {
for (const auto& memRec : MemRecs) {
callback(memRec);
}
}
};

////////////////////////////////////////////////////////////////////////////
// TCompactRecordMergerDataPass
////////////////////////////////////////////////////////////////////////////
template<typename TKey, typename TMemRec>
class TCompactRecordMergerDataPass : public TCompactRecordMergerBase<TKey, TMemRec> {
using TBase = TCompactRecordMergerBase<TKey, TMemRec>;

using TBase::ProducingSmallBlob;
using TBase::MemRecs;
using TBase::MemRec;
using TBase::DataMerger;
using TBase::GType;
using TBase::SetLoadDataMode;

public:
TCompactRecordMergerDataPass(const TBlobStorageGroupType &gtype, bool addHeader)
: TBase(gtype, addHeader)
{
SetLoadDataMode(true);
}

void Clear() {
TBase::Clear();
ReadSmallBlobs.clear();
SetLoadDataMode(true);
}

// add read small blob content; they should come in order as returned from GetSmallBlobDiskParts by index merger
void AddReadSmallBlob(TString data) {
Y_ABORT_UNLESS(ProducingSmallBlob);
ReadSmallBlobs.push_back(std::move(data));
}

void Finish() {
// ensure we are producing small blobs; otherwise this merger should never be created
Y_ABORT_UNLESS(ProducingSmallBlob);

// add all read small blobs into blob merger
const size_t count = ReadSmallBlobs.size();
Y_ABORT_UNLESS(count == +MemRecs, "count# %zu +MemRecs# %zu", count, +MemRecs);
for (size_t i = 0; i < count; ++i) {
const TMemRec& memRec = MemRecs[i]->GetMemRec();
const TString& buffer = ReadSmallBlobs[i];
Y_ABORT_UNLESS(buffer.size() == memRec.DataSize());
DataMerger.AddBlob(TDiskBlob(buffer.data(), buffer.size(), memRec.GetLocalParts(GType)));
}


// ensure that data merger has small blob
Y_ABORT_UNLESS(DataMerger.HasSmallBlobs());

// finalize base class logic; it also generates blob record
TBase::Finish();

// ensure that we have generated correct DiskBlob with full set of declared parts
const TDiskBlob& blob = DataMerger.GetDiskBlobMerger().GetDiskBlob();
Y_ABORT_UNLESS(blob.GetParts() == MemRec.GetLocalParts(GType));
Y_ABORT_UNLESS(MemRec.GetType() == TBlobType::DiskBlob);
}

private:
TVector<TString> ReadSmallBlobs;
protected:
TStackVec<TMemRec, 16> MemRecs;
bool ProducingSmallBlob = false;
bool ProducingHugeBlob = false;
ELoadData NeedToLoadData = ELoadData::NotSet;
TDataMerger DataMerger;
const bool AddHeader;
};


////////////////////////////////////////////////////////////////////////////
// TRecordMergerCallback
////////////////////////////////////////////////////////////////////////////
Expand Down Expand Up @@ -412,9 +319,8 @@ namespace NKikimr {
case 1: {
if (memRec.GetType() == TBlobType::DiskBlob) {
// don't deduplicate inplaced data
const TDiskPart &data = extr.SwearOne();
if (data.ChunkIdx && data.Size) {
(*Callback)(data, v);
if (!v.Empty()) {
(*Callback)(extr.SwearOne(), v);
}
} else if (memRec.GetType() == TBlobType::HugeBlob) {
Y_ABORT_UNLESS(v.CountBits() == 1u);
Expand Down Expand Up @@ -446,20 +352,31 @@ namespace NKikimr {

void AddFromFresh(const TMemRec &memRec, const TRope *data, const TKey &key, ui64 lsn) {
AddBasic(memRec, key);
if (memRec.HasData()) {
const NMatrix::TVectorType v = memRec.GetLocalParts(GType);
if (data) {
Y_ABORT_UNLESS(memRec.GetType() == TBlobType::MemBlob);
// we have in-memory data in a rope, it always wins among other data,
// so we call Callback immediately and remove any data for this local part
// from LastWriteWinsMerger
(*Callback)(TDiskBlob(data, v, GType, key.LogoBlobID()));
} else {
Y_ABORT_UNLESS(memRec.GetType() == TBlobType::HugeBlob && v.CountBits() == 1u);
TDiskDataExtractor extr;
memRec.GetDiskData(&extr, nullptr);
// deduplicate huge blob
LastWriteWinsMerger.Add(extr.SwearOne(), v, lsn);
if (const NMatrix::TVectorType local = memRec.GetLocalParts(GType); !local.Empty()) {
TDiskDataExtractor extr;
static TRope rope;
switch (memRec.GetType()) {
case TBlobType::MemBlob:
// we have in-memory data in a rope, it always wins among other data,
// so we call Callback immediately and remove any data for this local part
// from LastWriteWinsMerger
Y_ABORT_UNLESS(data); // HaveToMergeData is true, so data must be present
(*Callback)(TDiskBlob(data, local, GType, key.LogoBlobID()));
break;

case TBlobType::DiskBlob:
Y_ABORT_UNLESS(!memRec.HasData());
(*Callback)(TDiskBlob(&rope, local, GType, key.LogoBlobID())); // pure metadata parts only
break;

case TBlobType::HugeBlob:
Y_ABORT_UNLESS(local.CountBits() == 1);
memRec.GetDiskData(&extr, nullptr);
LastWriteWinsMerger.Add(extr.SwearOne(), local, lsn);
break;

case TBlobType::ManyHugeBlobs:
Y_ABORT("unexpected case");
}
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,11 +20,11 @@ namespace NKikimr {
////////////////////////////////////////////////////////////////////////////////////////
typedef TLevelSegment<TKeyLogoBlob, TMemRecLogoBlob> TSstLogoBlob;
typedef TSstLogoBlob::TWriter TWriterLogoBlob;
typedef TCompactRecordMergerIndexPass<TKeyLogoBlob, TMemRecLogoBlob> TTLogoBlobCompactRecordMerger;
typedef TCompactRecordMerger<TKeyLogoBlob, TMemRecLogoBlob> TTLogoBlobCompactRecordMerger;

typedef TLevelSegment<TKeyBlock, TMemRecBlock> TSstBlock;
typedef TSstBlock::TWriter TWriterBlock;
typedef TCompactRecordMergerIndexPass<TKeyBlock, TMemRecBlock> TBlockCompactRecordMerger;
typedef TCompactRecordMerger<TKeyBlock, TMemRecBlock> TBlockCompactRecordMerger;
TTestContexts TestCtx;


Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,13 @@ namespace NKikimr {
"Db# LogoBlobs action# add_data mode# %s id# %s lsn# %" PRIu64 " bufSize# %" PRIu32,
OpMode2Str(mode), id.ToString().data(), lsn, ui32(buffer.GetSize())));

HullDs->LogoBlobs->PutToFresh(lsn, TKeyLogoBlob(id), partId, ingress, std::move(buffer));
if (buffer) {
HullDs->LogoBlobs->PutToFresh(lsn, TKeyLogoBlob(id), partId, ingress, std::move(buffer));
} else {
const TBlobStorageGroupType gtype = HullDs->HullCtx->VCtx->Top->GType;
Y_DEBUG_ABORT_UNLESS(!gtype.PartSize(TLogoBlobID(id, partId)));
HullDs->LogoBlobs->PutToFresh(lsn, TKeyLogoBlob(id), TMemRecLogoBlob(ingress));
}
}

void THullDbRecovery::ReplayAddLogoBlobCmd(
Expand Down
Loading
Loading