Skip to content

Commit

Permalink
Merge 8c159f8 into 7fd0a9c
Browse files Browse the repository at this point in the history
  • Loading branch information
ivanmorozov333 authored Apr 6, 2024
2 parents 7fd0a9c + 8c159f8 commit 19f5b3b
Show file tree
Hide file tree
Showing 8 changed files with 169 additions and 198 deletions.
51 changes: 51 additions & 0 deletions ydb/core/formats/arrow/reader/batch_iterator.cpp
Original file line number Diff line number Diff line change
@@ -0,0 +1,51 @@
#include "batch_iterator.h"

namespace NKikimr::NArrow::NMerger {

NJson::TJsonValue TBatchIterator::DebugJson() const {
NJson::TJsonValue result;
result["is_cp"] = IsControlPoint();
result["key"] = KeyColumns.DebugJson();
return result;
}

NKikimr::NArrow::NMerger::TSortableBatchPosition::TFoundPosition TBatchIterator::SkipToLower(const TSortableBatchPosition& pos) {
const ui32 posStart = KeyColumns.GetPosition();
auto result = KeyColumns.SkipToLower(pos);
const i32 delta = IsReverse() ? (posStart - KeyColumns.GetPosition()) : (KeyColumns.GetPosition() - posStart);
AFL_VERIFY(delta >= 0);
AFL_VERIFY(VersionColumns.InitPosition(KeyColumns.GetPosition()))("pos", KeyColumns.GetPosition())
("size", VersionColumns.GetRecordsCount())("key_size", KeyColumns.GetRecordsCount());
if (FilterIterator && delta) {
AFL_VERIFY(FilterIterator->Next(delta));
}
return result;
}

bool TBatchIterator::Next() {
const bool result = KeyColumns.NextPosition(ReverseSortKff) && VersionColumns.NextPosition(ReverseSortKff);
if (FilterIterator) {
Y_ABORT_UNLESS(result == FilterIterator->Next(1));
}
return result;
}

bool TBatchIterator::operator<(const TBatchIterator& item) const {
const std::partial_ordering result = KeyColumns.Compare(item.KeyColumns);
if (result == std::partial_ordering::equivalent) {
if (IsControlPoint() && item.IsControlPoint()) {
return false;
} else if (IsControlPoint()) {
return false;
} else if (item.IsControlPoint()) {
return true;
}
//don't need inverse through we need maximal version at first (reverse analytic not included in VersionColumns)
return VersionColumns.Compare(item.VersionColumns) == std::partial_ordering::less;
} else {
//inverse logic through we use max heap, but need minimal element if not reverse (reverse analytic included in KeyColumns)
return result == std::partial_ordering::greater;
}
}

}
89 changes: 89 additions & 0 deletions ydb/core/formats/arrow/reader/batch_iterator.h
Original file line number Diff line number Diff line change
@@ -0,0 +1,89 @@
#pragma once
#include "position.h"
#include <ydb/core/formats/arrow/arrow_filter.h>

namespace NKikimr::NArrow::NMerger {

class TBatchIterator {
private:
bool ControlPointFlag;
TSortableBatchPosition KeyColumns;
TSortableBatchPosition VersionColumns;
i64 RecordsCount;
int ReverseSortKff;

std::shared_ptr<NArrow::TColumnFilter> Filter;
std::shared_ptr<NArrow::TColumnFilter::TIterator> FilterIterator;

i32 GetFirstPosition() const {
if (ReverseSortKff > 0) {
return 0;
} else {
return RecordsCount - 1;
}
}

public:
NJson::TJsonValue DebugJson() const;

const std::shared_ptr<NArrow::TColumnFilter>& GetFilter() const {
return Filter;
}

bool IsControlPoint() const {
return ControlPointFlag;
}

const TSortableBatchPosition& GetKeyColumns() const {
return KeyColumns;
}

const TSortableBatchPosition& GetVersionColumns() const {
return VersionColumns;
}

TBatchIterator(const TSortableBatchPosition& keyColumns)
: ControlPointFlag(true)
, KeyColumns(keyColumns) {

}

template <class TDataContainer>
TBatchIterator(std::shared_ptr<TDataContainer> batch, std::shared_ptr<NArrow::TColumnFilter> filter,
const std::vector<std::string>& keyColumns, const std::vector<std::string>& dataColumns, const bool reverseSort, const std::vector<std::string>& versionColumnNames)
: ControlPointFlag(false)
, KeyColumns(batch, 0, keyColumns, dataColumns, reverseSort)
, VersionColumns(batch, 0, versionColumnNames, {}, false)
, RecordsCount(batch->num_rows())
, ReverseSortKff(reverseSort ? -1 : 1)
, Filter(filter) {
Y_ABORT_UNLESS(KeyColumns.InitPosition(GetFirstPosition()));
Y_ABORT_UNLESS(VersionColumns.InitPosition(GetFirstPosition()));
if (Filter) {
FilterIterator = std::make_shared<NArrow::TColumnFilter::TIterator>(Filter->GetIterator(reverseSort, RecordsCount));
}
}

bool CheckNextBatch(const TBatchIterator& nextIterator) {
return KeyColumns.Compare(nextIterator.KeyColumns) == std::partial_ordering::less;
}

bool IsReverse() const {
return ReverseSortKff < 0;
}

bool IsDeleted() const {
if (!FilterIterator) {
return false;
}
return !FilterIterator->GetCurrentAcceptance();
}

TSortableBatchPosition::TFoundPosition SkipToLower(const TSortableBatchPosition& pos);

bool Next();

bool operator<(const TBatchIterator& item) const;
};

}
61 changes: 0 additions & 61 deletions ydb/core/formats/arrow/reader/merger.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -12,60 +12,6 @@ void TMergePartialStream::PutControlPoint(std::shared_ptr<TSortableBatchPosition
SortHeap.Push(TBatchIterator(*point));
}

