diff --git a/ydb/core/protos/config.proto b/ydb/core/protos/config.proto index 5db4d3087755..8cb9cf5442a4 100644 --- a/ydb/core/protos/config.proto +++ b/ydb/core/protos/config.proto @@ -1516,6 +1516,7 @@ message TColumnShardConfig { optional TIndexMetadataMemoryLimit IndexMetadataMemoryLimit = 12; optional bool CleanupEnabled = 13 [default = true]; + optional uint32 MaxInFlightIntervalsOnRequest = 16; } message TSchemeShardConfig { diff --git a/ydb/core/tx/columnshard/background_controller.h b/ydb/core/tx/columnshard/background_controller.h index b88e1be3bcea..bb38f2744061 100644 --- a/ydb/core/tx/columnshard/background_controller.h +++ b/ydb/core/tx/columnshard/background_controller.h @@ -17,6 +17,7 @@ class TBackgroundController { bool ActiveCleanupPortions = false; bool ActiveCleanupTables = false; + bool ActiveCleanupInsertTable = false; YDB_READONLY(TMonotonic, LastIndexationInstant, TMonotonic::Zero()); public: THashSet GetConflictTTLPortions() const; @@ -66,6 +67,18 @@ class TBackgroundController { bool IsCleanupTablesActive() const { return ActiveCleanupTables; } + + void StartCleanupInsertTable() { + Y_ABORT_UNLESS(!ActiveCleanupInsertTable); + ActiveCleanupInsertTable = true; + } + void FinishCleanupInsertTable() { + Y_ABORT_UNLESS(ActiveCleanupInsertTable); + ActiveCleanupInsertTable = false; + } + bool IsCleanupInsertTableActive() const { + return ActiveCleanupInsertTable; + } }; } diff --git a/ydb/core/tx/columnshard/blobs_action/abstract/blob_set.h b/ydb/core/tx/columnshard/blobs_action/abstract/blob_set.h index 7692dad22eb2..756f0edb0a6f 100644 --- a/ydb/core/tx/columnshard/blobs_action/abstract/blob_set.h +++ b/ydb/core/tx/columnshard/blobs_action/abstract/blob_set.h @@ -89,7 +89,18 @@ class TTabletsByBlob { bool IsEmpty() const { return Data.empty(); } - + using TGenStep = std::tuple; + std::deque GroupByGenStep() const { + std::deque result; + for (const auto& i : Data) { + result.emplace_back(i.first); + } + const auto pred = [](const TUnifiedBlobId& l, const TUnifiedBlobId& r) { + return TGenStep(l.GetLogoBlobId().Generation(), l.GetLogoBlobId().Step()) < TGenStep(r.GetLogoBlobId().Generation(), r.GetLogoBlobId().Step()); + }; + std::sort(result.begin(), result.end(), pred); + return result; + } template TTabletsByBlob ExtractBlobs(const TFilter& filter, const std::optional countLimit = {}) { TTabletsByBlob result; diff --git a/ydb/core/tx/columnshard/blobs_action/bs/blob_manager.cpp b/ydb/core/tx/columnshard/blobs_action/bs/blob_manager.cpp index ed3810ae1edf..84fe2768d5fd 100644 --- a/ydb/core/tx/columnshard/blobs_action/bs/blob_manager.cpp +++ b/ydb/core/tx/columnshard/blobs_action/bs/blob_manager.cpp @@ -211,7 +211,7 @@ std::vector TBlobManager::FindNewGCBarriers() { return result; } -std::shared_ptr TBlobManager::BuildGCTask(const TString& storageId, +std::shared_ptr TBlobManager::BuildGCTask(const TString& storageId, const std::shared_ptr& manager, const std::shared_ptr& sharedBlobsInfo, const std::shared_ptr& counters) noexcept { AFL_VERIFY(!CollectGenStepInFlight); @@ -244,7 +244,7 @@ std::shared_ptr TBlobManager::BuildGCTas } } - static const ui32 blobsGCCountLimit = 50000; + static const ui32 blobsGCCountLimit = 500000; const auto predShared = [&](const TUnifiedBlobId& id, const THashSet& /*tabletIds*/) { return id.GetLogoBlobId().TabletID() != (ui64)SelfTabletId; @@ -262,9 +262,8 @@ std::shared_ptr TBlobManager::BuildGCTas TTabletsByBlob extractedOld = BlobsToDelete.ExtractBlobs(predRemoveOld, blobsGCCountLimit - extractedToRemoveFromDB.GetSize()); extractedToRemoveFromDB.Add(extractedOld); - TTabletId tabletId; - TUnifiedBlobId unifiedBlobId; - while (extractedOld.ExtractFront(tabletId, unifiedBlobId)) { + for (TTabletsByBlob::TIterator itExtractedOld(extractedOld); itExtractedOld.IsValid(); ++itExtractedOld) { + const TUnifiedBlobId unifiedBlobId = itExtractedOld.GetBlobId(); auto logoBlobId = unifiedBlobId.GetLogoBlobId(); AFL_DEBUG(NKikimrServices::TX_COLUMNSHARD)("to_delete_gc", logoBlobId); NBlobOperations::NBlobStorage::TGCTask::TGCLists& gl = perGroupGCListsInFlight[unifiedBlobId.GetDsGroup()]; @@ -278,6 +277,10 @@ std::shared_ptr TBlobManager::BuildGCTas std::deque keepsToErase; + std::deque deleteIndex; + if (extractedToRemoveFromDB.GetSize() + keepsToErase.size() < blobsGCCountLimit) { + deleteIndex = BlobsToDelete.GroupByGenStep(); + } for (auto&& newCollectGenStep : newCollectGenSteps) { AFL_DEBUG(NKikimrServices::TX_COLUMNSHARD)("event", "PreparePerGroupGCRequests")("gen", std::get<0>(newCollectGenStep))("step", std::get<1>(newCollectGenStep)); BlobsManagerCounters.OnNewCollectStep(std::get<0>(newCollectGenStep), std::get<1>(newCollectGenStep)); @@ -297,20 +300,38 @@ std::shared_ptr TBlobManager::BuildGCTas perGroupGCListsInFlight[blobGroup].KeepList.insert(*keepBlobIt); keepsToErase.emplace_back(TUnifiedBlobId(blobGroup, *keepBlobIt)); AFL_DEBUG(NKikimrServices::TX_COLUMNSHARD)("to_keep_gc", *keepBlobIt); + if (extractedToRemoveFromDB.GetSize() + keepsToErase.size() > blobsGCCountLimit) { + AFL_WARN(NKikimrServices::TX_COLUMNSHARD)("event", "a lot of blobs to gc")("to_remove", extractedToRemoveFromDB.GetSize())("keeps_to_erase", keepsToErase.size())("limit", blobsGCCountLimit); + break; + } + } + if (extractedToRemoveFromDB.GetSize() + keepsToErase.size() > blobsGCCountLimit) { + AFL_WARN(NKikimrServices::TX_COLUMNSHARD)("event", "a lot of blobs to gc")("to_remove", extractedToRemoveFromDB.GetSize())("keeps_to_erase", keepsToErase.size())("limit", blobsGCCountLimit); + break; } BlobsToKeep.erase(BlobsToKeep.begin(), keepBlobIt); BlobsManagerCounters.OnBlobsKeep(BlobsToKeep); - const auto predSelf = [&](const TUnifiedBlobId& id, const THashSet& /*tabletIds*/) { - auto logoBlobId = id.GetLogoBlobId(); - TGenStep genStep{logoBlobId.Generation(), logoBlobId.Step()}; - return genStep <= newCollectGenStep && id.GetLogoBlobId().TabletID() == (ui64)SelfTabletId; - }; - TTabletsByBlob extractedSelf = BlobsToDelete.ExtractBlobs(predSelf); + TTabletsByBlob extractedSelf; + { + while (deleteIndex.size()) { + const auto& blobId = deleteIndex.front().GetLogoBlobId(); + if (newCollectGenStep < TGenStep(blobId.Generation(), blobId.Step())) { + break; + } + BlobsToDelete.ExtractBlobTo(deleteIndex.front(), extractedSelf); + if (extractedToRemoveFromDB.GetSize() + extractedSelf.GetSize() + keepsToErase.size() > blobsGCCountLimit) { + AFL_WARN(NKikimrServices::TX_COLUMNSHARD)("event", "a lot of blobs to gc")("to_remove", extractedToRemoveFromDB.GetSize())("keeps_to_erase", keepsToErase.size())("limit", blobsGCCountLimit); + break; + } + deleteIndex.pop_front(); + } + + } + extractedToRemoveFromDB.Add(extractedSelf); - TTabletId tabletId; - TUnifiedBlobId unifiedBlobId; - while (extractedSelf.ExtractFront(tabletId, unifiedBlobId)) { + for (TTabletsByBlob::TIterator itExtractedSelf(extractedSelf); itExtractedSelf.IsValid(); ++itExtractedSelf) { + const TUnifiedBlobId unifiedBlobId = itExtractedSelf.GetBlobId(); auto logoBlobId = unifiedBlobId.GetLogoBlobId(); AFL_DEBUG(NKikimrServices::TX_COLUMNSHARD)("to_delete_gc", logoBlobId); NBlobOperations::NBlobStorage::TGCTask::TGCLists& gl = perGroupGCListsInFlight[unifiedBlobId.GetDsGroup()]; @@ -359,6 +380,8 @@ std::shared_ptr TBlobManager::BuildGCTas return result; } + + TBlobBatch TBlobManager::StartBlobBatch(ui32 channel) { ++CountersUpdate.BatchesStarted; Y_ABORT_UNLESS(channel == BLOB_CHANNEL, "Support for mutiple blob channels is not implemented yet"); diff --git a/ydb/core/tx/columnshard/blobs_action/transaction/tx_gc_insert_table.cpp b/ydb/core/tx/columnshard/blobs_action/transaction/tx_gc_insert_table.cpp index ece65128719a..15a05e7108a7 100644 --- a/ydb/core/tx/columnshard/blobs_action/transaction/tx_gc_insert_table.cpp +++ b/ydb/core/tx/columnshard/blobs_action/transaction/tx_gc_insert_table.cpp @@ -30,7 +30,8 @@ void TTxInsertTableCleanup::Complete(const TActorContext& /*ctx*/) { Y_ABORT_UNLESS(BlobsAction); BlobsAction->OnCompleteTxAfterRemoving(true); - Self->EnqueueBackgroundActivities(); + Self->BackgroundController.FinishCleanupInsertTable(); + Self->SetupCleanupInsertTable(); } } diff --git a/ydb/core/tx/columnshard/columnshard_impl.cpp b/ydb/core/tx/columnshard/columnshard_impl.cpp index dd7aaf540533..64b33bb519f1 100644 --- a/ydb/core/tx/columnshard/columnshard_impl.cpp +++ b/ydb/core/tx/columnshard/columnshard_impl.cpp @@ -845,6 +845,10 @@ void TColumnShard::Handle(TEvPrivate::TEvGarbageCollectionFinished::TPtr& ev, co } void TColumnShard::SetupCleanupInsertTable() { + if (BackgroundController.IsCleanupInsertTableActive()) { + ACFL_DEBUG("background", "cleanup_insert_table")("skip_reason", "in_progress"); + return; + } auto writeIdsToCleanup = InsertTable->OldWritesToAbort(AppData()->TimeProvider->Now()); if (!InsertTable->GetAborted().size() && !writeIdsToCleanup.size()) { @@ -852,6 +856,7 @@ void TColumnShard::SetupCleanupInsertTable() { } AFL_INFO(NKikimrServices::TX_COLUMNSHARD)("event", "cleanup_started")("aborted", InsertTable->GetAborted().size())("to_cleanup", writeIdsToCleanup.size()); + BackgroundController.StartCleanupInsertTable(); Execute(new TTxInsertTableCleanup(this, std::move(writeIdsToCleanup)), TActorContext::AsActorContext()); } diff --git a/ydb/core/tx/columnshard/engines/column_engine.cpp b/ydb/core/tx/columnshard/engines/column_engine.cpp index d6f46742093c..288be294731e 100644 --- a/ydb/core/tx/columnshard/engines/column_engine.cpp +++ b/ydb/core/tx/columnshard/engines/column_engine.cpp @@ -9,15 +9,16 @@ const std::shared_ptr& IColumnEngine::GetReplaceKey() const { } ui64 IColumnEngine::GetMetadataLimit() { + static const auto MemoryTotal = NSystemInfo::TotalMemorySize(); if (!HasAppData()) { - AFL_DEBUG(NKikimrServices::TX_COLUMNSHARD)("total", NSystemInfo::TotalMemorySize()); - return NSystemInfo::TotalMemorySize() * 0.3; + AFL_DEBUG(NKikimrServices::TX_COLUMNSHARD)("total", MemoryTotal); + return MemoryTotal * 0.3; } else if (AppDataVerified().ColumnShardConfig.GetIndexMetadataMemoryLimit().HasAbsoluteValue()) { AFL_DEBUG(NKikimrServices::TX_COLUMNSHARD)("value", AppDataVerified().ColumnShardConfig.GetIndexMetadataMemoryLimit().GetAbsoluteValue()); return AppDataVerified().ColumnShardConfig.GetIndexMetadataMemoryLimit().GetAbsoluteValue(); } else { - AFL_DEBUG(NKikimrServices::TX_COLUMNSHARD)("total", NSystemInfo::TotalMemorySize())("kff", AppDataVerified().ColumnShardConfig.GetIndexMetadataMemoryLimit().GetTotalRatio()); - return NSystemInfo::TotalMemorySize() * AppDataVerified().ColumnShardConfig.GetIndexMetadataMemoryLimit().GetTotalRatio(); + AFL_DEBUG(NKikimrServices::TX_COLUMNSHARD)("total", MemoryTotal)("kff", AppDataVerified().ColumnShardConfig.GetIndexMetadataMemoryLimit().GetTotalRatio()); + return MemoryTotal * AppDataVerified().ColumnShardConfig.GetIndexMetadataMemoryLimit().GetTotalRatio(); } } diff --git a/ydb/core/tx/columnshard/engines/reader/plain_reader/iterator/scanner.cpp b/ydb/core/tx/columnshard/engines/reader/plain_reader/iterator/scanner.cpp index f0c789f3d205..8442648344b7 100644 --- a/ydb/core/tx/columnshard/engines/reader/plain_reader/iterator/scanner.cpp +++ b/ydb/core/tx/columnshard/engines/reader/plain_reader/iterator/scanner.cpp @@ -9,7 +9,7 @@ void TScanHead::OnIntervalResult(const std::optional&& merger, const ui32 intervalIdx, TPlainReadData& reader) { if (Context->GetReadMetadata()->Limit && (!newBatch || newBatch->GetRecordsCount() == 0) && InFlightLimit < 1000) { if (++ZeroCount == std::max(16, InFlightLimit)) { - InFlightLimit *= 2; + InFlightLimit = std::max(MaxInFlight, InFlightLimit * 2); ZeroCount = 0; } } else { @@ -96,7 +96,17 @@ TConclusionStatus TScanHead::Start() { TScanHead::TScanHead(std::deque>&& sources, const std::shared_ptr& context) : Context(context) { - InFlightLimit = Context->GetReadMetadata()->Limit ? 1 : Max(); + if (!HasAppData() || !AppDataVerified().ColumnShardConfig.HasMaxInFlightIntervalsOnRequest()) { + MaxInFlight = 256; + } else { + MaxInFlight = AppDataVerified().ColumnShardConfig.GetMaxInFlightIntervalsOnRequest(); + } + + if (Context->GetReadMetadata()->Limit) { + InFlightLimit = 1; + } else { + InFlightLimit = MaxInFlight; + } while (sources.size()) { auto source = sources.front(); BorderPoints[source->GetStart()].AddStart(source); diff --git a/ydb/core/tx/columnshard/engines/reader/plain_reader/iterator/scanner.h b/ydb/core/tx/columnshard/engines/reader/plain_reader/iterator/scanner.h index 75de439aec63..7092dac19acd 100644 --- a/ydb/core/tx/columnshard/engines/reader/plain_reader/iterator/scanner.h +++ b/ydb/core/tx/columnshard/engines/reader/plain_reader/iterator/scanner.h @@ -78,6 +78,7 @@ class TScanHead { ui32 SegmentIdxCounter = 0; std::vector IntervalStats; ui64 InFlightLimit = 1; + ui64 MaxInFlight = 256; ui64 ZeroCount = 0; bool AbortFlag = false; void DrainSources();