diff --git a/ydb/core/tx/columnshard/normalizer/abstract/abstract.h b/ydb/core/tx/columnshard/normalizer/abstract/abstract.h index fa107c7fcc05..76973fad83c7 100644 --- a/ydb/core/tx/columnshard/normalizer/abstract/abstract.h +++ b/ydb/core/tx/columnshard/normalizer/abstract/abstract.h @@ -52,10 +52,11 @@ namespace NKikimr::NOlap { enum class ENormalizerSequentialId : ui32 { Granules = 1, + CleanGranuleId, Chunks, PortionsCleaner, TablesCleaner, - PortionsMetadata, + PortionsMetadata }; class TNormalizationContext { diff --git a/ydb/core/tx/columnshard/normalizer/granule/clean_granule.cpp b/ydb/core/tx/columnshard/normalizer/granule/clean_granule.cpp new file mode 100644 index 000000000000..7b0fc13ebdbb --- /dev/null +++ b/ydb/core/tx/columnshard/normalizer/granule/clean_granule.cpp @@ -0,0 +1,149 @@ +#include "clean_granule.h" + +#include + +namespace NKikimr::NOlap { + +namespace { + + struct TChunkData { + ui64 Index = 0; + ui64 GranuleId = 0; + ui64 PlanStep = 0; + ui64 TxId = 0; + ui64 PortionId = 0; + ui32 Chunk = 0; + ui64 ColumnIdx = 0; + + ui64 XPlanStep = 0; + ui64 XTxId = 0; + TString Blob; + TString Metadata; + ui64 Offset; + ui32 Size; + ui64 PathId; + + template + TChunkData(const TRowSet& rowset) { + using Schema = NColumnShard::Schema; + PlanStep = rowset.GetValue(); + TxId = rowset.GetValue(); + PortionId = rowset.GetValue(); + GranuleId = rowset.GetValue(); + Chunk = rowset.GetValue(); + Index = rowset.GetValue(); + ColumnIdx = rowset.GetValue(); + + XPlanStep = rowset.GetValue(); + XTxId = rowset.GetValue(); + Blob = rowset.GetValue(); + Metadata = rowset.GetValue(); + Offset = rowset.GetValue(); + Size = rowset.GetValue(); + PathId = rowset.GetValue(); + } + }; +} + +class TCleanGranuleIdNormalizer::TNormalizerResult : public INormalizerChanges { +private: + std::vector Chunks; + + void AddChunk(TChunkData&& chunk) { + Chunks.push_back(std::move(chunk)); + } + + TNormalizerResult() = default; + +public: + bool ApplyOnExecute(NTabletFlatExecutor::TTransactionContext& txc, const TNormalizationController& /* normController */) const override { + using Schema = NColumnShard::Schema; + NIceDb::TNiceDb db(txc.DB); + ACFL_INFO("normalizer", "TCleanGranuleIdNormalizer")("message", TStringBuilder() << "apply " << Chunks.size() << " chunks"); + + for (auto&& key : Chunks) { + db.Table().Key(key.Index, key.GranuleId, key.ColumnIdx, + key.PlanStep, key.TxId, key.PortionId, key.Chunk).Delete(); + + db.Table().Key(0, 0, key.ColumnIdx, + key.PlanStep, key.TxId, key.PortionId, key.Chunk).Update( + NIceDb::TUpdate(key.PathId), + NIceDb::TUpdate(key.Blob), + NIceDb::TUpdate(key.Metadata), + NIceDb::TUpdate(key.Offset), + NIceDb::TUpdate(key.Size), + NIceDb::TUpdate(key.XPlanStep), + NIceDb::TUpdate(key.XTxId) + + ); + } + return true; + } + + ui64 GetSize() const override { + return Chunks.size(); + } + + static std::optional> Init(const TNormalizationController& controller, NTabletFlatExecutor::TTransactionContext& txc) { + using namespace NColumnShard; + NIceDb::TNiceDb db(txc.DB); + + bool ready = true; + ready = ready & Schema::Precharge(db, txc.DB.GetScheme()); + if (!ready) { + return std::nullopt; + } + + std::vector tasks; + ui64 fullChunksCount = 0; + { + auto rowset = db.Table().Select(); + if (!rowset.IsReady()) { + return std::nullopt; + } + std::shared_ptr changes(new TNormalizerResult()); + ui64 chunksCount = 0; + + while (!rowset.EndOfSet()) { + if (rowset.GetValue() || rowset.GetValue()) { + TChunkData key(rowset); + + changes->AddChunk(std::move(key)); + ++chunksCount; + ++fullChunksCount; + + if (chunksCount == 10000) { + tasks.emplace_back(changes); + changes.reset(new TNormalizerResult()); + chunksCount = 0; + } + } + + if (!rowset.Next()) { + return std::nullopt; + } + } + + if (chunksCount > 0) { + tasks.emplace_back(changes); + } + } + ACFL_INFO("normalizer", "TGranulesNormalizer")("message", TStringBuilder() << fullChunksCount << " chunks found"); + return tasks; + } + +}; + +TConclusion> TCleanGranuleIdNormalizer::DoInit(const TNormalizationController& controller, NTabletFlatExecutor::TTransactionContext& txc) { + auto changes = TNormalizerResult::Init(controller, txc); + if (!changes) { + return TConclusionStatus::Fail("Not ready");; + } + std::vector tasks; + for (auto&& c : *changes) { + tasks.emplace_back(std::make_shared(c)); + } + return tasks; +} + +} diff --git a/ydb/core/tx/columnshard/normalizer/granule/clean_granule.h b/ydb/core/tx/columnshard/normalizer/granule/clean_granule.h new file mode 100644 index 000000000000..3fc4150a2f45 --- /dev/null +++ b/ydb/core/tx/columnshard/normalizer/granule/clean_granule.h @@ -0,0 +1,25 @@ +#pragma once + +#include +#include + + +namespace NKikimr::NOlap { + +class TCleanGranuleIdNormalizer: public TNormalizationController::INormalizerComponent { + class TNormalizerResult; + + static inline INormalizerComponent::TFactory::TRegistrator Registrator = + INormalizerComponent::TFactory::TRegistrator(ENormalizerSequentialId::CleanGranuleId); +public: + TCleanGranuleIdNormalizer(const TNormalizationController::TInitContext&) { + } + + virtual ENormalizerSequentialId GetType() const override { + return ENormalizerSequentialId::CleanGranuleId; + } + + virtual TConclusion> DoInit(const TNormalizationController& controller, NTabletFlatExecutor::TTransactionContext& txc) override; +}; + +} diff --git a/ydb/core/tx/columnshard/normalizer/granule/ya.make b/ydb/core/tx/columnshard/normalizer/granule/ya.make index d44051621eb3..09f0c56bec71 100644 --- a/ydb/core/tx/columnshard/normalizer/granule/ya.make +++ b/ydb/core/tx/columnshard/normalizer/granule/ya.make @@ -2,6 +2,7 @@ LIBRARY() SRCS( GLOBAL normalizer.cpp + GLOBAL clean_granule.cpp ) PEERDIR(