Skip to content

Commit

Permalink
Added benchmarks and implemented HeapIterator in code (#6611)
Browse files Browse the repository at this point in the history
  • Loading branch information
mregrock authored Jul 17, 2024
1 parent dc1084d commit ec43cdd
Show file tree
Hide file tree
Showing 13 changed files with 395 additions and 242 deletions.
30 changes: 14 additions & 16 deletions ydb/core/blobstorage/vdisk/defrag/defrag_search.h
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@
#include <ydb/core/blobstorage/vdisk/common/vdisk_defrag.h>
#include <ydb/core/blobstorage/vdisk/huge/blobstorage_hullhugeheap.h>
#include <ydb/core/blobstorage/vdisk/hulldb/hull_ds_all_snap.h>
#include <ydb/core/blobstorage/vdisk/hulldb/base/hullds_heap_it.h>
#include <ydb/core/blobstorage/vdisk/query/query_statalgo.h>

namespace NKikimr {
Expand Down Expand Up @@ -337,25 +338,18 @@ namespace NKikimr {
bool Scan(TDuration quota, THullDsSnap fullSnap) {
// create iterator and set it up to point to next blob of interest
TLogoBlobsSnapshot::TForwardIterator iter(fullSnap.HullCtx, &fullSnap.LogoBlobsSnap);
if (NextId) {
iter.Seek(*NextId);
} else {
iter.SeekToFirst();
}

THeapIterator<TKeyLogoBlob, TMemRecLogoBlob, true> heapIt(&iter);
// calculate timestamp to finish scanning
const ui64 endTime = GetCycleCountFast() + DurationToCycles(quota);
for (ui32 count = 0; iter.Valid(); iter.Next()) {
if (++count % 1024 == 0 && GetCycleCountFast() >= endTime) {
break;
}
iter.PutToMerger(this);
}

if (iter.Valid()) {
NextId.emplace(iter.GetCurKey().LogoBlobID());
ui32 count = 0;
auto callback = [&](TKeyLogoBlob /*key*/, auto* /*merger*/) -> bool {
return (++count % 1024 != 0 || GetCycleCountFast() < endTime);
};
heapIt.Walk(NextId.value_or(TLogoBlobID()), this, callback);
if (heapIt.Valid()) {
NextId.emplace(heapIt.GetCurKey().LogoBlobID());
}
return iter.Valid();
return heapIt.Valid();
}

void AddFromFresh(const TMemRecLogoBlob& memRec, const TRope* /*data*/, const TKeyLogoBlob& key, ui64 /*lsn*/) {
Expand All @@ -366,6 +360,10 @@ namespace NKikimr {
Update(key, memRec, outbound);
}

void Finish() {}

void Clear() {}

static constexpr bool HaveToMergeData() { return false; }

std::vector<TDefragRecord> GetRecordsToRewrite() {
Expand Down
18 changes: 6 additions & 12 deletions ydb/core/blobstorage/vdisk/hulldb/barriers/barriers_public.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -84,20 +84,14 @@ namespace NKikimr {
void TBarriersDs::BuildMemView() {
TBase::TLevelIndexSnapshot snap = TBase::GetIndexSnapshot();
TBase::TLevelIndexSnapshot::TForwardIterator it(Settings.HullCtx, &snap);
it.SeekToFirst();

THeapIterator<TKeyBarrier, TMemRecBarrier, true> heapIt(&it);
TIndexRecordMerger<TKeyBarrier, TMemRecBarrier> merger(Settings.HullCtx->VCtx->Top->GType);
while (it.Valid()) {
it.PutToMerger(&merger);
merger.Finish();

const TKeyBarrier& key = it.GetCurKey();
const TMemRecBarrier& memRec = merger.GetMemRec();
auto callback = [&] (TKeyBarrier key, auto* merger) -> bool {
const TMemRecBarrier& memRec = merger->GetMemRec();
MemView->Update(key, memRec);

it.Next();
merger.Clear();
}
return true;
};
heapIt.Walk(TKeyBarrier::First(), &merger, callback);
}

} // NBarriers
Expand Down
19 changes: 7 additions & 12 deletions ydb/core/blobstorage/vdisk/hulldb/barriers/hullds_cache_barrier.h
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@

#include "defs.h"
#include <ydb/core/blobstorage/vdisk/hulldb/hull_ds_all.h>
#include <ydb/core/blobstorage/vdisk/hulldb/base/hullds_heap_it.h>

namespace NKikimr {

Expand Down Expand Up @@ -55,23 +56,17 @@ namespace NKikimr {

// create iterator and start traversing the whole barrier database
TBarriersSnapshot::TForwardIterator it(hullDs->HullCtx, &snapshot);
it.SeekToFirst();

THeapIterator<TKeyBarrier, TMemRecBarrier, true> heapIt(&it);
TIndexRecordMerger<TKeyBarrier, TMemRecBarrier> merger(hullDs->HullCtx->VCtx->Top->GType);
while (it.Valid()) {
it.PutToMerger(&merger);
merger.Finish();

const TKeyBarrier& key = it.GetCurKey();
const TMemRecBarrier& memRec = merger.GetMemRec();
auto callback = [&] (TKeyBarrier key, auto* merger) -> bool {
const TMemRecBarrier& memRec = merger->GetMemRec();
if (!hullDs->HullCtx->GCOnlySynced || memRec.Ingress.IsQuorum(hullDs->HullCtx->IngressCache.Get()) ||
key.Hard) {
Update(key.TabletId, key.Channel, key.Hard, memRec.CollectGen, memRec.CollectStep);
}

it.Next();
merger.Clear();
}
return true;
};
heapIt.Walk(TKeyBarrier::First(), &merger, callback);
}

void Update(ui64 tabletId, ui32 channel, bool hard, ui32 collectGen, ui32 collectStep) {
Expand Down
13 changes: 11 additions & 2 deletions ydb/core/blobstorage/vdisk/hulldb/base/hullds_heap_it.h
Original file line number Diff line number Diff line change
Expand Up @@ -75,6 +75,13 @@ namespace NKikimr {
}

public:
template <typename TIter>
THeapIterator(TIter* iter) {
iter->PutToHeap(*this);
}

THeapIterator() = default;

bool Valid() const {
return HeapItems;
}
Expand Down Expand Up @@ -143,8 +150,10 @@ namespace NKikimr {
}

template<typename TMerger, typename TCallback>
void Walk(TKey key, TMerger merger, TCallback&& callback) {
Seek(key);
void Walk(std::optional<TKey> key, TMerger merger, TCallback&& callback) {
if (key.has_value()) {
Seek(key.value());
}
while (Valid()) {
const TKey key = GetCurKey();
PutToMergerAndAdvance(merger);
Expand Down
Loading

0 comments on commit ec43cdd

Please sign in to comment.