Skip to content

Commit

Permalink
Merge 67e208d into 1fa2be9
Browse files Browse the repository at this point in the history
  • Loading branch information
ivanmorozov333 authored Jul 6, 2024
2 parents 1fa2be9 + 67e208d commit f22d77d
Show file tree
Hide file tree
Showing 7 changed files with 250 additions and 256 deletions.
53 changes: 53 additions & 0 deletions ydb/core/tx/columnshard/blobs_action/abstract/blob_set.h
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
#pragma once
#include <ydb/core/tx/columnshard/common/tablet_id.h>
#include <ydb/core/tx/columnshard/blob.h>
#include <ydb/core/util/gen_step.h>

#include <ydb/library/accessor/accessor.h>
#include <ydb/library/services/services.pb.h>
Expand Down Expand Up @@ -44,6 +45,58 @@ class TTabletByBlob {

};

class TBlobsByGenStep {
private:
struct Comparator {
bool operator()(const TLogoBlobID& l, const TLogoBlobID& r) const {
TGenStep gsl(l);
TGenStep gsr(l);
if (gsl == gsr) {
return l < r;
} else {
return gsl < gsr;
}
}
};
std::set<TLogoBlobID, Comparator> Blobs;
public:
[[nodiscard]] bool Add(const TLogoBlobID& blobId) {
return Blobs.emplace(blobId).second;
}
[[nodiscard]] bool Remove(const TLogoBlobID& blobId) {
return Blobs.erase(blobId);
}
bool IsEmpty() const {
return Blobs.empty();
}
ui32 GetSize() const {
return Blobs.size();
}

TGenStep GetMinGenStepVerified() const {
AFL_VERIFY(Blobs.size());
return TGenStep(*Blobs.begin());
}

template <class TActor>
bool ExtractTo(const TGenStep includeBorder, const ui32 countLimit, const TActor& actor) {
ui32 idx = 0;
for (auto it = Blobs.begin(); it != Blobs.end(); ++it) {
TGenStep gs(*it);
if (includeBorder < gs) {
return true;
}
if (++idx > countLimit) {
Blobs.erase(Blobs.begin(), it);
return false;
}
actor(gs, *it);
}
Blobs.clear();
return true;
}
};

