Skip to content

Commit

Permalink
Skip decoding parquet pages for long seek operations
Browse files Browse the repository at this point in the history
When a large enough number of rows are skipped due to `seek` operation,
it is possible to skip decompressing and decoding parquet pages entirely.
  • Loading branch information
raunaqmorarka committed Nov 18, 2022
1 parent cf86f6e commit 6877f67
Show file tree
Hide file tree
Showing 6 changed files with 95 additions and 7 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -109,4 +109,19 @@ public DictionaryPage readDictionaryPage()
throw new RuntimeException("Error reading dictionary page", e);
}
}

public DataPage getNextPage()
{
return compressedPages.getFirst();
}

public boolean hasNext()
{
return !compressedPages.isEmpty();
}

public void skipNextPage()
{
compressedPages.removeFirst();
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -196,6 +196,17 @@ public void resetForNewPage(OptionalLong firstRowIndex)
verify(pageFirstRowIndex <= rangeEnd);
}

/**
* Returns whether the current page with the provided value count
* is fully contained within the current row range.
*/
@Override
public boolean isPageFullyConsumed(int pageValueCount)
{
return pageFirstRowIndex >= currentRange.start()
&& (pageFirstRowIndex + pageValueCount) <= currentRange.end() + 1;
}

private void nextRange()
{
currentRange = rowRangeIterator.next();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -104,7 +104,11 @@ private void seek()
int remainingInBatch = readOffset;
while (remainingInBatch > 0) {
if (remainingPageValueCount == 0) {
if (!readNextPage()) {
remainingInBatch = seekToNextPage(remainingInBatch);
if (remainingInBatch == 0) {
break;
}
if (remainingPageValueCount == 0) {
throwEndOfBatchException(remainingInBatch);
}
}
Expand Down Expand Up @@ -237,11 +241,36 @@ private boolean skipToRowRangesStart()

private boolean readNextPage()
{
DataPage page = pageReader.readPage();
if (page == null) {
if (!pageReader.hasNext()) {
return false;
}
DataPage page = readPage();
rowRanges.resetForNewPage(page.getFirstRowIndex());
return true;
}

// When a large enough number of rows are skipped due to `seek` operation,
// it is possible to skip decompressing and decoding parquet pages entirely.
private int seekToNextPage(int remainingInBatch)
{
while (remainingInBatch > 0 && pageReader.hasNext()) {
DataPage page = pageReader.getNextPage();
rowRanges.resetForNewPage(page.getFirstRowIndex());
if (remainingInBatch < page.getValueCount() || !rowRanges.isPageFullyConsumed(page.getValueCount())) {
readPage();
return remainingInBatch;
}
remainingInBatch -= page.getValueCount();
remainingPageValueCount = 0;
pageReader.skipNextPage();
}
return remainingInBatch;
}

private DataPage readPage()
{
DataPage page = pageReader.readPage();
requireNonNull(page, "page is null");
if (page instanceof DataPageV1) {
readFlatPageV1((DataPageV1) page);
}
Expand All @@ -250,8 +279,7 @@ else if (page instanceof DataPageV2) {
}

remainingPageValueCount = page.getValueCount();
rowRanges.resetForNewPage(page.getFirstRowIndex());
return true;
return page;
}

private void readFlatPageV1(DataPageV1 page)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,8 @@ public interface RowRangesIterator

void resetForNewPage(OptionalLong firstRowIndex);

boolean isPageFullyConsumed(int pageValueCount);

class AllRowRangesIterator
implements RowRangesIterator
{
Expand All @@ -53,6 +55,12 @@ public long skipToRangeStart()

@Override
public void resetForNewPage(OptionalLong firstRowIndex) {}

@Override
public boolean isPageFullyConsumed(int pageValueCount)
{
return true;
}
}

static RowRangesIterator createRowRangesIterator(Optional<FilteredRowRanges> filteredRowRanges)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -130,7 +130,7 @@ public void testReadFilteredPage(
}

readCount += batchSize;
batchSize = Math.min(batchSize * 2, rowCount - readCount);
batchSize = Math.min(Math.min(batchSize * 2, 512), rowCount - readCount);
}
assertThat(rowRangesIterator.hasNext()).isFalse();
assertThat(valuesRead).isEqualTo(expectedValues);
Expand All @@ -156,7 +156,13 @@ public Object[][] testRowRangesProvider()
ImmutableList.of(range(0, 127), range(128, 4095)),
ImmutableList.of(range(0, 767), range(768, 4095)),
ImmutableList.of(range(0, 255), range(256, 511), range(512, 767), range(768, 4095)),
ImmutableList.of(range(0, 99), range(100, 199), range(200, 399), range(400, 599), range(600, 799), range(800, 999), range(1000, 4095)))
// Parquet pages with small size to simulate cases of FlatColumnReader#seek skipping over parquet pages
IntStream.rangeClosed(0, 4095 / 150).boxed()
.map(i -> {
long start = i * 150;
return range(start, Math.min(start + 149, 4095));
})
.collect(toImmutableList()))
.collect(toDataProvider());
Object[][] rangesWithNoPageSkipped = cartesianProduct(columnReaders, batchSkippers, rowRanges, pageRowRanges);
Object[][] rangesWithPagesSkipped = cartesianProduct(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,7 @@ public void testNullRowRanges()
assertThat(ranges.skipToRangeStart()).isEqualTo(0);
assertThat(ranges.advanceRange(100)).isEqualTo(100);
assertThat(ranges.seekForward(100)).isEqualTo(100);
assertThat(ranges.isPageFullyConsumed(100)).isTrue();
}

@Test
Expand Down Expand Up @@ -77,6 +78,25 @@ public void testSkipToRangeStart()
.isInstanceOf(VerifyException.class);
}

@Test
public void testIsPageFullyConsumed()
{
RowRangesIterator ranges = createRowRangesIterator(range(20, 30), range(50, 99));
ranges.resetForNewPage(OptionalLong.of(0));
assertThat(ranges.isPageFullyConsumed(5)).isFalse();
assertThat(ranges.isPageFullyConsumed(31)).isFalse();

ranges = createRowRangesIterator(range(20, 30), range(50, 99));
ranges.resetForNewPage(OptionalLong.of(20));
assertThat(ranges.isPageFullyConsumed(11)).isTrue();
assertThat(ranges.isPageFullyConsumed(12)).isFalse();

ranges = createRowRangesIterator(range(20, 30), range(50, 99));
ranges.resetForNewPage(OptionalLong.of(25));
assertThat(ranges.isPageFullyConsumed(6)).isTrue();
assertThat(ranges.isPageFullyConsumed(7)).isFalse();
}

@Test
public void testAdvanceRange()
{
Expand Down

0 comments on commit 6877f67

Please sign in to comment.