Skip to content

Commit

Permalink
add validations for inconsistency states for scanners (#3526)
Browse files Browse the repository at this point in the history
  • Loading branch information
ivanmorozov333 authored Apr 7, 2024
1 parent 3c19574 commit 1acf95f
Show file tree
Hide file tree
Showing 11 changed files with 170 additions and 219 deletions.
51 changes: 51 additions & 0 deletions ydb/core/formats/arrow/reader/batch_iterator.cpp
Original file line number Diff line number Diff line change
@@ -0,0 +1,51 @@
#include "batch_iterator.h"

namespace NKikimr::NArrow::NMerger {

NJson::TJsonValue TBatchIterator::DebugJson() const {
NJson::TJsonValue result;
result["is_cp"] = IsControlPoint();
result["key"] = KeyColumns.DebugJson();
return result;
}

NKikimr::NArrow::NMerger::TSortableBatchPosition::TFoundPosition TBatchIterator::SkipToLower(const TSortableBatchPosition& pos) {
const ui32 posStart = KeyColumns.GetPosition();
auto result = KeyColumns.SkipToLower(pos);
const i32 delta = IsReverse() ? (posStart - KeyColumns.GetPosition()) : (KeyColumns.GetPosition() - posStart);
AFL_VERIFY(delta >= 0);
AFL_VERIFY(VersionColumns.InitPosition(KeyColumns.GetPosition()))("pos", KeyColumns.GetPosition())
("size", VersionColumns.GetRecordsCount())("key_size", KeyColumns.GetRecordsCount());
if (FilterIterator && delta) {
AFL_VERIFY(FilterIterator->Next(delta));
}
return result;
}

bool TBatchIterator::Next() {
const bool result = KeyColumns.NextPosition(ReverseSortKff) && VersionColumns.NextPosition(ReverseSortKff);
if (FilterIterator) {
Y_ABORT_UNLESS(result == FilterIterator->Next(1));
}
return result;
}

bool TBatchIterator::operator<(const TBatchIterator& item) const {
const std::partial_ordering result = KeyColumns.Compare(item.KeyColumns);
if (result == std::partial_ordering::equivalent) {
if (IsControlPoint() && item.IsControlPoint()) {
return false;
} else if (IsControlPoint()) {
return false;
} else if (item.IsControlPoint()) {
return true;
}
//don't need inverse through we need maximal version at first (reverse analytic not included in VersionColumns)
return VersionColumns.Compare(item.VersionColumns) == std::partial_ordering::less;
} else {
//inverse logic through we use max heap, but need minimal element if not reverse (reverse analytic included in KeyColumns)
return result == std::partial_ordering::greater;
}
}

}
89 changes: 89 additions & 0 deletions ydb/core/formats/arrow/reader/batch_iterator.h
Original file line number Diff line number Diff line change
@@ -0,0 +1,89 @@
#pragma once
#include "position.h"
#include <ydb/core/formats/arrow/arrow_filter.h>

namespace NKikimr::NArrow::NMerger {

class TBatchIterator {
private:
bool ControlPointFlag;
TSortableBatchPosition KeyColumns;
TSortableBatchPosition VersionColumns;
i64 RecordsCount;
int ReverseSortKff;

std::shared_ptr<NArrow::TColumnFilter> Filter;
std::shared_ptr<NArrow::TColumnFilter::TIterator> FilterIterator;

i32 GetFirstPosition() const {
if (ReverseSortKff > 0) {
return 0;
} else {
return RecordsCount - 1;
}
}

public:
NJson::TJsonValue DebugJson() const;

const std::shared_ptr<NArrow::TColumnFilter>& GetFilter() const {
return Filter;
}

bool IsControlPoint() const {
return ControlPointFlag;
}

const TSortableBatchPosition& GetKeyColumns() const {
return KeyColumns;
}

const TSortableBatchPosition& GetVersionColumns() const {
return VersionColumns;
}

TBatchIterator(const TSortableBatchPosition& keyColumns)
: ControlPointFlag(true)
, KeyColumns(keyColumns) {

}

template <class TDataContainer>
TBatchIterator(std::shared_ptr<TDataContainer> batch, std::shared_ptr<NArrow::TColumnFilter> filter,
const std::vector<std::string>& keyColumns, const std::vector<std::string>& dataColumns, const bool reverseSort, const std::vector<std::string>& versionColumnNames)
: ControlPointFlag(false)
, KeyColumns(batch, 0, keyColumns, dataColumns, reverseSort)
, VersionColumns(batch, 0, versionColumnNames, {}, false)
, RecordsCount(batch->num_rows())
, ReverseSortKff(reverseSort ? -1 : 1)
, Filter(filter) {
Y_ABORT_UNLESS(KeyColumns.InitPosition(GetFirstPosition()));
Y_ABORT_UNLESS(VersionColumns.InitPosition(GetFirstPosition()));
if (Filter) {
FilterIterator = std::make_shared<NArrow::TColumnFilter::TIterator>(Filter->GetIterator(reverseSort, RecordsCount));
}
}

bool CheckNextBatch(const TBatchIterator& nextIterator) {
return KeyColumns.Compare(nextIterator.KeyColumns) == std::partial_ordering::less;
}

bool IsReverse() const {
return ReverseSortKff < 0;
}

bool IsDeleted() const {
if (!FilterIterator) {
return false;
}
return !FilterIterator->GetCurrentAcceptance();
}

TSortableBatchPosition::TFoundPosition SkipToLower(const TSortableBatchPosition& pos);

bool Next();

bool operator<(const TBatchIterator& item) const;
};

}
61 changes: 0 additions & 61 deletions ydb/core/formats/arrow/reader/merger.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -12,60 +12,6 @@ void TMergePartialStream::PutControlPoint(std::shared_ptr<TSortableBatchPosition
SortHeap.Push(TBatchIterator(*point));
}

void TMergePartialStream::AddSource(std::shared_ptr<arrow::RecordBatch> batch, std::shared_ptr<NArrow::TColumnFilter> filter) {
if (!batch || !batch->num_rows()) {
return;
}
Y_DEBUG_ABORT_UNLESS(NArrow::IsSorted(batch, SortSchema));
AddNewToHeap(batch, filter);
}

void TMergePartialStream::AddSource(std::shared_ptr<TGeneralContainer> batch, std::shared_ptr<NArrow::TColumnFilter> filter) {
if (!batch || !batch->num_rows()) {
return;
}
// Y_DEBUG_ABORT_UNLESS(batch->IsSorted(SortSchema));
AddNewToHeap(batch, filter);
}

void TMergePartialStream::AddSource(std::shared_ptr<arrow::Table> batch, std::shared_ptr<NArrow::TColumnFilter> filter) {
if (!batch || !batch->num_rows()) {
return;
}
// Y_DEBUG_ABORT_UNLESS(NArrow::IsSorted(batch, SortSchema));
AddNewToHeap(batch, filter);
}

void TMergePartialStream::AddNewToHeap(std::shared_ptr<TGeneralContainer> batch, std::shared_ptr<NArrow::TColumnFilter> filter) {
if (!filter || filter->IsTotalAllowFilter()) {
SortHeap.Push(TBatchIterator(batch, nullptr, SortSchema->field_names(), DataSchema ? DataSchema->field_names() : std::vector<std::string>(), Reverse, VersionColumnNames));
} else if (filter->IsTotalDenyFilter()) {
return;
} else {
SortHeap.Push(TBatchIterator(batch, filter, SortSchema->field_names(), DataSchema ? DataSchema->field_names() : std::vector<std::string>(), Reverse, VersionColumnNames));
}
}

void TMergePartialStream::AddNewToHeap(std::shared_ptr<arrow::RecordBatch> batch, std::shared_ptr<NArrow::TColumnFilter> filter) {
if (!filter || filter->IsTotalAllowFilter()) {
SortHeap.Push(TBatchIterator(batch, nullptr, SortSchema->field_names(), DataSchema ? DataSchema->field_names() : std::vector<std::string>(), Reverse, VersionColumnNames));
} else if (filter->IsTotalDenyFilter()) {
return;
} else {
SortHeap.Push(TBatchIterator(batch, filter, SortSchema->field_names(), DataSchema ? DataSchema->field_names() : std::vector<std::string>(), Reverse, VersionColumnNames));
}
}

void TMergePartialStream::AddNewToHeap(std::shared_ptr<arrow::Table> batch, std::shared_ptr<NArrow::TColumnFilter> filter) {
if (!filter || filter->IsTotalAllowFilter()) {
SortHeap.Push(TBatchIterator(batch, nullptr, SortSchema->field_names(), DataSchema ? DataSchema->field_names() : std::vector<std::string>(), Reverse, VersionColumnNames));
} else if (filter->IsTotalDenyFilter()) {
return;
} else {
SortHeap.Push(TBatchIterator(batch, filter, SortSchema->field_names(), DataSchema ? DataSchema->field_names() : std::vector<std::string>(), Reverse, VersionColumnNames));
}
}

void TMergePartialStream::RemoveControlPoint() {
Y_ABORT_UNLESS(ControlPoints == 1);
Y_ABORT_UNLESS(ControlPointEnriched());
Expand Down Expand Up @@ -252,11 +198,4 @@ std::vector<std::shared_ptr<arrow::RecordBatch>> TMergePartialStream::DrainAllPa
return result;
}

NJson::TJsonValue TMergePartialStream::TBatchIterator::DebugJson() const {
NJson::TJsonValue result;
result["is_cp"] = IsControlPoint();
result["key"] = KeyColumns.DebugJson();
return result;
}

}
148 changes: 13 additions & 135 deletions ydb/core/formats/arrow/reader/merger.h
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@
#include "position.h"
#include "heap.h"
#include "result_builder.h"
#include "batch_iterator.h"

