Skip to content

Commit

Permalink
Prepare to disable blob header by default in VDisk
Browse files Browse the repository at this point in the history
  • Loading branch information
alexvru committed Sep 24, 2024
1 parent 9f4dbb7 commit d8ddee3
Show file tree
Hide file tree
Showing 9 changed files with 103 additions and 186 deletions.
4 changes: 2 additions & 2 deletions ydb/core/blobstorage/ut_blobstorage/defrag.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,7 @@ static TIntrusivePtr<TBlobStorageGroupInfo> PrepareEnv(TEnvironmentSetup& env, T
const TIntrusivePtr<TBlobStorageGroupInfo> info = env.GetGroupInfo(groups.front());
env.Sim(TDuration::Minutes(5));

const ui32 dataLen = 512 * 1024;
const ui32 dataLen = 512 * 1024 + 1;
const TString data(dataLen, 'x');
ui32 index = 0;

Expand Down Expand Up @@ -257,7 +257,7 @@ Y_UNIT_TEST_SUITE(Defragmentation) {

const TEvDefragRewritten* msg = ev->Get<TEvDefragRewritten>();
UNIT_ASSERT_VALUES_EQUAL(msg->RewrittenRecs, 18);
UNIT_ASSERT_VALUES_EQUAL(msg->RewrittenBytes, 9961567);
UNIT_ASSERT_VALUES_EQUAL(msg->RewrittenBytes, 9961491);
}
return true;
case TEvBlobStorage::EvRestoreCorruptedBlob:
Expand Down
6 changes: 1 addition & 5 deletions ydb/core/blobstorage/vdisk/common/disk_part.h
Original file line number Diff line number Diff line change
Expand Up @@ -131,11 +131,7 @@ namespace NKikimr {
}

ui64 Hash() const {
ui64 x = 0;
x |= (ui64)ChunkIdx;
x <<= 32u;
x |= (ui64)Offset;
return x;
return MultiHash(ChunkIdx, Offset);
}

inline bool operator <(const TDiskPart &x) const {
Expand Down
1 change: 1 addition & 0 deletions ydb/core/blobstorage/vdisk/common/vdisk_events.h
Original file line number Diff line number Diff line change
Expand Up @@ -1252,6 +1252,7 @@ namespace NKikimr {
if (record.GetIndexOnly())
str << " IndexOnly";
if (record.HasMsgQoS()) {
str << ' ';
TEvBlobStorage::TEvVPut::OutMsgQos(record.GetMsgQoS(), str);
}
str << " Notify# " << record.GetNotifyIfNotReady()
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -379,7 +379,7 @@ namespace NKikimr {
template <class TRecordMerger>
void PutToMerger(const TMemRec &memRec, ui64 lsn, TRecordMerger *merger) {
TKey key = It.GetValue().Key;
if (merger->HaveToMergeData() && memRec.HasData() && memRec.GetType() == TBlobType::MemBlob) {
if (merger->HaveToMergeData() && memRec.GetType() == TBlobType::MemBlob) {
const TMemPart p = memRec.GetMemData();
const TRope& rope = Seg->GetLogoBlobData(p);
merger->AddFromFresh(memRec, &rope, key, lsn);
Expand Down
243 changes: 78 additions & 165 deletions ydb/core/blobstorage/vdisk/hulldb/generic/blobstorage_hullrecmerger.h
Original file line number Diff line number Diff line change
Expand Up @@ -108,14 +108,12 @@ namespace NKikimr {
};


////////////////////////////////////////////////////////////////////////////
// TCompactRecordMergerBase
////////////////////////////////////////////////////////////////////////////
// Valid call sequence:
// Clear(); Add(); ... Add(); Finish()
// GetMemRec(); GetData();
template <class TKey, class TMemRec>
class TCompactRecordMergerBase : public TRecordMergerBase<TKey, TMemRec> {
class TCompactRecordMerger : public TRecordMergerBase<TKey, TMemRec> {
protected:
using TBase = TRecordMergerBase<TKey, TMemRec>;
using TBase::MemRec;
Expand All @@ -132,18 +130,16 @@ namespace NKikimr {
};

public:
TCompactRecordMergerBase(const TBlobStorageGroupType &gtype, bool addHeader)
TCompactRecordMerger(const TBlobStorageGroupType &gtype, bool addHeader)
: TBase(gtype, true)
, MemRecs()
, ProducingSmallBlob(false)
, NeedToLoadData(ELoadData::NotSet)
, AddHeader(addHeader)
{}

void Clear() {
TBase::Clear();
MemRecs.clear();
ProducingSmallBlob = false;
ProducingHugeBlob = false;
NeedToLoadData = ELoadData::NotSet;
DataMerger.Clear();
}
Expand All @@ -156,51 +152,47 @@ namespace NKikimr {
}

void AddFromSegment(const TMemRec &memRec, const TDiskPart *outbound, const TKey &key, ui64 circaLsn) {
Y_DEBUG_ABORT_UNLESS(NeedToLoadData != ELoadData::NotSet);
AddBasic(memRec, key);
switch (memRec.GetType()) {
case TBlobType::DiskBlob: {
if (memRec.HasData() && NeedToLoadData == ELoadData::LoadData) {
MemRecs.push_back(memRec);
ProducingSmallBlob = true;
}
break;
}
case TBlobType::HugeBlob:
case TBlobType::ManyHugeBlobs: {
TDiskDataExtractor extr;
memRec.GetDiskData(&extr, outbound);
const NMatrix::TVectorType v = memRec.GetLocalParts(GType);
DataMerger.AddHugeBlob(extr.Begin, extr.End, v, circaLsn);
break;
}
default:
Y_ABORT("Impossible case");
}
VerifyConsistency(memRec, outbound);
Add(memRec, nullptr, outbound, key, circaLsn);
}

void AddFromFresh(const TMemRec &memRec, const TRope *data, const TKey &key, ui64 lsn) {
Add(memRec, data, nullptr, key, lsn);
}

void Add(const TMemRec& memRec, const TRope *data, const TDiskPart *outbound, const TKey& key, ui64 lsn) {
Y_DEBUG_ABORT_UNLESS(NeedToLoadData != ELoadData::NotSet);
AddBasic(memRec, key);
if (memRec.HasData()) {
if (data) {
Y_ABORT_UNLESS(memRec.GetType() == TBlobType::MemBlob || memRec.GetType() == TBlobType::DiskBlob);
if (NeedToLoadData == ELoadData::LoadData) {
DataMerger.AddBlob(TDiskBlob(data, memRec.GetLocalParts(GType), GType, key.LogoBlobID()));
ProducingSmallBlob = true;
} else {
// intentionally do nothing: don't add any data to DataMerger, because we don't need it
}
} else {
Y_ABORT_UNLESS(memRec.GetType() == TBlobType::HugeBlob);
TDiskDataExtractor extr;
memRec.GetDiskData(&extr, nullptr);
const NMatrix::TVectorType v = memRec.GetLocalParts(GType);
DataMerger.AddHugeBlob(extr.Begin, extr.End, v, lsn);
if (const NMatrix::TVectorType local = memRec.GetLocalParts(GType); !local.Empty()) {
TDiskDataExtractor extr;
switch (memRec.GetType()) {
case TBlobType::MemBlob:
case TBlobType::DiskBlob:
if (NeedToLoadData == ELoadData::LoadData) {
if (data) {
// we have some data in-memory
DataMerger.AddBlob(TDiskBlob(data, local, GType, key.LogoBlobID()));
}
if (memRec.HasData() && memRec.GetType() == TBlobType::DiskBlob) {
// there is something to read from the disk
MemRecs.push_back(memRec);
}
Y_DEBUG_ABORT_UNLESS(!ProducingHugeBlob);
ProducingSmallBlob = true;
}
break;

case TBlobType::ManyHugeBlobs:
Y_ABORT_UNLESS(outbound);
[[fallthrough]];
case TBlobType::HugeBlob:
memRec.GetDiskData(&extr, outbound);
DataMerger.AddHugeBlob(extr.Begin, extr.End, local, lsn);
Y_DEBUG_ABORT_UNLESS(!ProducingSmallBlob);
ProducingHugeBlob = true;
break;
}
}
VerifyConsistency(memRec, nullptr);
VerifyConsistency(memRec, outbound);
}

void VerifyConsistency(const TMemRec& memRec, const TDiskPart *outbound) {
Expand Down Expand Up @@ -239,6 +231,13 @@ namespace NKikimr {
}

void Finish() {
if (NeedToLoadData == ELoadData::DontLoadData) {
Y_ABORT_UNLESS(!DataMerger.HasSmallBlobs()); // we didn't put any small blob to the data merger
// if we have huge blobs for the record, than we set TBlobType::HugeBlob or
// TBlobType::ManyHugeBlobs a few lines below
MemRec.SetNoBlob();
}

Y_DEBUG_ABORT_UNLESS(!Empty());
VerifyConsistency();

Expand All @@ -263,118 +262,22 @@ namespace NKikimr {
return &DataMerger;
}

protected:
TStackVec<TMemRec, 16> MemRecs;
bool ProducingSmallBlob;
ELoadData NeedToLoadData;
TDataMerger DataMerger;
const bool AddHeader;
};

////////////////////////////////////////////////////////////////////////////
// TCompactRecordMergerIndexPass
////////////////////////////////////////////////////////////////////////////
template<typename TKey, typename TMemRec>
class TCompactRecordMergerIndexPass : public TCompactRecordMergerBase<TKey, TMemRec> {
using TBase = TCompactRecordMergerBase<TKey, TMemRec>;

using ELoadData = typename TBase::ELoadData;

using TBase::MemRecs;
using TBase::ProducingSmallBlob;
using TBase::NeedToLoadData;
using TBase::DataMerger;
using TBase::MemRec;

public:
TCompactRecordMergerIndexPass(const TBlobStorageGroupType &gtype, bool addHeader)
: TBase(gtype, addHeader)
{}

void Finish() {
if (NeedToLoadData == ELoadData::DontLoadData) {
Y_ABORT_UNLESS(!DataMerger.HasSmallBlobs()); // we didn't put any small blob to the data merger
// if we have huge blobs for the record, than we set TBlobType::HugeBlob or
// TBlobType::ManyHugeBlobs a few lines below
MemRec.SetNoBlob();
}

TBase::Finish();
}

template<typename TCallback>
void ForEachSmallDiskBlob(TCallback&& callback) {
for (const auto& memRec : MemRecs) {
callback(memRec);
}
}
};

////////////////////////////////////////////////////////////////////////////
// TCompactRecordMergerDataPass
////////////////////////////////////////////////////////////////////////////
template<typename TKey, typename TMemRec>
class TCompactRecordMergerDataPass : public TCompactRecordMergerBase<TKey, TMemRec> {
using TBase = TCompactRecordMergerBase<TKey, TMemRec>;

using TBase::ProducingSmallBlob;
using TBase::MemRecs;
using TBase::MemRec;
using TBase::DataMerger;
using TBase::GType;
using TBase::SetLoadDataMode;

public:
TCompactRecordMergerDataPass(const TBlobStorageGroupType &gtype, bool addHeader)
: TBase(gtype, addHeader)
{
SetLoadDataMode(true);
}

void Clear() {
TBase::Clear();
ReadSmallBlobs.clear();
SetLoadDataMode(true);
}

// add read small blob content; they should come in order as returned from GetSmallBlobDiskParts by index merger
void AddReadSmallBlob(TString data) {
Y_ABORT_UNLESS(ProducingSmallBlob);
ReadSmallBlobs.push_back(std::move(data));
}

void Finish() {
// ensure we are producing small blobs; otherwise this merger should never be created
Y_ABORT_UNLESS(ProducingSmallBlob);

// add all read small blobs into blob merger
const size_t count = ReadSmallBlobs.size();
Y_ABORT_UNLESS(count == +MemRecs, "count# %zu +MemRecs# %zu", count, +MemRecs);
for (size_t i = 0; i < count; ++i) {
const TMemRec& memRec = MemRecs[i]->GetMemRec();
const TString& buffer = ReadSmallBlobs[i];
Y_ABORT_UNLESS(buffer.size() == memRec.DataSize());
DataMerger.AddBlob(TDiskBlob(buffer.data(), buffer.size(), memRec.GetLocalParts(GType)));
}


// ensure that data merger has small blob
Y_ABORT_UNLESS(DataMerger.HasSmallBlobs());

// finalize base class logic; it also generates blob record
TBase::Finish();

// ensure that we have generated correct DiskBlob with full set of declared parts
const TDiskBlob& blob = DataMerger.GetDiskBlobMerger().GetDiskBlob();
Y_ABORT_UNLESS(blob.GetParts() == MemRec.GetLocalParts(GType));
Y_ABORT_UNLESS(MemRec.GetType() == TBlobType::DiskBlob);
}

private:
TVector<TString> ReadSmallBlobs;
protected:
TStackVec<TMemRec, 16> MemRecs;
bool ProducingSmallBlob = false;
bool ProducingHugeBlob = false;
ELoadData NeedToLoadData = ELoadData::NotSet;
TDataMerger DataMerger;
const bool AddHeader;
};


////////////////////////////////////////////////////////////////////////////
// TRecordMergerCallback
////////////////////////////////////////////////////////////////////////////
Expand Down Expand Up @@ -412,9 +315,8 @@ namespace NKikimr {
case 1: {
if (memRec.GetType() == TBlobType::DiskBlob) {
// don't deduplicate inplaced data
const TDiskPart &data = extr.SwearOne();
if (data.ChunkIdx && data.Size) {
(*Callback)(data, v);
if (!v.Empty()) {
(*Callback)(extr.SwearOne(), v);
}
} else if (memRec.GetType() == TBlobType::HugeBlob) {
Y_ABORT_UNLESS(v.CountBits() == 1u);
Expand Down Expand Up @@ -446,20 +348,31 @@ namespace NKikimr {

void AddFromFresh(const TMemRec &memRec, const TRope *data, const TKey &key, ui64 lsn) {
AddBasic(memRec, key);
if (memRec.HasData()) {
const NMatrix::TVectorType v = memRec.GetLocalParts(GType);
if (data) {
Y_ABORT_UNLESS(memRec.GetType() == TBlobType::MemBlob);
// we have in-memory data in a rope, it always wins among other data,
// so we call Callback immediately and remove any data for this local part
// from LastWriteWinsMerger
(*Callback)(TDiskBlob(data, v, GType, key.LogoBlobID()));
} else {
Y_ABORT_UNLESS(memRec.GetType() == TBlobType::HugeBlob && v.CountBits() == 1u);
TDiskDataExtractor extr;
memRec.GetDiskData(&extr, nullptr);
// deduplicate huge blob
LastWriteWinsMerger.Add(extr.SwearOne(), v, lsn);
if (const NMatrix::TVectorType local = memRec.GetLocalParts(GType); !local.Empty()) {
TDiskDataExtractor extr;
static TRope rope;
switch (memRec.GetType()) {
case TBlobType::MemBlob:
// we have in-memory data in a rope, it always wins among other data,
// so we call Callback immediately and remove any data for this local part
// from LastWriteWinsMerger
Y_ABORT_UNLESS(data); // HaveToMergeData is true, so data must be present
(*Callback)(TDiskBlob(data, local, GType, key.LogoBlobID()));
break;

case TBlobType::DiskBlob:
Y_ABORT_UNLESS(!memRec.HasData());
(*Callback)(TDiskBlob(&rope, local, GType, key.LogoBlobID())); // pure metadata parts only
break;

case TBlobType::HugeBlob:
Y_ABORT_UNLESS(local.CountBits() == 1);
memRec.GetDiskData(&extr, nullptr);
LastWriteWinsMerger.Add(extr.SwearOne(), local, lsn);
break;

case TBlobType::ManyHugeBlobs:
Y_ABORT("unexpected case");
}
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,11 +20,11 @@ namespace NKikimr {
////////////////////////////////////////////////////////////////////////////////////////
typedef TLevelSegment<TKeyLogoBlob, TMemRecLogoBlob> TSstLogoBlob;
typedef TSstLogoBlob::TWriter TWriterLogoBlob;
typedef TCompactRecordMergerIndexPass<TKeyLogoBlob, TMemRecLogoBlob> TTLogoBlobCompactRecordMerger;
typedef TCompactRecordMerger<TKeyLogoBlob, TMemRecLogoBlob> TTLogoBlobCompactRecordMerger;

typedef TLevelSegment<TKeyBlock, TMemRecBlock> TSstBlock;
typedef TSstBlock::TWriter TWriterBlock;
typedef TCompactRecordMergerIndexPass<TKeyBlock, TMemRecBlock> TBlockCompactRecordMerger;
typedef TCompactRecordMerger<TKeyBlock, TMemRecBlock> TBlockCompactRecordMerger;
TTestContexts TestCtx;


Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,13 @@ namespace NKikimr {
"Db# LogoBlobs action# add_data mode# %s id# %s lsn# %" PRIu64 " bufSize# %" PRIu32,
OpMode2Str(mode), id.ToString().data(), lsn, ui32(buffer.GetSize())));

HullDs->LogoBlobs->PutToFresh(lsn, TKeyLogoBlob(id), partId, ingress, std::move(buffer));
if (buffer) {
HullDs->LogoBlobs->PutToFresh(lsn, TKeyLogoBlob(id), partId, ingress, std::move(buffer));
} else {
const TBlobStorageGroupType gtype = HullDs->HullCtx->VCtx->Top->GType;
Y_DEBUG_ABORT_UNLESS(!gtype.PartSize(TLogoBlobID(id, partId)));
HullDs->LogoBlobs->PutToFresh(lsn, TKeyLogoBlob(id), TMemRecLogoBlob(ingress));
}
}

void THullDbRecovery::ReplayAddLogoBlobCmd(
Expand Down
Loading

0 comments on commit d8ddee3

Please sign in to comment.