Skip to content

Commit

Permalink
fix huge blobs volume for start mode withno barrier moving
Browse files Browse the repository at this point in the history
  • Loading branch information
ivanmorozov333 committed Jul 6, 2024
1 parent 2c3cdc6 commit fd60ff0
Show file tree
Hide file tree
Showing 4 changed files with 47 additions and 36 deletions.
63 changes: 37 additions & 26 deletions ydb/core/tx/columnshard/blobs_action/bs/blob_manager.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -237,7 +237,6 @@ class TBlobManager::TGCContext {
// TODO: we need only actual channel history here
for (ui32 channelIdx = 2; channelIdx < tabletInfo->Channels.size(); ++channelIdx) {
const auto& channelHistory = tabletInfo->ChannelInfo(channelIdx)->History;

for (auto it = channelHistory.begin(); it != channelHistory.end(); ++it) {
PerGroupGCListsInFlight[TBlobAddress(it->GroupID, channelIdx)];
}
Expand Down Expand Up @@ -330,11 +329,6 @@ std::shared_ptr<NBlobOperations::NBlobStorage::TGCTask> TBlobManager::BuildGCTas

PreviousGCTime = AppData()->TimeProvider->Now();
TGCContext gcContext(sharedBlobsInfo);
if (FirstGC) {
gcContext.InitializeFirst(TabletInfo);
FirstGC = false;
}

NActors::TLogContextGuard lGuard = NActors::TLogContextBuilder::Build()("action_id", TGUID::CreateTimebased().AsGuidString());
const std::deque<TGenStep> newCollectGenSteps = FindNewGCBarriers();
AFL_VERIFY(newCollectGenSteps.size());
Expand All @@ -345,7 +339,9 @@ std::shared_ptr<NBlobOperations::NBlobStorage::TGCTask> TBlobManager::BuildGCTas
if (!DrainKeepTo(newCollectGenStep, gcContext)) {
break;
}
CollectGenStepInFlight = std::max(CollectGenStepInFlight.value_or(newCollectGenStep), newCollectGenStep);
if (newCollectGenStep.Generation() == CurrentGen) {
CollectGenStepInFlight = std::max(CollectGenStepInFlight.value_or(newCollectGenStep), newCollectGenStep);
}
}
AFL_VERIFY(LastCollectedGenStep <= CollectGenStepInFlight)("last", LastCollectedGenStep)("collect", CollectGenStepInFlight);
} else {
Expand All @@ -361,22 +357,27 @@ std::shared_ptr<NBlobOperations::NBlobStorage::TGCTask> TBlobManager::BuildGCTas
if (!DrainKeepTo(newCollectGenStep, gcContext)) {
break;
}
CollectGenStepInFlight = std::max(CollectGenStepInFlight.value_or(newCollectGenStep), newCollectGenStep);
if (newCollectGenStep.Generation() == CurrentGen) {
CollectGenStepInFlight = std::max(CollectGenStepInFlight.value_or(newCollectGenStep), newCollectGenStep);
}
}