#include <ydb/core/formats/arrow/arrow_filter.h>

Expand All @@ -20,135 +21,6 @@ class TMergePartialStream {
const std::vector<std::string> VersionColumnNames;
ui32 ControlPoints = 0;

class TBatchIterator {
private:
bool ControlPointFlag;
TSortableBatchPosition KeyColumns;
TSortableBatchPosition VersionColumns;
i64 RecordsCount;
int ReverseSortKff;

std::shared_ptr<NArrow::TColumnFilter> Filter;
std::shared_ptr<NArrow::TColumnFilter::TIterator> FilterIterator;

i32 GetFirstPosition() const {
if (ReverseSortKff > 0) {
return 0;
} else {
return RecordsCount - 1;
}
}

public:
NJson::TJsonValue DebugJson() const;

const std::shared_ptr<NArrow::TColumnFilter>& GetFilter() const {
return Filter;
}

bool IsControlPoint() const {
return ControlPointFlag;
}

const TSortableBatchPosition& GetKeyColumns() const {
return KeyColumns;
}

const TSortableBatchPosition& GetVersionColumns() const {
return VersionColumns;
}

TBatchIterator(const TSortableBatchPosition& keyColumns)
: ControlPointFlag(true)
, KeyColumns(keyColumns)
{

}

template <class TDataContainer>
TBatchIterator(std::shared_ptr<TDataContainer> batch, std::shared_ptr<NArrow::TColumnFilter> filter,
const std::vector<std::string>& keyColumns, const std::vector<std::string>& dataColumns, const bool reverseSort, const std::vector<std::string>& versionColumnNames)
: ControlPointFlag(false)
, KeyColumns(batch, 0, keyColumns, dataColumns, reverseSort)
, VersionColumns(batch, 0, versionColumnNames, {}, false)
, RecordsCount(batch->num_rows())
, ReverseSortKff(reverseSort ? -1 : 1)
, Filter(filter)
{
Y_ABORT_UNLESS(KeyColumns.InitPosition(GetFirstPosition()));
Y_ABORT_UNLESS(VersionColumns.InitPosition(GetFirstPosition()));
if (Filter) {
FilterIterator = std::make_shared<NArrow::TColumnFilter::TIterator>(Filter->GetIterator(reverseSort, RecordsCount));
}
}

bool CheckNextBatch(const TBatchIterator& nextIterator) {
return KeyColumns.Compare(nextIterator.KeyColumns) == std::partial_ordering::less;
}

bool IsReverse() const {
return ReverseSortKff < 0;
}

bool IsDeleted() const {
if (!FilterIterator) {
return false;
}
return !FilterIterator->GetCurrentAcceptance();
}

TSortableBatchPosition::TFoundPosition SkipToLower(const TSortableBatchPosition& pos) {
const ui32 posStart = KeyColumns.GetPosition();
auto result = KeyColumns.SkipToLower(pos);
const i32 delta = IsReverse() ? (posStart - KeyColumns.GetPosition()) : (KeyColumns.GetPosition() - posStart);
AFL_VERIFY(delta >= 0);
AFL_VERIFY(VersionColumns.InitPosition(KeyColumns.GetPosition()))("pos", KeyColumns.GetPosition())("size", VersionColumns.GetRecordsCount());
if (FilterIterator && delta) {
AFL_VERIFY(FilterIterator->Next(delta));
}
return result;
}

bool Next() {
const bool result = KeyColumns.NextPosition(ReverseSortKff) && VersionColumns.NextPosition(ReverseSortKff);
if (FilterIterator) {
Y_ABORT_UNLESS(result == FilterIterator->Next(1));
}
return result;
}

bool operator<(const TBatchIterator& item) const {
const std::partial_ordering result = KeyColumns.Compare(item.KeyColumns);
if (result == std::partial_ordering::equivalent) {
if (IsControlPoint() && item.IsControlPoint()) {
return false;
} else if (IsControlPoint()) {
return false;
} else if (item.IsControlPoint()) {
return true;
}
//don't need inverse through we need maximal version at first (reverse analytic not included in VersionColumns)
return VersionColumns.Compare(item.VersionColumns) == std::partial_ordering::less;
} else {
//inverse logic through we use max heap, but need minimal element if not reverse (reverse analytic included in KeyColumns)
return result == std::partial_ordering::greater;
}
}
};

class TIteratorData {
private:
YDB_READONLY_DEF(std::shared_ptr<arrow::RecordBatch>, Batch);
YDB_READONLY_DEF(std::shared_ptr<NArrow::TColumnFilter>, Filter);
public:
TIteratorData(std::shared_ptr<arrow::RecordBatch> batch, std::shared_ptr<NArrow::TColumnFilter> filter)
: Batch(batch)
, Filter(filter)
{

}
};

TSortingHeap<TBatchIterator> SortHeap;

NJson::TJsonValue DebugJson() const {
Expand All @@ -164,9 +36,6 @@ class TMergePartialStream {

std::optional<TSortableBatchPosition> DrainCurrentPosition();

void AddNewToHeap(std::shared_ptr<arrow::RecordBatch> batch, std::shared_ptr<NArrow::TColumnFilter> filter);
void AddNewToHeap(std::shared_ptr<arrow::Table> batch, std::shared_ptr<NArrow::TColumnFilter> filter);
void AddNewToHeap(std::shared_ptr<TGeneralContainer> batch, std::shared_ptr<NArrow::TColumnFilter> filter);
void CheckSequenceInDebug(const TSortableBatchPosition& nextKeyColumnsPosition);
public:
TMergePartialStream(std::shared_ptr<arrow::Schema> sortSchema, std::shared_ptr<arrow::Schema> dataSchema, const bool reverse, const std::vector<std::string>& versionColumnNames)
Expand Down Expand Up @@ -233,9 +102,18 @@ class TMergePartialStream {
return SortHeap.Size() && SortHeap.Current().IsControlPoint();
}

void AddSource(std::shared_ptr<arrow::RecordBatch> batch, std::shared_ptr<NArrow::TColumnFilter> filter);
void AddSource(std::shared_ptr<arrow::Table> batch, std::shared_ptr<NArrow::TColumnFilter> filter);
void AddSource(std::shared_ptr<TGeneralContainer> batch, std::shared_ptr<NArrow::TColumnFilter> filter);
template <class TDataContainer>
void AddSource(const std::shared_ptr<TDataContainer>& batch, const std::shared_ptr<NArrow::TColumnFilter>& filter) {
if (!batch || !batch->num_rows()) {
return;
}
if (filter && filter->IsTotalDenyFilter()) {
return;
}
// Y_DEBUG_ABORT_UNLESS(NArrow::IsSorted(batch, SortSchema));
auto filterImpl = (!filter || filter->IsTotalAllowFilter()) ? nullptr : filter;
SortHeap.Push(TBatchIterator(batch, filterImpl, SortSchema->field_names(), DataSchema ? DataSchema->field_names() : std::vector<std::string>(), Reverse, VersionColumnNames));
}

bool IsEmpty() const {
return !SortHeap.Size();
Expand Down
1 change: 1 addition & 0 deletions ydb/core/formats/arrow/reader/position.h
Original file line number Diff line number Diff line change
Expand Up @@ -284,6 +284,7 @@ class TSortableBatchPosition {
Y_ABORT_UNLESS(batch);
Y_ABORT_UNLESS(batch->num_rows());
RecordsCount = batch->num_rows();
AFL_VERIFY(Position < RecordsCount)("position", Position)("count", RecordsCount);

if (dataColumns.size()) {
Data = std::make_shared<TSortableScanData>(batch, dataColumns);
Expand Down
Loading

0 comments on commit 1acf95f

Please sign in to comment.