diff --git a/ydb/core/formats/arrow/common/accessor.cpp b/ydb/core/formats/arrow/common/accessor.cpp index 450c3f40cf4c..9865b2a692f7 100644 --- a/ydb/core/formats/arrow/common/accessor.cpp +++ b/ydb/core/formats/arrow/common/accessor.cpp @@ -17,25 +17,27 @@ std::shared_ptr IChunkedArray::TReader::CopyRecord(const ui64 reco return NArrow::CopyRecords(address.GetArray(), {address.GetPosition()}); } -std::shared_ptr IChunkedArray::TReader::Slice(const ui32 offset, const ui32 count) const { +std::shared_ptr IChunkedArray::Slice(const ui32 offset, const ui32 count) const { AFL_VERIFY(offset + count <= (ui64)GetRecordsCount())("offset", offset)("count", count)("length", GetRecordsCount()); ui32 currentOffset = offset; ui32 countLeast = count; std::vector> chunks; + auto address = GetChunk({}, offset); while (countLeast) { - auto address = GetReadChunk(currentOffset); - if (address.GetPosition() + countLeast <= (ui64)address.GetArray()->length()) { - chunks.emplace_back(address.GetArray()->Slice(address.GetPosition(), countLeast)); + address = GetChunk(address, currentOffset); + const ui64 internalPos = currentOffset - address.GetStartPosition(); + if (internalPos + countLeast <= (ui64)address.GetArray()->length()) { + chunks.emplace_back(address.GetArray()->Slice(internalPos, countLeast)); break; } else { - const ui32 deltaCount = address.GetArray()->length() - address.GetPosition(); - chunks.emplace_back(address.GetArray()->Slice(address.GetPosition(), deltaCount)); + const ui32 deltaCount = address.GetArray()->length() - internalPos; + chunks.emplace_back(address.GetArray()->Slice(internalPos, deltaCount)); AFL_VERIFY(countLeast >= deltaCount); countLeast -= deltaCount; currentOffset += deltaCount; } } - return std::make_shared(chunks, ChunkedArray->DataType); + return std::make_shared(chunks, DataType); } TString IChunkedArray::TReader::DebugString(const ui32 position) const { @@ -89,6 +91,27 @@ class TChunkAccessor { return ChunkedArray->chunk(idx); } }; + +} + +std::partial_ordering IChunkedArray::TCurrentChunkAddress::Compare(const ui64 position, const TCurrentChunkAddress& item, const ui64 itemPosition) const { + AFL_VERIFY(StartPosition <= position); + AFL_VERIFY(position < FinishPosition); + AFL_VERIFY(item.StartPosition <= itemPosition); + AFL_VERIFY(itemPosition < item.FinishPosition); + return TComparator::TypedCompare(*Array, position - StartPosition, *item.Array, itemPosition - item.StartPosition); +} + +std::shared_ptr IChunkedArray::TCurrentChunkAddress::CopyRecord(const ui64 recordIndex) const { + AFL_VERIFY(StartPosition <= recordIndex); + AFL_VERIFY(recordIndex < FinishPosition); + return NArrow::CopyRecords(Array, { recordIndex - StartPosition }); +} + +TString IChunkedArray::TCurrentChunkAddress::DebugString(const ui64 position) const { + AFL_VERIFY(position < FinishPosition); + AFL_VERIFY(StartPosition <= position); + return NArrow::DebugString(Array, position - StartPosition); } IChunkedArray::TCurrentChunkAddress TTrivialChunkedArray::DoGetChunk(const std::optional& chunkCurrent, const ui64 position) const { diff --git a/ydb/core/formats/arrow/common/accessor.h b/ydb/core/formats/arrow/common/accessor.h index acd7e3a650c2..3765d726992b 100644 --- a/ydb/core/formats/arrow/common/accessor.h +++ b/ydb/core/formats/arrow/common/accessor.h @@ -21,12 +21,23 @@ class IChunkedArray { private: YDB_READONLY_DEF(std::shared_ptr, Array); YDB_READONLY(ui64, StartPosition, 0); + YDB_READONLY(ui64, FinishPosition, 0); YDB_READONLY(ui64, ChunkIndex, 0); public: + TString DebugString(const ui64 position) const; + ui64 GetLength() const { return Array->length(); } + bool Contains(const ui64 position) const { + return position >= StartPosition && position < FinishPosition; + } + + std::shared_ptr CopyRecord(const ui64 recordIndex) const; + + std::partial_ordering Compare(const ui64 position, const TCurrentChunkAddress& item, const ui64 itemPosition) const; + TCurrentChunkAddress(const std::shared_ptr& arr, const ui64 pos, const ui32 chunkIdx) : Array(arr) , StartPosition(pos) @@ -34,6 +45,7 @@ class IChunkedArray { { AFL_VERIFY(arr); AFL_VERIFY(arr->length()); + FinishPosition = StartPosition + arr->length(); } TString DebugString() const { @@ -141,7 +153,6 @@ class IChunkedArray { static std::partial_ordering CompareColumns(const std::vector& l, const ui64 lPosition, const std::vector& r, const ui64 rPosition); void AppendPositionTo(arrow::ArrayBuilder& builder, const ui64 position, ui64* recordSize) const; std::shared_ptr CopyRecord(const ui64 recordIndex) const; - std::shared_ptr Slice(const ui32 offset, const ui32 count) const; TString DebugString(const ui32 position) const; }; @@ -150,6 +161,12 @@ class IChunkedArray { } virtual ~IChunkedArray() = default; + std::shared_ptr Slice(const ui32 offset, const ui32 count) const; + + TCurrentChunkAddress GetChunk(const std::optional& chunkCurrent, const ui64 position) const { + return DoGetChunk(chunkCurrent, position); + } + IChunkedArray(const ui64 recordsCount, const EType type, const std::shared_ptr& dataType) : DataType(dataType) , RecordsCount(recordsCount) diff --git a/ydb/core/formats/arrow/reader/batch_iterator.h b/ydb/core/formats/arrow/reader/batch_iterator.h index eec3559eb2b9..48497a53c452 100644 --- a/ydb/core/formats/arrow/reader/batch_iterator.h +++ b/ydb/core/formats/arrow/reader/batch_iterator.h @@ -7,8 +7,8 @@ namespace NKikimr::NArrow::NMerger { class TBatchIterator { private: bool ControlPointFlag; - TSortableBatchPosition KeyColumns; - TSortableBatchPosition VersionColumns; + TRWSortableBatchPosition KeyColumns; + TRWSortableBatchPosition VersionColumns; i64 RecordsCount; int ReverseSortKff; @@ -34,17 +34,17 @@ class TBatchIterator { return ControlPointFlag; } - const TSortableBatchPosition& GetKeyColumns() const { + const TRWSortableBatchPosition& GetKeyColumns() const { return KeyColumns; } - const TSortableBatchPosition& GetVersionColumns() const { + const TRWSortableBatchPosition& GetVersionColumns() const { return VersionColumns; } - TBatchIterator(const TSortableBatchPosition& keyColumns) + TBatchIterator(TRWSortableBatchPosition&& keyColumns) : ControlPointFlag(true) - , KeyColumns(keyColumns) { + , KeyColumns(std::move(keyColumns)) { } diff --git a/ydb/core/formats/arrow/reader/heap.h b/ydb/core/formats/arrow/reader/heap.h index 058066e9e7cb..4ffc216bb0cd 100644 --- a/ydb/core/formats/arrow/reader/heap.h +++ b/ydb/core/formats/arrow/reader/heap.h @@ -10,6 +10,8 @@ class TRecordBatchBuilder; template class TSortingHeap { +private: + std::deque FinishedCursors; public: TSortingHeap() = default; @@ -40,14 +42,19 @@ class TSortingHeap { } } + void CleanFinished() { + FinishedCursors.clear(); + } + void RemoveTop() { std::pop_heap(Queue.begin(), Queue.end()); + FinishedCursors.emplace_back(std::move(Queue.back())); Queue.pop_back(); NextIdx = 0; } void Push(TSortCursor&& cursor) { - Queue.emplace_back(cursor); + Queue.emplace_back(std::move(cursor)); std::push_heap(Queue.begin(), Queue.end()); NextIdx = 0; } diff --git a/ydb/core/formats/arrow/reader/merger.cpp b/ydb/core/formats/arrow/reader/merger.cpp index 83a43630f42d..ddae86c1ed28 100644 --- a/ydb/core/formats/arrow/reader/merger.cpp +++ b/ydb/core/formats/arrow/reader/merger.cpp @@ -4,13 +4,12 @@ namespace NKikimr::NArrow::NMerger { -void TMergePartialStream::PutControlPoint(std::shared_ptr point) { - Y_ABORT_UNLESS(point); - AFL_VERIFY(point->IsSameSortingSchema(SortSchema))("point", point->DebugJson())("schema", SortSchema->ToString()); - Y_ABORT_UNLESS(point->IsReverseSort() == Reverse); +void TMergePartialStream::PutControlPoint(const TSortableBatchPosition& point) { + AFL_VERIFY(point.IsSameSortingSchema(SortSchema))("point", point.DebugJson())("schema", SortSchema->ToString()); + Y_ABORT_UNLESS(point.IsReverseSort() == Reverse); Y_ABORT_UNLESS(++ControlPoints == 1); - SortHeap.Push(TBatchIterator(*point)); + SortHeap.Push(TBatchIterator(point.BuildRWPosition())); } void TMergePartialStream::RemoveControlPoint() { @@ -21,54 +20,56 @@ void TMergePartialStream::RemoveControlPoint() { SortHeap.RemoveTop(); } -void TMergePartialStream::CheckSequenceInDebug(const TSortableBatchPosition& nextKeyColumnsPosition) { +void TMergePartialStream::CheckSequenceInDebug(const TRWSortableBatchPosition& nextKeyColumnsPosition) { #ifndef NDEBUG if (CurrentKeyColumns) { - const bool linearExecutionCorrectness = CurrentKeyColumns->Compare(nextKeyColumnsPosition) == std::partial_ordering::less; + const bool linearExecutionCorrectness = nextKeyColumnsPosition.Compare(*CurrentKeyColumns) == std::partial_ordering::greater; if (!linearExecutionCorrectness) { const bool newSegmentScan = nextKeyColumnsPosition.GetPosition() == 0; AFL_VERIFY(newSegmentScan && nextKeyColumnsPosition.Compare(*CurrentKeyColumns) == std::partial_ordering::less) ("merge_debug", DebugJson())("current_ext", nextKeyColumnsPosition.DebugJson())("newSegmentScan", newSegmentScan); } } - CurrentKeyColumns = nextKeyColumnsPosition; + CurrentKeyColumns = nextKeyColumnsPosition.BuildSortingCursor(); #else Y_UNUSED(nextKeyColumnsPosition); #endif } -bool TMergePartialStream::DrainToControlPoint(TRecordBatchBuilder& builder, const bool includeFinish, std::optional* lastResultPosition) { +bool TMergePartialStream::DrainToControlPoint(TRecordBatchBuilder& builder, const bool includeFinish, std::optional* lastResultPosition) { AFL_VERIFY(ControlPoints == 1); Y_ABORT_UNLESS((ui32)DataSchema->num_fields() == builder.GetBuildersCount()); builder.ValidateDataSchema(DataSchema); bool cpReachedFlag = false; + std::shared_ptr resultScanData; + ui64 resultPosition; while (SortHeap.Size() && !cpReachedFlag && !builder.IsBufferExhausted()) { if (SortHeap.Current().IsControlPoint()) { - auto keyColumns = SortHeap.Current().GetKeyColumns(); + auto keyColumns = SortHeap.Current().GetKeyColumns().BuildSortingCursor(); RemoveControlPoint(); cpReachedFlag = true; if (SortHeap.Empty() || !includeFinish || SortHeap.Current().GetKeyColumns().Compare(keyColumns) == std::partial_ordering::greater) { + if (lastResultPosition && resultScanData) { + *lastResultPosition = resultScanData->BuildCursor(resultPosition); + } return true; } } - if (auto currentPosition = DrainCurrentPosition()) { - CheckSequenceInDebug(*currentPosition); - builder.AddRecord(*currentPosition); - if (lastResultPosition) { - *lastResultPosition = *currentPosition; - } - } + DrainCurrentPosition(&builder, &resultScanData, &resultPosition); + } + if (lastResultPosition && resultScanData) { + *lastResultPosition = resultScanData->BuildCursor(resultPosition); } return cpReachedFlag; } -bool TMergePartialStream::DrainCurrentTo(TRecordBatchBuilder& builder, const TSortableBatchPosition& readTo, const bool includeFinish, std::optional* lastResultPosition) { - PutControlPoint(std::make_shared(readTo)); +bool TMergePartialStream::DrainCurrentTo(TRecordBatchBuilder& builder, const TSortableBatchPosition& readTo, const bool includeFinish, std::optional* lastResultPosition) { + PutControlPoint(readTo); return DrainToControlPoint(builder, includeFinish, lastResultPosition); } -std::shared_ptr TMergePartialStream::SingleSourceDrain(const TSortableBatchPosition& readTo, const bool includeFinish, std::optional* lastResultPosition) { +std::shared_ptr TMergePartialStream::SingleSourceDrain(const TSortableBatchPosition& readTo, const bool includeFinish, std::optional* lastResultPosition) { std::shared_ptr result; if (SortHeap.Empty()) { return result; @@ -100,7 +101,7 @@ std::shared_ptr TMergePartialStream::SingleSourceDrain(const TSort result = SortHeap.Current().GetKeyColumns().SliceData(pos.GetPosition() + (include ? 0 : 1), resultSize); if (lastResultPosition && resultSize) { auto keys = SortHeap.Current().GetKeyColumns().SliceKeys(pos.GetPosition() + (include ? 0 : 1), resultSize); - *lastResultPosition = TSortableBatchPosition(keys, 0, SortSchema->field_names(), {}, true); + *lastResultPosition = TCursor(keys, 0, SortSchema->field_names()); } if (SortHeap.Current().GetFilter()) { SortHeap.Current().GetFilter()->Apply(result, pos.GetPosition() + (include ? 0 : 1), resultSize); @@ -109,7 +110,7 @@ std::shared_ptr TMergePartialStream::SingleSourceDrain(const TSort result = SortHeap.Current().GetKeyColumns().SliceData(startPos, resultSize); if (lastResultPosition && resultSize) { auto keys = SortHeap.Current().GetKeyColumns().SliceKeys(startPos, resultSize); - *lastResultPosition = TSortableBatchPosition(keys, keys->num_rows() - 1, SortSchema->field_names(), {}, false); + *lastResultPosition = TCursor(keys, keys->num_rows() - 1, SortSchema->field_names()); } if (SortHeap.Current().GetFilter()) { SortHeap.Current().GetFilter()->Apply(result, startPos, resultSize); @@ -144,38 +145,44 @@ std::shared_ptr TMergePartialStream::SingleSourceDrain(const TSort void TMergePartialStream::DrainAll(TRecordBatchBuilder& builder) { Y_ABORT_UNLESS((ui32)DataSchema->num_fields() == builder.GetBuildersCount()); while (SortHeap.Size()) { - if (auto currentPosition = DrainCurrentPosition()) { - CheckSequenceInDebug(*currentPosition); - builder.AddRecord(*currentPosition); - } + DrainCurrentPosition(&builder, nullptr, nullptr); } } -std::optional TMergePartialStream::DrainCurrentPosition() { +void TMergePartialStream::DrainCurrentPosition(TRecordBatchBuilder* builder, std::shared_ptr* resultScanData, ui64* resultPosition) { Y_ABORT_UNLESS(SortHeap.Size()); Y_ABORT_UNLESS(!SortHeap.Current().IsControlPoint()); - TSortableBatchPosition result = SortHeap.Current().GetKeyColumns(); - TSortableBatchPosition resultVersion = SortHeap.Current().GetVersionColumns(); + if (!SortHeap.Current().IsDeleted()) { + if (builder) { + builder->AddRecord(SortHeap.Current().GetKeyColumns()); + } + if (resultScanData && resultPosition) { + *resultScanData = SortHeap.Current().GetKeyColumns().GetSorting(); + *resultPosition = SortHeap.Current().GetKeyColumns().GetPosition(); + } + } + CheckSequenceInDebug(SortHeap.Current().GetKeyColumns()); + const ui64 startPosition = SortHeap.Current().GetKeyColumns().GetPosition(); + const TSortableScanData* startSorting = SortHeap.Current().GetKeyColumns().GetSorting().get(); + const TSortableScanData* startVersion = SortHeap.Current().GetVersionColumns().GetSorting().get(); bool isFirst = true; - const bool deletedFlag = SortHeap.Current().IsDeleted(); - while (SortHeap.Size() && (isFirst || result.Compare(SortHeap.Current().GetKeyColumns()) == std::partial_ordering::equivalent)) { - auto& anotherIterator = SortHeap.Current(); + while (SortHeap.Size() && (isFirst || SortHeap.Current().GetKeyColumns().Compare(*startSorting, startPosition) == std::partial_ordering::equivalent)) { if (!isFirst) { + auto& anotherIterator = SortHeap.Current(); if (PossibleSameVersionFlag) { - AFL_VERIFY(resultVersion.Compare(anotherIterator.GetVersionColumns()) != std::partial_ordering::less)("r", resultVersion.DebugJson())("a", anotherIterator.GetVersionColumns().DebugJson()) - ("key", result.DebugJson()); + AFL_VERIFY(anotherIterator.GetVersionColumns().Compare(*startVersion, startPosition) != std::partial_ordering::greater) + ("r", startVersion->BuildCursor(startPosition).DebugJson())("a", anotherIterator.GetVersionColumns().DebugJson()) + ("key", startSorting->BuildCursor(startPosition).DebugJson()); } else { - AFL_VERIFY(resultVersion.Compare(anotherIterator.GetVersionColumns()) == std::partial_ordering::greater)("r", resultVersion.DebugJson())("a", anotherIterator.GetVersionColumns().DebugJson()) - ("key", result.DebugJson()); + AFL_VERIFY(anotherIterator.GetVersionColumns().Compare(*startVersion, startPosition) == std::partial_ordering::less) + ("r", startVersion->BuildCursor(startPosition).DebugJson())("a", anotherIterator.GetVersionColumns().DebugJson()) + ("key", startSorting->BuildCursor(startPosition).DebugJson()); } } SortHeap.Next(); isFirst = false; } - if (deletedFlag) { - return {}; - } - return result; + SortHeap.CleanFinished(); } std::vector> TMergePartialStream::DrainAllParts(const std::map& positions, diff --git a/ydb/core/formats/arrow/reader/merger.h b/ydb/core/formats/arrow/reader/merger.h index 5ff296cae0e8..6a522a801ed7 100644 --- a/ydb/core/formats/arrow/reader/merger.h +++ b/ydb/core/formats/arrow/reader/merger.h @@ -11,7 +11,7 @@ namespace NKikimr::NArrow::NMerger { class TMergePartialStream { private: #ifndef NDEBUG - std::optional CurrentKeyColumns; + std::optional CurrentKeyColumns; #endif bool PossibleSameVersionFlag = true; @@ -34,9 +34,9 @@ class TMergePartialStream { return result; } - std::optional DrainCurrentPosition(); + void DrainCurrentPosition(TRecordBatchBuilder* builder, std::shared_ptr* resultScanData, ui64* resultPosition); - void CheckSequenceInDebug(const TSortableBatchPosition& nextKeyColumnsPosition); + void CheckSequenceInDebug(const TRWSortableBatchPosition& nextKeyColumnsPosition); public: TMergePartialStream(std::shared_ptr sortSchema, std::shared_ptr dataSchema, const bool reverse, const std::vector& versionColumnNames) : SortSchema(sortSchema) @@ -67,7 +67,7 @@ class TMergePartialStream { return TStringBuilder() << "sort_heap=" << SortHeap.DebugJson(); } - void PutControlPoint(std::shared_ptr point); + void PutControlPoint(const TSortableBatchPosition& point); void RemoveControlPoint(); @@ -93,9 +93,9 @@ class TMergePartialStream { } void DrainAll(TRecordBatchBuilder& builder); - std::shared_ptr SingleSourceDrain(const TSortableBatchPosition& readTo, const bool includeFinish, std::optional* lastResultPosition = nullptr); - bool DrainCurrentTo(TRecordBatchBuilder& builder, const TSortableBatchPosition& readTo, const bool includeFinish, std::optional* lastResultPosition = nullptr); - bool DrainToControlPoint(TRecordBatchBuilder& builder, const bool includeFinish, std::optional* lastResultPosition = nullptr); + std::shared_ptr SingleSourceDrain(const TSortableBatchPosition& readTo, const bool includeFinish, std::optional* lastResultPosition = nullptr); + bool DrainCurrentTo(TRecordBatchBuilder& builder, const TSortableBatchPosition& readTo, const bool includeFinish, std::optional* lastResultPosition = nullptr); + bool DrainToControlPoint(TRecordBatchBuilder& builder, const bool includeFinish, std::optional* lastResultPosition = nullptr); std::vector> DrainAllParts(const std::map& positions, const std::vector>& resultFields); }; diff --git a/ydb/core/formats/arrow/reader/position.cpp b/ydb/core/formats/arrow/reader/position.cpp index a11863b32b91..def3bbc5d2a6 100644 --- a/ydb/core/formats/arrow/reader/position.cpp +++ b/ydb/core/formats/arrow/reader/position.cpp @@ -15,7 +15,7 @@ NJson::TJsonValue TSortableBatchPosition::DebugJson() const { return result; } -std::optional TSortableBatchPosition::FindPosition(TSortableBatchPosition& position, const ui64 posStartExt, const ui64 posFinishExt, const TSortableBatchPosition& forFound, const bool greater) { +std::optional TSortableBatchPosition::FindPosition(TRWSortableBatchPosition& position, const ui64 posStartExt, const ui64 posFinishExt, const TSortableBatchPosition& forFound, const bool greater) { ui64 posStart = posStartExt; ui64 posFinish = posFinishExt; { @@ -70,11 +70,23 @@ std::optional TSortableBatchPosition::Fi posStart = *includedStartPosition; } - TSortableBatchPosition position = forFound.BuildSame(batch, posStart); + TRWSortableBatchPosition position = forFound.BuildRWPosition(batch, posStart); return FindPosition(position, posStart, posFinish, forFound, greater); } -TSortableBatchPosition::TFoundPosition TSortableBatchPosition::SkipToLower(const TSortableBatchPosition& forFound) { +NKikimr::NArrow::NMerger::TRWSortableBatchPosition TSortableBatchPosition::BuildRWPosition() const { + return TRWSortableBatchPosition(Position, RecordsCount, ReverseSort, Sorting->BuildCopy(Position), Data ? Data->BuildCopy(Position) : nullptr); +} + +NKikimr::NArrow::NMerger::TRWSortableBatchPosition TSortableBatchPosition::BuildRWPosition(std::shared_ptr batch, const ui32 position) const { + std::vector dataColumns; + if (Data) { + dataColumns = Data->GetFieldNames(); + } + return TRWSortableBatchPosition(batch, position, Sorting->GetFieldNames(), dataColumns, ReverseSort); +} + +TSortableBatchPosition::TFoundPosition TRWSortableBatchPosition::SkipToLower(const TSortableBatchPosition& forFound) { const ui32 posStart = Position; auto pos = FindPosition(*this, posStart, ReverseSort ? 0 : (RecordsCount - 1), forFound, true); AFL_VERIFY(pos)("cursor", DebugJson())("found", forFound.DebugJson()); @@ -86,37 +98,128 @@ TSortableBatchPosition::TFoundPosition TSortableBatchPosition::SkipToLower(const return *pos; } -TSortableScanData::TSortableScanData(const std::shared_ptr& batch, const std::vector& columns) { +TSortableScanData::TSortableScanData(const ui64 position, const std::shared_ptr& batch, const std::vector& columns) { for (auto&& i : columns) { auto c = batch->GetAccessorByNameOptional(i); AFL_VERIFY(c)("column_name", i)("columns", JoinSeq(",", columns))("batch", batch->DebugString()); - Columns.emplace_back(NAccessor::IChunkedArray::TReader(c)); + Columns.emplace_back(c); auto f = batch->GetSchema()->GetFieldByName(i); AFL_VERIFY(f); Fields.emplace_back(f); } + BuildPosition(position); } -TSortableScanData::TSortableScanData(const std::shared_ptr& batch, const std::vector& columns) { +TSortableScanData::TSortableScanData(const ui64 position, const std::shared_ptr& batch, const std::vector& columns) { for (auto&& i : columns) { auto c = batch->GetColumnByName(i); AFL_VERIFY(c)("column_name", i)("columns", JoinSeq(",", columns)); - Columns.emplace_back(NAccessor::IChunkedArray::TReader(std::make_shared(c))); + Columns.emplace_back(std::make_shared(c)); auto f = batch->schema()->GetFieldByName(i); AFL_VERIFY(f); Fields.emplace_back(f); } + BuildPosition(position); } -TSortableScanData::TSortableScanData(const std::shared_ptr& batch, const std::vector& columns) { +TSortableScanData::TSortableScanData(const ui64 position, const std::shared_ptr& batch, const std::vector& columns) { for (auto&& i : columns) { auto c = batch->GetColumnByName(i); AFL_VERIFY(c)("column_name", i)("columns", JoinSeq(",", columns)); - Columns.emplace_back(NAccessor::IChunkedArray::TReader(std::make_shared(c))); + Columns.emplace_back(std::make_shared(c)); auto f = batch->schema()->GetFieldByName(i); AFL_VERIFY(f); Fields.emplace_back(f); } + BuildPosition(position); +} + +void TSortableScanData::AppendPositionTo(const std::vector>& builders, const ui64 position, ui64* recordSize) const { + AFL_VERIFY(builders.size() == PositionAddress.size()); + for (ui32 i = 0; i < PositionAddress.size(); ++i) { + AFL_VERIFY(NArrow::Append(*builders[i], *PositionAddress[i].GetArray(), position - PositionAddress[i].GetStartPosition(), recordSize)); + } +} + +void TSortableScanData::BuildPosition(const ui64 position) { + PositionAddress.clear(); + std::optional recordsCount; + FinishPosition = Max(); + StartPosition = 0; + LastInit = position; + for (auto&& i : Columns) { + PositionAddress.emplace_back(i->GetChunk({}, position)); + StartPosition = std::max(StartPosition, PositionAddress.back().GetStartPosition()); + FinishPosition = std::min(FinishPosition, PositionAddress.back().GetFinishPosition()); + if (!recordsCount) { + recordsCount = i->GetRecordsCount(); + } else { + AFL_VERIFY(*recordsCount == i->GetRecordsCount()); + } + } + AFL_VERIFY(StartPosition < FinishPosition); + AFL_VERIFY(recordsCount); + RecordsCount = *recordsCount; + AFL_VERIFY(position < RecordsCount); +} + +bool TSortableScanData::InitPosition(const ui64 position) { + AFL_VERIFY(position < RecordsCount); + if (position < FinishPosition && StartPosition <= position) { + return false; + } + LastInit = position; + ui32 idx = 0; + FinishPosition = Max(); + StartPosition = 0; + for (auto&& i : PositionAddress) { + if (!i.Contains(position)) { + i = Columns[idx]->GetChunk(i, position); + } + StartPosition = std::max(StartPosition, i.GetStartPosition()); + FinishPosition = std::min(FinishPosition, i.GetFinishPosition()); + ++idx; + } + AFL_VERIFY(StartPosition < FinishPosition); + return true; +} + +std::partial_ordering TCursor::Compare(const TSortableScanData& item, const ui64 itemPosition) const { + AFL_VERIFY(PositionAddress.size() == item.GetPositionAddress().size()); + for (ui32 idx = 0; idx < PositionAddress.size(); ++idx) { + auto cmp = PositionAddress[idx].Compare(Position, item.GetPositionAddress()[idx], itemPosition); + if (std::is_neq(cmp)) { + return cmp; + } + } + + return std::partial_ordering::equivalent; +} + +std::partial_ordering TCursor::Compare(const TCursor& item) const { + AFL_VERIFY(PositionAddress.size() == item.PositionAddress.size()); + for (ui32 idx = 0; idx < PositionAddress.size(); ++idx) { + auto cmp = PositionAddress[idx].Compare(Position, item.PositionAddress[idx], item.Position); + if (std::is_neq(cmp)) { + return cmp; + } + } + + return std::partial_ordering::equivalent; +} + +void TCursor::AppendPositionTo(const std::vector>& builders, ui64* recordSize) const { + AFL_VERIFY(builders.size() == PositionAddress.size()); + for (ui32 i = 0; i < PositionAddress.size(); ++i) { + AFL_VERIFY_DEBUG(builders[i]->type()->Equals(PositionAddress[i].GetArray()->type())); + AFL_VERIFY(NArrow::Append(*builders[i], *PositionAddress[i].GetArray(), Position - PositionAddress[i].GetStartPosition(), recordSize)); + } +} + +TCursor::TCursor(const std::shared_ptr& table, const ui64 position, const std::vector& columns) + : Position(position) +{ + PositionAddress = TSortableScanData(position, table, columns).GetPositionAddress(); } } diff --git a/ydb/core/formats/arrow/reader/position.h b/ydb/core/formats/arrow/reader/position.h index f2a070d7b1f2..3782614f56e1 100644 --- a/ydb/core/formats/arrow/reader/position.h +++ b/ydb/core/formats/arrow/reader/position.h @@ -17,31 +17,137 @@ namespace NKikimr::NArrow::NMerger { class TRecordBatchBuilder; +class TSortableScanData; -class TSortableScanData { +class TCursor { private: - YDB_READONLY_DEF(std::vector, Columns); - YDB_READONLY_DEF(std::vector>, Fields); + YDB_READONLY(ui64, Position, 0); + std::vector PositionAddress; public: - TSortableScanData() = default; - TSortableScanData(const std::shared_ptr& batch, const std::vector& columns); - TSortableScanData(const std::shared_ptr& batch, const std::vector& columns); - TSortableScanData(const std::shared_ptr& batch, const std::vector& columns); + TCursor() = default; + TCursor(const std::shared_ptr& table, const ui64 position, const std::vector& columns); + + TCursor(const ui64 position, const std::vector& addresses) + : Position(position) + , PositionAddress(addresses) + { + + } - std::shared_ptr ExtractPosition(const ui64 pos) const { + NJson::TJsonValue DebugJson() const { + NJson::TJsonValue result = NJson::JSON_MAP; + for (ui32 i = 0; i < PositionAddress.size(); ++i) { + auto& jsonColumn = result["sorting_columns"].AppendValue(NJson::JSON_MAP); + jsonColumn["value"] = PositionAddress[i].DebugString(Position); + } + return result; + } + + std::shared_ptr ExtractSortingPosition(const std::vector>& fields) const { + AFL_VERIFY(fields.size() == PositionAddress.size()); std::vector> columns; - std::shared_ptr schema = std::make_shared(Fields); - for (ui32 i = 0; i < Columns.size(); ++i) { - auto extracted = Columns[i].CopyRecord(pos); + std::shared_ptr schema = std::make_shared(fields); + for (ui32 i = 0; i < PositionAddress.size(); ++i) { + auto extracted = PositionAddress[i].CopyRecord(Position); columns.emplace_back(extracted); } return arrow::RecordBatch::Make(schema, 1, columns); } + void AppendPositionTo(const std::vector>& builders, ui64* recordSize) const; + + std::partial_ordering Compare(const TSortableScanData& item, const ui64 itemPosition) const; + std::partial_ordering Compare(const TCursor& item) const; + +}; + +class TSortableScanData { +private: + ui64 RecordsCount = 0; + YDB_READONLY_DEF(std::vector, PositionAddress); + YDB_READONLY_DEF(std::vector>, Columns); + YDB_READONLY_DEF(std::vector>, Fields); + ui64 StartPosition = 0; + ui64 FinishPosition = 0; + void BuildPosition(const ui64 position); + ui64 LastInit = 0; + + bool Contains(const ui64 position) const { + return StartPosition <= position && position < FinishPosition; + } +public: + TSortableScanData(const ui64 position, const std::shared_ptr& batch, const std::vector& columns); + TSortableScanData(const ui64 position, const std::shared_ptr& batch, const std::vector& columns); + TSortableScanData(const ui64 position, const std::shared_ptr& batch, const std::vector& columns); + TSortableScanData(const ui64 position, const ui64 recordsCount, const std::vector>& columns, const std::vector>& fields) + : RecordsCount(recordsCount) + , Columns(columns) + , Fields(fields) + { + BuildPosition(position); + } + + std::shared_ptr BuildCopy(const ui64 position) const { + return std::make_shared(position, RecordsCount, Columns, Fields); + } + + TCursor BuildCursor(const ui64 position) const { + if (Contains(position)) { + return TCursor(position, PositionAddress); + } + auto addresses = PositionAddress; + ui32 idx = 0; + for (auto&& i : addresses) { + if (!i.Contains(position)) { + i = Columns[idx]->GetChunk(i, position); + } + ++idx; + } + return TCursor(position, addresses); + } + + std::partial_ordering Compare(const ui64 position, const TSortableScanData& item, const ui64 itemPosition) const { + AFL_VERIFY(PositionAddress.size() == item.PositionAddress.size()); + if (Contains(position) && item.Contains(itemPosition)) { + for (ui32 idx = 0; idx < PositionAddress.size(); ++idx) { + std::partial_ordering cmp = PositionAddress[idx].Compare(position, item.PositionAddress[idx], itemPosition); + if (cmp != std::partial_ordering::equivalent) { + return cmp; + } + } + } else { + for (ui32 idx = 0; idx < PositionAddress.size(); ++idx) { + std::partial_ordering cmp = std::partial_ordering::equivalent; + const bool containsSelf = PositionAddress[idx].Contains(position); + const bool containsItem = item.PositionAddress[idx].Contains(itemPosition); + if (containsSelf && containsItem) { + cmp = PositionAddress[idx].Compare(position, item.PositionAddress[idx], itemPosition); + } else if (containsSelf) { + auto temporaryAddress = item.Columns[idx]->GetChunk(item.PositionAddress[idx], itemPosition); + cmp = PositionAddress[idx].Compare(position, temporaryAddress, itemPosition); + } else if (containsItem) { + auto temporaryAddress = Columns[idx]->GetChunk(PositionAddress[idx], position); + cmp = temporaryAddress.Compare(position, item.PositionAddress[idx], itemPosition); + } else { + AFL_VERIFY(false); + } + if (cmp != std::partial_ordering::equivalent) { + return cmp; + } + } + } + + return std::partial_ordering::equivalent; + } + + void AppendPositionTo(const std::vector>& builders, const ui64 position, ui64* recordSize) const; + + bool InitPosition(const ui64 position); + std::shared_ptr Slice(const ui64 offset, const ui64 count) const { std::vector> slicedArrays; for (auto&& i : Columns) { - slicedArrays.emplace_back(i.Slice(offset, count)); + slicedArrays.emplace_back(i->Slice(offset, count)); } return arrow::Table::Make(std::make_shared(Fields), slicedArrays, count); } @@ -61,7 +167,7 @@ class TSortableScanData { return true; } - NJson::TJsonValue DebugJson(const i32 position) const { + NJson::TJsonValue DebugJson(const ui64 position) const { NJson::TJsonValue result = NJson::JSON_MAP; auto& jsonFields = result.InsertValue("fields", NJson::JSON_ARRAY); for (auto&& i : Fields) { @@ -70,9 +176,7 @@ class TSortableScanData { for (ui32 i = 0; i < Columns.size(); ++i) { auto& jsonColumn = result["sorting_columns"].AppendValue(NJson::JSON_MAP); jsonColumn["name"] = Fields[i]->name(); - if (position >= 0 && (ui64)position < Columns[i].GetRecordsCount()) { - jsonColumn["value"] = Columns[i].DebugString(position); - } + jsonColumn["value"] = PositionAddress[i].DebugString(position); } return result; } @@ -86,20 +190,65 @@ class TSortableScanData { } }; +class TRWSortableBatchPosition; + class TSortableBatchPosition { -private: - YDB_READONLY(i64, Position, 0); - YDB_READONLY(i64, RecordsCount, 0); +protected: + i64 Position = 0; + i64 RecordsCount = 0; bool ReverseSort = false; std::shared_ptr Sorting; std::shared_ptr Data; public: TSortableBatchPosition() = default; - std::shared_ptr ExtractSortingPosition() const { - return Sorting->ExtractPosition(Position); + i64 GetPosition() const { + return Position; } + i64 GetRecordsCount() const { + return RecordsCount; + } + + std::shared_ptr GetSorting() const { + return Sorting; + } + + TCursor BuildSortingCursor() const { + return Sorting->BuildCursor(Position); + } + + TCursor BuildDataCursor() const { + if (!Data) { + return TCursor(); + } + return Data->BuildCursor(Position); + } + + const std::vector>& GetSortFields() const { + return Sorting->GetFields(); + } + + TSortableBatchPosition(const i64 position, const i64 recordsCount, const bool reverseSort, const std::shared_ptr& sorting, const std::shared_ptr& data) + : Position(position) + , RecordsCount(recordsCount) + , ReverseSort(reverseSort) + , Sorting(sorting) + , Data(data) + { + + } + + TSortableBatchPosition(const TRWSortableBatchPosition& source) = delete; + TSortableBatchPosition(TRWSortableBatchPosition& source) = delete; + TSortableBatchPosition(TRWSortableBatchPosition&& source) = delete; + + TSortableBatchPosition operator= (const TRWSortableBatchPosition& source) = delete; + TSortableBatchPosition operator= (TRWSortableBatchPosition& source) = delete; + TSortableBatchPosition operator= (TRWSortableBatchPosition&& source) = delete; + + TRWSortableBatchPosition BuildRWPosition() const; + std::shared_ptr SliceData(const ui64 offset, const ui64 count) const { AFL_VERIFY(Data); return Data->Slice(offset, count); @@ -143,11 +292,122 @@ class TSortableBatchPosition { } }; + static std::optional FindPosition(const std::shared_ptr& batch, const TSortableBatchPosition& forFound, const bool needGreater, const std::optional includedStartPosition); + static std::optional FindPosition(TRWSortableBatchPosition& position, const ui64 posStart, const ui64 posFinish, const TSortableBatchPosition& forFound, const bool greater); + + const TSortableScanData& GetData() const { + return *Data; + } + + bool IsReverseSort() const { + return ReverseSort; + } + NJson::TJsonValue DebugJson() const; + + TRWSortableBatchPosition BuildRWPosition(std::shared_ptr batch, const ui32 position) const; + + bool IsSameSortingSchema(const std::shared_ptr& schema) const { + return Sorting->IsSameSchema(schema); + } + + template + TSortableBatchPosition(const std::shared_ptr& batch, const ui32 position, const std::vector& sortingColumns, + const std::vector& dataColumns, const bool reverseSort) + : Position(position) + , ReverseSort(reverseSort) { + Y_ABORT_UNLESS(batch); + Y_ABORT_UNLESS(batch->num_rows()); + RecordsCount = batch->num_rows(); + AFL_VERIFY(Position < RecordsCount)("position", Position)("count", RecordsCount); + + if (dataColumns.size()) { + Data = std::make_shared(Position, batch, dataColumns); + } + Sorting = std::make_shared(Position, batch, sortingColumns); + Y_DEBUG_ABORT_UNLESS(batch->ValidateFull().ok()); + Y_ABORT_UNLESS(Sorting->GetColumns().size()); + } + + std::partial_ordering GetReverseForCompareResult(const std::partial_ordering directResult) const { + if (directResult == std::partial_ordering::less) { + return std::partial_ordering::greater; + } else if (directResult == std::partial_ordering::greater) { + return std::partial_ordering::less; + } else { + return directResult; + } + } + + std::partial_ordering ApplyOptionalReverseForCompareResult(const std::partial_ordering directResult) const { + if (ReverseSort) { + return GetReverseForCompareResult(directResult); + } else { + return directResult; + } + } + + std::partial_ordering Compare(const TCursor& cursor) const { + if (ReverseSort) { + return cursor.Compare(*Sorting, Position); + } else { + return GetReverseForCompareResult(cursor.Compare(*Sorting, Position)); + } + } + + std::partial_ordering Compare(const TSortableBatchPosition& item) const { + Y_ABORT_UNLESS(item.ReverseSort == ReverseSort); + Y_ABORT_UNLESS(item.Sorting->GetColumns().size() == Sorting->GetColumns().size()); + const auto directResult = Sorting->Compare(Position, *item.Sorting, item.GetPosition()); + return ApplyOptionalReverseForCompareResult(directResult); + } + + std::partial_ordering Compare(const TSortableScanData& data, const ui64 dataPosition) const { + return Sorting->Compare(Position, data, dataPosition); + } + + bool operator<(const TSortableBatchPosition& item) const { + return Compare(item) == std::partial_ordering::less; + } + + bool operator==(const TSortableBatchPosition& item) const { + return Compare(item) == std::partial_ordering::equivalent; + } + + bool operator!=(const TSortableBatchPosition& item) const { + return Compare(item) != std::partial_ordering::equivalent; + } + +}; + +class TRWSortableBatchPosition: public TSortableBatchPosition, public TMoveOnly { +private: + using TBase = TSortableBatchPosition; +public: + using TBase::TBase; + + bool NextPosition(const i64 delta) { + return InitPosition(Position + delta); + } + + bool InitPosition(const i64 position) { + if (position < RecordsCount && position >= 0) { + Sorting->InitPosition(position); + if (Data) { + Data->InitPosition(position); + } + Position = position; + return true; + } else { + return false; + } + + } + TSortableBatchPosition::TFoundPosition SkipToLower(const TSortableBatchPosition& forFound); + // (-inf, it1), [it1, it2), [it2, it3), ..., [itLast, +inf) template static std::vector> SplitByBorders(const std::shared_ptr& batch, - const std::vector& columnNames, TBordersIterator& it) - { + const std::vector& columnNames, TBordersIterator& it) { std::vector> result; if (!batch || batch->num_rows() == 0) { while (it.IsValid()) { @@ -156,7 +416,7 @@ class TSortableBatchPosition { result.emplace_back(nullptr); return result; } - TSortableBatchPosition pos(batch, 0, columnNames, {}, false); + TRWSortableBatchPosition pos(batch, 0, columnNames, {}, false); bool batchFinished = false; i64 recordsCountSplitted = 0; for (; it.IsValid() && !batchFinished; it.Next()) { @@ -250,92 +510,6 @@ class TSortableBatchPosition { return SplitByBorders(batch, columnNames, it); } - static std::optional FindPosition(const std::shared_ptr& batch, const TSortableBatchPosition& forFound, const bool needGreater, const std::optional includedStartPosition); - static std::optional FindPosition(TSortableBatchPosition& position, const ui64 posStart, const ui64 posFinish, const TSortableBatchPosition& forFound, const bool greater); - TSortableBatchPosition::TFoundPosition SkipToLower(const TSortableBatchPosition& forFound); - - const TSortableScanData& GetData() const { - return *Data; - } - - bool IsReverseSort() const { - return ReverseSort; - } - NJson::TJsonValue DebugJson() const; - - TSortableBatchPosition BuildSame(std::shared_ptr batch, const ui32 position) const { - std::vector dataColumns; - if (Data) { - dataColumns = Data->GetFieldNames(); - } - return TSortableBatchPosition(batch, position, Sorting->GetFieldNames(), dataColumns, ReverseSort); - } - - bool IsSameSortingSchema(const std::shared_ptr& schema) { - return Sorting->IsSameSchema(schema); - } - - template - TSortableBatchPosition(const std::shared_ptr& batch, const ui32 position, const std::vector& sortingColumns, - const std::vector& dataColumns, const bool reverseSort) - : Position(position) - , ReverseSort(reverseSort) - { - Y_ABORT_UNLESS(batch); - Y_ABORT_UNLESS(batch->num_rows()); - RecordsCount = batch->num_rows(); - AFL_VERIFY(Position < RecordsCount)("position", Position)("count", RecordsCount); - - if (dataColumns.size()) { - Data = std::make_shared(batch, dataColumns); - } - Sorting = std::make_shared(batch, sortingColumns); - Y_DEBUG_ABORT_UNLESS(batch->ValidateFull().ok()); - Y_ABORT_UNLESS(Sorting->GetColumns().size()); - } - - std::partial_ordering Compare(const TSortableBatchPosition& item) const { - Y_ABORT_UNLESS(item.ReverseSort == ReverseSort); - Y_ABORT_UNLESS(item.Sorting->GetColumns().size() == Sorting->GetColumns().size()); - const auto directResult = NAccessor::IChunkedArray::TReader::CompareColumns(Sorting->GetColumns(), Position, item.Sorting->GetColumns(), item.Position); - if (ReverseSort) { - if (directResult == std::partial_ordering::less) { - return std::partial_ordering::greater; - } else if (directResult == std::partial_ordering::greater) { - return std::partial_ordering::less; - } else { - return std::partial_ordering::equivalent; - } - } else { - return directResult; - } - } - - bool operator<(const TSortableBatchPosition& item) const { - return Compare(item) == std::partial_ordering::less; - } - - bool operator==(const TSortableBatchPosition& item) const { - return Compare(item) == std::partial_ordering::equivalent; - } - - bool operator!=(const TSortableBatchPosition& item) const { - return Compare(item) != std::partial_ordering::equivalent; - } - - bool NextPosition(const i64 delta) { - return InitPosition(Position + delta); - } - - bool InitPosition(const i64 position) { - if (position < RecordsCount && position >= 0) { - Position = position; - return true; - } else { - return false; - } - - } }; } diff --git a/ydb/core/formats/arrow/reader/result_builder.cpp b/ydb/core/formats/arrow/reader/result_builder.cpp index 5a7d79a08fed..deb4fe3e1427 100644 --- a/ydb/core/formats/arrow/reader/result_builder.cpp +++ b/ydb/core/formats/arrow/reader/result_builder.cpp @@ -7,19 +7,26 @@ #include +#include "position.h" + namespace NKikimr::NArrow::NMerger { void TRecordBatchBuilder::ValidateDataSchema(const std::shared_ptr& schema) { AFL_VERIFY(IsSameFieldsSequence(schema->fields(), Fields)); } -void TRecordBatchBuilder::AddRecord(const TSortableBatchPosition& position) { +void TRecordBatchBuilder::AddRecord(const TCursor& position) { +// AFL_VERIFY_DEBUG(IsSameFieldsSequence(position.GetData().GetFields(), Fields)); +// AFL_TRACE(NKikimrServices::TX_COLUMNSHARD)("event", "record_add_on_read")("record", position.DebugJson()); + position.AppendPositionTo(Builders, MemoryBufferLimit ? &CurrentBytesUsed : nullptr); + ++RecordsCount; +} + +void TRecordBatchBuilder::AddRecord(const TRWSortableBatchPosition& position) { AFL_VERIFY_DEBUG(position.GetData().GetColumns().size() == Builders.size()); AFL_VERIFY_DEBUG(IsSameFieldsSequence(position.GetData().GetFields(), Fields)); -// AFL_TRACE(NKikimrServices::TX_COLUMNSHARD)("event", "record_add_on_read")("record", position.DebugJson()); - for (ui32 i = 0; i < position.GetData().GetColumns().size(); ++i) { - position.GetData().GetColumns()[i].AppendPositionTo(*Builders[i], position.GetPosition(), MemoryBufferLimit ? &CurrentBytesUsed : nullptr); - } + // AFL_TRACE(NKikimrServices::TX_COLUMNSHARD)("event", "record_add_on_read")("record", position.DebugJson()); + position.GetData().AppendPositionTo(Builders, position.GetPosition(), MemoryBufferLimit ? &CurrentBytesUsed : nullptr); ++RecordsCount; } @@ -57,7 +64,11 @@ std::shared_ptr TRecordBatchBuilder::Finalize() { for (auto&& i : Builders) { columns.emplace_back(NArrow::TStatusValidator::GetValid(i->Finish())); } - return arrow::RecordBatch::Make(schema, columns.front()->length(), columns); + auto result = arrow::RecordBatch::Make(schema, columns.front()->length(), columns); +#ifndef NDEBUG + NArrow::TStatusValidator::Validate(result->ValidateFull()); +#endif + return result; } TString TRecordBatchBuilder::GetColumnNames() const { diff --git a/ydb/core/formats/arrow/reader/result_builder.h b/ydb/core/formats/arrow/reader/result_builder.h index ba05e03cb934..56e8250af206 100644 --- a/ydb/core/formats/arrow/reader/result_builder.h +++ b/ydb/core/formats/arrow/reader/result_builder.h @@ -31,7 +31,8 @@ class TRecordBatchBuilder { bool IsBufferExhausted() const { return MemoryBufferLimit && *MemoryBufferLimit < CurrentBytesUsed; } - void AddRecord(const TSortableBatchPosition& position); + void AddRecord(const TCursor& position); + void AddRecord(const TRWSortableBatchPosition& position); void ValidateDataSchema(const std::shared_ptr& schema); }; diff --git a/ydb/core/formats/arrow/special_keys.cpp b/ydb/core/formats/arrow/special_keys.cpp index a654c4be6ef6..3209fcff13a9 100644 --- a/ydb/core/formats/arrow/special_keys.cpp +++ b/ydb/core/formats/arrow/special_keys.cpp @@ -64,15 +64,15 @@ TMinMaxSpecialKeys::TMinMaxSpecialKeys(std::shared_ptr batch Y_ABORT_UNLESS(batch->num_rows()); Y_ABORT_UNLESS(schema); - NMerger::TSortableBatchPosition record(batch, 0, schema->field_names(), {}, false); - std::optional minValue; - std::optional maxValue; + NMerger::TRWSortableBatchPosition record(batch, 0, schema->field_names(), {}, false); + std::optional minValue; + std::optional maxValue; while (true) { - if (!minValue || minValue->Compare(record) == std::partial_ordering::greater) { - minValue = record; + if (!minValue || record.Compare(*minValue) == std::partial_ordering::less) { + minValue = record.BuildSortingCursor(); } - if (!maxValue || maxValue->Compare(record) == std::partial_ordering::less) { - maxValue = record; + if (!maxValue || record.Compare(*maxValue) == std::partial_ordering::greater) { + maxValue = record.BuildSortingCursor(); } if (!record.NextPosition(1)) { break; diff --git a/ydb/core/tx/columnshard/engines/changes/indexation.cpp b/ydb/core/tx/columnshard/engines/changes/indexation.cpp index 123e61ef0fde..c205726a1098 100644 --- a/ydb/core/tx/columnshard/engines/changes/indexation.cpp +++ b/ydb/core/tx/columnshard/engines/changes/indexation.cpp @@ -167,7 +167,7 @@ TConclusionStatus TInsertColumnEngineChanges::DoConstructBlobs(TConstructionCont auto itGranule = PathToGranule.find(pathId); AFL_VERIFY(itGranule != PathToGranule.end()); - std::vector> result = NArrow::NMerger::TSortableBatchPosition:: + std::vector> result = NArrow::NMerger::TRWSortableBatchPosition:: SplitByBordersInSequentialContainer(mergedBatch, comparableColumns, itGranule->second); for (auto&& b : result) { if (!b) { diff --git a/ydb/core/tx/columnshard/engines/changes/with_appended.cpp b/ydb/core/tx/columnshard/engines/changes/with_appended.cpp index f57f6660de38..edc8f8ca86de 100644 --- a/ydb/core/tx/columnshard/engines/changes/with_appended.cpp +++ b/ydb/core/tx/columnshard/engines/changes/with_appended.cpp @@ -1,4 +1,5 @@ #include "with_appended.h" + #include #include #include @@ -39,7 +40,7 @@ void TChangesWithAppend::DoWriteIndexOnComplete(NColumnShard::TColumnShard* self auto& portionInfo = portionBuilder.GetPortionResult(); switch (portionInfo.GetMeta().Produced) { case NOlap::TPortionMeta::EProduced::UNSPECIFIED: - Y_ABORT_UNLESS(false); // unexpected + Y_ABORT_UNLESS(false); // unexpected case NOlap::TPortionMeta::EProduced::INSERTED: self->IncCounter(NColumnShard::COUNTER_INDEXING_PORTIONS_WRITTEN); break; @@ -147,4 +148,4 @@ std::vector TChangesWithAppend::MakeAppendedPortions void TChangesWithAppend::DoStart(NColumnShard::TColumnShard& /*self*/) { } -} +} // namespace NKikimr::NOlap diff --git a/ydb/core/tx/columnshard/engines/reader/plain_reader/iterator/merge.cpp b/ydb/core/tx/columnshard/engines/reader/plain_reader/iterator/merge.cpp index ec6c4c04163b..b36c976881cc 100644 --- a/ydb/core/tx/columnshard/engines/reader/plain_reader/iterator/merge.cpp +++ b/ydb/core/tx/columnshard/engines/reader/plain_reader/iterator/merge.cpp @@ -4,8 +4,8 @@ namespace NKikimr::NOlap::NReader::NPlain { -std::optional TBaseMergeTask::DrainMergerLinearScan(const std::optional resultBufferLimit) { - std::optional lastResultPosition; +std::optional TBaseMergeTask::DrainMergerLinearScan(const std::optional resultBufferLimit) { + std::optional lastResultPosition; AFL_VERIFY(!ResultBatch); auto rbBuilder = std::make_shared(Context->GetProgramInputColumns()->GetSchema()->fields()); rbBuilder->SetMemoryBufferLimit(resultBufferLimit); @@ -99,13 +99,13 @@ bool TStartMergeTask::DoExecute() { ResultBatch = nullptr; return true; } - Merger->PutControlPoint(std::make_shared(MergingContext->GetFinish())); + Merger->PutControlPoint(MergingContext->GetFinish()); Merger->SkipToLowerBound(MergingContext->GetStart(), MergingContext->GetIncludeStart()); const ui32 originalSourcesCount = Sources.size(); Sources.clear(); AFL_DEBUG(NKikimrServices::TX_COLUMNSHARD_SCAN)("event", "DoExecute")("interval_idx", MergingContext->GetIntervalIdx()); - std::optional lastResultPosition; + std::optional lastResultPosition; if (Merger->GetSourcesCount() == 1 && sourcesInMemory) { TMemoryProfileGuard mGuard("SCAN_PROFILE::MERGE::ONE", IS_DEBUG_LOG_ENABLED(NKikimrServices::TX_COLUMNSHARD_SCAN_MEMORY)); ResultBatch = Merger->SingleSourceDrain(MergingContext->GetFinish(), MergingContext->GetIncludeFinish(), &lastResultPosition); @@ -122,7 +122,7 @@ bool TStartMergeTask::DoExecute() { lastResultPosition = DrainMergerLinearScan(bufferLimit); } if (lastResultPosition) { - LastPK = lastResultPosition->ExtractSortingPosition(); + LastPK = lastResultPosition->ExtractSortingPosition(MergingContext->GetFinish().GetSortFields()); } AFL_VERIFY(!!LastPK == (!!ResultBatch && ResultBatch->num_rows())); PrepareResultBatch(); @@ -145,9 +145,9 @@ TStartMergeTask::TStartMergeTask(const std::shared_ptr& merging bool TContinueMergeTask::DoExecute() { TMemoryProfileGuard mGuard("SCAN_PROFILE::MERGE::CONTINUE", IS_DEBUG_LOG_ENABLED(NKikimrServices::TX_COLUMNSHARD_SCAN_MEMORY)); - std::optional lastResultPosition = DrainMergerLinearScan(Context->ReadSequentiallyBufferSize); + std::optional lastResultPosition = DrainMergerLinearScan(Context->ReadSequentiallyBufferSize); if (lastResultPosition) { - LastPK = lastResultPosition->ExtractSortingPosition(); + LastPK = lastResultPosition->ExtractSortingPosition(MergingContext->GetFinish().GetSortFields()); } AFL_VERIFY(!!LastPK == (!!ResultBatch && ResultBatch->num_rows())); PrepareResultBatch(); diff --git a/ydb/core/tx/columnshard/engines/reader/plain_reader/iterator/merge.h b/ydb/core/tx/columnshard/engines/reader/plain_reader/iterator/merge.h index 043ff943e472..90cda0841363 100644 --- a/ydb/core/tx/columnshard/engines/reader/plain_reader/iterator/merge.h +++ b/ydb/core/tx/columnshard/engines/reader/plain_reader/iterator/merge.h @@ -58,7 +58,7 @@ class TBaseMergeTask: public IDataTasksProcessor::ITask { const ui32 IntervalIdx; std::optional ShardedBatch; - [[nodiscard]] std::optional DrainMergerLinearScan(const std::optional resultBufferLimit); + [[nodiscard]] std::optional DrainMergerLinearScan(const std::optional resultBufferLimit); void PrepareResultBatch(); private: diff --git a/ydb/core/tx/columnshard/engines/reader/plain_reader/iterator/source.h b/ydb/core/tx/columnshard/engines/reader/plain_reader/iterator/source.h index 9230c9940c38..66c29d219eeb 100644 --- a/ydb/core/tx/columnshard/engines/reader/plain_reader/iterator/source.h +++ b/ydb/core/tx/columnshard/engines/reader/plain_reader/iterator/source.h @@ -125,7 +125,7 @@ class IDataSource { void InitFetchingPlan(const std::shared_ptr& fetching); std::shared_ptr GetLastPK() const { - return Finish.ExtractSortingPosition(); + return Finish.BuildSortingCursor().ExtractSortingPosition(Finish.GetSortFields()); } void IncIntervalsCount() { ++IntervalsCount; @@ -183,7 +183,7 @@ class IDataSource { void RegisterInterval(TFetchingInterval& interval); - IDataSource(const ui32 sourceIdx, const std::shared_ptr& context, + IDataSource(const ui32 sourceIdx, const std::shared_ptr& context, const NArrow::TReplaceKey& start, const NArrow::TReplaceKey& finish, const TSnapshot& recordSnapshotMin, const TSnapshot& recordSnapshotMax, const ui32 recordsCount, const std::optional shardingVersion) : SourceIdx(sourceIdx)