From 8c159f8e5044c0dcbe2fc887d142ad6c345ca463 Mon Sep 17 00:00:00 2001 From: ivanmorozov333 Date: Sat, 6 Apr 2024 15:21:35 +0300 Subject: [PATCH] add validations for inconsistency states for scanners --- .../formats/arrow/reader/batch_iterator.cpp | 51 ++++++ .../formats/arrow/reader/batch_iterator.h | 89 ++++++++++ ydb/core/formats/arrow/reader/merger.cpp | 61 ------- ydb/core/formats/arrow/reader/merger.h | 160 +++--------------- ydb/core/formats/arrow/reader/position.h | 1 + ydb/core/formats/arrow/reader/ya.make | 1 + .../plain_reader/iterator/fetched_data.h | 2 +- .../reader/plain_reader/iterator/merge.cpp | 2 +- 8 files changed, 169 insertions(+), 198 deletions(-) create mode 100644 ydb/core/formats/arrow/reader/batch_iterator.cpp create mode 100644 ydb/core/formats/arrow/reader/batch_iterator.h diff --git a/ydb/core/formats/arrow/reader/batch_iterator.cpp b/ydb/core/formats/arrow/reader/batch_iterator.cpp new file mode 100644 index 000000000000..5216691dd208 --- /dev/null +++ b/ydb/core/formats/arrow/reader/batch_iterator.cpp @@ -0,0 +1,51 @@ +#include "batch_iterator.h" + +namespace NKikimr::NArrow::NMerger { + +NJson::TJsonValue TBatchIterator::DebugJson() const { + NJson::TJsonValue result; + result["is_cp"] = IsControlPoint(); + result["key"] = KeyColumns.DebugJson(); + return result; +} + +NKikimr::NArrow::NMerger::TSortableBatchPosition::TFoundPosition TBatchIterator::SkipToLower(const TSortableBatchPosition& pos) { + const ui32 posStart = KeyColumns.GetPosition(); + auto result = KeyColumns.SkipToLower(pos); + const i32 delta = IsReverse() ? (posStart - KeyColumns.GetPosition()) : (KeyColumns.GetPosition() - posStart); + AFL_VERIFY(delta >= 0); + AFL_VERIFY(VersionColumns.InitPosition(KeyColumns.GetPosition()))("pos", KeyColumns.GetPosition()) + ("size", VersionColumns.GetRecordsCount())("key_size", KeyColumns.GetRecordsCount()); + if (FilterIterator && delta) { + AFL_VERIFY(FilterIterator->Next(delta)); + } + return result; +} + +bool TBatchIterator::Next() { + const bool result = KeyColumns.NextPosition(ReverseSortKff) && VersionColumns.NextPosition(ReverseSortKff); + if (FilterIterator) { + Y_ABORT_UNLESS(result == FilterIterator->Next(1)); + } + return result; +} + +bool TBatchIterator::operator<(const TBatchIterator& item) const { + const std::partial_ordering result = KeyColumns.Compare(item.KeyColumns); + if (result == std::partial_ordering::equivalent) { + if (IsControlPoint() && item.IsControlPoint()) { + return false; + } else if (IsControlPoint()) { + return false; + } else if (item.IsControlPoint()) { + return true; + } + //don't need inverse through we need maximal version at first (reverse analytic not included in VersionColumns) + return VersionColumns.Compare(item.VersionColumns) == std::partial_ordering::less; + } else { + //inverse logic through we use max heap, but need minimal element if not reverse (reverse analytic included in KeyColumns) + return result == std::partial_ordering::greater; + } +} + +} diff --git a/ydb/core/formats/arrow/reader/batch_iterator.h b/ydb/core/formats/arrow/reader/batch_iterator.h new file mode 100644 index 000000000000..eec3559eb2b9 --- /dev/null +++ b/ydb/core/formats/arrow/reader/batch_iterator.h @@ -0,0 +1,89 @@ +#pragma once +#include "position.h" +#include + +namespace NKikimr::NArrow::NMerger { + +class TBatchIterator { +private: + bool ControlPointFlag; + TSortableBatchPosition KeyColumns; + TSortableBatchPosition VersionColumns; + i64 RecordsCount; + int ReverseSortKff; + + std::shared_ptr Filter; + std::shared_ptr FilterIterator; + + i32 GetFirstPosition() const { + if (ReverseSortKff > 0) { + return 0; + } else { + return RecordsCount - 1; + } + } + +public: + NJson::TJsonValue DebugJson() const; + + const std::shared_ptr& GetFilter() const { + return Filter; + } + + bool IsControlPoint() const { + return ControlPointFlag; + } + + const TSortableBatchPosition& GetKeyColumns() const { + return KeyColumns; + } + + const TSortableBatchPosition& GetVersionColumns() const { + return VersionColumns; + } + + TBatchIterator(const TSortableBatchPosition& keyColumns) + : ControlPointFlag(true) + , KeyColumns(keyColumns) { + + } + + template + TBatchIterator(std::shared_ptr batch, std::shared_ptr filter, + const std::vector& keyColumns, const std::vector& dataColumns, const bool reverseSort, const std::vector& versionColumnNames) + : ControlPointFlag(false) + , KeyColumns(batch, 0, keyColumns, dataColumns, reverseSort) + , VersionColumns(batch, 0, versionColumnNames, {}, false) + , RecordsCount(batch->num_rows()) + , ReverseSortKff(reverseSort ? -1 : 1) + , Filter(filter) { + Y_ABORT_UNLESS(KeyColumns.InitPosition(GetFirstPosition())); + Y_ABORT_UNLESS(VersionColumns.InitPosition(GetFirstPosition())); + if (Filter) { + FilterIterator = std::make_shared(Filter->GetIterator(reverseSort, RecordsCount)); + } + } + + bool CheckNextBatch(const TBatchIterator& nextIterator) { + return KeyColumns.Compare(nextIterator.KeyColumns) == std::partial_ordering::less; + } + + bool IsReverse() const { + return ReverseSortKff < 0; + } + + bool IsDeleted() const { + if (!FilterIterator) { + return false; + } + return !FilterIterator->GetCurrentAcceptance(); + } + + TSortableBatchPosition::TFoundPosition SkipToLower(const TSortableBatchPosition& pos); + + bool Next(); + + bool operator<(const TBatchIterator& item) const; +}; + +} diff --git a/ydb/core/formats/arrow/reader/merger.cpp b/ydb/core/formats/arrow/reader/merger.cpp index 710c07c5a56a..72ce35a183aa 100644 --- a/ydb/core/formats/arrow/reader/merger.cpp +++ b/ydb/core/formats/arrow/reader/merger.cpp @@ -12,60 +12,6 @@ void TMergePartialStream::PutControlPoint(std::shared_ptr batch, std::shared_ptr filter) { - if (!batch || !batch->num_rows()) { - return; - } - Y_DEBUG_ABORT_UNLESS(NArrow::IsSorted(batch, SortSchema)); - AddNewToHeap(batch, filter); -} - -void TMergePartialStream::AddSource(std::shared_ptr batch, std::shared_ptr filter) { - if (!batch || !batch->num_rows()) { - return; - } -// Y_DEBUG_ABORT_UNLESS(batch->IsSorted(SortSchema)); - AddNewToHeap(batch, filter); -} - -void TMergePartialStream::AddSource(std::shared_ptr batch, std::shared_ptr filter) { - if (!batch || !batch->num_rows()) { - return; - } -// Y_DEBUG_ABORT_UNLESS(NArrow::IsSorted(batch, SortSchema)); - AddNewToHeap(batch, filter); -} - -void TMergePartialStream::AddNewToHeap(std::shared_ptr batch, std::shared_ptr filter) { - if (!filter || filter->IsTotalAllowFilter()) { - SortHeap.Push(TBatchIterator(batch, nullptr, SortSchema->field_names(), DataSchema ? DataSchema->field_names() : std::vector(), Reverse, VersionColumnNames)); - } else if (filter->IsTotalDenyFilter()) { - return; - } else { - SortHeap.Push(TBatchIterator(batch, filter, SortSchema->field_names(), DataSchema ? DataSchema->field_names() : std::vector(), Reverse, VersionColumnNames)); - } -} - -void TMergePartialStream::AddNewToHeap(std::shared_ptr batch, std::shared_ptr filter) { - if (!filter || filter->IsTotalAllowFilter()) { - SortHeap.Push(TBatchIterator(batch, nullptr, SortSchema->field_names(), DataSchema ? DataSchema->field_names() : std::vector(), Reverse, VersionColumnNames)); - } else if (filter->IsTotalDenyFilter()) { - return; - } else { - SortHeap.Push(TBatchIterator(batch, filter, SortSchema->field_names(), DataSchema ? DataSchema->field_names() : std::vector(), Reverse, VersionColumnNames)); - } -} - -void TMergePartialStream::AddNewToHeap(std::shared_ptr batch, std::shared_ptr filter) { - if (!filter || filter->IsTotalAllowFilter()) { - SortHeap.Push(TBatchIterator(batch, nullptr, SortSchema->field_names(), DataSchema ? DataSchema->field_names() : std::vector(), Reverse, VersionColumnNames)); - } else if (filter->IsTotalDenyFilter()) { - return; - } else { - SortHeap.Push(TBatchIterator(batch, filter, SortSchema->field_names(), DataSchema ? DataSchema->field_names() : std::vector(), Reverse, VersionColumnNames)); - } -} - void TMergePartialStream::RemoveControlPoint() { Y_ABORT_UNLESS(ControlPoints == 1); Y_ABORT_UNLESS(ControlPointEnriched()); @@ -252,11 +198,4 @@ std::vector> TMergePartialStream::DrainAllPa return result; } -NJson::TJsonValue TMergePartialStream::TBatchIterator::DebugJson() const { - NJson::TJsonValue result; - result["is_cp"] = IsControlPoint(); - result["key"] = KeyColumns.DebugJson(); - return result; -} - } diff --git a/ydb/core/formats/arrow/reader/merger.h b/ydb/core/formats/arrow/reader/merger.h index 1849e535a8c3..1b3683ceafba 100644 --- a/ydb/core/formats/arrow/reader/merger.h +++ b/ydb/core/formats/arrow/reader/merger.h @@ -2,6 +2,7 @@ #include "position.h" #include "heap.h" #include "result_builder.h" +#include "batch_iterator.h" #include @@ -20,135 +21,6 @@ class TMergePartialStream { const std::vector VersionColumnNames; ui32 ControlPoints = 0; - class TBatchIterator { - private: - bool ControlPointFlag; - TSortableBatchPosition KeyColumns; - TSortableBatchPosition VersionColumns; - i64 RecordsCount; - int ReverseSortKff; - - std::shared_ptr Filter; - std::shared_ptr FilterIterator; - - i32 GetFirstPosition() const { - if (ReverseSortKff > 0) { - return 0; - } else { - return RecordsCount - 1; - } - } - - public: - NJson::TJsonValue DebugJson() const; - - const std::shared_ptr& GetFilter() const { - return Filter; - } - - bool IsControlPoint() const { - return ControlPointFlag; - } - - const TSortableBatchPosition& GetKeyColumns() const { - return KeyColumns; - } - - const TSortableBatchPosition& GetVersionColumns() const { - return VersionColumns; - } - - TBatchIterator(const TSortableBatchPosition& keyColumns) - : ControlPointFlag(true) - , KeyColumns(keyColumns) - { - - } - - template - TBatchIterator(std::shared_ptr batch, std::shared_ptr filter, - const std::vector& keyColumns, const std::vector& dataColumns, const bool reverseSort, const std::vector& versionColumnNames) - : ControlPointFlag(false) - , KeyColumns(batch, 0, keyColumns, dataColumns, reverseSort) - , VersionColumns(batch, 0, versionColumnNames, {}, false) - , RecordsCount(batch->num_rows()) - , ReverseSortKff(reverseSort ? -1 : 1) - , Filter(filter) - { - Y_ABORT_UNLESS(KeyColumns.InitPosition(GetFirstPosition())); - Y_ABORT_UNLESS(VersionColumns.InitPosition(GetFirstPosition())); - if (Filter) { - FilterIterator = std::make_shared(Filter->GetIterator(reverseSort, RecordsCount)); - } - } - - bool CheckNextBatch(const TBatchIterator& nextIterator) { - return KeyColumns.Compare(nextIterator.KeyColumns) == std::partial_ordering::less; - } - - bool IsReverse() const { - return ReverseSortKff < 0; - } - - bool IsDeleted() const { - if (!FilterIterator) { - return false; - } - return !FilterIterator->GetCurrentAcceptance(); - } - - TSortableBatchPosition::TFoundPosition SkipToLower(const TSortableBatchPosition& pos) { - const ui32 posStart = KeyColumns.GetPosition(); - auto result = KeyColumns.SkipToLower(pos); - const i32 delta = IsReverse() ? (posStart - KeyColumns.GetPosition()) : (KeyColumns.GetPosition() - posStart); - AFL_VERIFY(delta >= 0); - AFL_VERIFY(VersionColumns.InitPosition(KeyColumns.GetPosition()))("pos", KeyColumns.GetPosition())("size", VersionColumns.GetRecordsCount()); - if (FilterIterator && delta) { - AFL_VERIFY(FilterIterator->Next(delta)); - } - return result; - } - - bool Next() { - const bool result = KeyColumns.NextPosition(ReverseSortKff) && VersionColumns.NextPosition(ReverseSortKff); - if (FilterIterator) { - Y_ABORT_UNLESS(result == FilterIterator->Next(1)); - } - return result; - } - - bool operator<(const TBatchIterator& item) const { - const std::partial_ordering result = KeyColumns.Compare(item.KeyColumns); - if (result == std::partial_ordering::equivalent) { - if (IsControlPoint() && item.IsControlPoint()) { - return false; - } else if (IsControlPoint()) { - return false; - } else if (item.IsControlPoint()) { - return true; - } - //don't need inverse through we need maximal version at first (reverse analytic not included in VersionColumns) - return VersionColumns.Compare(item.VersionColumns) == std::partial_ordering::less; - } else { - //inverse logic through we use max heap, but need minimal element if not reverse (reverse analytic included in KeyColumns) - return result == std::partial_ordering::greater; - } - } - }; - - class TIteratorData { - private: - YDB_READONLY_DEF(std::shared_ptr, Batch); - YDB_READONLY_DEF(std::shared_ptr, Filter); - public: - TIteratorData(std::shared_ptr batch, std::shared_ptr filter) - : Batch(batch) - , Filter(filter) - { - - } - }; - TSortingHeap SortHeap; NJson::TJsonValue DebugJson() const { @@ -164,9 +36,19 @@ class TMergePartialStream { std::optional DrainCurrentPosition(); - void AddNewToHeap(std::shared_ptr batch, std::shared_ptr filter); - void AddNewToHeap(std::shared_ptr batch, std::shared_ptr filter); - void AddNewToHeap(std::shared_ptr batch, std::shared_ptr filter); + template + void AddNewToHeap(const std::shared_ptr& batch, const std::shared_ptr& filter) { + if (!batch->num_rows()) { + return; + } + if (!filter || filter->IsTotalAllowFilter()) { + SortHeap.Push(TBatchIterator(batch, nullptr, SortSchema->field_names(), DataSchema ? DataSchema->field_names() : std::vector(), Reverse, VersionColumnNames)); + } else if (filter->IsTotalDenyFilter()) { + return; + } else { + SortHeap.Push(TBatchIterator(batch, filter, SortSchema->field_names(), DataSchema ? DataSchema->field_names() : std::vector(), Reverse, VersionColumnNames)); + } + } void CheckSequenceInDebug(const TSortableBatchPosition& nextKeyColumnsPosition); public: TMergePartialStream(std::shared_ptr sortSchema, std::shared_ptr dataSchema, const bool reverse, const std::vector& versionColumnNames) @@ -233,9 +115,17 @@ class TMergePartialStream { return SortHeap.Size() && SortHeap.Current().IsControlPoint(); } - void AddSource(std::shared_ptr batch, std::shared_ptr filter); - void AddSource(std::shared_ptr batch, std::shared_ptr filter); - void AddSource(std::shared_ptr batch, std::shared_ptr filter); + template + void AddSource(const std::shared_ptr& batch, const std::shared_ptr& filter) { + if (!batch || !batch->num_rows()) { + return; + } + if (filter && filter->IsTotalDenyFilter()) { + return; + } +// Y_DEBUG_ABORT_UNLESS(NArrow::IsSorted(batch, SortSchema)); + AddNewToHeap(batch, filter); + } bool IsEmpty() const { return !SortHeap.Size(); diff --git a/ydb/core/formats/arrow/reader/position.h b/ydb/core/formats/arrow/reader/position.h index 5c3d490335e8..f2a070d7b1f2 100644 --- a/ydb/core/formats/arrow/reader/position.h +++ b/ydb/core/formats/arrow/reader/position.h @@ -284,6 +284,7 @@ class TSortableBatchPosition { Y_ABORT_UNLESS(batch); Y_ABORT_UNLESS(batch->num_rows()); RecordsCount = batch->num_rows(); + AFL_VERIFY(Position < RecordsCount)("position", Position)("count", RecordsCount); if (dataColumns.size()) { Data = std::make_shared(batch, dataColumns); diff --git a/ydb/core/formats/arrow/reader/ya.make b/ydb/core/formats/arrow/reader/ya.make index e46e3d616074..f87e73dd355d 100644 --- a/ydb/core/formats/arrow/reader/ya.make +++ b/ydb/core/formats/arrow/reader/ya.make @@ -9,6 +9,7 @@ PEERDIR( ) SRCS( + batch_iterator.cpp merger.cpp position.cpp heap.cpp diff --git a/ydb/core/tx/columnshard/engines/reader/plain_reader/iterator/fetched_data.h b/ydb/core/tx/columnshard/engines/reader/plain_reader/iterator/fetched_data.h index 3bd0f61bef7d..374c20da6143 100644 --- a/ydb/core/tx/columnshard/engines/reader/plain_reader/iterator/fetched_data.h +++ b/ydb/core/tx/columnshard/engines/reader/plain_reader/iterator/fetched_data.h @@ -61,7 +61,7 @@ class TFetchedData { } bool IsEmpty() const { - return IsEmptyFilter() || (Table && !Table->num_rows()); + return IsEmptyFilter() || !Table || !Table->num_rows(); } void AddFilter(const std::shared_ptr& filter) { diff --git a/ydb/core/tx/columnshard/engines/reader/plain_reader/iterator/merge.cpp b/ydb/core/tx/columnshard/engines/reader/plain_reader/iterator/merge.cpp index 2b4363ca83ad..cf6a08b4a434 100644 --- a/ydb/core/tx/columnshard/engines/reader/plain_reader/iterator/merge.cpp +++ b/ydb/core/tx/columnshard/engines/reader/plain_reader/iterator/merge.cpp @@ -58,7 +58,7 @@ bool TBaseMergeTask::DoApply(IDataReader& indexedDataRead) const { bool TStartMergeTask::DoExecute() { if (EmptyFiltersOnly()) { - ResultBatch = NArrow::TStatusValidator::GetValid(arrow::Table::FromRecordBatches({NArrow::MakeEmptyBatch(Context->GetProgramInputColumns()->GetSchema())})); + ResultBatch = nullptr; return true; } bool sourcesInMemory = true;