Skip to content

Commit

Permalink
normalizer for remove granule_id in stored portions
Browse files Browse the repository at this point in the history
  • Loading branch information
ivanmorozov333 committed May 30, 2024
1 parent d1de63e commit 67c0ae8
Show file tree
Hide file tree
Showing 4 changed files with 177 additions and 1 deletion.
3 changes: 2 additions & 1 deletion ydb/core/tx/columnshard/normalizer/abstract/abstract.h
Original file line number Diff line number Diff line change
Expand Up @@ -52,10 +52,11 @@ namespace NKikimr::NOlap {

enum class ENormalizerSequentialId : ui32 {
Granules = 1,
CleanGranuleId,
Chunks,
PortionsCleaner,
TablesCleaner,
PortionsMetadata,
PortionsMetadata
};

class TNormalizationContext {
Expand Down
149 changes: 149 additions & 0 deletions ydb/core/tx/columnshard/normalizer/granule/clean_granule.cpp
Original file line number Diff line number Diff line change
@@ -0,0 +1,149 @@
#include "clean_granule.h"

#include <ydb/core/tx/columnshard/columnshard_private_events.h>

namespace NKikimr::NOlap {

namespace {

struct TChunkData {
ui64 Index = 0;
ui64 GranuleId = 0;
ui64 PlanStep = 0;
ui64 TxId = 0;
ui64 PortionId = 0;
ui32 Chunk = 0;
ui64 ColumnIdx = 0;

ui64 XPlanStep = 0;
ui64 XTxId = 0;
TString Blob;
TString Metadata;
ui64 Offset;
ui32 Size;
ui64 PathId;

template <class TRowSet>
TChunkData(const TRowSet& rowset) {
using Schema = NColumnShard::Schema;
PlanStep = rowset.GetValue<Schema::IndexColumns::PlanStep>();
TxId = rowset.GetValue<Schema::IndexColumns::TxId>();
PortionId = rowset.GetValue<Schema::IndexColumns::Portion>();
GranuleId = rowset.GetValue<Schema::IndexColumns::Granule>();
Chunk = rowset.GetValue<Schema::IndexColumns::Chunk>();
Index = rowset.GetValue<Schema::IndexColumns::Index>();
ColumnIdx = rowset.GetValue<Schema::IndexColumns::ColumnIdx>();

XPlanStep = rowset.GetValue<Schema::IndexColumns::XPlanStep>();
XTxId = rowset.GetValue<Schema::IndexColumns::XTxId>();
Blob = rowset.GetValue<Schema::IndexColumns::Blob>();
Metadata = rowset.GetValue<Schema::IndexColumns::Metadata>();
Offset = rowset.GetValue<Schema::IndexColumns::Offset>();
Size = rowset.GetValue<Schema::IndexColumns::Size>();
PathId = rowset.GetValue<Schema::IndexColumns::PathId>();
}
};
}

class TCleanGranuleIdNormalizer::TNormalizerResult : public INormalizerChanges {
private:
std::vector<TChunkData> Chunks;

void AddChunk(TChunkData&& chunk) {
Chunks.push_back(std::move(chunk));
}

TNormalizerResult() = default;

public:
bool ApplyOnExecute(NTabletFlatExecutor::TTransactionContext& txc, const TNormalizationController& /* normController */) const override {
using Schema = NColumnShard::Schema;
NIceDb::TNiceDb db(txc.DB);
ACFL_INFO("normalizer", "TCleanGranuleIdNormalizer")("message", TStringBuilder() << "apply " << Chunks.size() << " chunks");

for (auto&& key : Chunks) {
db.Table<Schema::IndexColumns>().Key(key.Index, key.GranuleId, key.ColumnIdx,
key.PlanStep, key.TxId, key.PortionId, key.Chunk).Delete();

db.Table<Schema::IndexColumns>().Key(0, 0, key.ColumnIdx,
key.PlanStep, key.TxId, key.PortionId, key.Chunk).Update(
NIceDb::TUpdate<Schema::IndexColumns::PathId>(key.PathId),
NIceDb::TUpdate<Schema::IndexColumns::Blob>(key.Blob),
NIceDb::TUpdate<Schema::IndexColumns::Metadata>(key.Metadata),
NIceDb::TUpdate<Schema::IndexColumns::Metadata>(key.Offset),
NIceDb::TUpdate<Schema::IndexColumns::Metadata>(key.Size),
NIceDb::TUpdate<Schema::IndexColumns::Metadata>(key.XPlanStep),
NIceDb::TUpdate<Schema::IndexColumns::Metadata>(key.XTxId)

);
}
return true;
}

ui64 GetSize() const override {
return Chunks.size();
}

static std::optional<std::vector<INormalizerChanges::TPtr>> Init(const TNormalizationController& controller, NTabletFlatExecutor::TTransactionContext& txc) {
using namespace NColumnShard;
NIceDb::TNiceDb db(txc.DB);

bool ready = true;
ready = ready & Schema::Precharge<Schema::IndexColumns>(db, txc.DB.GetScheme());
if (!ready) {
return std::nullopt;
}

std::vector<INormalizerChanges::TPtr> tasks;
ui64 fullChunksCount = 0;
{
auto rowset = db.Table<Schema::IndexColumns>().Select();
if (!rowset.IsReady()) {
return std::nullopt;
}
std::shared_ptr<TNormalizerResult> changes(new TNormalizerResult());
ui64 chunksCount = 0;

while (!rowset.EndOfSet()) {
if (rowset.GetValue<Schema::IndexColumns::Granule>() || rowset.GetValue<Schema::IndexColumns::Index>()) {
TChunkData key(rowset);

changes->AddChunk(std::move(key));
++chunksCount;
++fullChunksCount;

if (chunksCount == 10000) {
tasks.emplace_back(changes);
changes.reset(new TNormalizerResult());
chunksCount = 0;
}
}

if (!rowset.Next()) {
return std::nullopt;
}
}

if (chunksCount > 0) {
tasks.emplace_back(changes);
}
}
ACFL_INFO("normalizer", "TGranulesNormalizer")("message", TStringBuilder() << fullChunksCount << " chunks found");
return tasks;
}

};

TConclusion<std::vector<INormalizerTask::TPtr>> TCleanGranuleIdNormalizer::DoInit(const TNormalizationController& controller, NTabletFlatExecutor::TTransactionContext& txc) {
auto changes = TNormalizerResult::Init(controller, txc);
if (!changes) {
return TConclusionStatus::Fail("Not ready");;
}
std::vector<INormalizerTask::TPtr> tasks;
for (auto&& c : *changes) {
tasks.emplace_back(std::make_shared<TTrivialNormalizerTask>(c));
}
return tasks;
}

}
25 changes: 25 additions & 0 deletions ydb/core/tx/columnshard/normalizer/granule/clean_granule.h
Original file line number Diff line number Diff line change
@@ -0,0 +1,25 @@
#pragma once

#include <ydb/core/tx/columnshard/normalizer/abstract/abstract.h>
#include <ydb/core/tx/columnshard/columnshard_schema.h>


namespace NKikimr::NOlap {

class TCleanGranuleIdNormalizer: public TNormalizationController::INormalizerComponent {
class TNormalizerResult;

static inline INormalizerComponent::TFactory::TRegistrator<TCleanGranuleIdNormalizer> Registrator =
INormalizerComponent::TFactory::TRegistrator<TCleanGranuleIdNormalizer>(ENormalizerSequentialId::CleanGranuleId);
public:
TCleanGranuleIdNormalizer(const TNormalizationController::TInitContext&) {
}

virtual ENormalizerSequentialId GetType() const override {
return ENormalizerSequentialId::CleanGranuleId;
}

virtual TConclusion<std::vector<INormalizerTask::TPtr>> DoInit(const TNormalizationController& controller, NTabletFlatExecutor::TTransactionContext& txc) override;
};

}
1 change: 1 addition & 0 deletions ydb/core/tx/columnshard/normalizer/granule/ya.make
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@ LIBRARY()

SRCS(
GLOBAL normalizer.cpp
GLOBAL clean_granule.cpp
)

PEERDIR(
Expand Down

0 comments on commit 67c0ae8

Please sign in to comment.