Skip to content

Commit

Permalink
speed up records merge (ydb-platform#4822)
Browse files Browse the repository at this point in the history
Conflicts:
	ydb/core/tx/columnshard/engines/changes/indexation.cpp
	ydb/core/tx/columnshard/engines/reader/plain_reader/iterator/source.h
  • Loading branch information
ivanmorozov333 authored and zverevgeny committed Jun 17, 2024
1 parent 971d894 commit eaf99c7
Show file tree
Hide file tree
Showing 16 changed files with 552 additions and 208 deletions.
37 changes: 30 additions & 7 deletions ydb/core/formats/arrow/common/accessor.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -17,25 +17,27 @@ std::shared_ptr<arrow::Array> IChunkedArray::TReader::CopyRecord(const ui64 reco
return NArrow::CopyRecords(address.GetArray(), {address.GetPosition()});
}

std::shared_ptr<arrow::ChunkedArray> IChunkedArray::TReader::Slice(const ui32 offset, const ui32 count) const {
std::shared_ptr<arrow::ChunkedArray> IChunkedArray::Slice(const ui32 offset, const ui32 count) const {
AFL_VERIFY(offset + count <= (ui64)GetRecordsCount())("offset", offset)("count", count)("length", GetRecordsCount());
ui32 currentOffset = offset;
ui32 countLeast = count;
std::vector<std::shared_ptr<arrow::Array>> chunks;
auto address = GetChunk({}, offset);
while (countLeast) {
auto address = GetReadChunk(currentOffset);
if (address.GetPosition() + countLeast <= (ui64)address.GetArray()->length()) {
chunks.emplace_back(address.GetArray()->Slice(address.GetPosition(), countLeast));
address = GetChunk(address, currentOffset);
const ui64 internalPos = currentOffset - address.GetStartPosition();
if (internalPos + countLeast <= (ui64)address.GetArray()->length()) {
chunks.emplace_back(address.GetArray()->Slice(internalPos, countLeast));
break;
} else {
const ui32 deltaCount = address.GetArray()->length() - address.GetPosition();
chunks.emplace_back(address.GetArray()->Slice(address.GetPosition(), deltaCount));
const ui32 deltaCount = address.GetArray()->length() - internalPos;
chunks.emplace_back(address.GetArray()->Slice(internalPos, deltaCount));
AFL_VERIFY(countLeast >= deltaCount);
countLeast -= deltaCount;
currentOffset += deltaCount;
}
}
return std::make_shared<arrow::ChunkedArray>(chunks, ChunkedArray->DataType);
return std::make_shared<arrow::ChunkedArray>(chunks, DataType);
}

TString IChunkedArray::TReader::DebugString(const ui32 position) const {
Expand Down Expand Up @@ -89,6 +91,27 @@ class TChunkAccessor {
return ChunkedArray->chunk(idx);
}
};

}

std::partial_ordering IChunkedArray::TCurrentChunkAddress::Compare(const ui64 position, const TCurrentChunkAddress& item, const ui64 itemPosition) const {
AFL_VERIFY(StartPosition <= position);
AFL_VERIFY(position < FinishPosition);
AFL_VERIFY(item.StartPosition <= itemPosition);
AFL_VERIFY(itemPosition < item.FinishPosition);
return TComparator::TypedCompare<true>(*Array, position - StartPosition, *item.Array, itemPosition - item.StartPosition);
}

std::shared_ptr<arrow::Array> IChunkedArray::TCurrentChunkAddress::CopyRecord(const ui64 recordIndex) const {
AFL_VERIFY(StartPosition <= recordIndex);
AFL_VERIFY(recordIndex < FinishPosition);
return NArrow::CopyRecords(Array, { recordIndex - StartPosition });
}

TString IChunkedArray::TCurrentChunkAddress::DebugString(const ui64 position) const {
AFL_VERIFY(position < FinishPosition);
AFL_VERIFY(StartPosition <= position);
return NArrow::DebugString(Array, position - StartPosition);
}

IChunkedArray::TCurrentChunkAddress TTrivialChunkedArray::DoGetChunk(const std::optional<TCurrentChunkAddress>& chunkCurrent, const ui64 position) const {
Expand Down
19 changes: 18 additions & 1 deletion ydb/core/formats/arrow/common/accessor.h
Original file line number Diff line number Diff line change
Expand Up @@ -21,19 +21,31 @@ class IChunkedArray {
private:
YDB_READONLY_DEF(std::shared_ptr<arrow::Array>, Array);
YDB_READONLY(ui64, StartPosition, 0);
YDB_READONLY(ui64, FinishPosition, 0);
YDB_READONLY(ui64, ChunkIndex, 0);
public:
TString DebugString(const ui64 position) const;

ui64 GetLength() const {
return Array->length();
}

bool Contains(const ui64 position) const {
return position >= StartPosition && position < FinishPosition;
}

std::shared_ptr<arrow::Array> CopyRecord(const ui64 recordIndex) const;

std::partial_ordering Compare(const ui64 position, const TCurrentChunkAddress& item, const ui64 itemPosition) const;

TCurrentChunkAddress(const std::shared_ptr<arrow::Array>& arr, const ui64 pos, const ui32 chunkIdx)
: Array(arr)
, StartPosition(pos)
, ChunkIndex(chunkIdx)
{
AFL_VERIFY(arr);
AFL_VERIFY(arr->length());
FinishPosition = StartPosition + arr->length();
}

TString DebugString() const {
Expand Down Expand Up @@ -141,7 +153,6 @@ class IChunkedArray {
static std::partial_ordering CompareColumns(const std::vector<TReader>& l, const ui64 lPosition, const std::vector<TReader>& r, const ui64 rPosition);
void AppendPositionTo(arrow::ArrayBuilder& builder, const ui64 position, ui64* recordSize) const;
std::shared_ptr<arrow::Array> CopyRecord(const ui64 recordIndex) const;
std::shared_ptr<arrow::ChunkedArray> Slice(const ui32 offset, const ui32 count) const;
TString DebugString(const ui32 position) const;
};

Expand All @@ -150,6 +161,12 @@ class IChunkedArray {
}
virtual ~IChunkedArray() = default;

std::shared_ptr<arrow::ChunkedArray> Slice(const ui32 offset, const ui32 count) const;

TCurrentChunkAddress GetChunk(const std::optional<TCurrentChunkAddress>& chunkCurrent, const ui64 position) const {
return DoGetChunk(chunkCurrent, position);
}

IChunkedArray(const ui64 recordsCount, const EType type, const std::shared_ptr<arrow::DataType>& dataType)
: DataType(dataType)
, RecordsCount(recordsCount)
Expand Down
12 changes: 6 additions & 6 deletions ydb/core/formats/arrow/reader/batch_iterator.h
Original file line number Diff line number Diff line change
Expand Up @@ -7,8 +7,8 @@ namespace NKikimr::NArrow::NMerger {
class TBatchIterator {
private:
bool ControlPointFlag;
TSortableBatchPosition KeyColumns;
TSortableBatchPosition VersionColumns;
TRWSortableBatchPosition KeyColumns;
TRWSortableBatchPosition VersionColumns;
i64 RecordsCount;
int ReverseSortKff;

Expand All @@ -34,17 +34,17 @@ class TBatchIterator {
return ControlPointFlag;
}

const TSortableBatchPosition& GetKeyColumns() const {
const TRWSortableBatchPosition& GetKeyColumns() const {
return KeyColumns;
}

const TSortableBatchPosition& GetVersionColumns() const {
const TRWSortableBatchPosition& GetVersionColumns() const {
return VersionColumns;
}

TBatchIterator(const TSortableBatchPosition& keyColumns)
TBatchIterator(TRWSortableBatchPosition&& keyColumns)
: ControlPointFlag(true)
, KeyColumns(keyColumns) {
, KeyColumns(std::move(keyColumns)) {

}

Expand Down
9 changes: 8 additions & 1 deletion ydb/core/formats/arrow/reader/heap.h
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,8 @@ class TRecordBatchBuilder;

template <class TSortCursor>
class TSortingHeap {
private:
std::deque<TSortCursor> FinishedCursors;
public:
TSortingHeap() = default;

Expand Down Expand Up @@ -40,14 +42,19 @@ class TSortingHeap {
}
}

void CleanFinished() {
FinishedCursors.clear();
}

void RemoveTop() {
std::pop_heap(Queue.begin(), Queue.end());
FinishedCursors.emplace_back(std::move(Queue.back()));
Queue.pop_back();
NextIdx = 0;
}

void Push(TSortCursor&& cursor) {
Queue.emplace_back(cursor);
Queue.emplace_back(std::move(cursor));
std::push_heap(Queue.begin(), Queue.end());
NextIdx = 0;
}
Expand Down
87 changes: 47 additions & 40 deletions ydb/core/formats/arrow/reader/merger.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -4,13 +4,12 @@

namespace NKikimr::NArrow::NMerger {

void TMergePartialStream::PutControlPoint(std::shared_ptr<TSortableBatchPosition> point) {
Y_ABORT_UNLESS(point);
AFL_VERIFY(point->IsSameSortingSchema(SortSchema))("point", point->DebugJson())("schema", SortSchema->ToString());
Y_ABORT_UNLESS(point->IsReverseSort() == Reverse);
void TMergePartialStream::PutControlPoint(const TSortableBatchPosition& point) {
AFL_VERIFY(point.IsSameSortingSchema(SortSchema))("point", point.DebugJson())("schema", SortSchema->ToString());
Y_ABORT_UNLESS(point.IsReverseSort() == Reverse);
Y_ABORT_UNLESS(++ControlPoints == 1);

SortHeap.Push(TBatchIterator(*point));
SortHeap.Push(TBatchIterator(point.BuildRWPosition()));
}

void TMergePartialStream::RemoveControlPoint() {
Expand All @@ -21,54 +20,56 @@ void TMergePartialStream::RemoveControlPoint() {
SortHeap.RemoveTop();
}

void TMergePartialStream::CheckSequenceInDebug(const TSortableBatchPosition& nextKeyColumnsPosition) {
void TMergePartialStream::CheckSequenceInDebug(const TRWSortableBatchPosition& nextKeyColumnsPosition) {
#ifndef NDEBUG
if (CurrentKeyColumns) {
const bool linearExecutionCorrectness = CurrentKeyColumns->Compare(nextKeyColumnsPosition) == std::partial_ordering::less;
const bool linearExecutionCorrectness = nextKeyColumnsPosition.Compare(*CurrentKeyColumns) == std::partial_ordering::greater;
if (!linearExecutionCorrectness) {
const bool newSegmentScan = nextKeyColumnsPosition.GetPosition() == 0;
AFL_VERIFY(newSegmentScan && nextKeyColumnsPosition.Compare(*CurrentKeyColumns) == std::partial_ordering::less)
("merge_debug", DebugJson())("current_ext", nextKeyColumnsPosition.DebugJson())("newSegmentScan", newSegmentScan);
}
}
CurrentKeyColumns = nextKeyColumnsPosition;
CurrentKeyColumns = nextKeyColumnsPosition.BuildSortingCursor();
#else
Y_UNUSED(nextKeyColumnsPosition);
#endif
}

bool TMergePartialStream::DrainToControlPoint(TRecordBatchBuilder& builder, const bool includeFinish, std::optional<TSortableBatchPosition>* lastResultPosition) {
bool TMergePartialStream::DrainToControlPoint(TRecordBatchBuilder& builder, const bool includeFinish, std::optional<TCursor>* lastResultPosition) {
AFL_VERIFY(ControlPoints == 1);
Y_ABORT_UNLESS((ui32)DataSchema->num_fields() == builder.GetBuildersCount());
builder.ValidateDataSchema(DataSchema);
bool cpReachedFlag = false;
std::shared_ptr<TSortableScanData> resultScanData;
ui64 resultPosition;
while (SortHeap.Size() && !cpReachedFlag && !builder.IsBufferExhausted()) {
if (SortHeap.Current().IsControlPoint()) {
auto keyColumns = SortHeap.Current().GetKeyColumns();
auto keyColumns = SortHeap.Current().GetKeyColumns().BuildSortingCursor();
RemoveControlPoint();
cpReachedFlag = true;
if (SortHeap.Empty() || !includeFinish || SortHeap.Current().GetKeyColumns().Compare(keyColumns) == std::partial_ordering::greater) {
if (lastResultPosition && resultScanData) {
*lastResultPosition = resultScanData->BuildCursor(resultPosition);
}
return true;
}
}

if (auto currentPosition = DrainCurrentPosition()) {
CheckSequenceInDebug(*currentPosition);
builder.AddRecord(*currentPosition);
if (lastResultPosition) {
*lastResultPosition = *currentPosition;
}
}
DrainCurrentPosition(&builder, &resultScanData, &resultPosition);
}
if (lastResultPosition && resultScanData) {
*lastResultPosition = resultScanData->BuildCursor(resultPosition);
}
return cpReachedFlag;
}

bool TMergePartialStream::DrainCurrentTo(TRecordBatchBuilder& builder, const TSortableBatchPosition& readTo, const bool includeFinish, std::optional<TSortableBatchPosition>* lastResultPosition) {
PutControlPoint(std::make_shared<TSortableBatchPosition>(readTo));
bool TMergePartialStream::DrainCurrentTo(TRecordBatchBuilder& builder, const TSortableBatchPosition& readTo, const bool includeFinish, std::optional<TCursor>* lastResultPosition) {
PutControlPoint(readTo);
return DrainToControlPoint(builder, includeFinish, lastResultPosition);
}

std::shared_ptr<arrow::Table> TMergePartialStream::SingleSourceDrain(const TSortableBatchPosition& readTo, const bool includeFinish, std::optional<TSortableBatchPosition>* lastResultPosition) {
std::shared_ptr<arrow::Table> TMergePartialStream::SingleSourceDrain(const TSortableBatchPosition& readTo, const bool includeFinish, std::optional<TCursor>* lastResultPosition) {
std::shared_ptr<arrow::Table> result;
if (SortHeap.Empty()) {
return result;
Expand Down Expand Up @@ -100,7 +101,7 @@ std::shared_ptr<arrow::Table> TMergePartialStream::SingleSourceDrain(const TSort
result = SortHeap.Current().GetKeyColumns().SliceData(pos.GetPosition() + (include ? 0 : 1), resultSize);
if (lastResultPosition && resultSize) {
auto keys = SortHeap.Current().GetKeyColumns().SliceKeys(pos.GetPosition() + (include ? 0 : 1), resultSize);
*lastResultPosition = TSortableBatchPosition(keys, 0, SortSchema->field_names(), {}, true);
*lastResultPosition = TCursor(keys, 0, SortSchema->field_names());
}
if (SortHeap.Current().GetFilter()) {
SortHeap.Current().GetFilter()->Apply(result, pos.GetPosition() + (include ? 0 : 1), resultSize);
Expand All @@ -109,7 +110,7 @@ std::shared_ptr<arrow::Table> TMergePartialStream::SingleSourceDrain(const TSort
result = SortHeap.Current().GetKeyColumns().SliceData(startPos, resultSize);
if (lastResultPosition && resultSize) {
auto keys = SortHeap.Current().GetKeyColumns().SliceKeys(startPos, resultSize);
*lastResultPosition = TSortableBatchPosition(keys, keys->num_rows() - 1, SortSchema->field_names(), {}, false);
*lastResultPosition = TCursor(keys, keys->num_rows() - 1, SortSchema->field_names());
}
if (SortHeap.Current().GetFilter()) {
SortHeap.Current().GetFilter()->Apply(result, startPos, resultSize);
Expand Down Expand Up @@ -144,38 +145,44 @@ std::shared_ptr<arrow::Table> TMergePartialStream::SingleSourceDrain(const TSort
void TMergePartialStream::DrainAll(TRecordBatchBuilder& builder) {
Y_ABORT_UNLESS((ui32)DataSchema->num_fields() == builder.GetBuildersCount());
while (SortHeap.Size()) {
if (auto currentPosition = DrainCurrentPosition()) {
CheckSequenceInDebug(*currentPosition);
builder.AddRecord(*currentPosition);
}
DrainCurrentPosition(&builder, nullptr, nullptr);
}
}

std::optional<TSortableBatchPosition> TMergePartialStream::DrainCurrentPosition() {
void TMergePartialStream::DrainCurrentPosition(TRecordBatchBuilder* builder, std::shared_ptr<TSortableScanData>* resultScanData, ui64* resultPosition) {
Y_ABORT_UNLESS(SortHeap.Size());
Y_ABORT_UNLESS(!SortHeap.Current().IsControlPoint());
TSortableBatchPosition result = SortHeap.Current().GetKeyColumns();
TSortableBatchPosition resultVersion = SortHeap.Current().GetVersionColumns();
if (!SortHeap.Current().IsDeleted()) {
if (builder) {
builder->AddRecord(SortHeap.Current().GetKeyColumns());
}
if (resultScanData && resultPosition) {
*resultScanData = SortHeap.Current().GetKeyColumns().GetSorting();
*resultPosition = SortHeap.Current().GetKeyColumns().GetPosition();
}
}
CheckSequenceInDebug(SortHeap.Current().GetKeyColumns());
const ui64 startPosition = SortHeap.Current().GetKeyColumns().GetPosition();
const TSortableScanData* startSorting = SortHeap.Current().GetKeyColumns().GetSorting().get();
const TSortableScanData* startVersion = SortHeap.Current().GetVersionColumns().GetSorting().get();
bool isFirst = true;
const bool deletedFlag = SortHeap.Current().IsDeleted();
while (SortHeap.Size() && (isFirst || result.Compare(SortHeap.Current().GetKeyColumns()) == std::partial_ordering::equivalent)) {
auto& anotherIterator = SortHeap.Current();
while (SortHeap.Size() && (isFirst || SortHeap.Current().GetKeyColumns().Compare(*startSorting, startPosition) == std::partial_ordering::equivalent)) {
if (!isFirst) {
auto& anotherIterator = SortHeap.Current();
if (PossibleSameVersionFlag) {
AFL_VERIFY(resultVersion.Compare(anotherIterator.GetVersionColumns()) != std::partial_ordering::less)("r", resultVersion.DebugJson())("a", anotherIterator.GetVersionColumns().DebugJson())
("key", result.DebugJson());
AFL_VERIFY(anotherIterator.GetVersionColumns().Compare(*startVersion, startPosition) != std::partial_ordering::greater)
("r", startVersion->BuildCursor(startPosition).DebugJson())("a", anotherIterator.GetVersionColumns().DebugJson())
("key", startSorting->BuildCursor(startPosition).DebugJson());
} else {
AFL_VERIFY(resultVersion.Compare(anotherIterator.GetVersionColumns()) == std::partial_ordering::greater)("r", resultVersion.DebugJson())("a", anotherIterator.GetVersionColumns().DebugJson())
("key", result.DebugJson());
AFL_VERIFY(anotherIterator.GetVersionColumns().Compare(*startVersion, startPosition) == std::partial_ordering::less)
("r", startVersion->BuildCursor(startPosition).DebugJson())("a", anotherIterator.GetVersionColumns().DebugJson())
("key", startSorting->BuildCursor(startPosition).DebugJson());
}
}
SortHeap.Next();
isFirst = false;
}
if (deletedFlag) {
return {};
}
return result;
SortHeap.CleanFinished();
}

std::vector<std::shared_ptr<arrow::RecordBatch>> TMergePartialStream::DrainAllParts(const std::map<TSortableBatchPosition, bool>& positions,
Expand Down
14 changes: 7 additions & 7 deletions ydb/core/formats/arrow/reader/merger.h
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,7 @@ namespace NKikimr::NArrow::NMerger {
class TMergePartialStream {
private:
#ifndef NDEBUG
std::optional<TSortableBatchPosition> CurrentKeyColumns;
std::optional<TCursor> CurrentKeyColumns;
#endif
bool PossibleSameVersionFlag = true;

Expand All @@ -34,9 +34,9 @@ class TMergePartialStream {
return result;
}

std::optional<TSortableBatchPosition> DrainCurrentPosition();
void DrainCurrentPosition(TRecordBatchBuilder* builder, std::shared_ptr<TSortableScanData>* resultScanData, ui64* resultPosition);

void CheckSequenceInDebug(const TSortableBatchPosition& nextKeyColumnsPosition);
void CheckSequenceInDebug(const TRWSortableBatchPosition& nextKeyColumnsPosition);
public:
TMergePartialStream(std::shared_ptr<arrow::Schema> sortSchema, std::shared_ptr<arrow::Schema> dataSchema, const bool reverse, const std::vector<std::string>& versionColumnNames)
: SortSchema(sortSchema)
Expand Down Expand Up @@ -67,7 +67,7 @@ class TMergePartialStream {
return TStringBuilder() << "sort_heap=" << SortHeap.DebugJson();
}

void PutControlPoint(std::shared_ptr<TSortableBatchPosition> point);
void PutControlPoint(const TSortableBatchPosition& point);

void RemoveControlPoint();

Expand All @@ -93,9 +93,9 @@ class TMergePartialStream {
}

void DrainAll(TRecordBatchBuilder& builder);
std::shared_ptr<arrow::Table> SingleSourceDrain(const TSortableBatchPosition& readTo, const bool includeFinish, std::optional<TSortableBatchPosition>* lastResultPosition = nullptr);
bool DrainCurrentTo(TRecordBatchBuilder& builder, const TSortableBatchPosition& readTo, const bool includeFinish, std::optional<TSortableBatchPosition>* lastResultPosition = nullptr);
bool DrainToControlPoint(TRecordBatchBuilder& builder, const bool includeFinish, std::optional<TSortableBatchPosition>* lastResultPosition = nullptr);
std::shared_ptr<arrow::Table> SingleSourceDrain(const TSortableBatchPosition& readTo, const bool includeFinish, std::optional<TCursor>* lastResultPosition = nullptr);
bool DrainCurrentTo(TRecordBatchBuilder& builder, const TSortableBatchPosition& readTo, const bool includeFinish, std::optional<TCursor>* lastResultPosition = nullptr);
bool DrainToControlPoint(TRecordBatchBuilder& builder, const bool includeFinish, std::optional<TCursor>* lastResultPosition = nullptr);
std::vector<std::shared_ptr<arrow::RecordBatch>> DrainAllParts(const std::map<TSortableBatchPosition, bool>& positions,
const std::vector<std::shared_ptr<arrow::Field>>& resultFields);
};
Expand Down
Loading

0 comments on commit eaf99c7

Please sign in to comment.