Skip to content

Commit

Permalink
Fix addressing sparsed (#9382)
Browse files Browse the repository at this point in the history
  • Loading branch information
ivanmorozov333 authored Sep 17, 2024
1 parent 0361036 commit 3a4de79
Show file tree
Hide file tree
Showing 4 changed files with 20 additions and 7 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -134,7 +134,7 @@ bool TSparsedMerger::TPlainChunkCursor::AddIndexTo(const ui32 index, TWriter& wr

bool TSparsedMerger::TSparsedChunkCursor::AddIndexTo(const ui32 index, TWriter& writer) {
AFL_VERIFY(ChunkStartGlobalPosition <= index);
AFL_VERIFY(index == NextGlobalPosition);
AFL_VERIFY(index == NextGlobalPosition)("index", index)("next", NextGlobalPosition);
writer.AddRealData(Chunk->GetColValue(), NextLocalPosition);
return true;
}
Expand Down Expand Up @@ -163,7 +163,9 @@ void TSparsedMerger::TCursor::InitArrays(const ui32 position) {
PlainCursor = make_shared<TPlainChunkCursor>(CurrentOwnedArray->GetArray(), &*CurrentOwnedArray);
SparsedCursor = nullptr;
}
AFL_VERIFY(CurrentOwnedArray->GetAddress().GetGlobalStartPosition() <= position);
FinishGlobalPosition = CurrentOwnedArray->GetAddress().GetGlobalStartPosition() + CurrentOwnedArray->GetArray()->GetRecordsCount();
AFL_VERIFY(position < FinishGlobalPosition);
}

} // namespace NKikimr::NOlap::NCompaction
Original file line number Diff line number Diff line change
Expand Up @@ -58,11 +58,15 @@ class TSparsedMerger: public IColumnMerger {

void InitArrays(const ui32 position) {
AFL_VERIFY(!ChunkAddress || ChunkFinishPosition <= position);
ChunkAddress = CurrentChunkedArray->GetChunk(ChunkAddress, position);
AFL_VERIFY(CurrentOwnedArray->GetAddress().GetGlobalStartPosition() <= position)("pos", position)(
"global", CurrentOwnedArray->GetAddress().GetGlobalStartPosition());
ChunkAddress = CurrentChunkedArray->GetChunk(ChunkAddress, position - CurrentOwnedArray->GetAddress().GetGlobalStartPosition());
AFL_VERIFY(ChunkAddress);
ChunkStartPosition = CurrentOwnedArray->GetAddress().GetGlobalStartPosition() + ChunkAddress->GetAddress().GetGlobalStartPosition();
ChunkFinishPosition =
CurrentOwnedArray->GetAddress().GetGlobalStartPosition() + ChunkAddress->GetAddress().GetGlobalFinishPosition();
AFL_VERIFY(position < ChunkFinishPosition)("finish", ChunkFinishPosition)("pos", position);
AFL_VERIFY(ChunkStartPosition <= position)("start", ChunkStartPosition)("pos", position);
}

public:
Expand All @@ -76,14 +80,15 @@ class TSparsedMerger: public IColumnMerger {
}
bool AddIndexTo(const ui32 index, TWriter& writer);
std::optional<ui32> MoveToSignificant(const ui32 currentGlobalPosition, const TColumnMergeContext& context) {
AFL_VERIFY(ChunkStartPosition <= currentGlobalPosition);
AFL_VERIFY(ChunkStartPosition <= currentGlobalPosition)("start", ChunkStartPosition)("pos", currentGlobalPosition)(
"global_start", CurrentOwnedArray->GetAddress().GetGlobalStartPosition());
ui32 currentIndex = currentGlobalPosition;
while (true) {
if (CurrentOwnedArray->GetAddress().GetGlobalFinishPosition() <= currentIndex) {
return {};
}
if (ChunkFinishPosition <= currentIndex) {
InitArrays(currentGlobalPosition);
InitArrays(currentIndex);
continue;
}
for (; currentIndex < ChunkFinishPosition; ++currentIndex) {
Expand Down Expand Up @@ -190,6 +195,7 @@ class TSparsedMerger: public IColumnMerger {
if (FinishGlobalPosition == Array->GetRecordsCount()) {
return FinishGlobalPosition;
} else {
currentPosition = FinishGlobalPosition;
InitArrays(FinishGlobalPosition);
}
}
Expand Down
2 changes: 1 addition & 1 deletion ydb/library/formats/arrow/accessor/abstract/accessor.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -45,7 +45,7 @@ std::shared_ptr<arrow::ChunkedArray> IChunkedArray::Slice(const ui32 offset, con

NKikimr::NArrow::NAccessor::IChunkedArray::TFullDataAddress IChunkedArray::GetChunk(
const std::optional<TAddressChain>& chunkCurrent, const ui64 position) const {
AFL_VERIFY(position < GetRecordsCount());
AFL_VERIFY(position < GetRecordsCount())("pos", position)("records", GetRecordsCount())("current", chunkCurrent ? chunkCurrent->DebugString() : Default<TString>());
std::optional<TCommonChunkAddress> address;

if (IsDataOwner()) {
Expand Down
9 changes: 7 additions & 2 deletions ydb/library/formats/arrow/accessor/abstract/accessor.h
Original file line number Diff line number Diff line change
Expand Up @@ -104,8 +104,13 @@ class IChunkedArray {
}

TString DebugString() const {
return TStringBuilder() << "start=" << GlobalStartPosition << ";finish=" << GlobalFinishPosition
<< ";addresses_count=" << Addresses.size() << ";";
TStringBuilder sb;
sb << "start=" << GlobalStartPosition << ";finish=" << GlobalFinishPosition
<< ";addresses_count=" << Addresses.size() << ";";
for (auto&& i : Addresses) {
sb << "addresses=" << i.DebugString() << ";";
}
return sb;
}
};

Expand Down

0 comments on commit 3a4de79

Please sign in to comment.