Skip to content

Commit

Permalink
separate merger logic for make new one (#6931)
Browse files Browse the repository at this point in the history
  • Loading branch information
ivanmorozov333 authored Jul 22, 2024
1 parent c921a59 commit 0662944
Show file tree
Hide file tree
Showing 26 changed files with 233 additions and 186 deletions.
2 changes: 1 addition & 1 deletion ydb/core/formats/arrow/common/accessor.h
Original file line number Diff line number Diff line change
Expand Up @@ -109,7 +109,7 @@ class IChunkedArray {
}
idx = nextIdx;
}
} else if (position < chunkCurrent->GetStartPosition()) {
} else {
AFL_VERIFY(chunkCurrent->GetChunkIndex() > 0);
ui64 idx = chunkCurrent->GetStartPosition();
for (i32 i = chunkCurrent->GetChunkIndex() - 1; i >= 0; --i) {
Expand Down
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
#include "merge_context.h"
#include "merger.h"

namespace NKikimr::NOlap::NCompaction {

Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,48 @@
#pragma once
#include <ydb/core/tx/columnshard/engines/changes/compaction/common/result.h>
#include <ydb/core/tx/columnshard/engines/changes/compaction/common/context.h>

namespace NKikimr::NOlap::NCompaction {
class IColumnMerger {
private:
bool Started = false;

virtual std::vector<TColumnPortionResult> DoExecute(
const NCompaction::TColumnMergeContext& context, const arrow::UInt16Array& pIdxArray, const arrow::UInt32Array& pRecordIdxArray) = 0;
virtual void DoStart(const std::vector<std::shared_ptr<NArrow::NAccessor::IChunkedArray>>& input) = 0;

public:
static inline const TString PortionIdFieldName = "$$__portion_id";
static inline const TString PortionRecordIndexFieldName = "$$__portion_record_idx";
static inline const std::shared_ptr<arrow::Field> PortionIdField =
std::make_shared<arrow::Field>(PortionIdFieldName, std::make_shared<arrow::UInt16Type>());
static inline const std::shared_ptr<arrow::Field> PortionRecordIndexField =
std::make_shared<arrow::Field>(PortionRecordIndexFieldName, std::make_shared<arrow::UInt32Type>());

virtual ~IColumnMerger() = default;

void Start(const std::vector<std::shared_ptr<NArrow::NAccessor::IChunkedArray>>& input) {
AFL_VERIFY(!Started);
Started = true;
return DoStart(input);
}

std::vector<TColumnPortionResult> Execute(
const NCompaction::TColumnMergeContext& context, const std::shared_ptr<arrow::RecordBatch>& remap) {

auto columnPortionIdx = remap->GetColumnByName(IColumnMerger::PortionIdFieldName);
auto columnPortionRecordIdx = remap->GetColumnByName(IColumnMerger::PortionRecordIndexFieldName);
Y_ABORT_UNLESS(columnPortionIdx && columnPortionRecordIdx);
Y_ABORT_UNLESS(columnPortionIdx->type_id() == arrow::UInt16Type::type_id);
Y_ABORT_UNLESS(columnPortionRecordIdx->type_id() == arrow::UInt32Type::type_id);
const arrow::UInt16Array& pIdxArray = static_cast<const arrow::UInt16Array&>(*columnPortionIdx);
const arrow::UInt32Array& pRecordIdxArray = static_cast<const arrow::UInt32Array&>(*columnPortionRecordIdx);

AFL_VERIFY(remap->num_rows() == pIdxArray.length());
AFL_VERIFY(remap->num_rows() == pRecordIdxArray.length());

return DoExecute(context, pIdxArray, pRecordIdxArray);
}
};

} // namespace NKikimr::NOlap::NCompaction
Original file line number Diff line number Diff line change
@@ -0,0 +1,11 @@
LIBRARY()

SRCS(
merger.cpp
)

PEERDIR(
ydb/core/tx/columnshard/engines/changes/compaction/common
)

END()
Original file line number Diff line number Diff line change
@@ -0,0 +1,5 @@
#include "context.h"

namespace NKikimr::NOlap::NCompaction {

}
Original file line number Diff line number Diff line change
@@ -0,0 +1,10 @@
#include "result.h"
#include <util/string/builder.h>

namespace NKikimr::NOlap::NCompaction {

TString TColumnPortionResult::DebugString() const {
return TStringBuilder() << "chunks=" << Chunks.size() << ";";
}

}
25 changes: 25 additions & 0 deletions ydb/core/tx/columnshard/engines/changes/compaction/common/result.h
Original file line number Diff line number Diff line change
@@ -0,0 +1,25 @@
#pragma once
#include <ydb/core/tx/columnshard/splitter/abstract/chunks.h>

namespace NKikimr::NOlap::NCompaction {

class TColumnPortionResult {
protected:
std::vector<std::shared_ptr<IPortionDataChunk>> Chunks;
const ui32 ColumnId;
public:

TColumnPortionResult(const ui32 columnId)
: ColumnId(columnId) {

}

const std::vector<std::shared_ptr<IPortionDataChunk>>& GetChunks() const {
return Chunks;
}

TString DebugString() const;

};

}
12 changes: 12 additions & 0 deletions ydb/core/tx/columnshard/engines/changes/compaction/common/ya.make
Original file line number Diff line number Diff line change
@@ -0,0 +1,12 @@
LIBRARY()

SRCS(
context.cpp
result.cpp
)

PEERDIR(
ydb/core/tx/columnshard/engines/scheme
)

END()
89 changes: 27 additions & 62 deletions ydb/core/tx/columnshard/engines/changes/compaction/merger.cpp
Original file line number Diff line number Diff line change
@@ -1,34 +1,25 @@
#include "merger.h"

#include "column_cursor.h"
#include "column_portion_chunk.h"
#include "merge_context.h"
#include "merged_column.h"
#include "abstract/merger.h"
#include "plain/logic.h"

#include <ydb/core/formats/arrow/reader/merger.h>
#include <ydb/core/formats/arrow/serializer/native.h>
#include <ydb/core/formats/arrow/simple_builder/array.h>
#include <ydb/core/formats/arrow/simple_builder/filler.h>
#include <ydb/core/formats/arrow/serializer/native.h>
#include <ydb/core/tx/columnshard/splitter/batch_slice.h>

namespace NKikimr::NOlap::NCompaction {

std::vector<NKikimr::NOlap::TWritePortionInfoWithBlobsResult> TMerger::Execute(const std::shared_ptr<TSerializationStats>& stats,
const NArrow::NMerger::TIntervalPositions& checkPoints, const std::shared_ptr<TFilteredSnapshotSchema>& resultFiltered,
const ui64 pathId, const std::optional<ui64> shardingActualVersion) {
const NArrow::NMerger::TIntervalPositions& checkPoints, const std::shared_ptr<TFilteredSnapshotSchema>& resultFiltered, const ui64 pathId,
const std::optional<ui64> shardingActualVersion) {
AFL_VERIFY(Batches.size() == Filters.size());
static const TString portionIdFieldName = "$$__portion_id";
static const TString portionRecordIndexFieldName = "$$__portion_record_idx";
static const std::shared_ptr<arrow::Field> portionIdField =
std::make_shared<arrow::Field>(portionIdFieldName, std::make_shared<arrow::UInt16Type>());
static const std::shared_ptr<arrow::Field> portionRecordIndexField =
std::make_shared<arrow::Field>(portionRecordIndexFieldName, std::make_shared<arrow::UInt32Type>());

std::vector<std::shared_ptr<arrow::RecordBatch>> batchResults;
{
arrow::FieldVector indexFields;
indexFields.emplace_back(portionIdField);
indexFields.emplace_back(portionRecordIndexField);
indexFields.emplace_back(IColumnMerger::PortionIdField);
indexFields.emplace_back(IColumnMerger::PortionRecordIndexField);
IIndexInfo::AddSpecialFields(indexFields);
auto dataSchema = std::make_shared<arrow::Schema>(indexFields);
NArrow::NMerger::TMergePartialStream mergeStream(
Expand All @@ -39,40 +30,40 @@ std::vector<NKikimr::NOlap::TWritePortionInfoWithBlobsResult> TMerger::Execute(c
{
NArrow::NConstruction::IArrayBuilder::TPtr column =
std::make_shared<NArrow::NConstruction::TSimpleArrayConstructor<NArrow::NConstruction::TIntConstFiller<arrow::UInt16Type>>>(
portionIdFieldName, idx);
batch->AddField(portionIdField, column->BuildArray(batch->num_rows())).Validate();
IColumnMerger::PortionIdFieldName, idx);
batch->AddField(IColumnMerger::PortionIdField, column->BuildArray(batch->num_rows())).Validate();
}
{
NArrow::NConstruction::IArrayBuilder::TPtr column =
std::make_shared<NArrow::NConstruction::TSimpleArrayConstructor<NArrow::NConstruction::TIntSeqFiller<arrow::UInt32Type>>>(
portionRecordIndexFieldName);
batch->AddField(portionRecordIndexField, column->BuildArray(batch->num_rows())).Validate();
IColumnMerger::PortionRecordIndexFieldName);
batch->AddField(IColumnMerger::PortionRecordIndexField, column->BuildArray(batch->num_rows())).Validate();
}
mergeStream.AddSource(batch, Filters[idx]);
++idx;
}
batchResults = mergeStream.DrainAllParts(checkPoints, indexFields);
}

std::vector<std::map<ui32, std::vector<NCompaction::TColumnPortionResult>>> chunkGroups;
std::vector<std::map<ui32, std::vector<TColumnPortionResult>>> chunkGroups;
chunkGroups.resize(batchResults.size());
for (auto&& columnId : resultFiltered->GetColumnIds()) {
NActors::TLogContextGuard logGuard(
NActors::TLogContextBuilder::Build()("field_name", resultFiltered->GetIndexInfo().GetColumnName(columnId)));
auto columnInfo = stats->GetColumnInfo(columnId);
auto resultField = resultFiltered->GetIndexInfo().GetColumnFieldVerified(columnId);
std::shared_ptr<IColumnMerger> merger = std::make_shared<TPlainMerger>();
// resultFiltered->BuildColumnMergerVerified(columnId);

std::vector<NCompaction::TPortionColumnCursor> cursors;
{
ui32 idx = 0;
std::vector<std::shared_ptr<NArrow::NAccessor::IChunkedArray>> parts;
for (auto&& p : Batches) {
cursors.emplace_back(NCompaction::TPortionColumnCursor(p->GetColumnVerified(resultFiltered->GetFieldIndex(columnId)), idx));
++idx;
parts.emplace_back(p->GetColumnVerified(resultFiltered->GetFieldIndex(columnId)));
}

merger->Start(parts);
}

ui32 batchesRecordsCount = 0;
ui32 columnRecordsCount = 0;
std::map<std::string, std::vector<NCompaction::TColumnPortionResult>> columnChunks;
ui32 batchIdx = 0;
for (auto&& batchResult : batchResults) {
Expand All @@ -92,42 +83,10 @@ std::vector<NKikimr::NOlap::TWritePortionInfoWithBlobsResult> TMerger::Execute(c

NCompaction::TColumnMergeContext context(columnId, resultFiltered, portionRecordsCountLimit,
NSplitter::TSplitSettings().GetExpectedUnpackColumnChunkRawSize(), columnInfo, externalSaver);
NCompaction::TMergedColumn mColumn(context);

auto columnPortionIdx = batchResult->GetColumnByName(portionIdFieldName);
auto columnPortionRecordIdx = batchResult->GetColumnByName(portionRecordIndexFieldName);
auto columnSnapshotPlanStepIdx = batchResult->GetColumnByName(TIndexInfo::SPEC_COL_PLAN_STEP);
auto columnSnapshotTxIdx = batchResult->GetColumnByName(TIndexInfo::SPEC_COL_TX_ID);
Y_ABORT_UNLESS(columnPortionIdx && columnPortionRecordIdx && columnSnapshotPlanStepIdx && columnSnapshotTxIdx);
Y_ABORT_UNLESS(columnPortionIdx->type_id() == arrow::UInt16Type::type_id);
Y_ABORT_UNLESS(columnPortionRecordIdx->type_id() == arrow::UInt32Type::type_id);
Y_ABORT_UNLESS(columnSnapshotPlanStepIdx->type_id() == arrow::UInt64Type::type_id);
Y_ABORT_UNLESS(columnSnapshotTxIdx->type_id() == arrow::UInt64Type::type_id);
const arrow::UInt16Array& pIdxArray = static_cast<const arrow::UInt16Array&>(*columnPortionIdx);
const arrow::UInt32Array& pRecordIdxArray = static_cast<const arrow::UInt32Array&>(*columnPortionRecordIdx);

AFL_VERIFY(batchResult->num_rows() == pIdxArray.length());
std::optional<ui16> predPortionIdx;
for (ui32 idx = 0; idx < pIdxArray.length(); ++idx) {
const ui16 portionIdx = pIdxArray.Value(idx);
const ui32 portionRecordIdx = pRecordIdxArray.Value(idx);
auto& cursor = cursors[portionIdx];
cursor.Next(portionRecordIdx, mColumn);
if (predPortionIdx && portionIdx != *predPortionIdx) {
cursors[*predPortionIdx].Fetch(mColumn);
}
if (idx + 1 == pIdxArray.length()) {
cursor.Fetch(mColumn);
}
predPortionIdx = portionIdx;
}
chunkGroups[batchIdx][columnId] = mColumn.BuildResult();
batchesRecordsCount += batchResult->num_rows();
columnRecordsCount += mColumn.GetRecordsCount();
AFL_VERIFY(batchResult->num_rows() == mColumn.GetRecordsCount());

chunkGroups[batchIdx][columnId] = merger->Execute(context, batchResult);
++batchIdx;
}
AFL_VERIFY(columnRecordsCount == batchesRecordsCount)("mCount", columnRecordsCount)("bCount", batchesRecordsCount);
}
ui32 batchIdx = 0;

Expand All @@ -149,6 +108,12 @@ std::vector<NKikimr::NOlap::TWritePortionInfoWithBlobsResult> TMerger::Execute(c
AFL_VERIFY(i.second.size() == columnChunks.begin()->second.size())("first", columnChunks.begin()->second.size())(
"current", i.second.size())("first_name", columnChunks.begin()->first)("current_name", i.first);
}
auto columnSnapshotPlanStepIdx = batchResult->GetColumnByName(TIndexInfo::SPEC_COL_PLAN_STEP);
auto columnSnapshotTxIdx = batchResult->GetColumnByName(TIndexInfo::SPEC_COL_TX_ID);
Y_ABORT_UNLESS(columnSnapshotPlanStepIdx);
Y_ABORT_UNLESS(columnSnapshotTxIdx);
Y_ABORT_UNLESS(columnSnapshotPlanStepIdx->type_id() == arrow::UInt64Type::type_id);
Y_ABORT_UNLESS(columnSnapshotTxIdx->type_id() == arrow::UInt64Type::type_id);

std::vector<TGeneralSerializedSlice> batchSlices;
std::shared_ptr<TDefaultSchemaDetails> schemaDetails(new TDefaultSchemaDetails(resultFiltered, stats));
Expand Down Expand Up @@ -191,4 +156,4 @@ std::vector<NKikimr::NOlap::TWritePortionInfoWithBlobsResult> TMerger::Execute(c
return result;
}

}
} // namespace NKikimr::NOlap::NCompaction
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,6 @@ namespace NKikimr::NOlap::NCompaction {

bool TPortionColumnCursor::Fetch(TMergedColumn& column) {
Y_ABORT_UNLESS(RecordIndexStart);
// NActors::TLogContextGuard lg(NActors::TLogContextBuilder::Build()("portion_id", PortionId));
if (CurrentChunk && CurrentChunk->GetStartPosition() <= *RecordIndexStart && *RecordIndexStart < CurrentChunk->GetFinishPosition()) {

} else {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -13,8 +13,6 @@ class TPortionColumnCursor {
std::shared_ptr<NArrow::NAccessor::IChunkedArray> BlobChunks;
std::optional<ui32> RecordIndexStart;
YDB_READONLY(ui32, RecordIndexFinish, 0);
const ui64 PortionId;

public:
~TPortionColumnCursor() {
AFL_VERIFY(!RecordIndexStart)("start", RecordIndexStart)("finish", RecordIndexFinish);
Expand All @@ -24,10 +22,8 @@ class TPortionColumnCursor {

bool Fetch(TMergedColumn& column);

TPortionColumnCursor(const std::shared_ptr<NArrow::NAccessor::IChunkedArray>& columnChunks, const ui64 portionId)
: BlobChunks(columnChunks)
, PortionId(portionId) {
Y_UNUSED(PortionId);
TPortionColumnCursor(const std::shared_ptr<NArrow::NAccessor::IChunkedArray>& columnChunks)
: BlobChunks(columnChunks) {
}
};

Expand Down
Original file line number Diff line number Diff line change
@@ -1,45 +1,16 @@
#pragma once
#include "merge_context.h"
#include <ydb/core/formats/arrow/simple_arrays_cache.h>
#include <ydb/core/tx/columnshard/splitter/chunks.h>
#include <ydb/core/tx/columnshard/counters/splitter.h>
#include <ydb/core/tx/columnshard/engines/changes/compaction/common/context.h>
#include <ydb/core/tx/columnshard/engines/changes/compaction/common/result.h>
#include <ydb/core/tx/columnshard/engines/portions/column_record.h>
#include <ydb/core/tx/columnshard/engines/scheme/abstract_scheme.h>
#include <ydb/core/tx/columnshard/engines/scheme/column_features.h>
#include <ydb/core/tx/columnshard/counters/splitter.h>
#include <ydb/core/tx/columnshard/splitter/chunk_meta.h>
#include <ydb/core/tx/columnshard/splitter/chunks.h>

namespace NKikimr::NOlap::NCompaction {

class TColumnPortionResult {
protected:
std::vector<std::shared_ptr<IPortionDataChunk>> Chunks;
ui64 CurrentPortionRecords = 0;
const ui32 ColumnId;
ui64 PackedSize = 0;
public:
ui64 GetPackedSize() const {
return PackedSize;
}

TColumnPortionResult(const ui32 columnId)
: ColumnId(columnId) {

}

const std::vector<std::shared_ptr<IPortionDataChunk>>& GetChunks() const {
return Chunks;
}

ui64 GetCurrentPortionRecords() const {
return CurrentPortionRecords;
}

TString DebugString() const {
return TStringBuilder() << "chunks=" << Chunks.size() << ";records=" << CurrentPortionRecords << ";";
}

};

class TColumnPortion: public TColumnPortionResult {
private:
using TBase = TColumnPortionResult;
Expand All @@ -49,12 +20,14 @@ class TColumnPortion: public TColumnPortionResult {
YDB_READONLY(ui64, CurrentChunkRawSize, 0);
double PredictedPackedBytes = 0;
const TSimpleColumnInfo ColumnInfo;
ui64 PackedSize = 0;
ui64 CurrentPortionRecords = 0;

public:
TColumnPortion(const TColumnMergeContext& context)
: TBase(context.GetColumnId())
, Context(context)
, ColumnInfo(Context.GetIndexInfo().GetColumnFeaturesVerified(context.GetColumnId()))
{
, ColumnInfo(Context.GetIndexInfo().GetColumnFeaturesVerified(context.GetColumnId())) {
Builder = Context.MakeBuilder();
Type = Builder->type();
}
Expand All @@ -70,4 +43,4 @@ class TColumnPortion: public TColumnPortionResult {
ui32 AppendSlice(const std::shared_ptr<arrow::Array>& a, const ui32 startIndex, const ui32 length);
};

}
} // namespace NKikimr::NOlap::NCompaction
Loading

0 comments on commit 0662944

Please sign in to comment.