class TTabletsByBlob {
private:
THashMap<TUnifiedBlobId, THashSet<TTabletId>> Data;
Expand Down
221 changes: 107 additions & 114 deletions ydb/core/tx/columnshard/blobs_action/bs/blob_manager.cpp

Large diffs are not rendered by default.

14 changes: 7 additions & 7 deletions ydb/core/tx/columnshard/blobs_action/bs/blob_manager.h
Original file line number Diff line number Diff line change
Expand Up @@ -145,7 +145,7 @@ class TBlobManager : public IBlobManager, public TCommonBlobsTracker {
ui32 CurrentStep;
std::optional<TGenStep> CollectGenStepInFlight;
// Lists of blobs that need Keep flag to be set
std::map<TGenStep, std::set<TLogoBlobID>> BlobsToKeep;
TBlobsByGenStep BlobsToKeep;
// Lists of blobs that need DoNotKeep flag to be set
TTabletsByBlob BlobsToDelete;

Expand Down Expand Up @@ -173,7 +173,7 @@ class TBlobManager : public IBlobManager, public TCommonBlobsTracker {
virtual void DoSaveBlobBatchOnExecute(const TBlobBatch& blobBatch, IBlobManagerDb& db) override;
virtual void DoSaveBlobBatchOnComplete(TBlobBatch&& blobBatch) override;
void DrainDeleteTo(const TGenStep& dest, TGCContext& gcContext);
[[nodiscard]] bool DrainKeepTo(const TGenStep& dest, TGCContext& gcContext, const bool controlCapacity = true);
[[nodiscard]] bool DrainKeepTo(const TGenStep& dest, TGCContext& gcContext);
public:
TBlobManager(TIntrusivePtr<TTabletStorageInfo> tabletInfo, const ui32 gen, const TTabletId selfTabletId);

Expand Down Expand Up @@ -215,11 +215,11 @@ class TBlobManager : public IBlobManager, public TCommonBlobsTracker {
const std::shared_ptr<TBlobManager>& manager, const std::shared_ptr<NDataSharing::TStorageSharedBlobsManager>& sharedBlobsInfo,
const std::shared_ptr<NBlobOperations::TRemoveGCCounters>& counters) noexcept;

void OnGCFinishedOnExecute(const TGenStep& genStep, IBlobManagerDb& db);
void OnGCFinishedOnComplete(const TGenStep& genStep);
void OnGCFinishedOnExecute(const std::optional<TGenStep>& genStep, IBlobManagerDb& db);
void OnGCFinishedOnComplete(const std::optional<TGenStep>& genStep);

void OnGCStartOnExecute(const TGenStep& genStep, IBlobManagerDb& db);
void OnGCStartOnComplete(const TGenStep& genStep);
void OnGCStartOnExecute(const std::optional<TGenStep>& genStep, IBlobManagerDb& db);
void OnGCStartOnComplete(const std::optional<TGenStep>& genStep);

TBlobManagerCounters GetCountersUpdate() {
TBlobManagerCounters res = CountersUpdate;
Expand All @@ -239,7 +239,7 @@ class TBlobManager : public IBlobManager, public TCommonBlobsTracker {
bool ExtractEvicted(TEvictedBlob& evict, TEvictMetadata& meta, bool fromDropped = false);

TGenStep EdgeGenStep() const {
return CollectGenStepInFlight ? *CollectGenStepInFlight : LastCollectedGenStep;
return CollectGenStepInFlight ? *CollectGenStepInFlight : std::max(GCBarrierPreparation, LastCollectedGenStep);
}
};

Expand Down
6 changes: 3 additions & 3 deletions ydb/core/tx/columnshard/blobs_action/bs/gc.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,7 @@ bool TGCTask::DoOnCompleteTxBeforeCleaning(NColumnShard::TColumnShard& /*self*/,
return true;
}

TGCTask::TGCTask(const TString& storageId, TGCListsByGroup&& listsByGroupId, const TGenStep& collectGenStepInFlight, std::deque<TUnifiedBlobId>&& keepsToErase,
TGCTask::TGCTask(const TString& storageId, TGCListsByGroup&& listsByGroupId, const std::optional<TGenStep>& collectGenStepInFlight, std::deque<TUnifiedBlobId>&& keepsToErase,
const std::shared_ptr<TBlobManager>& manager, TBlobsCategories&& blobsToRemove, const std::shared_ptr<TRemoveGCCounters>& counters,
const ui64 tabletId, const ui64 currentGen)
: TBase(storageId, std::move(blobsToRemove), counters)
Expand Down Expand Up @@ -65,8 +65,8 @@ std::unique_ptr<TEvBlobStorage::TEvCollectGarbage> TGCTask::BuildRequest(const T
("count", it->second.RequestsCount);
auto result = std::make_unique<TEvBlobStorage::TEvCollectGarbage>(
TabletId, CurrentGen, PerGenerationCounter.Val(),
address.GetChannelId(), true,
CollectGenStepInFlight.Generation(), CollectGenStepInFlight.Step(),
address.GetChannelId(), !!CollectGenStepInFlight,
CollectGenStepInFlight ? CollectGenStepInFlight->Generation() : 0, CollectGenStepInFlight ? CollectGenStepInFlight->Step() : 0,
new TVector<TLogoBlobID>(it->second.KeepList.begin(), it->second.KeepList.end()),
new TVector<TLogoBlobID>(it->second.DontKeepList.begin(), it->second.DontKeepList.end()),
TInstant::Max(), true);
Expand Down
6 changes: 3 additions & 3 deletions ydb/core/tx/columnshard/blobs_action/bs/gc.h
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,7 @@ class TGCTask: public IBlobsGCAction {
using TGCListsByGroup = THashMap<TBlobAddress, TGCLists>;
private:
TGCListsByGroup ListsByGroupId;
const TGenStep CollectGenStepInFlight;
const std::optional<TGenStep> CollectGenStepInFlight;
const ui64 TabletId;
const ui64 CurrentGen;
std::deque<TUnifiedBlobId> KeepsToErase;
Expand All @@ -35,11 +35,11 @@ class TGCTask: public IBlobsGCAction {
virtual bool DoOnCompleteTxBeforeCleaning(NColumnShard::TColumnShard& self, const std::shared_ptr<IBlobsGCAction>& taskAction) override;

virtual bool DoIsEmpty() const override {
return false;
return !CollectGenStepInFlight && KeepsToErase.empty();
}

public:
TGCTask(const TString& storageId, TGCListsByGroup&& listsByGroupId, const TGenStep& collectGenStepInFlight, std::deque<TUnifiedBlobId>&& keepsToErase,
TGCTask(const TString& storageId, TGCListsByGroup&& listsByGroupId, const std::optional<TGenStep>& collectGenStepInFlight, std::deque<TUnifiedBlobId>&& keepsToErase,
const std::shared_ptr<TBlobManager>& manager, TBlobsCategories&& blobsToRemove, const std::shared_ptr<TRemoveGCCounters>& counters, const ui64 tabletId, const ui64 currentGen);

const TGCListsByGroup& GetListsByGroupId() const {
Expand Down
85 changes: 39 additions & 46 deletions ydb/core/tx/columnshard/counters/blobs_manager.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -7,58 +7,51 @@ namespace NKikimr::NColumnShard {

TBlobsManagerCounters::TBlobsManagerCounters(const TString& module)
: TCommonCountersOwner(module)
{
SkipCollection = TBase::GetDeriviative("GC/Skip/Count");
StartCollection = TBase::GetDeriviative("GC/Start/Count");
CollectDropExplicitBytes = TBase::GetDeriviative("GC/Drop/Explicit/Bytes");
CollectDropExplicitCount = TBase::GetDeriviative("GC/Drop/Explicit/Count");
CollectDropImplicitBytes = TBase::GetDeriviative("GC/Drop/Implicit/Bytes");
CollectDropImplicitCount = TBase::GetDeriviative("GC/Drop/Implicit/Count");
CollectKeepBytes = TBase::GetDeriviative("GC/Keep/Bytes");
CollectKeepCount = TBase::GetDeriviative("GC/Keep/Count");
PutBlobBytes = TBase::GetDeriviative("GC/PutBlob/Bytes");
PutBlobCount = TBase::GetDeriviative("GC/PutBlob/Count");
CollectGen = TBase::GetValue("GC/Gen");
CollectStep = TBase::GetValue("GC/Step");

DeleteBlobMarkerBytes = TBase::GetDeriviative("GC/MarkerDeleteBlob/Bytes");
DeleteBlobMarkerCount = TBase::GetDeriviative("GC/MarkerDeleteBlob/Count");
DeleteBlobDelayedMarkerBytes = TBase::GetDeriviative("GC/MarkerDelayedDeleteBlob/Bytes");
DeleteBlobDelayedMarkerCount = TBase::GetDeriviative("GC/MarkerDelayedDeleteBlob/Count");
AddSmallBlobBytes = TBase::GetDeriviative("GC/AddSmallBlob/Bytes");
AddSmallBlobCount = TBase::GetDeriviative("GC/AddSmallBlob/Count");
DeleteSmallBlobBytes = TBase::GetDeriviative("GC/DeleteSmallBlob/Bytes");
DeleteSmallBlobCount = TBase::GetDeriviative("GC/DeleteSmallBlob/Count");

BlobsKeepCount = TBase::GetValue("GC/BlobsKeep/Count");
BlobsKeepBytes = TBase::GetValue("GC/BlobsKeep/Bytes");
BlobsDeleteCount = TBase::GetValue("GC/BlobsDelete/Count");
BlobsDeleteBytes = TBase::GetValue("GC/BlobsDelete/Bytes");
, BlobsToDeleteCount(TBase::GetValue("BlobsToDelete/Count"))
, BlobsToDeleteDelayedCount(TBase::GetValue("BlobsToDeleteDelayed/Count"))
, BlobsToKeepCount(TBase::GetValue("BlobsToKeep/Count"))
, CurrentGen(TBase::GetValue("CurrentGen"))
, CurrentStep(TBase::GetValue("CurrentStep"))
, GCCounters(*this, "GC")

BrokenKeepCount = TBase::GetDeriviative("GC/BrokenKeep/Count");
BrokenKeepBytes = TBase::GetDeriviative("GC/BrokenKeep/Bytes");
{

KeepMarkerCount = TBase::GetDeriviative("GC/KeepMarker/Count");
KeepMarkerBytes = TBase::GetDeriviative("GC/KeepMarker/Bytes");
}

void TBlobsManagerCounters::OnBlobsKeep(const std::map<::NKikimr::TGenStep, std::set<TLogoBlobID>>& blobs) const {
AFL_DEBUG(NKikimrServices::TX_COLUMNSHARD)("event", "OnBlobsKeep")("count", blobs.size());
// BlobsKeepCount->Set(blobs.size());
// ui64 size = 0;
// for (auto&& i : blobs) {
// size += i.BlobSize();
// }
// BlobsKeepBytes->Set(size);
TBlobsManagerGCCounters::TBlobsManagerGCCounters(const TCommonCountersOwner& sameAs, const TString& componentName)
: TBase(sameAs, componentName)
, SkipCollectionEmpty(TBase::GetDeriviative("Skip/Empty/Count"))
, SkipCollectionThrottling(TBase::GetDeriviative("Skip/Throttling/Count"))
{
KeepsCountTasks = TBase::GetHistogram("Tasks/Keeps/Count", NMonitoring::ExponentialHistogram(16, 2, 100));
KeepsCountBlobs = TBase::GetHistogram("Tasks/Keeps/Blobs", NMonitoring::ExponentialHistogram(16, 2, 100));
KeepsCountBytes = TBase::GetHistogram("Tasks/Keeps/Bytes", NMonitoring::ExponentialHistogram(16, 2, 1024));
DeletesCountBlobs = TBase::GetHistogram("Tasks/Deletes/Count", NMonitoring::ExponentialHistogram(16, 2, 100));
DeletesCountTasks = TBase::GetHistogram("Tasks/Deletes/Blobs", NMonitoring::ExponentialHistogram(16, 2, 100));
DeletesCountBytes = TBase::GetHistogram("Tasks/Deletes/Bytes", NMonitoring::ExponentialHistogram(16, 2, 1024));
FullGCTasks = TBase::GetDeriviative("Tasks/Full/Count");
MoveBarriers = TBase::GetDeriviative("Tasks/Barrier/Move");
DontMoveBarriers = TBase::GetDeriviative("Tasks/Barrier/DontMove");
GCTasks = TBase::GetDeriviative("Tasks/All/Count");
EmptyGCTasks = TBase::GetDeriviative("Tasks/Empty/Count");
}

void TBlobsManagerCounters::OnBlobsDelete(const NOlap::TTabletsByBlob& /*blobs*/) const {
// BlobsDeleteCount->Set(blobs.size());
// ui64 size = 0;
// for (auto&& i : blobs) {
// size += i.BlobSize();
// }
// BlobsDeleteBytes->Set(size);
void TBlobsManagerGCCounters::OnGCTask(const ui32 keepsCount, const ui32 keepBytes, const ui32 deleteCount, const ui32 deleteBytes, const bool isFull, const bool moveBarrier) const {
GCTasks->Add(1);
if (isFull) {
FullGCTasks->Add(1);
}
KeepsCountTasks->Collect(keepsCount);
KeepsCountBlobs->Collect((i64)keepsCount, keepsCount);
KeepsCountBytes->Collect((i64)keepsCount, keepBytes);
DeletesCountTasks->Collect(deleteCount);
DeletesCountBlobs->Collect((i64)deleteCount, deleteCount);
DeletesCountBytes->Collect((i64)deleteCount, deleteBytes);
if (moveBarrier) {
MoveBarriers->Add(1);
} else {
DontMoveBarriers->Add(1);
}
}

}
121 changes: 38 additions & 83 deletions ydb/core/tx/columnshard/counters/blobs_manager.h
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@
#include "common/owner.h"

#include <ydb/core/base/logoblob.h>
#include <ydb/core/tx/columnshard/blobs_action/abstract/blob_set.h>
#include <ydb/core/tx/columnshard/blobs_action/abstract/common.h>

#include <library/cpp/monlib/dynamic_counters/counters.h>
Expand All @@ -13,99 +14,53 @@ class TTabletsByBlob;

namespace NKikimr::NColumnShard {

class TBlobsManagerCounters: public TCommonCountersOwner {
class TBlobsManagerGCCounters: public TCommonCountersOwner {
private:
using TBase = TCommonCountersOwner;
NMonitoring::TDynamicCounters::TCounterPtr CollectDropExplicitBytes;
NMonitoring::TDynamicCounters::TCounterPtr CollectDropExplicitCount;
NMonitoring::TDynamicCounters::TCounterPtr CollectDropImplicitBytes;
NMonitoring::TDynamicCounters::TCounterPtr CollectDropImplicitCount;
NMonitoring::TDynamicCounters::TCounterPtr CollectKeepBytes;
NMonitoring::TDynamicCounters::TCounterPtr CollectKeepCount;
NMonitoring::TDynamicCounters::TCounterPtr PutBlobBytes;
NMonitoring::TDynamicCounters::TCounterPtr PutBlobCount;
NMonitoring::TDynamicCounters::TCounterPtr CollectGen;
NMonitoring::TDynamicCounters::TCounterPtr CollectStep;
NMonitoring::TDynamicCounters::TCounterPtr DeleteBlobMarkerBytes;
NMonitoring::TDynamicCounters::TCounterPtr DeleteBlobMarkerCount;
NMonitoring::TDynamicCounters::TCounterPtr DeleteBlobDelayedMarkerBytes;
NMonitoring::TDynamicCounters::TCounterPtr DeleteBlobDelayedMarkerCount;
NMonitoring::TDynamicCounters::TCounterPtr AddSmallBlobBytes;
NMonitoring::TDynamicCounters::TCounterPtr AddSmallBlobCount;
NMonitoring::TDynamicCounters::TCounterPtr DeleteSmallBlobBytes;
NMonitoring::TDynamicCounters::TCounterPtr DeleteSmallBlobCount;
NMonitoring::TDynamicCounters::TCounterPtr BrokenKeepCount;
NMonitoring::TDynamicCounters::TCounterPtr BrokenKeepBytes;
NMonitoring::TDynamicCounters::TCounterPtr BlobsKeepCount;
NMonitoring::TDynamicCounters::TCounterPtr BlobsKeepBytes;
NMonitoring::TDynamicCounters::TCounterPtr BlobsDeleteCount;
NMonitoring::TDynamicCounters::TCounterPtr BlobsDeleteBytes;
NMonitoring::TDynamicCounters::TCounterPtr KeepMarkerCount;
NMonitoring::TDynamicCounters::TCounterPtr KeepMarkerBytes;

NMonitoring::THistogramPtr KeepsCountBytes;
NMonitoring::THistogramPtr KeepsCountBlobs;
NMonitoring::THistogramPtr KeepsCountTasks;
NMonitoring::THistogramPtr DeletesCountBytes;
NMonitoring::THistogramPtr DeletesCountBlobs;
NMonitoring::THistogramPtr DeletesCountTasks;
NMonitoring::TDynamicCounters::TCounterPtr FullGCTasks;
NMonitoring::TDynamicCounters::TCounterPtr MoveBarriers;
NMonitoring::TDynamicCounters::TCounterPtr DontMoveBarriers;
NMonitoring::TDynamicCounters::TCounterPtr GCTasks;
NMonitoring::TDynamicCounters::TCounterPtr EmptyGCTasks;
public:
NMonitoring::TDynamicCounters::TCounterPtr SkipCollection;
NMonitoring::TDynamicCounters::TCounterPtr StartCollection;

TBlobsManagerCounters(const TString& module);

void OnKeepMarker(const ui64 size) const {
KeepMarkerCount->Add(1);
KeepMarkerBytes->Add(size);
}

void OnBlobsKeep(const std::map<::NKikimr::TGenStep, std::set<TLogoBlobID>>& blobs) const;

void OnBlobsDelete(const NOlap::TTabletsByBlob& blobs) const;

void OnAddSmallBlob(const ui32 bSize) const {
AddSmallBlobBytes->Add(bSize);
AddSmallBlobCount->Add(1);
}

void OnDeleteBlobDelayedMarker(const ui32 bSize) const {
DeleteBlobDelayedMarkerBytes->Add(bSize);
DeleteBlobDelayedMarkerCount->Add(1);
}

void OnDeleteBlobMarker(const ui32 bSize) const {
DeleteBlobMarkerBytes->Add(bSize);
DeleteBlobMarkerCount->Add(1);
}
const NMonitoring::TDynamicCounters::TCounterPtr SkipCollectionEmpty;
const NMonitoring::TDynamicCounters::TCounterPtr SkipCollectionThrottling;

void OnNewCollectStep(const ui32 gen, const ui32 step) const {
CollectGen->Set(gen);
CollectStep->Set(step);
}
TBlobsManagerGCCounters(const TCommonCountersOwner& sameAs, const TString& componentName);

void OnDeleteSmallBlob(const ui32 bSize) const {
DeleteSmallBlobBytes->Add(bSize);
DeleteSmallBlobCount->Add(1);
}
void OnGCTask(const ui32 keepsCount, const ui32 keepBytes, const ui32 deleteCount, const ui32 deleteBytes,
const bool isFull, const bool moveBarrier) const;

void OnPutResult(const ui32 bSize) const {
PutBlobBytes->Add(bSize);
PutBlobCount->Add(1);
}

void OnCollectKeep(const ui32 bSize) const {
CollectKeepBytes->Add(bSize);
CollectKeepCount->Add(1);
void OnEmptyGCTask() const {
EmptyGCTasks->Add(1);
}
};

void OnBrokenKeep(const ui32 bSize) const {
BrokenKeepBytes->Add(bSize);
BrokenKeepCount->Add(1);
class TBlobsManagerCounters: public TCommonCountersOwner {
private:
using TBase = TCommonCountersOwner;
const NMonitoring::TDynamicCounters::TCounterPtr BlobsToDeleteCount;
const NMonitoring::TDynamicCounters::TCounterPtr BlobsToDeleteDelayedCount;
const NMonitoring::TDynamicCounters::TCounterPtr BlobsToKeepCount;
public:
const NMonitoring::TDynamicCounters::TCounterPtr CurrentGen;
const NMonitoring::TDynamicCounters::TCounterPtr CurrentStep;
const TBlobsManagerGCCounters GCCounters;
TBlobsManagerCounters(const TString& module);
void OnBlobsToDelete(const NOlap::TTabletsByBlob& blobs) const {
BlobsToDeleteCount->Set(blobs.GetSize());
}

void OnCollectDropExplicit(const ui32 bSize) const {
CollectDropExplicitBytes->Add(bSize);
CollectDropExplicitCount->Add(1);
void OnBlobsToKeep(const NOlap::TBlobsByGenStep& blobs) const {
BlobsToKeepCount->Set(blobs.GetSize());
}

void OnCollectDropImplicit(const ui32 bSize) const {
CollectDropImplicitBytes->Add(bSize);
CollectDropImplicitCount->Add(1);
void OnBlobsToDeleteDelayed(const NOlap::TTabletsByBlob& blobs) const {
BlobsToDeleteDelayedCount->Set(blobs.GetSize());
}
};

Expand Down

0 comments on commit f22d77d

Please sign in to comment.