Skip to content

Commit

Permalink
Support skipping blob header in TDiskBlob (ydb-platform#3145)
Browse files Browse the repository at this point in the history
  • Loading branch information
alexvru authored Mar 26, 2024
1 parent 9922ce5 commit b2f7f35
Show file tree
Hide file tree
Showing 32 changed files with 336 additions and 256 deletions.
1 change: 1 addition & 0 deletions ydb/core/blobstorage/ut_vdisk/lib/test_repl.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -203,6 +203,7 @@ class TVDiskReplProxyReaderActor : public TActorBootstrapped<TVDiskReplProxyRead

ReplCtx = std::make_shared<TReplCtx>(
VCtx,
nullptr,
nullptr, // PDiskCtx
nullptr, // HugeBlobCtx
nullptr,
Expand Down
4 changes: 2 additions & 2 deletions ydb/core/blobstorage/vdisk/common/blobstorage_cost_tracker.h
Original file line number Diff line number Diff line change
Expand Up @@ -232,7 +232,7 @@ class TBsCostModelBase {
const NKikimrBlobStorage::EPutHandleClass handleClass = record.GetHandleClass();
const ui64 size = record.HasBuffer() ? record.GetBuffer().size() : ev.GetPayload(0).GetSize();

NPriPut::EHandleType handleType = NPriPut::HandleType(HugeBlobSize, handleClass, size);
NPriPut::EHandleType handleType = NPriPut::HandleType(HugeBlobSize, handleClass, size, true);
if (handleType == NPriPut::Log) {
return WriteCost(size);
} else {
Expand All @@ -247,7 +247,7 @@ class TBsCostModelBase {

for (ui64 idx = 0; idx < record.ItemsSize(); ++idx) {
const ui64 size = ev.GetBufferBytes(idx);
NPriPut::EHandleType handleType = NPriPut::HandleType(HugeBlobSize, handleClass, size);
NPriPut::EHandleType handleType = NPriPut::HandleType(HugeBlobSize, handleClass, size, true);
if (handleType == NPriPut::Log) {
cost += WriteCost(size);
} else {
Expand Down
1 change: 1 addition & 0 deletions ydb/core/blobstorage/vdisk/common/vdisk_config.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -45,6 +45,7 @@ namespace NKikimr {
HullCompMaxInFlightReads = 20;
HullCompReadBatchEfficiencyThreshold = 0.5; // don't issue reads if there are more gaps than the useful data
AnubisOsirisMaxInFly = 1000;
AddHeader = true;

RecoveryLogCutterFirstDuration = TDuration::Seconds(10);
RecoveryLogCutterRegularDuration = TDuration::Seconds(30);
Expand Down
1 change: 1 addition & 0 deletions ydb/core/blobstorage/vdisk/common/vdisk_config.h
Original file line number Diff line number Diff line change
Expand Up @@ -128,6 +128,7 @@ namespace NKikimr {
ui32 HullCompMaxInFlightReads;
double HullCompReadBatchEfficiencyThreshold;
ui64 AnubisOsirisMaxInFly;
bool AddHeader;

//////////////// LOG CUTTER SETTINGS ////////////////
TDuration RecoveryLogCutterFirstDuration;
Expand Down
6 changes: 3 additions & 3 deletions ydb/core/blobstorage/vdisk/common/vdisk_costmodel.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -180,7 +180,7 @@ namespace NKikimr {
const NKikimrBlobStorage::EPutHandleClass handleClass = record.GetHandleClass();
const ui64 bufSize = record.HasBuffer() ? record.GetBuffer().size() : ev.GetPayload(0).GetSize();

NPriPut::EHandleType handleType = NPriPut::HandleType(MinREALHugeBlobInBytes, handleClass, bufSize);
NPriPut::EHandleType handleType = NPriPut::HandleType(MinREALHugeBlobInBytes, handleClass, bufSize, true);
if (handleType == NPriPut::Log) {
*logPutInternalQueue = true;
return SmallWriteCost(bufSize);
Expand All @@ -197,7 +197,7 @@ namespace NKikimr {
ui64 cost = 0;
for (ui64 idx = 0; idx < record.ItemsSize(); ++idx) {
const ui64 size = ev.GetBufferBytes(idx);
NPriPut::EHandleType handleType = NPriPut::HandleType(MinREALHugeBlobInBytes, handleClass, size);
NPriPut::EHandleType handleType = NPriPut::HandleType(MinREALHugeBlobInBytes, handleClass, size, true);
if (handleType == NPriPut::Log) {
cost += SmallWriteCost(size);
} else {
Expand Down Expand Up @@ -264,7 +264,7 @@ namespace NKikimr {
cost += MovedPatchCostBySize(essence.MovedPatchBlobSize);
}
for (ui64 size : essence.PutBufferSizes) {
NPriPut::EHandleType handleType = NPriPut::HandleType(MinREALHugeBlobInBytes, essence.HandleClass, size);
NPriPut::EHandleType handleType = NPriPut::HandleType(MinREALHugeBlobInBytes, essence.HandleClass, size, true);
if (handleType == NPriPut::Log) {
cost += SmallWriteCost(size);
} else {
Expand Down
12 changes: 2 additions & 10 deletions ydb/core/blobstorage/vdisk/common/vdisk_handle_class.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -9,9 +9,9 @@ namespace NKikimr {
namespace NPriPut {

EHandleType HandleType(const ui32 minREALHugeBlobSize, NKikimrBlobStorage::EPutHandleClass handleClass,
ui32 originalBufSizeWithoutOverhead) {
ui32 originalBufSizeWithoutOverhead, bool addHeader) {
// what size of huge blob it would be, if it huge
const ui64 hugeBlobSize = TDiskBlob::HugeBlobOverhead + originalBufSizeWithoutOverhead;
const ui64 hugeBlobSize = (addHeader ? TDiskBlob::HeaderSize : 0) + originalBufSizeWithoutOverhead;

switch (handleClass) {
case NKikimrBlobStorage::TabletLog:
Expand All @@ -25,13 +25,5 @@ namespace NKikimr {
}
}

bool IsHandleTypeLog(const ui32 minREALHugeBlobSize, NKikimrBlobStorage::EPutHandleClass handleClass,
ui32 originalBufSizeWithoutOverhead) {
const NPriPut::EHandleType handleType = NPriPut::HandleType(minREALHugeBlobSize, handleClass,
originalBufSizeWithoutOverhead);
const bool isHandleTypeLog = handleType == NPriPut::Log;
return isHandleTypeLog;
}

} // NPriPut
} // NKikimr
4 changes: 1 addition & 3 deletions ydb/core/blobstorage/vdisk/common/vdisk_handle_class.h
Original file line number Diff line number Diff line change
Expand Up @@ -18,10 +18,8 @@ namespace NKikimr {
};

EHandleType HandleType(const ui32 minREALHugeBlobSize, NKikimrBlobStorage::EPutHandleClass handleClass,
ui32 originalBufSizeWithoutOverhead);
ui32 originalBufSizeWithoutOverhead, bool addHeader);

bool IsHandleTypeLog(const ui32 minREALHugeBlobSize, NKikimrBlobStorage::EPutHandleClass handleClass,
ui32 originalBufSizeWithoutOverhead);
} // NPriPut

} // NKikimr
2 changes: 1 addition & 1 deletion ydb/core/blobstorage/vdisk/common/vdisk_hugeblobctx.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -43,7 +43,7 @@ namespace NKikimr {

// check whether this blob is huge one; userPartSize doesn't include any metadata stored along with blob
bool THugeBlobCtx::IsHugeBlob(TBlobStorageGroupType gtype, const TLogoBlobID& fullId) const {
return gtype.MaxPartSize(fullId) + TDiskBlob::HugeBlobOverhead >= MinREALHugeBlobInBytes;
return gtype.MaxPartSize(fullId) + (AddHeader ? TDiskBlob::HeaderSize : 0) >= MinREALHugeBlobInBytes;
}

} // NKikimr
Expand Down
6 changes: 4 additions & 2 deletions ydb/core/blobstorage/vdisk/common/vdisk_hugeblobctx.h
Original file line number Diff line number Diff line change
Expand Up @@ -66,13 +66,15 @@ namespace NKikimr {
// this value is multiply of AppendBlockSize and is calculated from Config->MinHugeBlobSize
const ui32 MinREALHugeBlobInBytes;
const std::shared_ptr<const THugeSlotsMap> HugeSlotsMap;
const bool AddHeader;

// check whether this blob is huge one; userPartSize doesn't include any metadata stored along with blob
// check whether this NEW blob is huge one; userPartSize doesn't include any metadata stored along with blob
bool IsHugeBlob(TBlobStorageGroupType gtype, const TLogoBlobID& fullId) const;

THugeBlobCtx(ui32 minREALHugeBlobInBytes, const std::shared_ptr<const THugeSlotsMap> &hugeSlotsMap)
THugeBlobCtx(ui32 minREALHugeBlobInBytes, const std::shared_ptr<const THugeSlotsMap> &hugeSlotsMap, bool addHeader)
: MinREALHugeBlobInBytes(minREALHugeBlobInBytes)
, HugeSlotsMap(hugeSlotsMap)
, AddHeader(addHeader)
{}
};

Expand Down
27 changes: 17 additions & 10 deletions ydb/core/blobstorage/vdisk/defrag/defrag_rewriter.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -131,18 +131,25 @@ namespace NKikimr {
Y_ABORT_UNLESS(partId);

TRcBuf data = msg->Data.ToString();
Y_ABORT_UNLESS(data.size() == TDiskBlob::HeaderSize + gtype.PartSize(rec.LogoBlobId));
const char *header = data.data();

ui32 fullDataSize;
memcpy(&fullDataSize, header, sizeof(fullDataSize));
header += sizeof(fullDataSize);
Y_ABORT_UNLESS(fullDataSize == rec.LogoBlobId.BlobSize());

Y_ABORT_UNLESS(NMatrix::TVectorType::MakeOneHot(partId - 1, gtype.TotalPartCount()).Raw() == static_cast<ui8>(*header));
Y_ABORT_UNLESS(data.size() == TDiskBlob::HeaderSize + gtype.PartSize(rec.LogoBlobId) ||
data.size() == gtype.PartSize(rec.LogoBlobId));

ui32 trim = 0;
if (data.size() == TDiskBlob::HeaderSize + gtype.PartSize(rec.LogoBlobId)) {
const char *header = data.data();
ui32 fullDataSize;
memcpy(&fullDataSize, header, sizeof(fullDataSize));
header += sizeof(fullDataSize);
Y_ABORT_UNLESS(fullDataSize == rec.LogoBlobId.BlobSize());
Y_ABORT_UNLESS(NMatrix::TVectorType::MakeOneHot(partId - 1, gtype.TotalPartCount()).Raw() == static_cast<ui8>(*header));
trim += TDiskBlob::HeaderSize;
}

TRope rope(std::move(data));
rope.EraseFront(TDiskBlob::HeaderSize);
if (trim) {
rope.EraseFront(trim);
}
Y_ABORT_UNLESS(rope.size() == gtype.PartSize(rec.LogoBlobId));

auto writeEvent = std::make_unique<TEvBlobStorage::TEvVPut>(rec.LogoBlobId, std::move(rope),
SelfVDiskId, true, nullptr, TInstant::Max(), NKikimrBlobStorage::EPutHandleClass::AsyncBlob);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -36,7 +36,8 @@ namespace NKikimr {

return std::make_shared<THugeBlobCtx>(
repairedHuge->GetMinREALHugeBlobInBytes(),
repairedHuge->Heap->BuildHugeSlotsMap());
repairedHuge->Heap->BuildHugeSlotsMap(),
true);
}


Expand Down
87 changes: 54 additions & 33 deletions ydb/core/blobstorage/vdisk/hulldb/base/blobstorage_blob.h
Original file line number Diff line number Diff line change
Expand Up @@ -35,7 +35,6 @@ namespace NKikimr {

public:
static const size_t HeaderSize = sizeof(ui32) + sizeof(ui8);
static const size_t HugeBlobOverhead = HeaderSize;

TDiskBlob() = default;

Expand All @@ -44,29 +43,41 @@ namespace NKikimr {
, Parts(parts)
{
// ensure the blob format is correct
Y_ABORT_UNLESS(Rope->GetSize() >= HeaderSize);
Y_ABORT_UNLESS(parts.GetSize() <= MaxTotalPartCount);
//Y_ABORT_UNLESS(parts.GetSize() == gtype.TotalPartCount()); // TODO(alexvru): fit UTs

ui32 blobSize = 0;
for (ui8 i = parts.FirstPosition(); i != parts.GetSize(); i = parts.NextPosition(i)) {
blobSize += gtype.PartSize(TLogoBlobID(fullId, i + 1));
}

Y_ABORT_UNLESS(rope->GetSize() == blobSize || rope->GetSize() == blobSize + HeaderSize);

auto iter = Rope->Begin();
ui32 offset = 0;

// obtain full data size from the header
iter.ExtractPlainDataAndAdvance(&FullDataSize, sizeof(FullDataSize));
if (rope->GetSize() == blobSize + HeaderSize) {
// obtain full data size from the header
iter.ExtractPlainDataAndAdvance(&FullDataSize, sizeof(FullDataSize));

// then check the parts; we have `parts' argument to validate actual blob content
ui8 partsMask;
iter.ExtractPlainDataAndAdvance(&partsMask, sizeof(partsMask));
Y_ABORT_UNLESS(parts.Raw() == partsMask);
// then check the parts; we have `parts' argument to validate actual blob content
ui8 partsMask;
iter.ExtractPlainDataAndAdvance(&partsMask, sizeof(partsMask));
Y_ABORT_UNLESS(parts.Raw() == partsMask);

// advance offset
offset += HeaderSize;
} else {
FullDataSize = fullId.BlobSize();
}

// calculate part layout in the binary
ui32 offset = HeaderSize;
for (ui8 i = 0; i <= parts.GetSize(); ++i) {
PartOffs[i] = offset;
if (i != parts.GetSize()) {
offset += parts.Get(i) ? gtype.PartSize(TLogoBlobID(fullId, i + 1)) : 0;
}
}
Y_ABORT_UNLESS(GetSize() == Rope->GetSize(), "%" PRIu32 " != %zu", GetSize(), Rope->GetSize());
}

bool Empty() const {
Expand Down Expand Up @@ -112,8 +123,8 @@ namespace NKikimr {
return Parts;
}

ui32 GetSize() const {
return PartOffs[Parts.GetSize()];
ui32 GetBlobSize(bool addHeader) const {
return PartOffs[Parts.GetSize()] - PartOffs[0] + (addHeader ? HeaderSize : 0);
}

////////////////// Iterator via all parts ///////////////////////////////////////
Expand Down Expand Up @@ -203,19 +214,22 @@ namespace NKikimr {

public:
template<typename TPartIt>
static TRope CreateFromDistinctParts(TPartIt first, TPartIt last, NMatrix::TVectorType parts, ui64 fullDataSize, TRopeArena& arena) {
static TRope CreateFromDistinctParts(TPartIt first, TPartIt last, NMatrix::TVectorType parts, ui64 fullDataSize,
TRopeArena& arena, bool addHeader) {
// ensure that we have correct number of set parts
Y_ABORT_UNLESS(parts.CountBits() == std::distance(first, last));
Y_ABORT_UNLESS(first != last);

TRope rope;

// fill in header
char header[HeaderSize];
Y_ABORT_UNLESS(fullDataSize <= Max<ui32>());
*reinterpret_cast<ui32*>(header) = fullDataSize;
*reinterpret_cast<ui8*>(header + sizeof(ui32)) = parts.Raw();
rope.Insert(rope.End(), arena.CreateRope(header, HeaderSize));
if (addHeader) {
// fill in header
char header[HeaderSize];
Y_ABORT_UNLESS(fullDataSize <= Max<ui32>());
*reinterpret_cast<ui32*>(header) = fullDataSize;
*reinterpret_cast<ui8*>(header + sizeof(ui32)) = parts.Raw();
rope.Insert(rope.End(), arena.CreateRope(header, HeaderSize));
}

// then copy parts' contents to the rope
while (first != last) {
Expand All @@ -225,19 +239,22 @@ namespace NKikimr {
return rope;
}

static inline TRope Create(ui64 fullDataSize, ui8 partId, ui8 total, TRope&& data, TRopeArena& arena) {
static inline TRope Create(ui64 fullDataSize, ui8 partId, ui8 total, TRope&& data, TRopeArena& arena,
bool addHeader) {
Y_ABORT_UNLESS(partId > 0 && partId <= 8);
return CreateFromDistinctParts(&data, &data + 1, NMatrix::TVectorType::MakeOneHot(partId - 1, total),
fullDataSize, arena);
fullDataSize, arena, addHeader);
}

static inline TRope Create(ui64 fullDataSize, NMatrix::TVectorType parts, TRope&& data, TRopeArena& arena) {
return CreateFromDistinctParts(&data, &data + 1, parts, fullDataSize, arena);
static inline TRope Create(ui64 fullDataSize, NMatrix::TVectorType parts, TRope&& data, TRopeArena& arena,
bool addHeader) {
return CreateFromDistinctParts(&data, &data + 1, parts, fullDataSize, arena, addHeader);
}

// static function for calculating size of a blob being created ('Create' function creates blob of this size)
static inline ui32 CalculateBlobSize(TBlobStorageGroupType gtype, const TLogoBlobID& fullId, NMatrix::TVectorType parts) {
ui32 res = HeaderSize;
static inline ui32 CalculateBlobSize(TBlobStorageGroupType gtype, const TLogoBlobID& fullId, NMatrix::TVectorType parts,
bool addHeader) {
ui32 res = addHeader ? HeaderSize : 0;
for (ui8 i = parts.FirstPosition(); i != parts.GetSize(); i = parts.NextPosition(i)) {
res += gtype.PartSize(TLogoBlobID(fullId, i + 1));
}
Expand All @@ -256,7 +273,7 @@ namespace NKikimr {

if (Parts.Empty()) {
Parts = NMatrix::TVectorType(0, source.Parts.GetSize());
PartOffs.fill(HeaderSize);
PartOffs.fill(0); // we don't care about absolute offsets here
} else {
Y_ABORT_UNLESS(Parts.GetSize() == source.Parts.GetSize());
}
Expand All @@ -273,14 +290,18 @@ namespace NKikimr {
}
}

TRope CreateDiskBlob(TRopeArena& arena) const {
TRope CreateDiskBlob(TRopeArena& arena, bool addHeader) const {
Y_ABORT_UNLESS(!Empty());

char header[HeaderSize];
*reinterpret_cast<ui32*>(header) = FullDataSize;
*reinterpret_cast<ui8*>(header + sizeof(ui32)) = Parts.Raw();
TRope rope;

if (addHeader) {
char header[HeaderSize];
*reinterpret_cast<ui32*>(header) = FullDataSize;
*reinterpret_cast<ui8*>(header + sizeof(ui32)) = Parts.Raw();
rope.Insert(rope.End(), arena.CreateRope(header, sizeof(header)));
}

TRope rope(arena.CreateRope(header, sizeof(header)));
for (auto it = begin(); it != end(); ++it) {
rope.Insert(rope.End(), it.GetPart());
}
Expand Down Expand Up @@ -315,8 +336,8 @@ namespace NKikimr {
return Blob.Empty();
}

TRope CreateDiskBlob(TRopeArena& arena) const {
return Blob.CreateDiskBlob(arena);
TRope CreateDiskBlob(TRopeArena& arena, bool addHeader) const {
return Blob.CreateDiskBlob(arena, addHeader);
}

const TDiskBlob& GetDiskBlob() const {
Expand Down
Loading

0 comments on commit b2f7f35

Please sign in to comment.