From 066294409e23b7496b794c1bc5b5730504bcf2e7 Mon Sep 17 00:00:00 2001 From: ivanmorozov333 Date: Mon, 22 Jul 2024 16:50:51 +0300 Subject: [PATCH] separate merger logic for make new one (#6931) --- ydb/core/formats/arrow/common/accessor.h | 2 +- .../merger.cpp} | 2 +- .../changes/compaction/abstract/merger.h | 48 ++++++++++ .../changes/compaction/abstract/ya.make | 11 +++ .../changes/compaction/common/context.cpp | 5 ++ .../{merge_context.h => common/context.h} | 0 .../changes/compaction/common/result.cpp | 10 +++ .../changes/compaction/common/result.h | 25 ++++++ .../engines/changes/compaction/common/ya.make | 12 +++ .../engines/changes/compaction/merger.cpp | 89 ++++++------------- .../compaction/{ => plain}/column_cursor.cpp | 1 - .../compaction/{ => plain}/column_cursor.h | 8 +- .../{ => plain}/column_portion_chunk.cpp | 0 .../{ => plain}/column_portion_chunk.h | 45 ++-------- .../changes/compaction/plain/logic.cpp | 33 +++++++ .../engines/changes/compaction/plain/logic.h | 19 ++++ .../compaction/{ => plain}/merged_column.cpp | 0 .../compaction/{ => plain}/merged_column.h | 5 +- .../engines/changes/compaction/plain/ya.make | 14 +++ .../engines/changes/compaction/ya.make | 7 +- .../engines/portions/portion_info.cpp | 39 ++------ .../engines/portions/portion_info.h | 9 -- .../plain_reader/iterator/fetched_data.h | 4 - .../reader/plain_reader/iterator/fetching.cpp | 2 +- .../reader/plain_reader/iterator/source.cpp | 28 +----- .../tx/columnshard/engines/scheme/ya.make | 1 - 26 files changed, 233 insertions(+), 186 deletions(-) rename ydb/core/tx/columnshard/engines/changes/compaction/{merge_context.cpp => abstract/merger.cpp} (61%) create mode 100644 ydb/core/tx/columnshard/engines/changes/compaction/abstract/merger.h create mode 100644 ydb/core/tx/columnshard/engines/changes/compaction/abstract/ya.make create mode 100644 ydb/core/tx/columnshard/engines/changes/compaction/common/context.cpp rename ydb/core/tx/columnshard/engines/changes/compaction/{merge_context.h => common/context.h} (100%) create mode 100644 ydb/core/tx/columnshard/engines/changes/compaction/common/result.cpp create mode 100644 ydb/core/tx/columnshard/engines/changes/compaction/common/result.h create mode 100644 ydb/core/tx/columnshard/engines/changes/compaction/common/ya.make rename ydb/core/tx/columnshard/engines/changes/compaction/{ => plain}/column_cursor.cpp (94%) rename ydb/core/tx/columnshard/engines/changes/compaction/{ => plain}/column_cursor.h (84%) rename ydb/core/tx/columnshard/engines/changes/compaction/{ => plain}/column_portion_chunk.cpp (100%) rename ydb/core/tx/columnshard/engines/changes/compaction/{ => plain}/column_portion_chunk.h (69%) create mode 100644 ydb/core/tx/columnshard/engines/changes/compaction/plain/logic.cpp create mode 100644 ydb/core/tx/columnshard/engines/changes/compaction/plain/logic.h rename ydb/core/tx/columnshard/engines/changes/compaction/{ => plain}/merged_column.cpp (100%) rename ydb/core/tx/columnshard/engines/changes/compaction/{ => plain}/merged_column.h (85%) create mode 100644 ydb/core/tx/columnshard/engines/changes/compaction/plain/ya.make diff --git a/ydb/core/formats/arrow/common/accessor.h b/ydb/core/formats/arrow/common/accessor.h index 8d10f2ae2e93..6021f47f5a88 100644 --- a/ydb/core/formats/arrow/common/accessor.h +++ b/ydb/core/formats/arrow/common/accessor.h @@ -109,7 +109,7 @@ class IChunkedArray { } idx = nextIdx; } - } else if (position < chunkCurrent->GetStartPosition()) { + } else { AFL_VERIFY(chunkCurrent->GetChunkIndex() > 0); ui64 idx = chunkCurrent->GetStartPosition(); for (i32 i = chunkCurrent->GetChunkIndex() - 1; i >= 0; --i) { diff --git a/ydb/core/tx/columnshard/engines/changes/compaction/merge_context.cpp b/ydb/core/tx/columnshard/engines/changes/compaction/abstract/merger.cpp similarity index 61% rename from ydb/core/tx/columnshard/engines/changes/compaction/merge_context.cpp rename to ydb/core/tx/columnshard/engines/changes/compaction/abstract/merger.cpp index 8280e58eec95..be583661ea38 100644 --- a/ydb/core/tx/columnshard/engines/changes/compaction/merge_context.cpp +++ b/ydb/core/tx/columnshard/engines/changes/compaction/abstract/merger.cpp @@ -1,4 +1,4 @@ -#include "merge_context.h" +#include "merger.h" namespace NKikimr::NOlap::NCompaction { diff --git a/ydb/core/tx/columnshard/engines/changes/compaction/abstract/merger.h b/ydb/core/tx/columnshard/engines/changes/compaction/abstract/merger.h new file mode 100644 index 000000000000..1e93f0c70979 --- /dev/null +++ b/ydb/core/tx/columnshard/engines/changes/compaction/abstract/merger.h @@ -0,0 +1,48 @@ +#pragma once +#include +#include + +namespace NKikimr::NOlap::NCompaction { +class IColumnMerger { +private: + bool Started = false; + + virtual std::vector DoExecute( + const NCompaction::TColumnMergeContext& context, const arrow::UInt16Array& pIdxArray, const arrow::UInt32Array& pRecordIdxArray) = 0; + virtual void DoStart(const std::vector>& input) = 0; + +public: + static inline const TString PortionIdFieldName = "$$__portion_id"; + static inline const TString PortionRecordIndexFieldName = "$$__portion_record_idx"; + static inline const std::shared_ptr PortionIdField = + std::make_shared(PortionIdFieldName, std::make_shared()); + static inline const std::shared_ptr PortionRecordIndexField = + std::make_shared(PortionRecordIndexFieldName, std::make_shared()); + + virtual ~IColumnMerger() = default; + + void Start(const std::vector>& input) { + AFL_VERIFY(!Started); + Started = true; + return DoStart(input); + } + + std::vector Execute( + const NCompaction::TColumnMergeContext& context, const std::shared_ptr& remap) { + + auto columnPortionIdx = remap->GetColumnByName(IColumnMerger::PortionIdFieldName); + auto columnPortionRecordIdx = remap->GetColumnByName(IColumnMerger::PortionRecordIndexFieldName); + Y_ABORT_UNLESS(columnPortionIdx && columnPortionRecordIdx); + Y_ABORT_UNLESS(columnPortionIdx->type_id() == arrow::UInt16Type::type_id); + Y_ABORT_UNLESS(columnPortionRecordIdx->type_id() == arrow::UInt32Type::type_id); + const arrow::UInt16Array& pIdxArray = static_cast(*columnPortionIdx); + const arrow::UInt32Array& pRecordIdxArray = static_cast(*columnPortionRecordIdx); + + AFL_VERIFY(remap->num_rows() == pIdxArray.length()); + AFL_VERIFY(remap->num_rows() == pRecordIdxArray.length()); + + return DoExecute(context, pIdxArray, pRecordIdxArray); + } +}; + +} // namespace NKikimr::NOlap::NCompaction diff --git a/ydb/core/tx/columnshard/engines/changes/compaction/abstract/ya.make b/ydb/core/tx/columnshard/engines/changes/compaction/abstract/ya.make new file mode 100644 index 000000000000..07be3f70eb68 --- /dev/null +++ b/ydb/core/tx/columnshard/engines/changes/compaction/abstract/ya.make @@ -0,0 +1,11 @@ +LIBRARY() + +SRCS( + merger.cpp +) + +PEERDIR( + ydb/core/tx/columnshard/engines/changes/compaction/common +) + +END() diff --git a/ydb/core/tx/columnshard/engines/changes/compaction/common/context.cpp b/ydb/core/tx/columnshard/engines/changes/compaction/common/context.cpp new file mode 100644 index 000000000000..35fbf111c993 --- /dev/null +++ b/ydb/core/tx/columnshard/engines/changes/compaction/common/context.cpp @@ -0,0 +1,5 @@ +#include "context.h" + +namespace NKikimr::NOlap::NCompaction { + +} diff --git a/ydb/core/tx/columnshard/engines/changes/compaction/merge_context.h b/ydb/core/tx/columnshard/engines/changes/compaction/common/context.h similarity index 100% rename from ydb/core/tx/columnshard/engines/changes/compaction/merge_context.h rename to ydb/core/tx/columnshard/engines/changes/compaction/common/context.h diff --git a/ydb/core/tx/columnshard/engines/changes/compaction/common/result.cpp b/ydb/core/tx/columnshard/engines/changes/compaction/common/result.cpp new file mode 100644 index 000000000000..6482ee301543 --- /dev/null +++ b/ydb/core/tx/columnshard/engines/changes/compaction/common/result.cpp @@ -0,0 +1,10 @@ +#include "result.h" +#include + +namespace NKikimr::NOlap::NCompaction { + +TString TColumnPortionResult::DebugString() const { + return TStringBuilder() << "chunks=" << Chunks.size() << ";"; +} + +} diff --git a/ydb/core/tx/columnshard/engines/changes/compaction/common/result.h b/ydb/core/tx/columnshard/engines/changes/compaction/common/result.h new file mode 100644 index 000000000000..850e1f6eebe0 --- /dev/null +++ b/ydb/core/tx/columnshard/engines/changes/compaction/common/result.h @@ -0,0 +1,25 @@ +#pragma once +#include + +namespace NKikimr::NOlap::NCompaction { + +class TColumnPortionResult { +protected: + std::vector> Chunks; + const ui32 ColumnId; +public: + + TColumnPortionResult(const ui32 columnId) + : ColumnId(columnId) { + + } + + const std::vector>& GetChunks() const { + return Chunks; + } + + TString DebugString() const; + +}; + +} diff --git a/ydb/core/tx/columnshard/engines/changes/compaction/common/ya.make b/ydb/core/tx/columnshard/engines/changes/compaction/common/ya.make new file mode 100644 index 000000000000..30667909c931 --- /dev/null +++ b/ydb/core/tx/columnshard/engines/changes/compaction/common/ya.make @@ -0,0 +1,12 @@ +LIBRARY() + +SRCS( + context.cpp + result.cpp +) + +PEERDIR( + ydb/core/tx/columnshard/engines/scheme +) + +END() diff --git a/ydb/core/tx/columnshard/engines/changes/compaction/merger.cpp b/ydb/core/tx/columnshard/engines/changes/compaction/merger.cpp index 0b2e93b16d27..1f5504317aa2 100644 --- a/ydb/core/tx/columnshard/engines/changes/compaction/merger.cpp +++ b/ydb/core/tx/columnshard/engines/changes/compaction/merger.cpp @@ -1,34 +1,25 @@ #include "merger.h" -#include "column_cursor.h" -#include "column_portion_chunk.h" -#include "merge_context.h" -#include "merged_column.h" +#include "abstract/merger.h" +#include "plain/logic.h" #include +#include #include #include -#include #include namespace NKikimr::NOlap::NCompaction { std::vector TMerger::Execute(const std::shared_ptr& stats, - const NArrow::NMerger::TIntervalPositions& checkPoints, const std::shared_ptr& resultFiltered, - const ui64 pathId, const std::optional shardingActualVersion) { + const NArrow::NMerger::TIntervalPositions& checkPoints, const std::shared_ptr& resultFiltered, const ui64 pathId, + const std::optional shardingActualVersion) { AFL_VERIFY(Batches.size() == Filters.size()); - static const TString portionIdFieldName = "$$__portion_id"; - static const TString portionRecordIndexFieldName = "$$__portion_record_idx"; - static const std::shared_ptr portionIdField = - std::make_shared(portionIdFieldName, std::make_shared()); - static const std::shared_ptr portionRecordIndexField = - std::make_shared(portionRecordIndexFieldName, std::make_shared()); - std::vector> batchResults; { arrow::FieldVector indexFields; - indexFields.emplace_back(portionIdField); - indexFields.emplace_back(portionRecordIndexField); + indexFields.emplace_back(IColumnMerger::PortionIdField); + indexFields.emplace_back(IColumnMerger::PortionRecordIndexField); IIndexInfo::AddSpecialFields(indexFields); auto dataSchema = std::make_shared(indexFields); NArrow::NMerger::TMergePartialStream mergeStream( @@ -39,14 +30,14 @@ std::vector TMerger::Execute(c { NArrow::NConstruction::IArrayBuilder::TPtr column = std::make_shared>>( - portionIdFieldName, idx); - batch->AddField(portionIdField, column->BuildArray(batch->num_rows())).Validate(); + IColumnMerger::PortionIdFieldName, idx); + batch->AddField(IColumnMerger::PortionIdField, column->BuildArray(batch->num_rows())).Validate(); } { NArrow::NConstruction::IArrayBuilder::TPtr column = std::make_shared>>( - portionRecordIndexFieldName); - batch->AddField(portionRecordIndexField, column->BuildArray(batch->num_rows())).Validate(); + IColumnMerger::PortionRecordIndexFieldName); + batch->AddField(IColumnMerger::PortionRecordIndexField, column->BuildArray(batch->num_rows())).Validate(); } mergeStream.AddSource(batch, Filters[idx]); ++idx; @@ -54,25 +45,25 @@ std::vector TMerger::Execute(c batchResults = mergeStream.DrainAllParts(checkPoints, indexFields); } - std::vector>> chunkGroups; + std::vector>> chunkGroups; chunkGroups.resize(batchResults.size()); for (auto&& columnId : resultFiltered->GetColumnIds()) { NActors::TLogContextGuard logGuard( NActors::TLogContextBuilder::Build()("field_name", resultFiltered->GetIndexInfo().GetColumnName(columnId))); auto columnInfo = stats->GetColumnInfo(columnId); auto resultField = resultFiltered->GetIndexInfo().GetColumnFieldVerified(columnId); + std::shared_ptr merger = std::make_shared(); + // resultFiltered->BuildColumnMergerVerified(columnId); - std::vector cursors; { - ui32 idx = 0; + std::vector> parts; for (auto&& p : Batches) { - cursors.emplace_back(NCompaction::TPortionColumnCursor(p->GetColumnVerified(resultFiltered->GetFieldIndex(columnId)), idx)); - ++idx; + parts.emplace_back(p->GetColumnVerified(resultFiltered->GetFieldIndex(columnId))); } + + merger->Start(parts); } - ui32 batchesRecordsCount = 0; - ui32 columnRecordsCount = 0; std::map> columnChunks; ui32 batchIdx = 0; for (auto&& batchResult : batchResults) { @@ -92,42 +83,10 @@ std::vector TMerger::Execute(c NCompaction::TColumnMergeContext context(columnId, resultFiltered, portionRecordsCountLimit, NSplitter::TSplitSettings().GetExpectedUnpackColumnChunkRawSize(), columnInfo, externalSaver); - NCompaction::TMergedColumn mColumn(context); - - auto columnPortionIdx = batchResult->GetColumnByName(portionIdFieldName); - auto columnPortionRecordIdx = batchResult->GetColumnByName(portionRecordIndexFieldName); - auto columnSnapshotPlanStepIdx = batchResult->GetColumnByName(TIndexInfo::SPEC_COL_PLAN_STEP); - auto columnSnapshotTxIdx = batchResult->GetColumnByName(TIndexInfo::SPEC_COL_TX_ID); - Y_ABORT_UNLESS(columnPortionIdx && columnPortionRecordIdx && columnSnapshotPlanStepIdx && columnSnapshotTxIdx); - Y_ABORT_UNLESS(columnPortionIdx->type_id() == arrow::UInt16Type::type_id); - Y_ABORT_UNLESS(columnPortionRecordIdx->type_id() == arrow::UInt32Type::type_id); - Y_ABORT_UNLESS(columnSnapshotPlanStepIdx->type_id() == arrow::UInt64Type::type_id); - Y_ABORT_UNLESS(columnSnapshotTxIdx->type_id() == arrow::UInt64Type::type_id); - const arrow::UInt16Array& pIdxArray = static_cast(*columnPortionIdx); - const arrow::UInt32Array& pRecordIdxArray = static_cast(*columnPortionRecordIdx); - - AFL_VERIFY(batchResult->num_rows() == pIdxArray.length()); - std::optional predPortionIdx; - for (ui32 idx = 0; idx < pIdxArray.length(); ++idx) { - const ui16 portionIdx = pIdxArray.Value(idx); - const ui32 portionRecordIdx = pRecordIdxArray.Value(idx); - auto& cursor = cursors[portionIdx]; - cursor.Next(portionRecordIdx, mColumn); - if (predPortionIdx && portionIdx != *predPortionIdx) { - cursors[*predPortionIdx].Fetch(mColumn); - } - if (idx + 1 == pIdxArray.length()) { - cursor.Fetch(mColumn); - } - predPortionIdx = portionIdx; - } - chunkGroups[batchIdx][columnId] = mColumn.BuildResult(); - batchesRecordsCount += batchResult->num_rows(); - columnRecordsCount += mColumn.GetRecordsCount(); - AFL_VERIFY(batchResult->num_rows() == mColumn.GetRecordsCount()); + + chunkGroups[batchIdx][columnId] = merger->Execute(context, batchResult); ++batchIdx; } - AFL_VERIFY(columnRecordsCount == batchesRecordsCount)("mCount", columnRecordsCount)("bCount", batchesRecordsCount); } ui32 batchIdx = 0; @@ -149,6 +108,12 @@ std::vector TMerger::Execute(c AFL_VERIFY(i.second.size() == columnChunks.begin()->second.size())("first", columnChunks.begin()->second.size())( "current", i.second.size())("first_name", columnChunks.begin()->first)("current_name", i.first); } + auto columnSnapshotPlanStepIdx = batchResult->GetColumnByName(TIndexInfo::SPEC_COL_PLAN_STEP); + auto columnSnapshotTxIdx = batchResult->GetColumnByName(TIndexInfo::SPEC_COL_TX_ID); + Y_ABORT_UNLESS(columnSnapshotPlanStepIdx); + Y_ABORT_UNLESS(columnSnapshotTxIdx); + Y_ABORT_UNLESS(columnSnapshotPlanStepIdx->type_id() == arrow::UInt64Type::type_id); + Y_ABORT_UNLESS(columnSnapshotTxIdx->type_id() == arrow::UInt64Type::type_id); std::vector batchSlices; std::shared_ptr schemaDetails(new TDefaultSchemaDetails(resultFiltered, stats)); @@ -191,4 +156,4 @@ std::vector TMerger::Execute(c return result; } -} +} // namespace NKikimr::NOlap::NCompaction diff --git a/ydb/core/tx/columnshard/engines/changes/compaction/column_cursor.cpp b/ydb/core/tx/columnshard/engines/changes/compaction/plain/column_cursor.cpp similarity index 94% rename from ydb/core/tx/columnshard/engines/changes/compaction/column_cursor.cpp rename to ydb/core/tx/columnshard/engines/changes/compaction/plain/column_cursor.cpp index 270336fd5e66..65412522c879 100644 --- a/ydb/core/tx/columnshard/engines/changes/compaction/column_cursor.cpp +++ b/ydb/core/tx/columnshard/engines/changes/compaction/plain/column_cursor.cpp @@ -5,7 +5,6 @@ namespace NKikimr::NOlap::NCompaction { bool TPortionColumnCursor::Fetch(TMergedColumn& column) { Y_ABORT_UNLESS(RecordIndexStart); -// NActors::TLogContextGuard lg(NActors::TLogContextBuilder::Build()("portion_id", PortionId)); if (CurrentChunk && CurrentChunk->GetStartPosition() <= *RecordIndexStart && *RecordIndexStart < CurrentChunk->GetFinishPosition()) { } else { diff --git a/ydb/core/tx/columnshard/engines/changes/compaction/column_cursor.h b/ydb/core/tx/columnshard/engines/changes/compaction/plain/column_cursor.h similarity index 84% rename from ydb/core/tx/columnshard/engines/changes/compaction/column_cursor.h rename to ydb/core/tx/columnshard/engines/changes/compaction/plain/column_cursor.h index 3274201f229a..0e54ade2b372 100644 --- a/ydb/core/tx/columnshard/engines/changes/compaction/column_cursor.h +++ b/ydb/core/tx/columnshard/engines/changes/compaction/plain/column_cursor.h @@ -13,8 +13,6 @@ class TPortionColumnCursor { std::shared_ptr BlobChunks; std::optional RecordIndexStart; YDB_READONLY(ui32, RecordIndexFinish, 0); - const ui64 PortionId; - public: ~TPortionColumnCursor() { AFL_VERIFY(!RecordIndexStart)("start", RecordIndexStart)("finish", RecordIndexFinish); @@ -24,10 +22,8 @@ class TPortionColumnCursor { bool Fetch(TMergedColumn& column); - TPortionColumnCursor(const std::shared_ptr& columnChunks, const ui64 portionId) - : BlobChunks(columnChunks) - , PortionId(portionId) { - Y_UNUSED(PortionId); + TPortionColumnCursor(const std::shared_ptr& columnChunks) + : BlobChunks(columnChunks) { } }; diff --git a/ydb/core/tx/columnshard/engines/changes/compaction/column_portion_chunk.cpp b/ydb/core/tx/columnshard/engines/changes/compaction/plain/column_portion_chunk.cpp similarity index 100% rename from ydb/core/tx/columnshard/engines/changes/compaction/column_portion_chunk.cpp rename to ydb/core/tx/columnshard/engines/changes/compaction/plain/column_portion_chunk.cpp diff --git a/ydb/core/tx/columnshard/engines/changes/compaction/column_portion_chunk.h b/ydb/core/tx/columnshard/engines/changes/compaction/plain/column_portion_chunk.h similarity index 69% rename from ydb/core/tx/columnshard/engines/changes/compaction/column_portion_chunk.h rename to ydb/core/tx/columnshard/engines/changes/compaction/plain/column_portion_chunk.h index d176781bbf5c..98fe703f7e1a 100644 --- a/ydb/core/tx/columnshard/engines/changes/compaction/column_portion_chunk.h +++ b/ydb/core/tx/columnshard/engines/changes/compaction/plain/column_portion_chunk.h @@ -1,45 +1,16 @@ #pragma once -#include "merge_context.h" #include -#include +#include +#include +#include #include #include #include -#include #include +#include namespace NKikimr::NOlap::NCompaction { -class TColumnPortionResult { -protected: - std::vector> Chunks; - ui64 CurrentPortionRecords = 0; - const ui32 ColumnId; - ui64 PackedSize = 0; -public: - ui64 GetPackedSize() const { - return PackedSize; - } - - TColumnPortionResult(const ui32 columnId) - : ColumnId(columnId) { - - } - - const std::vector>& GetChunks() const { - return Chunks; - } - - ui64 GetCurrentPortionRecords() const { - return CurrentPortionRecords; - } - - TString DebugString() const { - return TStringBuilder() << "chunks=" << Chunks.size() << ";records=" << CurrentPortionRecords << ";"; - } - -}; - class TColumnPortion: public TColumnPortionResult { private: using TBase = TColumnPortionResult; @@ -49,12 +20,14 @@ class TColumnPortion: public TColumnPortionResult { YDB_READONLY(ui64, CurrentChunkRawSize, 0); double PredictedPackedBytes = 0; const TSimpleColumnInfo ColumnInfo; + ui64 PackedSize = 0; + ui64 CurrentPortionRecords = 0; + public: TColumnPortion(const TColumnMergeContext& context) : TBase(context.GetColumnId()) , Context(context) - , ColumnInfo(Context.GetIndexInfo().GetColumnFeaturesVerified(context.GetColumnId())) - { + , ColumnInfo(Context.GetIndexInfo().GetColumnFeaturesVerified(context.GetColumnId())) { Builder = Context.MakeBuilder(); Type = Builder->type(); } @@ -70,4 +43,4 @@ class TColumnPortion: public TColumnPortionResult { ui32 AppendSlice(const std::shared_ptr& a, const ui32 startIndex, const ui32 length); }; -} +} // namespace NKikimr::NOlap::NCompaction diff --git a/ydb/core/tx/columnshard/engines/changes/compaction/plain/logic.cpp b/ydb/core/tx/columnshard/engines/changes/compaction/plain/logic.cpp new file mode 100644 index 000000000000..ac8cb351c572 --- /dev/null +++ b/ydb/core/tx/columnshard/engines/changes/compaction/plain/logic.cpp @@ -0,0 +1,33 @@ +#include "logic.h" + +namespace NKikimr::NOlap::NCompaction { + +void TPlainMerger::DoStart(const std::vector>& input) { + for (auto&& p : input) { + Cursors.emplace_back(NCompaction::TPortionColumnCursor(p)); + } +} + +std::vector TPlainMerger::DoExecute( + const NCompaction::TColumnMergeContext& context, const arrow::UInt16Array& pIdxArray, const arrow::UInt32Array& pRecordIdxArray) { + NCompaction::TMergedColumn mColumn(context); + + std::optional predPortionIdx; + for (ui32 idx = 0; idx < pIdxArray.length(); ++idx) { + const ui16 portionIdx = pIdxArray.Value(idx); + const ui32 portionRecordIdx = pRecordIdxArray.Value(idx); + auto& cursor = Cursors[portionIdx]; + cursor.Next(portionRecordIdx, mColumn); + if (predPortionIdx && portionIdx != *predPortionIdx) { + Cursors[*predPortionIdx].Fetch(mColumn); + } + if (idx + 1 == pIdxArray.length()) { + cursor.Fetch(mColumn); + } + predPortionIdx = portionIdx; + } + AFL_VERIFY(pIdxArray.length() == mColumn.GetRecordsCount()); + return mColumn.BuildResult(); +} + +} // namespace NKikimr::NOlap::NCompaction diff --git a/ydb/core/tx/columnshard/engines/changes/compaction/plain/logic.h b/ydb/core/tx/columnshard/engines/changes/compaction/plain/logic.h new file mode 100644 index 000000000000..995cd1c33a72 --- /dev/null +++ b/ydb/core/tx/columnshard/engines/changes/compaction/plain/logic.h @@ -0,0 +1,19 @@ +#pragma once +#include "column_cursor.h" + +#include +#include + +namespace NKikimr::NOlap::NCompaction { +class TPlainMerger: public IColumnMerger { +private: + std::vector Cursors; + virtual void DoStart(const std::vector>& input) override; + + virtual std::vector DoExecute(const NCompaction::TColumnMergeContext& context, const arrow::UInt16Array& pIdxArray, + const arrow::UInt32Array& pRecordIdxArray) override; + +public: +}; + +} // namespace NKikimr::NOlap::NCompaction diff --git a/ydb/core/tx/columnshard/engines/changes/compaction/merged_column.cpp b/ydb/core/tx/columnshard/engines/changes/compaction/plain/merged_column.cpp similarity index 100% rename from ydb/core/tx/columnshard/engines/changes/compaction/merged_column.cpp rename to ydb/core/tx/columnshard/engines/changes/compaction/plain/merged_column.cpp diff --git a/ydb/core/tx/columnshard/engines/changes/compaction/merged_column.h b/ydb/core/tx/columnshard/engines/changes/compaction/plain/merged_column.h similarity index 85% rename from ydb/core/tx/columnshard/engines/changes/compaction/merged_column.h rename to ydb/core/tx/columnshard/engines/changes/compaction/plain/merged_column.h index f0a90bdd9b44..9dee31b84215 100644 --- a/ydb/core/tx/columnshard/engines/changes/compaction/merged_column.h +++ b/ydb/core/tx/columnshard/engines/changes/compaction/plain/merged_column.h @@ -1,6 +1,7 @@ #pragma once #include "column_portion_chunk.h" -#include "merge_context.h" + +#include #include namespace NKikimr::NOlap::NCompaction { @@ -25,4 +26,4 @@ class TMergedColumn { std::vector BuildResult(); }; -} +} // namespace NKikimr::NOlap::NCompaction diff --git a/ydb/core/tx/columnshard/engines/changes/compaction/plain/ya.make b/ydb/core/tx/columnshard/engines/changes/compaction/plain/ya.make new file mode 100644 index 000000000000..64de6caea075 --- /dev/null +++ b/ydb/core/tx/columnshard/engines/changes/compaction/plain/ya.make @@ -0,0 +1,14 @@ +LIBRARY() + +SRCS( + column_cursor.cpp + column_portion_chunk.cpp + merged_column.cpp + logic.cpp +) + +PEERDIR( + ydb/core/tx/columnshard/engines/changes/compaction/common +) + +END() diff --git a/ydb/core/tx/columnshard/engines/changes/compaction/ya.make b/ydb/core/tx/columnshard/engines/changes/compaction/ya.make index 9d7a9ba90842..c6a7bc101f9a 100644 --- a/ydb/core/tx/columnshard/engines/changes/compaction/ya.make +++ b/ydb/core/tx/columnshard/engines/changes/compaction/ya.make @@ -1,15 +1,14 @@ LIBRARY() SRCS( - merge_context.cpp - column_cursor.cpp - column_portion_chunk.cpp - merged_column.cpp merger.cpp ) PEERDIR( ydb/core/tx/tiering + ydb/core/tx/columnshard/engines/changes/compaction/abstract + ydb/core/tx/columnshard/engines/changes/compaction/plain + ydb/core/tx/columnshard/engines/changes/compaction/common ) END() diff --git a/ydb/core/tx/columnshard/engines/portions/portion_info.cpp b/ydb/core/tx/columnshard/engines/portions/portion_info.cpp index ab480d5479af..b89b63e97b5c 100644 --- a/ydb/core/tx/columnshard/engines/portions/portion_info.cpp +++ b/ydb/core/tx/columnshard/engines/portions/portion_info.cpp @@ -723,16 +723,16 @@ std::shared_ptr TPortionInfo::TPreparedColumn::Assembl std::shared_ptr TPortionInfo::TPreparedColumn::Assemble() const { Y_ABORT_UNLESS(!Blobs.empty()); - std::vector> batches; - batches.reserve(Blobs.size()); + std::vector> chunks; + chunks.reserve(Blobs.size()); for (auto& blob : Blobs) { - batches.push_back(blob.BuildRecordBatch(*Loader)); - Y_ABORT_UNLESS(batches.back()); + auto batch = blob.BuildRecordBatch(*Loader); + Y_ABORT_UNLESS(batch); + Y_ABORT_UNLESS(batch->num_columns() == 1); + chunks.emplace_back(batch->column(0)); } - auto res = arrow::Table::FromRecordBatches(batches); - Y_VERIFY_S(res.ok(), res.status().message()); - return (*res)->column(0); + return NArrow::TStatusValidator::GetValid(arrow::ChunkedArray::Make(chunks)); } TDeserializeChunkedArray::TChunk TPortionInfo::TAssembleBlobInfo::BuildDeserializeChunk(const std::shared_ptr& loader) const { @@ -793,29 +793,4 @@ std::shared_ptr TPortionInfo::TPreparedBatchData::Ass return std::make_shared(fields, std::move(columns)); } -std::shared_ptr TPortionInfo::TPreparedBatchData::AssembleTable(const TAssembleOptions& options) const { - std::vector> columns; - std::vector> fields; - for (auto&& i : Columns) { - if (!options.IsAcceptedColumn(i.GetColumnId())) { - continue; - } - std::shared_ptr scalar; - if (options.IsConstantColumn(i.GetColumnId(), scalar)) { - auto type = i.GetField()->type(); - std::shared_ptr arr = NArrow::TThreadSimpleArraysCache::Get(type, scalar, RowsCount); - columns.emplace_back(std::make_shared(arr)); - } else { - columns.emplace_back(i.Assemble()); - } - fields.emplace_back(i.GetField()); - } - - return arrow::Table::Make(std::make_shared(fields), columns); -} - -std::shared_ptr TPortionInfo::TPreparedBatchData::Assemble(const TAssembleOptions& options) const { - return NArrow::ToBatch(AssembleTable(options), true); -} - } diff --git a/ydb/core/tx/columnshard/engines/portions/portion_info.h b/ydb/core/tx/columnshard/engines/portions/portion_info.h index ad1b661dea12..b7b89c2f187d 100644 --- a/ydb/core/tx/columnshard/engines/portions/portion_info.h +++ b/ydb/core/tx/columnshard/engines/portions/portion_info.h @@ -751,9 +751,7 @@ class TPortionInfo { , RowsCount(rowsCount) { } - std::shared_ptr Assemble(const TAssembleOptions& options = {}) const; std::shared_ptr AssembleToGeneralContainer(const std::set& sequentialColumnIds) const; - std::shared_ptr AssembleTable(const TAssembleOptions& options = {}) const; std::shared_ptr AssembleForSeqAccess() const; }; @@ -804,13 +802,6 @@ class TPortionInfo { TPreparedBatchData PrepareForAssemble(const ISnapshotSchema& dataSchema, const ISnapshotSchema& resultSchema, THashMap& blobsData) const; TPreparedBatchData PrepareForAssemble(const ISnapshotSchema& dataSchema, const ISnapshotSchema& resultSchema, THashMap& blobsData) const; - std::shared_ptr AssembleInBatch(const ISnapshotSchema& dataSchema, const ISnapshotSchema& resultSchema, - THashMap& data) const { - auto batch = PrepareForAssemble(dataSchema, resultSchema, data).Assemble(); - Y_ABORT_UNLESS(batch->Validate().ok()); - return batch; - } - friend IOutputStream& operator << (IOutputStream& out, const TPortionInfo& info) { out << info.DebugString(); return out; diff --git a/ydb/core/tx/columnshard/engines/reader/plain_reader/iterator/fetched_data.h b/ydb/core/tx/columnshard/engines/reader/plain_reader/iterator/fetched_data.h index f7e27ea87d8c..adde885f1468 100644 --- a/ydb/core/tx/columnshard/engines/reader/plain_reader/iterator/fetched_data.h +++ b/ydb/core/tx/columnshard/engines/reader/plain_reader/iterator/fetched_data.h @@ -80,10 +80,6 @@ class TFetchedData { } } - void AddBatch(const std::shared_ptr& batch) { - return AddBatch(arrow::Table::Make(batch->schema(), batch->columns(), batch->num_rows())); - } - void AddBatch(const std::shared_ptr& table) { AFL_VERIFY(table); if (UseFilter) { diff --git a/ydb/core/tx/columnshard/engines/reader/plain_reader/iterator/fetching.cpp b/ydb/core/tx/columnshard/engines/reader/plain_reader/iterator/fetching.cpp index 3e83e9d88e0f..7ce1bc9b6595 100644 --- a/ydb/core/tx/columnshard/engines/reader/plain_reader/iterator/fetching.cpp +++ b/ydb/core/tx/columnshard/engines/reader/plain_reader/iterator/fetching.cpp @@ -131,7 +131,7 @@ TConclusion TBuildFakeSpec::DoExecuteInplace(const std::shared_ptrfields()) { columns.emplace_back(NArrow::TThreadSimpleArraysCache::GetConst(f->type(), NArrow::DefaultScalar(f->type()), Count)); } - source->MutableStageData().AddBatch(arrow::RecordBatch::Make(TIndexInfo::ArrowSchemaSnapshot(), Count, columns)); + source->MutableStageData().AddBatch(std::make_shared(arrow::RecordBatch::Make(TIndexInfo::ArrowSchemaSnapshot(), Count, columns))); return true; } diff --git a/ydb/core/tx/columnshard/engines/reader/plain_reader/iterator/source.cpp b/ydb/core/tx/columnshard/engines/reader/plain_reader/iterator/source.cpp index fc0509f8c526..b5927acc8d1a 100644 --- a/ydb/core/tx/columnshard/engines/reader/plain_reader/iterator/source.cpp +++ b/ydb/core/tx/columnshard/engines/reader/plain_reader/iterator/source.cpp @@ -187,32 +187,8 @@ void TPortionDataSource::DoApplyIndex(const NIndexes::TIndexCheckerContainer& in void TPortionDataSource::DoAssembleColumns(const std::shared_ptr& columns) { auto blobSchema = GetContext()->GetReadMetadata()->GetLoadSchemaVerified(*Portion); - if (SequentialEntityIds.empty()) { - MutableStageData().AddBatch(Portion->PrepareForAssemble(*blobSchema, columns->GetFilteredSchemaVerified(), MutableStageData().MutableBlobs()).AssembleTable()); - } else { - { - auto inMemColumns = columns->GetColumnIds(); - for (auto&& i : SequentialEntityIds) { - inMemColumns.erase(i); - } - if (inMemColumns.size()) { - auto filteredSchema = std::make_shared(columns->GetFilteredSchemaPtrVerified(), inMemColumns); - MutableStageData().AddBatch(Portion->PrepareForAssemble(*blobSchema, *filteredSchema, MutableStageData().MutableBlobs()).AssembleTable()); - } - } - { - std::set scanColumns; - for (auto&& i : columns->GetColumnIds()) { - if (SequentialEntityIds.contains(i)) { - scanColumns.emplace(i); - } - } - if (scanColumns.size()) { - auto filteredSchema = std::make_shared(columns->GetFilteredSchemaPtrVerified(), scanColumns); - MutableStageData().AddBatch(Portion->PrepareForAssemble(*blobSchema, *filteredSchema, MutableStageData().MutableBlobs()).AssembleForSeqAccess()); - } - } - } + MutableStageData().AddBatch(Portion->PrepareForAssemble(*blobSchema, columns->GetFilteredSchemaVerified(), MutableStageData().MutableBlobs()) + .AssembleToGeneralContainer(SequentialEntityIds)); } bool TCommittedDataSource::DoStartFetchingColumns(const std::shared_ptr& sourcePtr, const TFetchingScriptCursor& step, const std::shared_ptr& /*columns*/) { diff --git a/ydb/core/tx/columnshard/engines/scheme/ya.make b/ydb/core/tx/columnshard/engines/scheme/ya.make index e3d52b2649bd..8e41573bf419 100644 --- a/ydb/core/tx/columnshard/engines/scheme/ya.make +++ b/ydb/core/tx/columnshard/engines/scheme/ya.make @@ -21,7 +21,6 @@ PEERDIR( ydb/core/tx/columnshard/engines/scheme/column ydb/core/tx/columnshard/engines/scheme/defaults ydb/core/tx/columnshard/blobs_action/abstract - ydb/core/tx/columnshard/engines/changes/compaction ) YQL_LAST_ABI_VERSION()