From 3914ca3eb4f791a62d25c872bc5a7a3bea8e5dc9 Mon Sep 17 00:00:00 2001 From: ivanmorozov333 Date: Thu, 24 Oct 2024 21:33:21 +0300 Subject: [PATCH 1/5] correct and speed up compaction --- .../engines/changes/compaction.cpp | 2 +- .../engines/changes/general_compaction.cpp | 39 ++++++++++++------- .../engines/changes/general_compaction.h | 12 +++++- .../optimizer/lcbuckets/planner/abstract.h | 4 +- .../optimizer/lcbuckets/planner/optimizer.cpp | 19 +++++---- .../lcbuckets/planner/zero_level.cpp | 10 ++--- .../optimizer/lcbuckets/planner/zero_level.h | 7 +++- 7 files changed, 55 insertions(+), 38 deletions(-) diff --git a/ydb/core/tx/columnshard/engines/changes/compaction.cpp b/ydb/core/tx/columnshard/engines/changes/compaction.cpp index 7172cb3bb660..457d55058ab4 100644 --- a/ydb/core/tx/columnshard/engines/changes/compaction.cpp +++ b/ydb/core/tx/columnshard/engines/changes/compaction.cpp @@ -30,7 +30,7 @@ void TCompactColumnEngineChanges::DoCompile(TFinalizationContext& context) { void TCompactColumnEngineChanges::DoStart(NColumnShard::TColumnShard& self) { TBase::DoStart(self); -// Y_ABORT_UNLESS(SwitchedPortions.size()); + Y_ABORT_UNLESS(PortionsToRemove.size() + PortionsToMove.size()); THashMap> blobRanges; auto& index = self.GetIndexAs().GetVersionedIndex(); for (const auto& p : SwitchedPortions) { diff --git a/ydb/core/tx/columnshard/engines/changes/general_compaction.cpp b/ydb/core/tx/columnshard/engines/changes/general_compaction.cpp index 5fa023bc6b0c..5b0d9bd7ec26 100644 --- a/ydb/core/tx/columnshard/engines/changes/general_compaction.cpp +++ b/ydb/core/tx/columnshard/engines/changes/general_compaction.cpp @@ -237,26 +237,35 @@ std::shared_ptr TGeneralCo } ui64 TGeneralCompactColumnEngineChanges::TMemoryPredictorChunkedPolicy::AddPortion(const TPortionInfo& portionInfo) { - SumMemoryFix += portionInfo.GetRecordsCount() * (2 * sizeof(ui64) + sizeof(ui32) + sizeof(ui16)); + SumMemoryFix += portionInfo.GetRecordsCount() * (2 * sizeof(ui64) + sizeof(ui32) + sizeof(ui16)) + portionInfo.GetTotalBlobBytes(); ++PortionsCount; - THashMap maxChunkSizeByColumn; + auto it = MaxMemoryByColumnChunk.begin(); + SumMemoryDelta = 0; + const auto advanceIterator = [&](const ui32 columnId, const ui64 maxColumnChunkRawBytes) { + while (it != MaxMemoryByColumnChunk.end() && it->ColumnId < columnId) { + ++it; + } + if (it == MaxMemoryByColumnChunk.end() || columnId < it->ColumnId) { + it = MaxMemoryByColumnChunk.insert(it, TColumnInfo(columnId)); + } + it->MemoryUsage += maxColumnChunkRawBytes; + SumMemoryDelta = std::max(SumMemoryDelta, it->MemoryUsage); + }; + ui32 columnId = 0; + ui64 maxChunkSize = 0; for (auto&& i : portionInfo.GetRecords()) { - SumMemoryFix += i.BlobRange.Size; - auto it = maxChunkSizeByColumn.find(i.GetColumnId()); - if (it == maxChunkSizeByColumn.end()) { - maxChunkSizeByColumn.emplace(i.GetColumnId(), i.GetMeta().GetRawBytes()); - } else { - if (it->second < i.GetMeta().GetRawBytes()) { - it->second = i.GetMeta().GetRawBytes(); + if (columnId != i.GetColumnId()) { + if (columnId) { + advanceIterator(columnId, maxChunkSize); } + columnId = i.GetColumnId(); + maxChunkSize = 0; + } + if (maxChunkSize < i.GetMeta().GetRawBytes()) { + maxChunkSize = i.GetMeta().GetRawBytes(); } } - - SumMemoryDelta = 0; - for (auto&& i : maxChunkSizeByColumn) { - MaxMemoryByColumnChunk[i.first] += i.second; - SumMemoryDelta = std::max(SumMemoryDelta, MaxMemoryByColumnChunk[i.first]); - } + advanceIterator(columnId, maxChunkSize); AFL_DEBUG(NKikimrServices::TX_COLUMNSHARD)("memory_prediction_after", SumMemoryFix + SumMemoryDelta)( "portion_info", portionInfo.DebugString()); diff --git a/ydb/core/tx/columnshard/engines/changes/general_compaction.h b/ydb/core/tx/columnshard/engines/changes/general_compaction.h index a1ca732899c2..4e24cbf2967a 100644 --- a/ydb/core/tx/columnshard/engines/changes/general_compaction.h +++ b/ydb/core/tx/columnshard/engines/changes/general_compaction.h @@ -66,7 +66,17 @@ class TGeneralCompactColumnEngineChanges: public TCompactColumnEngineChanges { ui64 SumMemoryDelta = 0; ui64 SumMemoryFix = 0; ui32 PortionsCount = 0; - THashMap MaxMemoryByColumnChunk; + class TColumnInfo { + public: + const ui32 ColumnId; + ui64 MemoryUsage = 0; + TColumnInfo(const ui32 columnId) + : ColumnId(columnId) + { + + } + }; + std::list MaxMemoryByColumnChunk; public: virtual ui64 AddPortion(const TPortionInfo& portionInfo) override; diff --git a/ydb/core/tx/columnshard/engines/storage/optimizer/lcbuckets/planner/abstract.h b/ydb/core/tx/columnshard/engines/storage/optimizer/lcbuckets/planner/abstract.h index 971917eb2125..9781256eb9ec 100644 --- a/ydb/core/tx/columnshard/engines/storage/optimizer/lcbuckets/planner/abstract.h +++ b/ydb/core/tx/columnshard/engines/storage/optimizer/lcbuckets/planner/abstract.h @@ -140,7 +140,7 @@ class TCompactionTaskData { StopSeparation = point; } - std::vector> GetRepackPortions(const ui32 levelIdx) const { + std::vector> GetRepackPortions(const ui32 /*levelIdx*/) const { std::vector> result; if (MemoryUsage > ((ui64)1 << 30)) { auto predictor = NCompaction::TGeneralCompactColumnEngineChanges::BuildMemoryPredictor(); @@ -154,7 +154,7 @@ class TCompactionTaskData { } } return result; - } else if (levelIdx == 0) { + } else { return Portions; } auto moveIds = GetMovePortionIds(); diff --git a/ydb/core/tx/columnshard/engines/storage/optimizer/lcbuckets/planner/optimizer.cpp b/ydb/core/tx/columnshard/engines/storage/optimizer/lcbuckets/planner/optimizer.cpp index 871263e43e2d..c463b2423069 100644 --- a/ydb/core/tx/columnshard/engines/storage/optimizer/lcbuckets/planner/optimizer.cpp +++ b/ydb/core/tx/columnshard/engines/storage/optimizer/lcbuckets/planner/optimizer.cpp @@ -18,11 +18,10 @@ TOptimizerPlanner::TOptimizerPlanner( const ui64 maxPortionBlobBytes = (ui64)1 << 20; Levels.emplace_back( std::make_shared(2, 0.9, maxPortionBlobBytes, nullptr, PortionsInfo, Counters->GetLevelCounters(2))); - Levels.emplace_back( - std::make_shared(1, 0.1, maxPortionBlobBytes, Levels.back(), PortionsInfo, Counters->GetLevelCounters(1))); */ - Levels.emplace_back(std::make_shared(1, nullptr, Counters->GetLevelCounters(2))); - Levels.emplace_back(std::make_shared(0, Levels.back(), Counters->GetLevelCounters(0))); + Levels.emplace_back(std::make_shared(2, nullptr, Counters->GetLevelCounters(2), TDuration::Max())); + Levels.emplace_back(std::make_shared(1, Levels.back(), Counters->GetLevelCounters(1), TDuration::Max())); + Levels.emplace_back(std::make_shared(0, Levels.back(), Counters->GetLevelCounters(0), TDuration::Seconds(180))); std::reverse(Levels.begin(), Levels.end()); RefreshWeights(); } @@ -34,14 +33,14 @@ std::shared_ptr TOptimizerPlanner::DoGetOp auto data = level->GetOptimizationTask(); TSaverContext saverContext(StoragesManager); std::shared_ptr result; - if (level->GetLevelId() == 0) { - result = std::make_shared( - granule, data.GetRepackPortions(level->GetLevelId()), saverContext); - } else { +// if (level->GetLevelId() == 0) { result = std::make_shared( granule, data.GetRepackPortions(level->GetLevelId()), saverContext); - result->AddMovePortions(data.GetMovePortions()); - } +// } else { +// result = std::make_shared( +// granule, data.GetRepackPortions(level->GetLevelId()), saverContext); +// result->AddMovePortions(data.GetMovePortions()); +// } result->SetTargetCompactionLevel(data.GetTargetCompactionLevel()); auto levelPortions = std::dynamic_pointer_cast(Levels[data.GetTargetCompactionLevel()]); if (levelPortions) { diff --git a/ydb/core/tx/columnshard/engines/storage/optimizer/lcbuckets/planner/zero_level.cpp b/ydb/core/tx/columnshard/engines/storage/optimizer/lcbuckets/planner/zero_level.cpp index 98c4d3f9bd58..e8854f74261d 100644 --- a/ydb/core/tx/columnshard/engines/storage/optimizer/lcbuckets/planner/zero_level.cpp +++ b/ydb/core/tx/columnshard/engines/storage/optimizer/lcbuckets/planner/zero_level.cpp @@ -25,20 +25,16 @@ ui64 TZeroLevelPortions::DoGetWeight() const { if (!NextLevel || Portions.size() < 10) { return 0; } - if (TInstant::Now() - *PredOptimization < TDuration::Seconds(180)) { - if (PortionsInfo.GetCount() <= 100 || PortionsInfo.PredictPackedBlobBytes(GetPackKff()) < (1 << 20)) { - return 0; - } - } else { - if (PortionsInfo.PredictPackedBlobBytes(GetPackKff()) < (512 << 10)) { + if (PredOptimization && TInstant::Now() - *PredOptimization < DurationToDrop) { + if (PortionsInfo.PredictPackedBlobBytes(GetPackKff()) < (1 << 20)) { return 0; } } - THashSet portionIds; const ui64 affectedRawBytes = NextLevel->GetAffectedPortionBytes(Portions.begin()->GetPortion()->IndexKeyStart(), Portions.rbegin()->GetPortion()->IndexKeyEnd()); /* + THashSet portionIds; auto chain = targetLevel->GetAffectedPortions(Portions.begin()->GetPortion()->IndexKeyStart(), Portions.rbegin()->GetPortion()->IndexKeyEnd()); ui64 affectedRawBytes = 0; diff --git a/ydb/core/tx/columnshard/engines/storage/optimizer/lcbuckets/planner/zero_level.h b/ydb/core/tx/columnshard/engines/storage/optimizer/lcbuckets/planner/zero_level.h index ba78cebbd436..4a3b3837788f 100644 --- a/ydb/core/tx/columnshard/engines/storage/optimizer/lcbuckets/planner/zero_level.h +++ b/ydb/core/tx/columnshard/engines/storage/optimizer/lcbuckets/planner/zero_level.h @@ -8,6 +8,7 @@ class TZeroLevelPortions: public IPortionsLevel { private: using TBase = IPortionsLevel; const TLevelCounters LevelCounters; + const TDuration DurationToDrop; class TOrderedPortion { private: YDB_READONLY_DEF(std::shared_ptr, Portion); @@ -87,9 +88,11 @@ class TZeroLevelPortions: public IPortionsLevel { virtual TCompactionTaskData DoGetOptimizationTask() const override; public: - TZeroLevelPortions(const ui32 levelIdx, const std::shared_ptr& nextLevel, const TLevelCounters& levelCounters) + TZeroLevelPortions(const ui32 levelIdx, const std::shared_ptr& nextLevel, const TLevelCounters& levelCounters, const TDuration durationToDrop) : TBase(levelIdx, nextLevel) - , LevelCounters(levelCounters) { + , LevelCounters(levelCounters) + , DurationToDrop(durationToDrop) + { } }; From 00519b2328d0f2773bbc97148fadad41759dbc5f Mon Sep 17 00:00:00 2001 From: ivanmorozov333 Date: Fri, 25 Oct 2024 08:31:23 +0300 Subject: [PATCH 2/5] fix --- ydb/core/tx/columnshard/engines/changes/compaction.cpp | 1 - ydb/core/tx/columnshard/engines/changes/with_appended.cpp | 1 + 2 files changed, 1 insertion(+), 1 deletion(-) diff --git a/ydb/core/tx/columnshard/engines/changes/compaction.cpp b/ydb/core/tx/columnshard/engines/changes/compaction.cpp index 457d55058ab4..b0eb17e90200 100644 --- a/ydb/core/tx/columnshard/engines/changes/compaction.cpp +++ b/ydb/core/tx/columnshard/engines/changes/compaction.cpp @@ -30,7 +30,6 @@ void TCompactColumnEngineChanges::DoCompile(TFinalizationContext& context) { void TCompactColumnEngineChanges::DoStart(NColumnShard::TColumnShard& self) { TBase::DoStart(self); - Y_ABORT_UNLESS(PortionsToRemove.size() + PortionsToMove.size()); THashMap> blobRanges; auto& index = self.GetIndexAs().GetVersionedIndex(); for (const auto& p : SwitchedPortions) { diff --git a/ydb/core/tx/columnshard/engines/changes/with_appended.cpp b/ydb/core/tx/columnshard/engines/changes/with_appended.cpp index cfe48e7f59a0..36178f841d71 100644 --- a/ydb/core/tx/columnshard/engines/changes/with_appended.cpp +++ b/ydb/core/tx/columnshard/engines/changes/with_appended.cpp @@ -133,6 +133,7 @@ void TChangesWithAppend::DoOnAfterCompile() { } void TChangesWithAppend::DoStart(NColumnShard::TColumnShard& /*self*/) { + AFL_VERIFY(PortionsToRemove.size() + PortionsToMove.size()); } } // namespace NKikimr::NOlap From 361934883334f53d3653830a3054395874078d20 Mon Sep 17 00:00:00 2001 From: ivanmorozov333 Date: Fri, 25 Oct 2024 09:46:00 +0300 Subject: [PATCH 3/5] first/last only for PK --- ydb/core/tx/columnshard/engines/storage/chunks/column.h | 6 ++++-- 1 file changed, 4 insertions(+), 2 deletions(-) diff --git a/ydb/core/tx/columnshard/engines/storage/chunks/column.h b/ydb/core/tx/columnshard/engines/storage/chunks/column.h index 9de818c49fb6..a9881eea246d 100644 --- a/ydb/core/tx/columnshard/engines/storage/chunks/column.h +++ b/ydb/core/tx/columnshard/engines/storage/chunks/column.h @@ -62,8 +62,10 @@ class TChunkPreparation: public IPortionColumnChunk { , Record(address, column, columnInfo) , ColumnInfo(columnInfo) { Y_ABORT_UNLESS(column->GetRecordsCount()); - First = column->GetScalar(0); - Last = column->GetScalar(column->GetRecordsCount() - 1); + if (ColumnInfo.GetPKColumnIndex()) { + First = column->GetScalar(0); + Last = column->GetScalar(column->GetRecordsCount() - 1); + } Record.BlobRange.Size = data.size(); } }; From d8dc3392174ecdbeb50b4899f153f54842ac1083 Mon Sep 17 00:00:00 2001 From: ivanmorozov333 Date: Fri, 25 Oct 2024 13:45:19 +0300 Subject: [PATCH 4/5] fix --- ydb/core/tx/columnshard/engines/changes/with_appended.cpp | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/ydb/core/tx/columnshard/engines/changes/with_appended.cpp b/ydb/core/tx/columnshard/engines/changes/with_appended.cpp index 36178f841d71..80fe35d966ba 100644 --- a/ydb/core/tx/columnshard/engines/changes/with_appended.cpp +++ b/ydb/core/tx/columnshard/engines/changes/with_appended.cpp @@ -133,7 +133,7 @@ void TChangesWithAppend::DoOnAfterCompile() { } void TChangesWithAppend::DoStart(NColumnShard::TColumnShard& /*self*/) { - AFL_VERIFY(PortionsToRemove.size() + PortionsToMove.size()); + AFL_VERIFY(PortionsToRemove.size() + PortionsToMove.size() + AppendedPortions.size()); } } // namespace NKikimr::NOlap From c98a9a215f4a07edb6fffed12ba1e1b26973e640 Mon Sep 17 00:00:00 2001 From: ivanmorozov333 Date: Fri, 25 Oct 2024 20:15:34 +0300 Subject: [PATCH 5/5] reduce mem --- ydb/core/tx/columnshard/engines/portions/constructor.cpp | 3 +++ 1 file changed, 3 insertions(+) diff --git a/ydb/core/tx/columnshard/engines/portions/constructor.cpp b/ydb/core/tx/columnshard/engines/portions/constructor.cpp index 5125d60f292c..636a09b6332a 100644 --- a/ydb/core/tx/columnshard/engines/portions/constructor.cpp +++ b/ydb/core/tx/columnshard/engines/portions/constructor.cpp @@ -83,8 +83,11 @@ TPortionInfo TPortionInfoConstructor::Build(const bool needChunksNormalization) } result.Indexes = std::move(Indexes); + result.Indexes.shrink_to_fit(); result.Records = std::move(Records); + result.Records.shrink_to_fit(); result.BlobIds = std::move(BlobIds); + result.BlobIds.shrink_to_fit(); result.Precalculate(); return result; }