Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Back out "Fix Iceberg read when positional delete files are unaligned" #10431

Closed
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
46 changes: 9 additions & 37 deletions velox/connectors/hive/iceberg/IcebergSplitReader.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -48,9 +48,7 @@ IcebergSplitReader::IcebergSplitReader(
executor,
scanSpec),
baseReadOffset_(0),
splitOffset_(0),
deleteBitmap_(nullptr),
deleteBitmapBitOffset_(0) {}
splitOffset_(0) {}

void IcebergSplitReader::prepareSplit(
std::shared_ptr<common::MetadataFilter> metadataFilter,
Expand Down Expand Up @@ -99,55 +97,29 @@ uint64_t IcebergSplitReader::next(uint64_t size, VectorPtr& output) {
mutation.randomSkip = baseReaderOpts_.randomSkip().get();
mutation.deletedRows = nullptr;

if (deleteBitmap_ && deleteBitmapBitOffset_ > 0) {
// There are unconsumed bits from last batch
if (deleteBitmapBitOffset_ < deleteBitmap_->size() * 8) {
bits::copyBits(
deleteBitmap_->as<uint64_t>(),
deleteBitmapBitOffset_,
deleteBitmap_->asMutable<uint64_t>(),
0,
deleteBitmap_->size() * 8 - deleteBitmapBitOffset_);

uint64_t newBitMapSizeInBytes =
deleteBitmap_->size() - deleteBitmapBitOffset_ / 8;
if (deleteBitmapBitOffset_ % 8 != 0) {
newBitMapSizeInBytes--;
}
deleteBitmap_->setSize(newBitMapSizeInBytes);
} else {
// All bits were consumed, reset to 0 for all bits
std::memset(
(void*)(deleteBitmap_->asMutable<int8_t>()),
0L,
deleteBitmap_->size());
}
}

if (!positionalDeleteFileReaders_.empty()) {
auto numBytes = bits::nbytes(size);
dwio::common::ensureCapacity<int8_t>(
deleteBitmap_, numBytes, connectorQueryCtx_->memoryPool(), true, true);
deleteBitmap_, numBytes, connectorQueryCtx_->memoryPool());
std::memset((void*)deleteBitmap_->as<int8_t>(), 0L, numBytes);

for (auto iter = positionalDeleteFileReaders_.begin();
iter != positionalDeleteFileReaders_.end();) {
(*iter)->readDeletePositions(baseReadOffset_, size, deleteBitmap_);

if ((*iter)->noMoreData()) {
(*iter)->readDeletePositions(
baseReadOffset_, size, deleteBitmap_->asMutable<int8_t>());
if ((*iter)->endOfFile()) {
iter = positionalDeleteFileReaders_.erase(iter);
} else {
++iter;
}
}
}

mutation.deletedRows = deleteBitmap_ && deleteBitmap_->size() > 0
? deleteBitmap_->as<uint64_t>()
: nullptr;
deleteBitmap_->setSize(numBytes);
mutation.deletedRows = deleteBitmap_->as<uint64_t>();
}

auto rowsScanned = baseRowReader_->next(size, output, &mutation);
baseReadOffset_ += rowsScanned;
deleteBitmapBitOffset_ = rowsScanned;

return rowsScanned;
}
Expand Down
5 changes: 2 additions & 3 deletions velox/connectors/hive/iceberg/IcebergSplitReader.h
Original file line number Diff line number Diff line change
Expand Up @@ -52,13 +52,12 @@ class IcebergSplitReader : public SplitReader {
// The read offset to the beginning of the split in number of rows for the
// current batch for the base data file
uint64_t baseReadOffset_;

// The file position for the first row in the split
uint64_t splitOffset_;

std::list<std::unique_ptr<PositionalDeleteFileReader>>
positionalDeleteFileReaders_;
BufferPtr deleteBitmap_;
// The offset in bits of the deleteBitmap_ starting from where the bits shall
// be consumed
uint64_t deleteBitmapBitOffset_;
};
} // namespace facebook::velox::connector::hive::iceberg
62 changes: 21 additions & 41 deletions velox/connectors/hive/iceberg/PositionalDeleteFileReader.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -49,9 +49,12 @@ PositionalDeleteFileReader::PositionalDeleteFileReader(
deleteRowReader_(nullptr),
deletePositionsOutput_(nullptr),
deletePositionsOffset_(0),
totalNumRowsScanned_(0) {
endOfFile_(false) {
VELOX_CHECK(deleteFile_.content == FileContent::kPositionalDeletes);
VELOX_CHECK(deleteFile_.recordCount);

if (deleteFile_.recordCount == 0) {
return;
}

// TODO: check if the lowerbounds and upperbounds in deleteFile overlap with
// this batch. If not, no need to proceed.
Expand Down Expand Up @@ -134,23 +137,21 @@ PositionalDeleteFileReader::PositionalDeleteFileReader(
void PositionalDeleteFileReader::readDeletePositions(
uint64_t baseReadOffset,
uint64_t size,
BufferPtr deleteBitmapBuffer) {
int8_t* deleteBitmap) {
// We are going to read to the row number up to the end of the batch. For the
// same base file, the deleted rows are in ascending order in the same delete
// file. rowNumberUpperBound is the upperbound for the row number in this
// batch, excluding boundaries
// file
int64_t rowNumberUpperBound = splitOffset_ + baseReadOffset + size;

// Finish unused delete positions from last batch. Note that at this point we
// don't know how many rows the base row reader would scan yet.
// Finish unused delete positions from last batch
if (deletePositionsOutput_ &&
deletePositionsOffset_ < deletePositionsOutput_->size()) {
updateDeleteBitmap(
std::dynamic_pointer_cast<RowVector>(deletePositionsOutput_)
->childAt(0),
baseReadOffset,
rowNumberUpperBound,
deleteBitmapBuffer);
deleteBitmap);

if (readFinishedForBatch(rowNumberUpperBound)) {
return;
Expand All @@ -165,15 +166,14 @@ void PositionalDeleteFileReader::readDeletePositions(
// and update the delete bitmap

auto outputType = posColumn_->type;

RowTypePtr outputRowType = ROW({posColumn_->name}, {posColumn_->type});
if (!deletePositionsOutput_) {
deletePositionsOutput_ = BaseVector::create(outputRowType, 0, pool_);
}

do {
while (!readFinishedForBatch(rowNumberUpperBound)) {
auto rowsScanned = deleteRowReader_->next(size, deletePositionsOutput_);
totalNumRowsScanned_ += rowsScanned;

if (rowsScanned > 0) {
VELOX_CHECK(
!deletePositionsOutput_->mayHaveNulls(),
Expand All @@ -184,57 +184,42 @@ void PositionalDeleteFileReader::readDeletePositions(
deletePositionsOutput_->loadedVector();
deletePositionsOffset_ = 0;

// Convert the row numbers to set bits, up to rowNumberUpperBound.
// Beyond that the buffer of deleteBitMap is not available.
updateDeleteBitmap(
std::dynamic_pointer_cast<RowVector>(deletePositionsOutput_)
->childAt(0),
baseReadOffset,
rowNumberUpperBound,
deleteBitmapBuffer);
deleteBitmap);
}
} else {
// Reaching the end of the file
endOfFile_ = true;
deleteSplit_.reset();
break;
return;
}
} while (!readFinishedForBatch(rowNumberUpperBound));
}
}

bool PositionalDeleteFileReader::noMoreData() {
return totalNumRowsScanned_ >= deleteFile_.recordCount &&
deletePositionsOutput_ &&
deletePositionsOffset_ >= deletePositionsOutput_->size();
bool PositionalDeleteFileReader::endOfFile() {
return endOfFile_;
}

void PositionalDeleteFileReader::updateDeleteBitmap(
VectorPtr deletePositionsVector,
uint64_t baseReadOffset,
int64_t rowNumberUpperBound,
BufferPtr deleteBitmapBuffer) {
auto deleteBitmap = deleteBitmapBuffer->asMutable<uint8_t>();

int8_t* deleteBitmap) {
// Convert the positions in file into positions relative to the start of the
// split.
const int64_t* deletePositions =
deletePositionsVector->as<FlatVector<int64_t>>()->rawValues();
int64_t offset = baseReadOffset + splitOffset_;

while (deletePositionsOffset_ < deletePositionsVector->size() &&
deletePositions[deletePositionsOffset_] < rowNumberUpperBound) {
bits::setBit(
deleteBitmap, deletePositions[deletePositionsOffset_] - offset);
deletePositionsOffset_++;
}

// There might be multiple delete files for a single base file. The size of
// the deleteBitmapBuffer should be the largest position among all delte files
deleteBitmapBuffer->setSize(std::max(
(uint64_t)deleteBitmapBuffer->size(),
deletePositionsOffset_ == 0
? 0
: bits::nbytes(
deletePositions[deletePositionsOffset_ - 1] + 1 - offset)));
}

bool PositionalDeleteFileReader::readFinishedForBatch(
Expand All @@ -246,14 +231,9 @@ bool PositionalDeleteFileReader::readFinishedForBatch(
const int64_t* deletePositions =
deletePositionsVector->as<FlatVector<int64_t>>()->rawValues();

// We've read enough of the delete positions from this delete file when 1) it
// reaches the end of the file, or 2) the last read delete position is greater
// than the largest base file row number that is going to be read in this
// batch
if (totalNumRowsScanned_ >= deleteFile_.recordCount ||
(deletePositionsVector->size() != 0 &&
(deletePositionsOffset_ < deletePositionsVector->size() &&
deletePositions[deletePositionsOffset_] >= rowNumberUpperBound))) {
if (deletePositionsOutput_->size() != 0 &&
deletePositionsOffset_ < deletePositionsVector->size() &&
deletePositions[deletePositionsOffset_] >= rowNumberUpperBound) {
return true;
}
return false;
Expand Down
16 changes: 4 additions & 12 deletions velox/connectors/hive/iceberg/PositionalDeleteFileReader.h
Original file line number Diff line number Diff line change
Expand Up @@ -50,16 +50,16 @@ class PositionalDeleteFileReader {
void readDeletePositions(
uint64_t baseReadOffset,
uint64_t size,
BufferPtr deleteBitmap);
int8_t* deleteBitmap);

bool noMoreData();
bool endOfFile();

private:
void updateDeleteBitmap(
VectorPtr deletePositionsVector,
uint64_t baseReadOffset,
int64_t rowNumberUpperBound,
BufferPtr deleteBitmapBuffer);
int8_t* deleteBitmap);

bool readFinishedForBatch(int64_t rowNumberUpperBound);

Expand All @@ -77,17 +77,9 @@ class PositionalDeleteFileReader {

std::shared_ptr<HiveConnectorSplit> deleteSplit_;
std::unique_ptr<dwio::common::RowReader> deleteRowReader_;
// The vector to hold the delete positions read from the positional delete
// file. These positions are relative to the start of the whole base data
// file.
VectorPtr deletePositionsOutput_;
// The index of deletePositionsOutput_ that indicates up to where the delete
// positions have been converted into the bitmap
uint64_t deletePositionsOffset_;
// Total number of rows read from this positional delete file reader,
// including the rows filtered out from filters on both filePathColumn_ and
// posColumn_.
uint64_t totalNumRowsScanned_;
bool endOfFile_;
};

} // namespace facebook::velox::connector::hive::iceberg
Loading
Loading