Skip to content

Commit

Permalink
Delete empty portions normalizer (#7596) (#7600)
Browse files Browse the repository at this point in the history
  • Loading branch information
zverevgeny authored Aug 9, 2024
1 parent d65a0a9 commit 9d83402
Show file tree
Hide file tree
Showing 6 changed files with 173 additions and 3 deletions.
1 change: 1 addition & 0 deletions ydb/core/tx/columnshard/normalizer/abstract/abstract.h
Original file line number Diff line number Diff line change
Expand Up @@ -56,6 +56,7 @@ enum class ENormalizerSequentialId: ui32 {
TablesCleaner,
PortionsMetadata,
CleanGranuleId,
EmptyPortionsCleaner,

MAX
};
Expand Down
120 changes: 120 additions & 0 deletions ydb/core/tx/columnshard/normalizer/portion/clean_empty.cpp
Original file line number Diff line number Diff line change
@@ -0,0 +1,120 @@
#include "clean_empty.h"
#include <ydb/core/tx/columnshard/columnshard_schema.h>


namespace NKikimr::NOlap {

namespace {
std::optional<THashSet<TPortionAddress>> GetColumnPortionAddresses(NTabletFlatExecutor::TTransactionContext& txc) {
using namespace NColumnShard;
NIceDb::TNiceDb db(txc.DB);
if (!Schema::Precharge<Schema::IndexColumns>(db, txc.DB.GetScheme())) {
return std::nullopt;
}
THashSet<TPortionAddress> usedPortions;
auto rowset = db.Table<Schema::IndexColumns>().Select<
Schema::IndexColumns::PathId,
Schema::IndexColumns::Portion
>();
if (!rowset.IsReady()) {
return std::nullopt;
}
while (!rowset.EndOfSet()) {
usedPortions.emplace(
rowset.GetValue<Schema::IndexColumns::PathId>(),
rowset.GetValue<Schema::IndexColumns::Portion>()
);
if (!rowset.Next()) {
return std::nullopt;
}
}
return usedPortions;
}

using TBatch = std::vector<TPortionAddress>;

std::optional<std::vector<TBatch>> GetPortionsToDelete(NTabletFlatExecutor::TTransactionContext& txc) {
using namespace NColumnShard;
const auto usedPortions = GetColumnPortionAddresses(txc);
if (!usedPortions) {
return std::nullopt;
}
const size_t MaxBatchSize = 10000;
NIceDb::TNiceDb db(txc.DB);
if (!Schema::Precharge<Schema::IndexPortions>(db, txc.DB.GetScheme())) {
return std::nullopt;
}
auto rowset = db.Table<Schema::IndexPortions>().Select<
Schema::IndexPortions::PathId,
Schema::IndexPortions::PortionId
>();
if (!rowset.IsReady()) {
return std::nullopt;
}
std::vector<TBatch> result;
TBatch portionsToDelete;
while (!rowset.EndOfSet()) {
TPortionAddress addr(
rowset.GetValue<Schema::IndexPortions::PathId>(),
rowset.GetValue<Schema::IndexPortions::PortionId>()
);
if (!usedPortions->contains(addr)) {
ACFL_WARN("normalizer", "TCleanEmptyPortionsNormalizer")("message", TStringBuilder() << addr.DebugString() << " marked for deletion");
portionsToDelete.emplace_back(std::move(addr));
if (portionsToDelete.size() == MaxBatchSize) {
result.emplace_back(std::move(portionsToDelete));
portionsToDelete = TBatch{};
}
}
if (!rowset.Next()) {
return std::nullopt;
}
}
if (!portionsToDelete.empty()) {
result.emplace_back(std::move(portionsToDelete));
}
return result;
}

class TChanges : public INormalizerChanges {
public:
TChanges(TBatch&& addresses)
: Addresses(addresses)
{}
bool ApplyOnExecute(NTabletFlatExecutor::TTransactionContext& txc, const TNormalizationController&) const override {
using namespace NColumnShard;
NIceDb::TNiceDb db(txc.DB);
for(const auto& a: Addresses) {
db.Table<Schema::IndexPortions>().Key(
a.GetPathId(),
a.GetPortionId()
).Delete();
}
ACFL_WARN("normalizer", "TCleanEmptyPortionsNormalizer")("message", TStringBuilder() << GetSize() << " portions deleted");
return true;
}

ui64 GetSize() const override {
return Addresses.size();
}
private:
const TBatch Addresses;
};

} //namespace

TConclusion<std::vector<INormalizerTask::TPtr>> TCleanEmptyPortionsNormalizer::DoInit(const TNormalizationController&, NTabletFlatExecutor::TTransactionContext& txc) {
using namespace NColumnShard;
auto batchesToDelete = GetPortionsToDelete(txc);
if (!batchesToDelete) {
return TConclusionStatus::Fail("Not ready");
}

std::vector<INormalizerTask::TPtr> result;
for (auto&& b: *batchesToDelete) {
result.emplace_back(std::make_shared<TTrivialNormalizerTask>(std::make_shared<TChanges>(std::move(b))));
}
return result;
}

} //namespace NKikimr::NOlap
28 changes: 28 additions & 0 deletions ydb/core/tx/columnshard/normalizer/portion/clean_empty.h
Original file line number Diff line number Diff line change
@@ -0,0 +1,28 @@
#pragma once

#include <ydb/core/tx/columnshard/normalizer/abstract/abstract.h>

namespace NKikimr::NOlap {

class TCleanEmptyPortionsNormalizer : public TNormalizationController::INormalizerComponent {

static TString ClassName() {
return ToString(ENormalizerSequentialId::EmptyPortionsCleaner);
}
static inline auto Registrator = INormalizerComponent::TFactory::TRegistrator<TCleanEmptyPortionsNormalizer>(ClassName());
public:
TCleanEmptyPortionsNormalizer(const TNormalizationController::TInitContext&)
{}

std::optional<ENormalizerSequentialId> DoGetEnumSequentialId() const override {
return ENormalizerSequentialId::EmptyPortionsCleaner;
}

TString GetClassName() const override {
return ClassName();
}

TConclusion<std::vector<INormalizerTask::TPtr>> DoInit(const TNormalizationController& controller, NTabletFlatExecutor::TTransactionContext& txc) override;
};

} //namespace NKikimr::NOlap
2 changes: 1 addition & 1 deletion ydb/core/tx/columnshard/normalizer/portion/normalizer.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,7 @@
namespace NKikimr::NOlap {

TConclusion<std::vector<INormalizerTask::TPtr>> TPortionsNormalizerBase::DoInit(const TNormalizationController& controller, NTabletFlatExecutor::TTransactionContext& txc) {
auto initRes = DoInitImpl(controller,txc);
auto initRes = DoInitImpl(controller, txc);

if (initRes.IsFail()) {
return initRes;
Expand Down
1 change: 1 addition & 0 deletions ydb/core/tx/columnshard/normalizer/portion/ya.make
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@ SRCS(
GLOBAL portion.cpp
GLOBAL chunks.cpp
GLOBAL clean.cpp
GLOBAL clean_empty.cpp
GLOBAL broken_blobs.cpp
)

Expand Down
24 changes: 22 additions & 2 deletions ydb/core/tx/columnshard/ut_rw/ut_normalizer.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@

#include <ydb/core/tx/columnshard/hooks/abstract/abstract.h>
#include <ydb/core/tx/columnshard/hooks/testing/controller.h>
#include <ydb/core/tx/columnshard/engines/portions/constructor.h>

#include <ydb/core/tx/columnshard/operations/write_data.h>

Expand Down Expand Up @@ -161,7 +162,7 @@ class TColumnChunksCleaner : public NYDBTest::ILocalDBModifier {
}
};

class TPortinosCleaner : public NYDBTest::ILocalDBModifier {
class TPortionsCleaner : public NYDBTest::ILocalDBModifier {
public:
virtual void Apply(NTabletFlatExecutor::TTransactionContext& txc) const override {
using namespace NColumnShard;
Expand All @@ -185,6 +186,21 @@ class TPortinosCleaner : public NYDBTest::ILocalDBModifier {
}
};


class TEmptyPortionsCleaner : public NYDBTest::ILocalDBModifier {
public:
virtual void Apply(NTabletFlatExecutor::TTransactionContext& txc) const override {
using namespace NColumnShard;
NIceDb::TNiceDb db(txc.DB);
for (size_t pathId = 100; pathId != 299; ++pathId) {
for (size_t portionId = 1000; portionId != 1199; ++portionId) {
db.Table<Schema::IndexPortions>().Key(pathId, portionId).Update();
}
}
}
};


class TTablesCleaner : public NYDBTest::ILocalDBModifier {
public:
virtual void Apply(NTabletFlatExecutor::TTransactionContext& txc) const override {
Expand Down Expand Up @@ -317,7 +333,11 @@ Y_UNIT_TEST_SUITE(Normalizers) {
}

Y_UNIT_TEST(PortionsNormalizer) {
TestNormalizerImpl<TPortinosCleaner>();
TestNormalizerImpl<TPortionsCleaner>();
}

Y_UNIT_TEST(CleanEmptyPortionsNormalizer) {
TestNormalizerImpl<TEmptyPortionsCleaner>();
}

Y_UNIT_TEST(EmptyTablesNormalizer) {
Expand Down

0 comments on commit 9d83402

Please sign in to comment.