Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

correct and speed up compaction #10867

Merged
merged 7 commits into from
Oct 28, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 0 additions & 1 deletion ydb/core/tx/columnshard/engines/changes/compaction.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,6 @@ void TCompactColumnEngineChanges::DoCompile(TFinalizationContext& context) {
void TCompactColumnEngineChanges::DoStart(NColumnShard::TColumnShard& self) {
TBase::DoStart(self);

// Y_ABORT_UNLESS(SwitchedPortions.size());
THashMap<TString, THashSet<TBlobRange>> blobRanges;
auto& index = self.GetIndexAs<TColumnEngineForLogs>().GetVersionedIndex();
for (const auto& p : SwitchedPortions) {
Expand Down
39 changes: 24 additions & 15 deletions ydb/core/tx/columnshard/engines/changes/general_compaction.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -237,26 +237,35 @@ std::shared_ptr<TGeneralCompactColumnEngineChanges::IMemoryPredictor> TGeneralCo
}

ui64 TGeneralCompactColumnEngineChanges::TMemoryPredictorChunkedPolicy::AddPortion(const TPortionInfo& portionInfo) {
SumMemoryFix += portionInfo.GetRecordsCount() * (2 * sizeof(ui64) + sizeof(ui32) + sizeof(ui16));
SumMemoryFix += portionInfo.GetRecordsCount() * (2 * sizeof(ui64) + sizeof(ui32) + sizeof(ui16)) + portionInfo.GetTotalBlobBytes();
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Maybe we could use sizeof(SomeStruct) here? Just wondering...

++PortionsCount;
THashMap<ui32, ui64> maxChunkSizeByColumn;
auto it = MaxMemoryByColumnChunk.begin();
SumMemoryDelta = 0;
const auto advanceIterator = [&](const ui32 columnId, const ui64 maxColumnChunkRawBytes) {
while (it != MaxMemoryByColumnChunk.end() && it->ColumnId < columnId) {
++it;
}
if (it == MaxMemoryByColumnChunk.end() || columnId < it->ColumnId) {
it = MaxMemoryByColumnChunk.insert(it, TColumnInfo(columnId));
}
it->MemoryUsage += maxColumnChunkRawBytes;
SumMemoryDelta = std::max(SumMemoryDelta, it->MemoryUsage);
};
ui32 columnId = 0;
ui64 maxChunkSize = 0;
for (auto&& i : portionInfo.GetRecords()) {
SumMemoryFix += i.BlobRange.Size;
auto it = maxChunkSizeByColumn.find(i.GetColumnId());
if (it == maxChunkSizeByColumn.end()) {
maxChunkSizeByColumn.emplace(i.GetColumnId(), i.GetMeta().GetRawBytes());
} else {
if (it->second < i.GetMeta().GetRawBytes()) {
it->second = i.GetMeta().GetRawBytes();
if (columnId != i.GetColumnId()) {
if (columnId) {
advanceIterator(columnId, maxChunkSize);
}
columnId = i.GetColumnId();
maxChunkSize = 0;
}
if (maxChunkSize < i.GetMeta().GetRawBytes()) {
maxChunkSize = i.GetMeta().GetRawBytes();
}
}

SumMemoryDelta = 0;
for (auto&& i : maxChunkSizeByColumn) {
MaxMemoryByColumnChunk[i.first] += i.second;
SumMemoryDelta = std::max(SumMemoryDelta, MaxMemoryByColumnChunk[i.first]);
}
advanceIterator(columnId, maxChunkSize);

AFL_DEBUG(NKikimrServices::TX_COLUMNSHARD)("memory_prediction_after", SumMemoryFix + SumMemoryDelta)(
"portion_info", portionInfo.DebugString());
Expand Down
12 changes: 11 additions & 1 deletion ydb/core/tx/columnshard/engines/changes/general_compaction.h
Original file line number Diff line number Diff line change
Expand Up @@ -66,7 +66,17 @@ class TGeneralCompactColumnEngineChanges: public TCompactColumnEngineChanges {
ui64 SumMemoryDelta = 0;
ui64 SumMemoryFix = 0;
ui32 PortionsCount = 0;
THashMap<ui32, ui64> MaxMemoryByColumnChunk;
class TColumnInfo {
public:
const ui32 ColumnId;
ui64 MemoryUsage = 0;
TColumnInfo(const ui32 columnId)
: ColumnId(columnId)
{

}
};
std::list<TColumnInfo> MaxMemoryByColumnChunk;

public:
virtual ui64 AddPortion(const TPortionInfo& portionInfo) override;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -114,6 +114,7 @@ void TChangesWithAppend::DoWriteIndexOnComplete(NColumnShard::TColumnShard* self
}

void TChangesWithAppend::DoCompile(TFinalizationContext& context) {
AFL_VERIFY(PortionsToRemove.size() + PortionsToMove.size() + AppendedPortions.size());
for (auto&& i : AppendedPortions) {
i.GetPortionConstructor().SetPortionId(context.NextPortionId());
i.GetPortionConstructor().MutableMeta().SetCompactionLevel(TargetCompactionLevel.value_or(0));
Expand Down
3 changes: 3 additions & 0 deletions ydb/core/tx/columnshard/engines/portions/constructor.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -83,8 +83,11 @@ TPortionInfo TPortionInfoConstructor::Build(const bool needChunksNormalization)
}

result.Indexes = std::move(Indexes);
result.Indexes.shrink_to_fit();
result.Records = std::move(Records);
result.Records.shrink_to_fit();
result.BlobIds = std::move(BlobIds);
result.BlobIds.shrink_to_fit();
result.Precalculate();
return result;
}
Expand Down
6 changes: 4 additions & 2 deletions ydb/core/tx/columnshard/engines/storage/chunks/column.h
Original file line number Diff line number Diff line change
Expand Up @@ -62,8 +62,10 @@ class TChunkPreparation: public IPortionColumnChunk {
, Record(address, column, columnInfo)
, ColumnInfo(columnInfo) {
Y_ABORT_UNLESS(column->GetRecordsCount());
First = column->GetScalar(0);
Last = column->GetScalar(column->GetRecordsCount() - 1);
if (ColumnInfo.GetPKColumnIndex()) {
First = column->GetScalar(0);
Last = column->GetScalar(column->GetRecordsCount() - 1);
}
Record.BlobRange.Size = data.size();
}
};
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -140,7 +140,7 @@ class TCompactionTaskData {
StopSeparation = point;
}

std::vector<std::shared_ptr<TPortionInfo>> GetRepackPortions(const ui32 levelIdx) const {
std::vector<std::shared_ptr<TPortionInfo>> GetRepackPortions(const ui32 /*levelIdx*/) const {
std::vector<std::shared_ptr<TPortionInfo>> result;
if (MemoryUsage > ((ui64)1 << 30)) {
auto predictor = NCompaction::TGeneralCompactColumnEngineChanges::BuildMemoryPredictor();
Expand All @@ -154,7 +154,7 @@ class TCompactionTaskData {
}
}
return result;
} else if (levelIdx == 0) {
} else {
return Portions;
}
auto moveIds = GetMovePortionIds();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,11 +18,10 @@ TOptimizerPlanner::TOptimizerPlanner(
const ui64 maxPortionBlobBytes = (ui64)1 << 20;
Levels.emplace_back(
std::make_shared<TLevelPortions>(2, 0.9, maxPortionBlobBytes, nullptr, PortionsInfo, Counters->GetLevelCounters(2)));
Levels.emplace_back(
std::make_shared<TLevelPortions>(1, 0.1, maxPortionBlobBytes, Levels.back(), PortionsInfo, Counters->GetLevelCounters(1)));
*/
Levels.emplace_back(std::make_shared<TZeroLevelPortions>(1, nullptr, Counters->GetLevelCounters(2)));
Levels.emplace_back(std::make_shared<TZeroLevelPortions>(0, Levels.back(), Counters->GetLevelCounters(0)));
Levels.emplace_back(std::make_shared<TZeroLevelPortions>(2, nullptr, Counters->GetLevelCounters(2), TDuration::Max()));
Levels.emplace_back(std::make_shared<TZeroLevelPortions>(1, Levels.back(), Counters->GetLevelCounters(1), TDuration::Max()));
Levels.emplace_back(std::make_shared<TZeroLevelPortions>(0, Levels.back(), Counters->GetLevelCounters(0), TDuration::Seconds(180)));
std::reverse(Levels.begin(), Levels.end());
RefreshWeights();
}
Expand All @@ -34,14 +33,14 @@ std::shared_ptr<NKikimr::NOlap::TColumnEngineChanges> TOptimizerPlanner::DoGetOp
auto data = level->GetOptimizationTask();
TSaverContext saverContext(StoragesManager);
std::shared_ptr<NCompaction::TGeneralCompactColumnEngineChanges> result;
if (level->GetLevelId() == 0) {
result = std::make_shared<NCompaction::TGeneralCompactColumnEngineChanges>(
granule, data.GetRepackPortions(level->GetLevelId()), saverContext);
} else {
// if (level->GetLevelId() == 0) {
result = std::make_shared<NCompaction::TGeneralCompactColumnEngineChanges>(
granule, data.GetRepackPortions(level->GetLevelId()), saverContext);
result->AddMovePortions(data.GetMovePortions());
}
// } else {
// result = std::make_shared<NCompaction::TGeneralCompactColumnEngineChanges>(
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Do we still need that commented code?

// granule, data.GetRepackPortions(level->GetLevelId()), saverContext);
// result->AddMovePortions(data.GetMovePortions());
// }
result->SetTargetCompactionLevel(data.GetTargetCompactionLevel());
auto levelPortions = std::dynamic_pointer_cast<TLevelPortions>(Levels[data.GetTargetCompactionLevel()]);
if (levelPortions) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -25,20 +25,16 @@ ui64 TZeroLevelPortions::DoGetWeight() const {
if (!NextLevel || Portions.size() < 10) {
return 0;
}
if (TInstant::Now() - *PredOptimization < TDuration::Seconds(180)) {
if (PortionsInfo.GetCount() <= 100 || PortionsInfo.PredictPackedBlobBytes(GetPackKff()) < (1 << 20)) {
return 0;
}
} else {
if (PortionsInfo.PredictPackedBlobBytes(GetPackKff()) < (512 << 10)) {
if (PredOptimization && TInstant::Now() - *PredOptimization < DurationToDrop) {
if (PortionsInfo.PredictPackedBlobBytes(GetPackKff()) < (1 << 20)) {
return 0;
}
}

THashSet<ui64> portionIds;
const ui64 affectedRawBytes =
NextLevel->GetAffectedPortionBytes(Portions.begin()->GetPortion()->IndexKeyStart(), Portions.rbegin()->GetPortion()->IndexKeyEnd());
/*
THashSet<ui64> portionIds;
auto chain =
targetLevel->GetAffectedPortions(Portions.begin()->GetPortion()->IndexKeyStart(), Portions.rbegin()->GetPortion()->IndexKeyEnd());
ui64 affectedRawBytes = 0;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@ class TZeroLevelPortions: public IPortionsLevel {
private:
using TBase = IPortionsLevel;
const TLevelCounters LevelCounters;
const TDuration DurationToDrop;
class TOrderedPortion {
private:
YDB_READONLY_DEF(std::shared_ptr<TPortionInfo>, Portion);
Expand Down Expand Up @@ -87,9 +88,11 @@ class TZeroLevelPortions: public IPortionsLevel {
virtual TCompactionTaskData DoGetOptimizationTask() const override;

public:
TZeroLevelPortions(const ui32 levelIdx, const std::shared_ptr<IPortionsLevel>& nextLevel, const TLevelCounters& levelCounters)
TZeroLevelPortions(const ui32 levelIdx, const std::shared_ptr<IPortionsLevel>& nextLevel, const TLevelCounters& levelCounters, const TDuration durationToDrop)
: TBase(levelIdx, nextLevel)
, LevelCounters(levelCounters) {
, LevelCounters(levelCounters)
, DurationToDrop(durationToDrop)
{
}
};

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -435,16 +435,22 @@ namespace NKikimr::NColumnShard {
NOlap::TIndexInfo BuildTableInfo(const std::vector<NArrow::NTest::TTestColumn>& ydbSchema,
const std::vector<NArrow::NTest::TTestColumn>& key) {
THashMap<ui32, NTable::TColumn> columns;
THashMap<TString, NTable::TColumn*> columnByName;
for (ui32 i = 0; i < ydbSchema.size(); ++i) {
ui32 id = i + 1;
auto& name = ydbSchema[i].GetName();
auto& type = ydbSchema[i].GetType();

columns[id] = NTable::TColumn(name, id, type, "");
AFL_VERIFY(columnByName.emplace(name, &columns[id]).second);
}

std::vector<TString> pkNames;
ui32 idx = 0;
for (const auto& c : key) {
auto it = columnByName.find(c.GetName());
AFL_VERIFY(it != columnByName.end());
it->second->KeyOrder = idx++;
pkNames.push_back(c.GetName());
}
return NOlap::TIndexInfo::BuildDefault(NOlap::TTestStoragesManager::GetInstance(), columns, pkNames);
Expand Down
Loading