void TMergePartialStream::AddSource(std::shared_ptr<arrow::RecordBatch> batch, std::shared_ptr<NArrow::TColumnFilter> filter) {
if (!batch || !batch->num_rows()) {
return;
}
Y_DEBUG_ABORT_UNLESS(NArrow::IsSorted(batch, SortSchema));
AddNewToHeap(batch, filter);
}

void TMergePartialStream::AddSource(std::shared_ptr<TGeneralContainer> batch, std::shared_ptr<NArrow::TColumnFilter> filter) {
if (!batch || !batch->num_rows()) {
return;
}
// Y_DEBUG_ABORT_UNLESS(batch->IsSorted(SortSchema));
AddNewToHeap(batch, filter);
}

void TMergePartialStream::AddSource(std::shared_ptr<arrow::Table> batch, std::shared_ptr<NArrow::TColumnFilter> filter) {
if (!batch || !batch->num_rows()) {
return;
}
// Y_DEBUG_ABORT_UNLESS(NArrow::IsSorted(batch, SortSchema));
AddNewToHeap(batch, filter);
}

void TMergePartialStream::AddNewToHeap(std::shared_ptr<TGeneralContainer> batch, std::shared_ptr<NArrow::TColumnFilter> filter) {
if (!filter || filter->IsTotalAllowFilter()) {
SortHeap.Push(TBatchIterator(batch, nullptr, SortSchema->field_names(), DataSchema ? DataSchema->field_names() : std::vector<std::string>(), Reverse, VersionColumnNames));
} else if (filter->IsTotalDenyFilter()) {
return;
} else {
SortHeap.Push(TBatchIterator(batch, filter, SortSchema->field_names(), DataSchema ? DataSchema->field_names() : std::vector<std::string>(), Reverse, VersionColumnNames));
}
}

void TMergePartialStream::AddNewToHeap(std::shared_ptr<arrow::RecordBatch> batch, std::shared_ptr<NArrow::TColumnFilter> filter) {
if (!filter || filter->IsTotalAllowFilter()) {
SortHeap.Push(TBatchIterator(batch, nullptr, SortSchema->field_names(), DataSchema ? DataSchema->field_names() : std::vector<std::string>(), Reverse, VersionColumnNames));
} else if (filter->IsTotalDenyFilter()) {
return;
} else {
SortHeap.Push(TBatchIterator(batch, filter, SortSchema->field_names(), DataSchema ? DataSchema->field_names() : std::vector<std::string>(), Reverse, VersionColumnNames));
}
}

void TMergePartialStream::AddNewToHeap(std::shared_ptr<arrow::Table> batch, std::shared_ptr<NArrow::TColumnFilter> filter) {
if (!filter || filter->IsTotalAllowFilter()) {
SortHeap.Push(TBatchIterator(batch, nullptr, SortSchema->field_names(), DataSchema ? DataSchema->field_names() : std::vector<std::string>(), Reverse, VersionColumnNames));
} else if (filter->IsTotalDenyFilter()) {
return;
} else {
SortHeap.Push(TBatchIterator(batch, filter, SortSchema->field_names(), DataSchema ? DataSchema->field_names() : std::vector<std::string>(), Reverse, VersionColumnNames));
}
}

void TMergePartialStream::RemoveControlPoint() {
Y_ABORT_UNLESS(ControlPoints == 1);
Y_ABORT_UNLESS(ControlPointEnriched());
Expand Down Expand Up @@ -252,11 +198,4 @@ std::vector<std::shared_ptr<arrow::RecordBatch>> TMergePartialStream::DrainAllPa
return result;
}

NJson::TJsonValue TMergePartialStream::TBatchIterator::DebugJson() const {
NJson::TJsonValue result;
result["is_cp"] = IsControlPoint();
result["key"] = KeyColumns.DebugJson();
return result;
}

}
Loading

0 comments on commit 19f5b3b

Please sign in to comment.