From fea1683b38b278f6076deb14b89bd23af55c538b Mon Sep 17 00:00:00 2001 From: ivanmorozov333 Date: Wed, 5 Jun 2024 07:14:16 +0300 Subject: [PATCH] gc simplification and avoid race with reduce barrier for cleanup (#5175) --- ydb/core/protos/counters_columnshard.proto | 1 + .../columnshard/blobs_action/abstract/gc.cpp | 14 + .../tx/columnshard/blobs_action/abstract/gc.h | 7 + .../blobs_action/abstract/storage.h | 32 ++- .../blobs_action/blob_manager_db.cpp | 24 +- .../blobs_action/blob_manager_db.h | 5 + .../blobs_action/bs/blob_manager.cpp | 272 ++++++++++-------- .../blobs_action/bs/blob_manager.h | 9 +- .../tx/columnshard/blobs_action/bs/gc.cpp | 14 +- ydb/core/tx/columnshard/blobs_action/bs/gc.h | 8 + .../tx/columnshard/blobs_action/bs/gc_actor.h | 3 +- .../columnshard/blobs_action/bs/storage.cpp | 11 +- .../tx/columnshard/blobs_action/bs/storage.h | 3 +- .../tx/columnshard/blobs_action/tier/gc.h | 6 + .../columnshard/blobs_action/tier/storage.cpp | 9 +- .../columnshard/blobs_action/tier/storage.h | 3 +- .../transaction/tx_gc_indexed.cpp | 14 + .../blobs_action/transaction/tx_gc_indexed.h | 16 ++ ydb/core/tx/columnshard/columnshard_impl.cpp | 6 +- ydb/core/tx/columnshard/columnshard_schema.h | 2 + .../data_sharing/manager/shared_blobs.h | 2 +- 21 files changed, 309 insertions(+), 152 deletions(-) diff --git a/ydb/core/protos/counters_columnshard.proto b/ydb/core/protos/counters_columnshard.proto index 9d2a212bb47a..5316533e1c9e 100644 --- a/ydb/core/protos/counters_columnshard.proto +++ b/ydb/core/protos/counters_columnshard.proto @@ -194,4 +194,5 @@ enum ETxTypes { TXTYPE_ADD_BACKGROUND_SESSION = 31 [(TxTypeOpts) = {Name: "TxAddBackgroundSession"}]; TXTYPE_SAVE_BACKGROUND_SESSION_PROGRESS = 32 [(TxTypeOpts) = {Name: "TxSaveBackgroundSessionProgress"}]; TXTYPE_SAVE_BACKGROUND_SESSION_STATE = 33 [(TxTypeOpts) = {Name: "TxSaveBackgroundSessionState"}]; + TXTYPE_GC_START = 34 [(TxTypeOpts) = {Name: "TxGarbageCollectionStart"}]; } diff --git a/ydb/core/tx/columnshard/blobs_action/abstract/gc.cpp b/ydb/core/tx/columnshard/blobs_action/abstract/gc.cpp index 893d6cc57bbe..8c29fa8c17e4 100644 --- a/ydb/core/tx/columnshard/blobs_action/abstract/gc.cpp +++ b/ydb/core/tx/columnshard/blobs_action/abstract/gc.cpp @@ -34,6 +34,20 @@ void IBlobsGCAction::OnExecuteTxAfterCleaning(NColumnShard::TColumnShard& self, } } +void IBlobsGCAction::OnCompleteTxBeforeCleaning(NColumnShard::TColumnShard& self, const std::shared_ptr& taskAction) { + if (!AbortedFlag) { + if (!DoOnCompleteTxAfterCleaning(self, taskAction)) { + return; + } + } +} + +void IBlobsGCAction::OnExecuteTxBeforeCleaning(NColumnShard::TColumnShard& self, TBlobManagerDb& dbBlobs) { + if (!AbortedFlag) { + return DoOnExecuteTxAfterCleaning(self, dbBlobs); + } +} + void IBlobsGCAction::Abort() { Y_ABORT_UNLESS(IsInProgress()); AFL_DEBUG(NKikimrServices::TX_COLUMNSHARD)("event", "gc_aborted")("action_guid", GetActionGuid()); diff --git a/ydb/core/tx/columnshard/blobs_action/abstract/gc.h b/ydb/core/tx/columnshard/blobs_action/abstract/gc.h index ad381063e79e..2d0a5e44732d 100644 --- a/ydb/core/tx/columnshard/blobs_action/abstract/gc.h +++ b/ydb/core/tx/columnshard/blobs_action/abstract/gc.h @@ -25,12 +25,19 @@ class IBlobsGCAction: public ICommonBlobsAction { virtual void DoOnExecuteTxAfterCleaning(NColumnShard::TColumnShard& self, TBlobManagerDb& dbBlobs) = 0; virtual bool DoOnCompleteTxAfterCleaning(NColumnShard::TColumnShard& self, const std::shared_ptr& taskAction) = 0; + + virtual void DoOnExecuteTxBeforeCleaning(NColumnShard::TColumnShard& self, TBlobManagerDb& dbBlobs) = 0; + virtual bool DoOnCompleteTxBeforeCleaning(NColumnShard::TColumnShard& self, const std::shared_ptr& taskAction) = 0; + virtual void RemoveBlobIdFromDB(const TTabletId tabletId, const TUnifiedBlobId& blobId, TBlobManagerDb& dbBlobs) = 0; virtual bool DoIsEmpty() const = 0; public: void OnExecuteTxAfterCleaning(NColumnShard::TColumnShard& self, TBlobManagerDb& dbBlobs); void OnCompleteTxAfterCleaning(NColumnShard::TColumnShard& self, const std::shared_ptr& taskAction); + void OnExecuteTxBeforeCleaning(NColumnShard::TColumnShard& self, TBlobManagerDb& dbBlobs); + void OnCompleteTxBeforeCleaning(NColumnShard::TColumnShard& self, const std::shared_ptr& taskAction); + const TBlobsCategories& GetBlobsToRemove() const { return BlobsToRemove; } diff --git a/ydb/core/tx/columnshard/blobs_action/abstract/storage.h b/ydb/core/tx/columnshard/blobs_action/abstract/storage.h index d99e949e308b..19a3655c17b1 100644 --- a/ydb/core/tx/columnshard/blobs_action/abstract/storage.h +++ b/ydb/core/tx/columnshard/blobs_action/abstract/storage.h @@ -51,9 +51,15 @@ class IBlobsStorageOperator { return ""; } - virtual std::shared_ptr DoStartGCAction(const std::shared_ptr& counters) const = 0; - std::shared_ptr StartGCAction(const std::shared_ptr& counters) const { - return DoStartGCAction(counters); + virtual void DoStartGCAction(const std::shared_ptr& counters) const = 0; + + void StartGCAction(const std::shared_ptr& action) const { + return DoStartGCAction(action); + } + + virtual std::shared_ptr DoCreateGCAction(const std::shared_ptr& counters) const = 0; + std::shared_ptr CreateGCAction(const std::shared_ptr& counters) const { + return DoCreateGCAction(counters); } public: @@ -101,22 +107,26 @@ class IBlobsStorageOperator { result->SetCounters(Counters->GetConsumerCounter(consumerId)->GetReadCounters()); return result; } - bool StartGC() { + + void StartGC(const std::shared_ptr& action) { + AFL_VERIFY(CurrentGCAction == action); + AFL_VERIFY(!!action && action->IsInProgress()); + StartGCAction(action); + } + + [[nodiscard]] std::shared_ptr CreateGC() { NActors::TLogContextGuard gLogging = NActors::TLogContextBuilder::Build(NKikimrServices::TX_COLUMNSHARD)("storage_id", GetStorageId())("tablet_id", GetSelfTabletId()); if (CurrentGCAction && CurrentGCAction->IsInProgress()) { AFL_DEBUG(NKikimrServices::TX_COLUMNSHARD)("event", "gc_in_progress"); - return false; + return nullptr; } if (Stopped) { AFL_DEBUG(NKikimrServices::TX_COLUMNSHARD)("event", "stopped_on_gc"); - return false; - } - auto task = StartGCAction(Counters->GetConsumerCounter(NBlobOperations::EConsumer::GC)->GetRemoveGCCounters()); - if (!task) { - return false; + return nullptr; } + auto task = CreateGCAction(Counters->GetConsumerCounter(NBlobOperations::EConsumer::GC)->GetRemoveGCCounters()); CurrentGCAction = task; - return true; + return CurrentGCAction; } }; diff --git a/ydb/core/tx/columnshard/blobs_action/blob_manager_db.cpp b/ydb/core/tx/columnshard/blobs_action/blob_manager_db.cpp index 3bc5f2b8c3d6..7b480054f471 100644 --- a/ydb/core/tx/columnshard/blobs_action/blob_manager_db.cpp +++ b/ydb/core/tx/columnshard/blobs_action/blob_manager_db.cpp @@ -6,16 +6,34 @@ namespace NKikimr::NOlap { using namespace NKikimr::NColumnShard; +bool TBlobManagerDb::LoadGCBarrierPreparation(TGenStep& genStep) { + ui64 gen = 0; + ui64 step = 0; + NIceDb::TNiceDb db(Database); + if (!Schema::GetSpecialValueOpt(db, Schema::EValueIds::GCBarrierPreparationGen, gen) || + !Schema::GetSpecialValueOpt(db, Schema::EValueIds::GCBarrierPreparationStep, step)) + { + return false; + } + genStep = TGenStep(gen, step); + return true; +} + +void TBlobManagerDb::SaveGCBarrierPreparation(const TGenStep& genStep) { + NIceDb::TNiceDb db(Database); + Schema::SaveSpecialValue(db, Schema::EValueIds::GCBarrierPreparationGen, std::get<0>(genStep)); + Schema::SaveSpecialValue(db, Schema::EValueIds::GCBarrierPreparationStep, std::get<1>(genStep)); +} + bool TBlobManagerDb::LoadLastGcBarrier(TGenStep& lastCollectedGenStep) { NIceDb::TNiceDb db(Database); ui64 gen = 0; ui64 step = 0; if (!Schema::GetSpecialValueOpt(db, Schema::EValueIds::LastGcBarrierGen, gen) || - !Schema::GetSpecialValueOpt(db, Schema::EValueIds::LastGcBarrierStep, step)) - { + !Schema::GetSpecialValueOpt(db, Schema::EValueIds::LastGcBarrierStep, step)) { return false; } - lastCollectedGenStep = {gen, step}; + lastCollectedGenStep = { gen, step }; return true; } diff --git a/ydb/core/tx/columnshard/blobs_action/blob_manager_db.h b/ydb/core/tx/columnshard/blobs_action/blob_manager_db.h index 6c52e6c9ff2f..c79363543756 100644 --- a/ydb/core/tx/columnshard/blobs_action/blob_manager_db.h +++ b/ydb/core/tx/columnshard/blobs_action/blob_manager_db.h @@ -19,6 +19,8 @@ class IBlobManagerDb { public: virtual ~IBlobManagerDb() = default; + [[nodiscard]] virtual bool LoadGCBarrierPreparation(TGenStep& genStep) = 0; + virtual void SaveGCBarrierPreparation(const TGenStep& genStep) = 0; [[nodiscard]] virtual bool LoadLastGcBarrier(TGenStep& lastCollectedGenStep) = 0; virtual void SaveLastGcBarrier(const TGenStep& lastCollectedGenStep) = 0; @@ -54,6 +56,9 @@ class TBlobManagerDb : public IBlobManagerDb { [[nodiscard]] bool LoadLastGcBarrier(TGenStep& lastCollectedGenStep) override; void SaveLastGcBarrier(const TGenStep& lastCollectedGenStep) override; + [[nodiscard]] bool LoadGCBarrierPreparation(TGenStep& genStep) override; + void SaveGCBarrierPreparation(const TGenStep& genStep) override; + [[nodiscard]] bool LoadLists(std::vector& blobsToKeep, TTabletsByBlob& blobsToDelete, const IBlobGroupSelector* dsGroupSelector, const TTabletId selfTabletId) override; diff --git a/ydb/core/tx/columnshard/blobs_action/bs/blob_manager.cpp b/ydb/core/tx/columnshard/blobs_action/bs/blob_manager.cpp index 5cf0a7a0589d..c9ee371bdbd5 100644 --- a/ydb/core/tx/columnshard/blobs_action/bs/blob_manager.cpp +++ b/ydb/core/tx/columnshard/blobs_action/bs/blob_manager.cpp @@ -15,7 +15,7 @@ TLogoBlobID ParseLogoBlobId(TString blobId) { return logoBlobId; } -struct TBlobBatch::TBatchInfo : TNonCopyable { +struct TBlobBatch::TBatchInfo: TNonCopyable { private: std::vector BlobIds; public: @@ -58,8 +58,8 @@ struct TBlobBatch::TBatchInfo : TNonCopyable { }; TBlobBatch::TBlobBatch(std::unique_ptr batchInfo) - : BatchInfo(std::move(batchInfo)) -{} + : BatchInfo(std::move(batchInfo)) { +} TBlobBatch::TBlobBatch() = default; TBlobBatch::TBlobBatch(TBlobBatch&& other) = default; @@ -128,8 +128,8 @@ TBlobManager::TBlobManager(TIntrusivePtr tabletInfo, ui32 ge , CurrentGen(gen) , CurrentStep(0) , BlobCountToTriggerGC(BLOB_COUNT_TO_TRIGGER_GC_DEFAULT, 0, Max()) - , GCIntervalSeconds(GC_INTERVAL_SECONDS_DEFAULT, 0, Max()) -{} + , GCIntervalSeconds(GC_INTERVAL_SECONDS_DEFAULT, 0, Max()) { +} void TBlobManager::RegisterControls(NKikimr::TControlBoard& icb) { icb.RegisterSharedControl(BlobCountToTriggerGC, "ColumnShardControls.BlobCountToTriggerGC"); @@ -141,6 +141,9 @@ bool TBlobManager::LoadState(IBlobManagerDb& db, const TTabletId selfTabletId) { if (!db.LoadLastGcBarrier(LastCollectedGenStep)) { return false; } + if (!db.LoadGCBarrierPreparation(GCBarrierPreparation)) { + return false; + } // Load the keep and delete queues std::vector blobsToKeep; @@ -158,7 +161,7 @@ bool TBlobManager::LoadState(IBlobManagerDb& db, const TTabletId selfTabletId) { THashSet genStepsWithBlobsToKeep; for (const auto& unifiedBlobId : blobsToKeep) { TLogoBlobID blobId = unifiedBlobId.GetLogoBlobId(); - TGenStep genStep{blobId.Generation(), blobId.Step()}; + TGenStep genStep{ blobId.Generation(), blobId.Step() }; Y_ABORT_UNLESS(genStep > LastCollectedGenStep); BlobsToKeep.insert(blobId); @@ -178,7 +181,7 @@ bool TBlobManager::LoadState(IBlobManagerDb& db, const TTabletId selfTabletId) { for (const auto& gs : genStepsWithBlobsToKeep) { AllocatedGenSteps.push_back(new TAllocatedGenStep(gs)); } - AllocatedGenSteps.push_back(new TAllocatedGenStep({CurrentGen, 0})); + AllocatedGenSteps.push_back(new TAllocatedGenStep({ CurrentGen, 0 })); Sort(AllocatedGenSteps.begin(), AllocatedGenSteps.end(), [](const TAllocatedGenStepConstPtr& a, const TAllocatedGenStepConstPtr& b) { return a->GenStep < b->GenStep; @@ -193,12 +196,11 @@ void TBlobManager::PopGCBarriers(const TGenStep gs) { } } -std::vector TBlobManager::FindNewGCBarriers() { - AFL_VERIFY(!CollectGenStepInFlight); +std::deque TBlobManager::FindNewGCBarriers() { TGenStep newCollectGenStep = LastCollectedGenStep; - std::vector result; + std::deque result; if (AllocatedGenSteps.empty()) { - return {TGenStep(CurrentGen, CurrentStep)}; + result.emplace_back(TGenStep(CurrentGen, CurrentStep)); } for (auto& allocated : AllocatedGenSteps) { AFL_VERIFY(allocated->GenStep > newCollectGenStep); @@ -208,152 +210,162 @@ std::vector TBlobManager::FindNewGCBarriers() { result.emplace_back(allocated->GenStep); newCollectGenStep = allocated->GenStep; } + if (result.empty() || LastCollectedGenStep < result.front()) { + result.emplace_front(LastCollectedGenStep); + } return result; } -std::shared_ptr TBlobManager::BuildGCTask(const TString& storageId, - const std::shared_ptr& manager, const std::shared_ptr& sharedBlobsInfo, - const std::shared_ptr& counters) noexcept { - AFL_VERIFY(!CollectGenStepInFlight); - if (BlobsToKeep.empty() && BlobsToDelete.IsEmpty() && LastCollectedGenStep == TGenStep{CurrentGen, CurrentStep}) { - ACFL_DEBUG("event", "TBlobManager::BuildGCTask skip")("current_gen", CurrentGen)("current_step", CurrentStep); - return nullptr; - } - std::vector newCollectGenSteps = FindNewGCBarriers(); +class TBlobManager::TGCContext { +private: + static inline const ui32 channelIdx = BLOB_CHANNEL; + static inline const ui32 BlobsGCCountLimit = 50000; + YDB_ACCESSOR_DEF(NBlobOperations::NBlobStorage::TGCTask::TGCListsByGroup, PerGroupGCListsInFlight); + YDB_ACCESSOR_DEF(TTabletsByBlob, ExtractedToRemoveFromDB); + YDB_ACCESSOR_DEF(std::deque, KeepsToErase); + YDB_READONLY_DEF(std::shared_ptr, SharedBlobsManager); +public: + TGCContext(const std::shared_ptr& sharedBlobsManager) + : SharedBlobsManager(sharedBlobsManager) + { - if (newCollectGenSteps.size()) { - if (AllocatedGenSteps.size()) { - AFL_VERIFY(newCollectGenSteps.front() > LastCollectedGenStep); - } else { - AFL_VERIFY(newCollectGenSteps.front() == LastCollectedGenStep); - } } - PreviousGCTime = AppData()->TimeProvider->Now(); - const ui32 channelIdx = BLOB_CHANNEL; - NBlobOperations::NBlobStorage::TGCTask::TGCListsByGroup perGroupGCListsInFlight; - // Clear all possibly not kept trash in channel's groups: create an event for each group - if (FirstGC) { - FirstGC = false; - + void InitializeFirst(const TIntrusivePtr& tabletInfo) { // TODO: we need only actual channel history here - const auto& channelHistory = TabletInfo->ChannelInfo(channelIdx)->History; + const auto& channelHistory = tabletInfo->ChannelInfo(channelIdx)->History; for (auto it = channelHistory.begin(); it != channelHistory.end(); ++it) { - perGroupGCListsInFlight[it->GroupID]; + PerGroupGCListsInFlight[it->GroupID]; } } - static const ui32 blobsGCCountLimit = 50000; + bool IsFull() const { + return KeepsToErase.size() + ExtractedToRemoveFromDB.GetSize() >= BlobsGCCountLimit; + } - const auto predShared = [&](const TUnifiedBlobId& id, const THashSet& /*tabletIds*/) { - return id.GetLogoBlobId().TabletID() != (ui64)SelfTabletId; - }; + ui64 GetFreeSpace() const { + return IsFull() ? 0 : (BlobsGCCountLimit - ExtractedToRemoveFromDB.GetSize() - KeepsToErase.size()); + } +}; - TTabletsByBlob extractedToRemoveFromDB = BlobsToDelete.ExtractBlobs(predShared, blobsGCCountLimit); - if (extractedToRemoveFromDB.GetSize() >= blobsGCCountLimit) { - newCollectGenSteps.clear(); - } else { - const auto predRemoveOld = [&](const TUnifiedBlobId& id, const THashSet& /*tabletIds*/) { +void TBlobManager::DrainDeleteTo(const TGenStep& dest, TGCContext& gcContext) { + const auto predShared = [&](const TUnifiedBlobId& id, const THashSet& /*tabletIds*/) { + if (id.GetLogoBlobId().TabletID() == (ui64)SelfTabletId) { auto logoBlobId = id.GetLogoBlobId(); - TGenStep genStep{logoBlobId.Generation(), logoBlobId.Step()}; - return genStep < LastCollectedGenStep && id.GetLogoBlobId().TabletID() == (ui64)SelfTabletId; - }; - - TTabletsByBlob extractedOld = BlobsToDelete.ExtractBlobs(predRemoveOld, blobsGCCountLimit - extractedToRemoveFromDB.GetSize()); - extractedToRemoveFromDB.Add(extractedOld); - TTabletId tabletId; - TUnifiedBlobId unifiedBlobId; - while (extractedOld.ExtractFront(tabletId, unifiedBlobId)) { - auto logoBlobId = unifiedBlobId.GetLogoBlobId(); - AFL_DEBUG(NKikimrServices::TX_COLUMNSHARD)("to_delete_gc", logoBlobId); - NBlobOperations::NBlobStorage::TGCTask::TGCLists& gl = perGroupGCListsInFlight[unifiedBlobId.GetDsGroup()]; - if (!sharedBlobsInfo->BuildStoreCategories({ unifiedBlobId }).GetDirect().IsEmpty()) { - BlobsManagerCounters.OnCollectDropExplicit(logoBlobId.BlobSize()); - gl.DontKeepList.insert(logoBlobId); + TGenStep genStep{ logoBlobId.Generation(), logoBlobId.Step() }; + if (dest < genStep) { + return false; } } - if (extractedToRemoveFromDB.GetSize() >= blobsGCCountLimit) { - newCollectGenSteps.clear(); + return true; + }; + TTabletsByBlob extractedOld = BlobsToDelete.ExtractBlobs(predShared, gcContext.GetFreeSpace()); + gcContext.MutableExtractedToRemoveFromDB().Add(extractedOld); + + TTabletId tabletId; + TUnifiedBlobId unifiedBlobId; + while (extractedOld.ExtractFront(tabletId, unifiedBlobId)) { + auto logoBlobId = unifiedBlobId.GetLogoBlobId(); + if (!gcContext.GetSharedBlobsManager()->BuildStoreCategories({ unifiedBlobId }).GetDirect().IsEmpty()) { + AFL_DEBUG(NKikimrServices::TX_COLUMNSHARD)("to_delete_gc", logoBlobId); + NBlobOperations::NBlobStorage::TGCTask::TGCLists& gl = gcContext.MutablePerGroupGCListsInFlight()[unifiedBlobId.GetDsGroup()]; + gl.DontKeepList.insert(logoBlobId); + } else { + AFL_DEBUG(NKikimrServices::TX_COLUMNSHARD)("to_delete_gc", logoBlobId)("skip_reason", "not_direct"); } } +} +void TBlobManager::DrainKeepTo(const TGenStep& dest, TGCContext& gcContext) { + AFL_DEBUG(NKikimrServices::TX_COLUMNSHARD)("event", "PreparePerGroupGCRequests")("gen", std::get<0>(dest))("step", std::get<1>(dest)); + auto keepBlobIt = BlobsToKeep.begin(); + for (; keepBlobIt != BlobsToKeep.end(); ++keepBlobIt) { + TGenStep genStep{ keepBlobIt->Generation(), keepBlobIt->Step() }; + AFL_VERIFY(genStep > LastCollectedGenStep); + if (genStep > dest) { + break; + } + const ui32 blobGroup = TabletInfo->GroupFor(keepBlobIt->Channel(), keepBlobIt->Generation()); + const TUnifiedBlobId keepUnified(blobGroup, *keepBlobIt); + gcContext.MutableKeepsToErase().emplace_back(keepUnified); + if (BlobsToDelete.ExtractBlobTo(keepUnified, gcContext.MutableExtractedToRemoveFromDB())) { + if (keepBlobIt->Generation() == CurrentGen) { + continue; + } + if (gcContext.GetSharedBlobsManager()->BuildStoreCategories({ keepUnified }).GetDirect().IsEmpty()) { + continue; + } + gcContext.MutablePerGroupGCListsInFlight()[blobGroup].DontKeepList.insert(*keepBlobIt); + } else { + gcContext.MutablePerGroupGCListsInFlight()[blobGroup].KeepList.insert(*keepBlobIt); + } + } + BlobsToKeep.erase(BlobsToKeep.begin(), keepBlobIt); +} - std::deque keepsToErase; - for (auto&& newCollectGenStep : newCollectGenSteps) { - AFL_DEBUG(NKikimrServices::TX_COLUMNSHARD)("event", "PreparePerGroupGCRequests")("gen", std::get<0>(newCollectGenStep))("step", std::get<1>(newCollectGenStep)); - BlobsManagerCounters.OnNewCollectStep(std::get<0>(newCollectGenStep), std::get<1>(newCollectGenStep)); +std::shared_ptr TBlobManager::BuildGCTask(const TString& storageId, + const std::shared_ptr& manager, const std::shared_ptr& sharedBlobsInfo, + const std::shared_ptr& counters) noexcept { + AFL_VERIFY(!CollectGenStepInFlight); + if (BlobsToKeep.empty() && BlobsToDelete.IsEmpty() && LastCollectedGenStep == TGenStep{ CurrentGen, CurrentStep }) { + ACFL_DEBUG("event", "TBlobManager::BuildGCTask skip")("current_gen", CurrentGen)("current_step", CurrentStep); + return nullptr; + } - // Make per-group Keep/DontKeep lists + PreviousGCTime = AppData()->TimeProvider->Now(); + TGCContext gcContext(sharedBlobsInfo); + // Clear all possibly not kept trash in channel's groups: create an event for each group + if (FirstGC) { + gcContext.InitializeFirst(TabletInfo); + FirstGC = false; + } - { - // Add all blobs to keep - auto keepBlobIt = BlobsToKeep.begin(); - for (; keepBlobIt != BlobsToKeep.end(); ++keepBlobIt) { - TGenStep genStep{keepBlobIt->Generation(), keepBlobIt->Step()}; - AFL_VERIFY(genStep > LastCollectedGenStep); - if (genStep > newCollectGenStep) { + NActors::TLogContextGuard lGuard = NActors::TLogContextBuilder::Build()("action_id", TGUID::CreateTimebased().AsGuidString()); + const std::deque newCollectGenSteps = FindNewGCBarriers(); + AFL_VERIFY(newCollectGenSteps.size()); + AFL_VERIFY(newCollectGenSteps.front() == LastCollectedGenStep); + + if (GCBarrierPreparation != LastCollectedGenStep) { + if (!std::get<0>(GCBarrierPreparation)) { + for (auto&& newCollectGenStep : newCollectGenSteps) { + DrainKeepTo(newCollectGenStep, gcContext); + CollectGenStepInFlight = std::max(CollectGenStepInFlight.value_or(newCollectGenStep), newCollectGenStep); + if (gcContext.IsFull()) { break; } - ui32 blobGroup = TabletInfo->GroupFor(keepBlobIt->Channel(), keepBlobIt->Generation()); - perGroupGCListsInFlight[blobGroup].KeepList.insert(*keepBlobIt); - keepsToErase.emplace_back(TUnifiedBlobId(blobGroup, *keepBlobIt)); - AFL_DEBUG(NKikimrServices::TX_COLUMNSHARD)("to_keep_gc", *keepBlobIt); } - BlobsToKeep.erase(BlobsToKeep.begin(), keepBlobIt); - BlobsManagerCounters.OnBlobsKeep(BlobsToKeep); - - const auto predSelf = [&](const TUnifiedBlobId& id, const THashSet& /*tabletIds*/) { - auto logoBlobId = id.GetLogoBlobId(); - TGenStep genStep{logoBlobId.Generation(), logoBlobId.Step()}; - return genStep <= newCollectGenStep && id.GetLogoBlobId().TabletID() == (ui64)SelfTabletId; - }; - TTabletsByBlob extractedSelf = BlobsToDelete.ExtractBlobs(predSelf); - extractedToRemoveFromDB.Add(extractedSelf); - TTabletId tabletId; - TUnifiedBlobId unifiedBlobId; - while (extractedSelf.ExtractFront(tabletId, unifiedBlobId)) { - auto logoBlobId = unifiedBlobId.GetLogoBlobId(); - AFL_DEBUG(NKikimrServices::TX_COLUMNSHARD)("to_delete_gc", logoBlobId); - NBlobOperations::NBlobStorage::TGCTask::TGCLists& gl = perGroupGCListsInFlight[unifiedBlobId.GetDsGroup()]; - bool skipDontKeep = sharedBlobsInfo->BuildStoreCategories({ unifiedBlobId }).GetDirect().IsEmpty(); - if (!skipDontKeep && gl.KeepList.erase(logoBlobId)) { - AFL_DEBUG(NKikimrServices::TX_COLUMNSHARD)("to_keep_gc_remove", logoBlobId); - // Skipped blobs still need to be deleted from BlobsToKeep table - if (CurrentGen == logoBlobId.Generation()) { - // If this blob was created and deleted in the current generation then - // we can skip sending both Keep and DontKeep flags. - // NOTE: its not safe to do this for older generations because there is - // a scenario when Keep flag was sent in the old generation and then tablet restarted - // before getting the result and removing the blob from the Keep list. - skipDontKeep = true; - ++CountersUpdate.BlobSkippedEntries; - } - } - if (!skipDontKeep) { - BlobsManagerCounters.OnCollectDropExplicit(logoBlobId.BlobSize()); - gl.DontKeepList.insert(logoBlobId); - } else { - BlobsManagerCounters.OnCollectDropImplicit(logoBlobId.BlobSize()); - } - } - BlobsManagerCounters.OnBlobsDelete(BlobsToDelete); + DrainDeleteTo(*CollectGenStepInFlight, gcContext); + } else { + AFL_VERIFY(std::get<0>(GCBarrierPreparation) != CurrentGen); + AFL_VERIFY(LastCollectedGenStep <= GCBarrierPreparation); + CollectGenStepInFlight = std::max(GCBarrierPreparation, LastCollectedGenStep); + DrainKeepTo(std::max(GCBarrierPreparation, LastCollectedGenStep), gcContext); + DrainDeleteTo(*CollectGenStepInFlight, gcContext); } - CollectGenStepInFlight = newCollectGenStep; - if (extractedToRemoveFromDB.GetSize() + keepsToErase.size() > blobsGCCountLimit) { - AFL_WARN(NKikimrServices::TX_COLUMNSHARD)("event", "a lot of blobs to gc")("to_remove", extractedToRemoveFromDB.GetSize())("keeps_to_erase", keepsToErase.size())("limit", blobsGCCountLimit); - break; + } + if (!gcContext.IsFull()) { + for (auto&& newCollectGenStep : newCollectGenSteps) { + DrainKeepTo(newCollectGenStep, gcContext); + DrainDeleteTo(newCollectGenStep, gcContext); + CollectGenStepInFlight = std::max(CollectGenStepInFlight.value_or(newCollectGenStep), newCollectGenStep); + if (gcContext.IsFull()) { + break; + } } } + if (CollectGenStepInFlight) { PopGCBarriers(*CollectGenStepInFlight); + AFL_DEBUG(NKikimrServices::TX_COLUMNSHARD)("notice", "collect_gen_step")("value", *CollectGenStepInFlight)("current_gen", CurrentGen); } else { CollectGenStepInFlight = LastCollectedGenStep; + AFL_WARN(NKikimrServices::TX_COLUMNSHARD)("warning", "restore_collect_gen_step")("value", LastCollectedGenStep)("current_gen", CurrentGen); } - auto removeCategories = sharedBlobsInfo->BuildRemoveCategories(std::move(extractedToRemoveFromDB)); + auto removeCategories = sharedBlobsInfo->BuildRemoveCategories(std::move(gcContext.MutableExtractedToRemoveFromDB())); - auto result = std::make_shared(storageId, std::move(perGroupGCListsInFlight), *CollectGenStepInFlight, - std::move(keepsToErase), manager, std::move(removeCategories), counters, TabletInfo->TabletID, CurrentGen); + auto result = std::make_shared(storageId, std::move(gcContext.MutablePerGroupGCListsInFlight()), *CollectGenStepInFlight, + std::move(gcContext.MutableKeepsToErase()), manager, std::move(removeCategories), counters, TabletInfo->TabletID, CurrentGen); if (result->IsEmpty()) { CollectGenStepInFlight = {}; return nullptr; @@ -363,9 +375,9 @@ std::shared_ptr TBlobManager::BuildGCTas TBlobBatch TBlobManager::StartBlobBatch(ui32 channel) { ++CountersUpdate.BatchesStarted; - Y_ABORT_UNLESS(channel == BLOB_CHANNEL, "Support for mutiple blob channels is not implemented yet"); + Y_ABORT_UNLESS(channel == BLOB_CHANNEL, "Support for multiple blob channels is not implemented yet"); ++CurrentStep; - TAllocatedGenStepConstPtr genStepRef = new TAllocatedGenStep({CurrentGen, CurrentStep}); + TAllocatedGenStepConstPtr genStepRef = new TAllocatedGenStep({ CurrentGen, CurrentStep }); AllocatedGenSteps.push_back(genStepRef); auto batchInfo = std::make_unique(TabletInfo, genStepRef, channel, BlobsManagerCounters); return TBlobBatch(std::move(batchInfo)); @@ -382,9 +394,9 @@ void TBlobManager::DoSaveBlobBatch(TBlobBatch&& blobBatch, IBlobManagerDb& db) { // Add this batch to KeepQueue TGenStep edgeGenStep = EdgeGenStep(); - for (auto&& blobId: blobBatch.BatchInfo->GetBlobIds()) { + for (auto&& blobId : blobBatch.BatchInfo->GetBlobIds()) { auto logoBlobId = blobId.GetLogoBlobId(); - TGenStep genStep{logoBlobId.Generation(), logoBlobId.Step()}; + TGenStep genStep{ logoBlobId.Generation(), logoBlobId.Step() }; AFL_VERIFY(genStep > edgeGenStep)("gen_step", genStep)("edge_gen_step", edgeGenStep)("blob_id", blobId.ToStringNew()); AFL_DEBUG(NKikimrServices::TX_COLUMNSHARD)("to_keep", logoBlobId.ToString()); @@ -431,6 +443,14 @@ void TBlobManager::OnGCFinishedOnComplete(const TGenStep& genStep) { CollectGenStepInFlight.reset(); } +void TBlobManager::OnGCStartOnExecute(const TGenStep& genStep, IBlobManagerDb& db) { + db.SaveGCBarrierPreparation(genStep); +} + +void TBlobManager::OnGCStartOnComplete(const TGenStep& genStep) { + GCBarrierPreparation = genStep; +} + void TBlobManager::OnBlobFree(const TUnifiedBlobId& blobId) { AFL_DEBUG(NKikimrServices::TX_COLUMNSHARD)("event", "blob_free")("blob_id", blobId); // Check if the blob is marked for delayed deletion diff --git a/ydb/core/tx/columnshard/blobs_action/bs/blob_manager.h b/ydb/core/tx/columnshard/blobs_action/bs/blob_manager.h index c1228ea97c76..4a3e3044c64c 100644 --- a/ydb/core/tx/columnshard/blobs_action/bs/blob_manager.h +++ b/ydb/core/tx/columnshard/blobs_action/bs/blob_manager.h @@ -125,6 +125,7 @@ class TBlobManager : public IBlobManager, public TCommonBlobsTracker { static constexpr ui64 GC_INTERVAL_SECONDS_DEFAULT = 60; private: + class TGCContext; const TTabletId SelfTabletId; TIntrusivePtr TabletInfo; const ui32 CurrentGen; @@ -145,6 +146,7 @@ class TBlobManager : public IBlobManager, public TCommonBlobsTracker { // The Gen:Step that has been acknowledged by the Distributed Storage TGenStep LastCollectedGenStep = {0, 0}; + TGenStep GCBarrierPreparation = { 0, 0 }; // The barrier in the current in-flight GC request(s) bool FirstGC = true; @@ -158,6 +160,8 @@ class TBlobManager : public IBlobManager, public TCommonBlobsTracker { TInstant PreviousGCTime; // Used for delaying next GC if there are too few blobs to collect virtual void DoSaveBlobBatch(TBlobBatch&& blobBatch, IBlobManagerDb& db) override; + void DrainDeleteTo(const TGenStep& dest, TGCContext& gcContext); + void DrainKeepTo(const TGenStep& dest, TGCContext& gcContext); public: TBlobManager(TIntrusivePtr tabletInfo, const ui32 gen, const TTabletId selfTabletId); @@ -198,6 +202,9 @@ class TBlobManager : public IBlobManager, public TCommonBlobsTracker { void OnGCFinishedOnExecute(const TGenStep& genStep, IBlobManagerDb& db); void OnGCFinishedOnComplete(const TGenStep& genStep); + void OnGCStartOnExecute(const TGenStep& genStep, IBlobManagerDb& db); + void OnGCStartOnComplete(const TGenStep& genStep); + TBlobManagerCounters GetCountersUpdate() { TBlobManagerCounters res = CountersUpdate; CountersUpdate = TBlobManagerCounters(); @@ -209,7 +216,7 @@ class TBlobManager : public IBlobManager, public TCommonBlobsTracker { virtual void DeleteBlobOnExecute(const TTabletId tabletId, const TUnifiedBlobId& blobId, IBlobManagerDb& db) override; virtual void DeleteBlobOnComplete(const TTabletId tabletId, const TUnifiedBlobId& blobId) override; private: - std::vector FindNewGCBarriers(); + std::deque FindNewGCBarriers(); void PopGCBarriers(const TGenStep gs); void PopGCBarriers(const ui32 count); diff --git a/ydb/core/tx/columnshard/blobs_action/bs/gc.cpp b/ydb/core/tx/columnshard/blobs_action/bs/gc.cpp index 4ee6598a206d..6e67a21223ac 100644 --- a/ydb/core/tx/columnshard/blobs_action/bs/gc.cpp +++ b/ydb/core/tx/columnshard/blobs_action/bs/gc.cpp @@ -22,6 +22,15 @@ bool TGCTask::DoOnCompleteTxAfterCleaning(NColumnShard::TColumnShard& /*self*/, return true; } +void TGCTask::DoOnExecuteTxBeforeCleaning(NColumnShard::TColumnShard& /*self*/, TBlobManagerDb& dbBlobs) { + Manager->OnGCStartOnExecute(CollectGenStepInFlight, dbBlobs); +} + +bool TGCTask::DoOnCompleteTxBeforeCleaning(NColumnShard::TColumnShard& /*self*/, const std::shared_ptr& /*taskAction*/) { + Manager->OnGCStartOnComplete(CollectGenStepInFlight); + return true; +} + TGCTask::TGCTask(const TString& storageId, TGCListsByGroup&& listsByGroupId, const TGenStep& collectGenStepInFlight, std::deque&& keepsToErase, const std::shared_ptr& manager, TBlobsCategories&& blobsToRemove, const std::shared_ptr& counters, const ui64 tabletId, const ui64 currentGen) @@ -50,7 +59,10 @@ std::unique_ptr TGCTask::BuildRequest(const u const ui32 channelIdx = IBlobManager::BLOB_CHANNEL; auto it = ListsByGroupId.find(groupId); AFL_VERIFY(it != ListsByGroupId.end()); - AFL_VERIFY(++it->second.RequestsCount < 10); + AFL_VERIFY(++it->second.RequestsCount < 10)("event", "build_gc_request")("group_id", groupId)("current_gen", CurrentGen)("gen", CollectGenStepInFlight) + ("count", it->second.RequestsCount); + AFL_DEBUG(NKikimrServices::TX_COLUMNSHARD)("event", "build_gc_request")("group_id", groupId)("current_gen", CurrentGen)("gen", CollectGenStepInFlight) + ("count", it->second.RequestsCount); auto result = std::make_unique( TabletId, CurrentGen, PerGenerationCounter.Val(), channelIdx, true, diff --git a/ydb/core/tx/columnshard/blobs_action/bs/gc.h b/ydb/core/tx/columnshard/blobs_action/bs/gc.h index 86f348fe9fcd..1511f03a10b9 100644 --- a/ydb/core/tx/columnshard/blobs_action/bs/gc.h +++ b/ydb/core/tx/columnshard/blobs_action/bs/gc.h @@ -29,6 +29,10 @@ class TGCTask: public IBlobsGCAction { virtual void RemoveBlobIdFromDB(const TTabletId tabletId, const TUnifiedBlobId& blobId, TBlobManagerDb& dbBlobs) override; virtual void DoOnExecuteTxAfterCleaning(NColumnShard::TColumnShard& self, TBlobManagerDb& dbBlobs) override; virtual bool DoOnCompleteTxAfterCleaning(NColumnShard::TColumnShard& self, const std::shared_ptr& taskAction) override; + + virtual void DoOnExecuteTxBeforeCleaning(NColumnShard::TColumnShard& self, TBlobManagerDb& dbBlobs) override; + virtual bool DoOnCompleteTxBeforeCleaning(NColumnShard::TColumnShard& self, const std::shared_ptr& taskAction) override; + virtual bool DoIsEmpty() const override { return false; } @@ -41,6 +45,10 @@ class TGCTask: public IBlobsGCAction { return ListsByGroupId; } + ui64 GetTabletId() const { + return TabletId; + } + bool IsFinished() const { return ListsByGroupId.empty(); } diff --git a/ydb/core/tx/columnshard/blobs_action/bs/gc_actor.h b/ydb/core/tx/columnshard/blobs_action/bs/gc_actor.h index b2ebcfd35368..8324ff758170 100644 --- a/ydb/core/tx/columnshard/blobs_action/bs/gc_actor.h +++ b/ydb/core/tx/columnshard/blobs_action/bs/gc_actor.h @@ -28,7 +28,8 @@ class TGarbageCollectionActor: public TSharedBlobsCollectionActorGetActionGuid()); + NActors::TLogContextGuard logGuard = NActors::TLogContextBuilder::Build(NKikimrServices::TX_COLUMNSHARD) + ("action_id", GCTask->GetActionGuid())("tablet_id", GCTask->GetTabletId()); switch (ev->GetTypeRewrite()) { hFunc(TEvBlobStorage::TEvCollectGarbageResult, Handle); default: diff --git a/ydb/core/tx/columnshard/blobs_action/bs/storage.cpp b/ydb/core/tx/columnshard/blobs_action/bs/storage.cpp index 80a9c356ec40..7eb21e83db40 100644 --- a/ydb/core/tx/columnshard/blobs_action/bs/storage.cpp +++ b/ydb/core/tx/columnshard/blobs_action/bs/storage.cpp @@ -20,7 +20,14 @@ std::shared_ptr TOperator::DoStartReadingAc return std::make_shared(GetStorageId(), BlobCacheActorId); } -std::shared_ptr TOperator::DoStartGCAction(const std::shared_ptr& counters) const { +void TOperator::DoStartGCAction(const std::shared_ptr& action) const { + auto gcTask = dynamic_pointer_cast(action); + AFL_VERIFY(!!gcTask); + AFL_DEBUG(NKikimrServices::TX_COLUMNSHARD)("event", "StartGC")("requests_count", gcTask->GetListsByGroupId().size()); + TActorContext::AsActorContext().Register(new TGarbageCollectionActor(gcTask, TabletActorId, GetSelfTabletId())); +} + +std::shared_ptr TOperator::DoCreateGCAction(const std::shared_ptr& counters) const { auto gcTask = Manager->BuildGCTask(GetStorageId(), Manager, GetSharedBlobs(), counters); if (!gcTask) { AFL_DEBUG(NKikimrServices::TX_COLUMNSHARD)("event", "StartGCSkipped"); @@ -28,8 +35,6 @@ std::shared_ptr TOperator::DoStartGCAction(const std::shared_ptr } else { AFL_VERIFY(!gcTask->IsEmpty()); } - AFL_DEBUG(NKikimrServices::TX_COLUMNSHARD)("event", "StartGC")("requests_count", gcTask->GetListsByGroupId().size()); - TActorContext::AsActorContext().Register(new TGarbageCollectionActor(gcTask, TabletActorId, GetSelfTabletId())); return gcTask; } diff --git a/ydb/core/tx/columnshard/blobs_action/bs/storage.h b/ydb/core/tx/columnshard/blobs_action/bs/storage.h index 230b703020b8..de22b03d8183 100644 --- a/ydb/core/tx/columnshard/blobs_action/bs/storage.h +++ b/ydb/core/tx/columnshard/blobs_action/bs/storage.h @@ -18,7 +18,8 @@ class TOperator: public IBlobsStorageOperator { virtual std::shared_ptr DoStartDeclareRemovingAction(const std::shared_ptr& counters) override; virtual std::shared_ptr DoStartWritingAction() override; virtual std::shared_ptr DoStartReadingAction() override; - virtual std::shared_ptr DoStartGCAction(const std::shared_ptr& counters) const override; + virtual void DoStartGCAction(const std::shared_ptr& action) const override; + virtual std::shared_ptr DoCreateGCAction(const std::shared_ptr& counters) const override; virtual bool DoLoad(IBlobManagerDb& dbBlobs) override { return Manager->LoadState(dbBlobs, GetSelfTabletId()); } diff --git a/ydb/core/tx/columnshard/blobs_action/tier/gc.h b/ydb/core/tx/columnshard/blobs_action/tier/gc.h index bb97d0350200..e43e69c6aa94 100644 --- a/ydb/core/tx/columnshard/blobs_action/tier/gc.h +++ b/ydb/core/tx/columnshard/blobs_action/tier/gc.h @@ -16,6 +16,12 @@ class TGCTask: public IBlobsGCAction { protected: virtual void DoOnExecuteTxAfterCleaning(NColumnShard::TColumnShard& self, TBlobManagerDb& dbBlobs) override; virtual bool DoOnCompleteTxAfterCleaning(NColumnShard::TColumnShard& self, const std::shared_ptr& taskAction) override; + virtual void DoOnExecuteTxBeforeCleaning(NColumnShard::TColumnShard& /*self*/, TBlobManagerDb& /*dbBlobs*/) override { + + } + virtual bool DoOnCompleteTxBeforeCleaning(NColumnShard::TColumnShard& /*self*/, const std::shared_ptr& /*taskAction*/) override { + return true; + } virtual void RemoveBlobIdFromDB(const TTabletId tabletId, const TUnifiedBlobId& blobId, TBlobManagerDb& dbBlobs) override; virtual bool DoIsEmpty() const override { return DraftBlobIds.empty(); diff --git a/ydb/core/tx/columnshard/blobs_action/tier/storage.cpp b/ydb/core/tx/columnshard/blobs_action/tier/storage.cpp index 36d273111eff..7990eb66c389 100644 --- a/ydb/core/tx/columnshard/blobs_action/tier/storage.cpp +++ b/ydb/core/tx/columnshard/blobs_action/tier/storage.cpp @@ -27,7 +27,7 @@ std::shared_ptr TOperator::DoStartReadingAction() { return std::make_shared(GetStorageId(), GetCurrentOperator()); } -std::shared_ptr TOperator::DoStartGCAction(const std::shared_ptr& counters) const { +std::shared_ptr TOperator::DoCreateGCAction(const std::shared_ptr& counters) const { std::deque draftBlobIds; AFL_VERIFY(!!TabletActorId); TBlobsCategories categories(TTabletId(0)); @@ -44,10 +44,15 @@ std::shared_ptr TOperator::DoStartGCAction(const std::shared_ptr AFL_DEBUG(NKikimrServices::TX_COLUMNSHARD)("event", "start_gc_skipped")("reason", "task_empty"); return nullptr; } - TActorContext::AsActorContext().Register(new TGarbageCollectionActor(gcTask, TabletActorId, GetSelfTabletId())); return gcTask; } +void TOperator::DoStartGCAction(const std::shared_ptr& action) const { + auto gcTask = dynamic_pointer_cast(action); + AFL_VERIFY(!!gcTask); + TActorContext::AsActorContext().Register(new TGarbageCollectionActor(gcTask, TabletActorId, GetSelfTabletId())); +} + void TOperator::InitNewExternalOperator(const NColumnShard::NTiers::TManager* tierManager) { NKikimrSchemeOp::TS3Settings settings; if (tierManager) { diff --git a/ydb/core/tx/columnshard/blobs_action/tier/storage.h b/ydb/core/tx/columnshard/blobs_action/tier/storage.h index f149faa4a1ad..0dc8c3005296 100644 --- a/ydb/core/tx/columnshard/blobs_action/tier/storage.h +++ b/ydb/core/tx/columnshard/blobs_action/tier/storage.h @@ -30,7 +30,8 @@ class TOperator: public IBlobsStorageOperator { virtual std::shared_ptr DoStartDeclareRemovingAction(const std::shared_ptr& counters) override; virtual std::shared_ptr DoStartWritingAction() override; virtual std::shared_ptr DoStartReadingAction() override; - virtual std::shared_ptr DoStartGCAction(const std::shared_ptr& counters) const override; + virtual std::shared_ptr DoCreateGCAction(const std::shared_ptr& counters) const override; + virtual void DoStartGCAction(const std::shared_ptr& action) const override; virtual bool DoLoad(IBlobManagerDb& dbBlobs) override; virtual void DoOnTieringModified(const std::shared_ptr& tiers) override; diff --git a/ydb/core/tx/columnshard/blobs_action/transaction/tx_gc_indexed.cpp b/ydb/core/tx/columnshard/blobs_action/transaction/tx_gc_indexed.cpp index 73baa553cab9..d29aa82d7db9 100644 --- a/ydb/core/tx/columnshard/blobs_action/transaction/tx_gc_indexed.cpp +++ b/ydb/core/tx/columnshard/blobs_action/transaction/tx_gc_indexed.cpp @@ -14,4 +14,18 @@ void TTxGarbageCollectionFinished::Complete(const TActorContext& /*ctx*/) { Action->OnCompleteTxAfterCleaning(*Self, Action); } +bool TTxGarbageCollectionStart::Execute(TTransactionContext& txc, const TActorContext& /*ctx*/) { + TMemoryProfileGuard mpg("TTxGarbageCollectionStart::Execute"); + AFL_DEBUG(NKikimrServices::TX_COLUMNSHARD)("tx", "TTxGarbageCollectionStart")("event", "execute"); + NOlap::TBlobManagerDb blobManagerDb(txc.DB); + Action->OnExecuteTxBeforeCleaning(*Self, blobManagerDb); + return true; +} +void TTxGarbageCollectionStart::Complete(const TActorContext& /*ctx*/) { + TMemoryProfileGuard mpg("TTxGarbageCollectionStart::Complete"); + AFL_DEBUG(NKikimrServices::TX_COLUMNSHARD)("tx", "TTxGarbageCollectionStart")("event", "complete"); + Action->OnCompleteTxBeforeCleaning(*Self, Action); + Operator->StartGC(Action); +} + } diff --git a/ydb/core/tx/columnshard/blobs_action/transaction/tx_gc_indexed.h b/ydb/core/tx/columnshard/blobs_action/transaction/tx_gc_indexed.h index 84846ff318d2..15756a6d4c32 100644 --- a/ydb/core/tx/columnshard/blobs_action/transaction/tx_gc_indexed.h +++ b/ydb/core/tx/columnshard/blobs_action/transaction/tx_gc_indexed.h @@ -17,5 +17,21 @@ class TTxGarbageCollectionFinished: public TTransactionBase { TTxType GetTxType() const override { return TXTYPE_GC_FINISHED; } }; +class TTxGarbageCollectionStart: public TTransactionBase { +private: + std::shared_ptr Action; + std::shared_ptr Operator; +public: + TTxGarbageCollectionStart(TColumnShard* self, const std::shared_ptr& action, const std::shared_ptr& op) + : TBase(self) + , Action(action) + , Operator(op) + { + } + + virtual bool Execute(TTransactionContext& txc, const TActorContext& ctx) override; + virtual void Complete(const TActorContext& ctx) override; + TTxType GetTxType() const override { return TXTYPE_GC_START; } +}; } diff --git a/ydb/core/tx/columnshard/columnshard_impl.cpp b/ydb/core/tx/columnshard/columnshard_impl.cpp index 5bd3586b8041..c357dcb68684 100644 --- a/ydb/core/tx/columnshard/columnshard_impl.cpp +++ b/ydb/core/tx/columnshard/columnshard_impl.cpp @@ -836,7 +836,11 @@ void TColumnShard::SetupGC() { return; } for (auto&& i : StoragesManager->GetStorages()) { - i.second->StartGC(); + auto gcTask = i.second->CreateGC(); + if (!gcTask) { + continue; + } + Execute(new TTxGarbageCollectionStart(this, gcTask, i.second), NActors::TActivationContext::AsActorContext()); } } diff --git a/ydb/core/tx/columnshard/columnshard_schema.h b/ydb/core/tx/columnshard/columnshard_schema.h index 8bba15f16276..7777edd5216c 100644 --- a/ydb/core/tx/columnshard/columnshard_schema.h +++ b/ydb/core/tx/columnshard/columnshard_schema.h @@ -81,6 +81,8 @@ struct Schema : NIceDb::Schema { LastCompletedStep = 13, LastCompletedTxId = 14, LastNormalizerSequentialId = 15, + GCBarrierPreparationGen = 16, + GCBarrierPreparationStep = 17 }; enum class EInsertTableIds : ui8 { diff --git a/ydb/core/tx/columnshard/data_sharing/manager/shared_blobs.h b/ydb/core/tx/columnshard/data_sharing/manager/shared_blobs.h index c9cacb603aeb..f04f094165d2 100644 --- a/ydb/core/tx/columnshard/data_sharing/manager/shared_blobs.h +++ b/ydb/core/tx/columnshard/data_sharing/manager/shared_blobs.h @@ -95,7 +95,7 @@ class TStorageSharedBlobsManager { shared = true; } } - AFL_VERIFY((borrowed ? 1 : 0) + (direct ? 1 : 0) + (shared ? 1 : 0) == 1)("b", borrowed)("d", direct)("s", shared); + AFL_VERIFY((borrowed ? 1 : 0) + (direct ? 1 : 0) + (shared ? 1 : 0) == 1)("b", borrowed)("d", direct)("s", shared)("blob_id", i.ToStringNew()); } return result; }