if (!CollectGenStepInFlight) {
CollectGenStepInFlight = LastCollectedGenStep;
if (CollectGenStepInFlight) {
PopGCBarriers(*CollectGenStepInFlight);
if (FirstGC) {
gcContext.InitializeFirst(TabletInfo);
FirstGC = false;
}
}
PopGCBarriers(*CollectGenStepInFlight);
AFL_VERIFY(LastCollectedGenStep <= *CollectGenStepInFlight);
AFL_INFO(NKikimrServices::TX_COLUMNSHARD_BLOBS_BS)("notice", "collect_gen_step")("value", *CollectGenStepInFlight)("current_gen", CurrentGen);
AFL_INFO(NKikimrServices::TX_COLUMNSHARD_BLOBS_BS)("notice", "collect_gen_step")("value", CollectGenStepInFlight)("current_gen", CurrentGen);

const bool isFull = gcContext.IsFull();

auto removeCategories = sharedBlobsInfo->BuildRemoveCategories(std::move(gcContext.MutableExtractedToRemoveFromDB()));

auto result = std::make_shared<NBlobOperations::NBlobStorage::TGCTask>(storageId, std::move(gcContext.MutablePerGroupGCListsInFlight()), *CollectGenStepInFlight,
std::move(gcContext.MutableKeepsToErase()), manager, std::move(removeCategories), counters, TabletInfo->TabletID, CurrentGen);
auto result = std::make_shared<NBlobOperations::NBlobStorage::TGCTask>(storageId, std::move(gcContext.MutablePerGroupGCListsInFlight()),
CollectGenStepInFlight, std::move(gcContext.MutableKeepsToErase()), manager, std::move(removeCategories), counters, TabletInfo->TabletID, CurrentGen);
if (result->IsEmpty()) {
CollectGenStepInFlight = {};
return nullptr;
Expand Down Expand Up @@ -467,24 +468,34 @@ void TBlobManager::DeleteBlobOnComplete(const TTabletId tabletId, const TUnified
}
}

void TBlobManager::OnGCFinishedOnExecute(const TGenStep& genStep, IBlobManagerDb& db) {
db.SaveLastGcBarrier(genStep);
void TBlobManager::OnGCFinishedOnExecute(const std::optional<TGenStep>& genStep, IBlobManagerDb& db) {
if (genStep) {
db.SaveLastGcBarrier(*genStep);
}
}

void TBlobManager::OnGCFinishedOnComplete(const TGenStep& genStep) {
LastCollectedGenStep = genStep;
AFL_VERIFY(GCBarrierPreparation == LastCollectedGenStep)("prepare", GCBarrierPreparation)("last", LastCollectedGenStep);
CollectGenStepInFlight.reset();
void TBlobManager::OnGCFinishedOnComplete(const std::optional<TGenStep>& genStep) {
if (genStep) {
LastCollectedGenStep = *genStep;
AFL_VERIFY(GCBarrierPreparation == LastCollectedGenStep)("prepare", GCBarrierPreparation)("last", LastCollectedGenStep);
CollectGenStepInFlight.reset();
} else {
AFL_VERIFY(!CollectGenStepInFlight);
}
}

void TBlobManager::OnGCStartOnExecute(const TGenStep& genStep, IBlobManagerDb& db) {
AFL_VERIFY(LastCollectedGenStep <= genStep)("last", LastCollectedGenStep)("prepared", genStep);
db.SaveGCBarrierPreparation(genStep);
void TBlobManager::OnGCStartOnExecute(const std::optional<TGenStep>& genStep, IBlobManagerDb& db) {
if (genStep) {
AFL_VERIFY(LastCollectedGenStep <= *genStep)("last", LastCollectedGenStep)("prepared", genStep);
db.SaveGCBarrierPreparation(*genStep);
}
}

void TBlobManager::OnGCStartOnComplete(const TGenStep& genStep) {
AFL_VERIFY(GCBarrierPreparation <= genStep)("last", GCBarrierPreparation)("prepared", genStep);
GCBarrierPreparation = genStep;
void TBlobManager::OnGCStartOnComplete(const std::optional<TGenStep>& genStep) {
if (genStep) {
AFL_VERIFY(GCBarrierPreparation <= *genStep)("last", GCBarrierPreparation)("prepared", genStep);
GCBarrierPreparation = *genStep;
}
}

void TBlobManager::OnBlobFree(const TUnifiedBlobId& blobId) {
Expand Down
8 changes: 4 additions & 4 deletions ydb/core/tx/columnshard/blobs_action/bs/blob_manager.h
Original file line number Diff line number Diff line change
Expand Up @@ -215,11 +215,11 @@ class TBlobManager : public IBlobManager, public TCommonBlobsTracker {
const std::shared_ptr<TBlobManager>& manager, const std::shared_ptr<NDataSharing::TStorageSharedBlobsManager>& sharedBlobsInfo,
const std::shared_ptr<NBlobOperations::TRemoveGCCounters>& counters) noexcept;

void OnGCFinishedOnExecute(const TGenStep& genStep, IBlobManagerDb& db);
void OnGCFinishedOnComplete(const TGenStep& genStep);
void OnGCFinishedOnExecute(const std::optional<TGenStep>& genStep, IBlobManagerDb& db);
void OnGCFinishedOnComplete(const std::optional<TGenStep>& genStep);

void OnGCStartOnExecute(const TGenStep& genStep, IBlobManagerDb& db);
void OnGCStartOnComplete(const TGenStep& genStep);
void OnGCStartOnExecute(const std::optional<TGenStep>& genStep, IBlobManagerDb& db);
void OnGCStartOnComplete(const std::optional<TGenStep>& genStep);

TBlobManagerCounters GetCountersUpdate() {
TBlobManagerCounters res = CountersUpdate;
Expand Down
6 changes: 3 additions & 3 deletions ydb/core/tx/columnshard/blobs_action/bs/gc.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,7 @@ bool TGCTask::DoOnCompleteTxBeforeCleaning(NColumnShard::TColumnShard& /*self*/,
return true;
}

TGCTask::TGCTask(const TString& storageId, TGCListsByGroup&& listsByGroupId, const TGenStep& collectGenStepInFlight, std::deque<TUnifiedBlobId>&& keepsToErase,
TGCTask::TGCTask(const TString& storageId, TGCListsByGroup&& listsByGroupId, const std::optional<TGenStep>& collectGenStepInFlight, std::deque<TUnifiedBlobId>&& keepsToErase,
const std::shared_ptr<TBlobManager>& manager, TBlobsCategories&& blobsToRemove, const std::shared_ptr<TRemoveGCCounters>& counters,
const ui64 tabletId, const ui64 currentGen)
: TBase(storageId, std::move(blobsToRemove), counters)
Expand Down Expand Up @@ -65,8 +65,8 @@ std::unique_ptr<TEvBlobStorage::TEvCollectGarbage> TGCTask::BuildRequest(const T
("count", it->second.RequestsCount);
auto result = std::make_unique<TEvBlobStorage::TEvCollectGarbage>(
TabletId, CurrentGen, PerGenerationCounter.Val(),
address.GetChannelId(), true,
CollectGenStepInFlight.Generation(), CollectGenStepInFlight.Step(),
address.GetChannelId(), !!CollectGenStepInFlight,
CollectGenStepInFlight ? CollectGenStepInFlight->Generation() : 0, CollectGenStepInFlight ? CollectGenStepInFlight->Step() : 0,
new TVector<TLogoBlobID>(it->second.KeepList.begin(), it->second.KeepList.end()),
new TVector<TLogoBlobID>(it->second.DontKeepList.begin(), it->second.DontKeepList.end()),
TInstant::Max(), true);
Expand Down
6 changes: 3 additions & 3 deletions ydb/core/tx/columnshard/blobs_action/bs/gc.h
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,7 @@ class TGCTask: public IBlobsGCAction {
using TGCListsByGroup = THashMap<TBlobAddress, TGCLists>;
private:
TGCListsByGroup ListsByGroupId;
const TGenStep CollectGenStepInFlight;
const std::optional<TGenStep> CollectGenStepInFlight;
const ui64 TabletId;
const ui64 CurrentGen;
std::deque<TUnifiedBlobId> KeepsToErase;
Expand All @@ -35,11 +35,11 @@ class TGCTask: public IBlobsGCAction {
virtual bool DoOnCompleteTxBeforeCleaning(NColumnShard::TColumnShard& self, const std::shared_ptr<IBlobsGCAction>& taskAction) override;

virtual bool DoIsEmpty() const override {
return false;
return !CollectGenStepInFlight && KeepsToErase.empty();
}

public:
TGCTask(const TString& storageId, TGCListsByGroup&& listsByGroupId, const TGenStep& collectGenStepInFlight, std::deque<TUnifiedBlobId>&& keepsToErase,
TGCTask(const TString& storageId, TGCListsByGroup&& listsByGroupId, const std::optional<TGenStep>& collectGenStepInFlight, std::deque<TUnifiedBlobId>&& keepsToErase,
const std::shared_ptr<TBlobManager>& manager, TBlobsCategories&& blobsToRemove, const std::shared_ptr<TRemoveGCCounters>& counters, const ui64 tabletId, const ui64 currentGen);

const TGCListsByGroup& GetListsByGroupId() const {
Expand Down

0 comments on commit fd60ff0

Please sign in to comment.