From 6877f67ef67580949844208e2bb8dcf666b330fd Mon Sep 17 00:00:00 2001 From: Raunaq Morarka Date: Sat, 1 Oct 2022 23:20:57 +0530 Subject: [PATCH] Skip decoding parquet pages for long seek operations When a large enough number of rows are skipped due to `seek` operation, it is possible to skip decompressing and decoding parquet pages entirely. --- .../io/trino/parquet/reader/PageReader.java | 15 ++++++++ .../flat/FilteredRowRangesIterator.java | 11 ++++++ .../parquet/reader/flat/FlatColumnReader.java | 38 ++++++++++++++++--- .../reader/flat/RowRangesIterator.java | 8 ++++ .../parquet/reader/TestColumnReader.java | 10 ++++- .../reader/flat/TestRowRangesIterator.java | 20 ++++++++++ 6 files changed, 95 insertions(+), 7 deletions(-) diff --git a/lib/trino-parquet/src/main/java/io/trino/parquet/reader/PageReader.java b/lib/trino-parquet/src/main/java/io/trino/parquet/reader/PageReader.java index ac463c047c56..b8928d4f91e7 100644 --- a/lib/trino-parquet/src/main/java/io/trino/parquet/reader/PageReader.java +++ b/lib/trino-parquet/src/main/java/io/trino/parquet/reader/PageReader.java @@ -109,4 +109,19 @@ public DictionaryPage readDictionaryPage() throw new RuntimeException("Error reading dictionary page", e); } } + + public DataPage getNextPage() + { + return compressedPages.getFirst(); + } + + public boolean hasNext() + { + return !compressedPages.isEmpty(); + } + + public void skipNextPage() + { + compressedPages.removeFirst(); + } } diff --git a/lib/trino-parquet/src/main/java/io/trino/parquet/reader/flat/FilteredRowRangesIterator.java b/lib/trino-parquet/src/main/java/io/trino/parquet/reader/flat/FilteredRowRangesIterator.java index 17ebef7879e8..c95713760f58 100644 --- a/lib/trino-parquet/src/main/java/io/trino/parquet/reader/flat/FilteredRowRangesIterator.java +++ b/lib/trino-parquet/src/main/java/io/trino/parquet/reader/flat/FilteredRowRangesIterator.java @@ -196,6 +196,17 @@ public void resetForNewPage(OptionalLong firstRowIndex) verify(pageFirstRowIndex <= rangeEnd); } + /** + * Returns whether the current page with the provided value count + * is fully contained within the current row range. + */ + @Override + public boolean isPageFullyConsumed(int pageValueCount) + { + return pageFirstRowIndex >= currentRange.start() + && (pageFirstRowIndex + pageValueCount) <= currentRange.end() + 1; + } + private void nextRange() { currentRange = rowRangeIterator.next(); diff --git a/lib/trino-parquet/src/main/java/io/trino/parquet/reader/flat/FlatColumnReader.java b/lib/trino-parquet/src/main/java/io/trino/parquet/reader/flat/FlatColumnReader.java index 06bc73fc4a47..60b8b61922a4 100644 --- a/lib/trino-parquet/src/main/java/io/trino/parquet/reader/flat/FlatColumnReader.java +++ b/lib/trino-parquet/src/main/java/io/trino/parquet/reader/flat/FlatColumnReader.java @@ -104,7 +104,11 @@ private void seek() int remainingInBatch = readOffset; while (remainingInBatch > 0) { if (remainingPageValueCount == 0) { - if (!readNextPage()) { + remainingInBatch = seekToNextPage(remainingInBatch); + if (remainingInBatch == 0) { + break; + } + if (remainingPageValueCount == 0) { throwEndOfBatchException(remainingInBatch); } } @@ -237,11 +241,36 @@ private boolean skipToRowRangesStart() private boolean readNextPage() { - DataPage page = pageReader.readPage(); - if (page == null) { + if (!pageReader.hasNext()) { return false; } + DataPage page = readPage(); + rowRanges.resetForNewPage(page.getFirstRowIndex()); + return true; + } + + // When a large enough number of rows are skipped due to `seek` operation, + // it is possible to skip decompressing and decoding parquet pages entirely. + private int seekToNextPage(int remainingInBatch) + { + while (remainingInBatch > 0 && pageReader.hasNext()) { + DataPage page = pageReader.getNextPage(); + rowRanges.resetForNewPage(page.getFirstRowIndex()); + if (remainingInBatch < page.getValueCount() || !rowRanges.isPageFullyConsumed(page.getValueCount())) { + readPage(); + return remainingInBatch; + } + remainingInBatch -= page.getValueCount(); + remainingPageValueCount = 0; + pageReader.skipNextPage(); + } + return remainingInBatch; + } + private DataPage readPage() + { + DataPage page = pageReader.readPage(); + requireNonNull(page, "page is null"); if (page instanceof DataPageV1) { readFlatPageV1((DataPageV1) page); } @@ -250,8 +279,7 @@ else if (page instanceof DataPageV2) { } remainingPageValueCount = page.getValueCount(); - rowRanges.resetForNewPage(page.getFirstRowIndex()); - return true; + return page; } private void readFlatPageV1(DataPageV1 page) diff --git a/lib/trino-parquet/src/main/java/io/trino/parquet/reader/flat/RowRangesIterator.java b/lib/trino-parquet/src/main/java/io/trino/parquet/reader/flat/RowRangesIterator.java index 64d6c37b2c75..3acb174604e9 100644 --- a/lib/trino-parquet/src/main/java/io/trino/parquet/reader/flat/RowRangesIterator.java +++ b/lib/trino-parquet/src/main/java/io/trino/parquet/reader/flat/RowRangesIterator.java @@ -30,6 +30,8 @@ public interface RowRangesIterator void resetForNewPage(OptionalLong firstRowIndex); + boolean isPageFullyConsumed(int pageValueCount); + class AllRowRangesIterator implements RowRangesIterator { @@ -53,6 +55,12 @@ public long skipToRangeStart() @Override public void resetForNewPage(OptionalLong firstRowIndex) {} + + @Override + public boolean isPageFullyConsumed(int pageValueCount) + { + return true; + } } static RowRangesIterator createRowRangesIterator(Optional filteredRowRanges) diff --git a/lib/trino-parquet/src/test/java/io/trino/parquet/reader/TestColumnReader.java b/lib/trino-parquet/src/test/java/io/trino/parquet/reader/TestColumnReader.java index 6ad822d1728c..77fc4a084435 100644 --- a/lib/trino-parquet/src/test/java/io/trino/parquet/reader/TestColumnReader.java +++ b/lib/trino-parquet/src/test/java/io/trino/parquet/reader/TestColumnReader.java @@ -130,7 +130,7 @@ public void testReadFilteredPage( } readCount += batchSize; - batchSize = Math.min(batchSize * 2, rowCount - readCount); + batchSize = Math.min(Math.min(batchSize * 2, 512), rowCount - readCount); } assertThat(rowRangesIterator.hasNext()).isFalse(); assertThat(valuesRead).isEqualTo(expectedValues); @@ -156,7 +156,13 @@ public Object[][] testRowRangesProvider() ImmutableList.of(range(0, 127), range(128, 4095)), ImmutableList.of(range(0, 767), range(768, 4095)), ImmutableList.of(range(0, 255), range(256, 511), range(512, 767), range(768, 4095)), - ImmutableList.of(range(0, 99), range(100, 199), range(200, 399), range(400, 599), range(600, 799), range(800, 999), range(1000, 4095))) + // Parquet pages with small size to simulate cases of FlatColumnReader#seek skipping over parquet pages + IntStream.rangeClosed(0, 4095 / 150).boxed() + .map(i -> { + long start = i * 150; + return range(start, Math.min(start + 149, 4095)); + }) + .collect(toImmutableList())) .collect(toDataProvider()); Object[][] rangesWithNoPageSkipped = cartesianProduct(columnReaders, batchSkippers, rowRanges, pageRowRanges); Object[][] rangesWithPagesSkipped = cartesianProduct( diff --git a/lib/trino-parquet/src/test/java/io/trino/parquet/reader/flat/TestRowRangesIterator.java b/lib/trino-parquet/src/test/java/io/trino/parquet/reader/flat/TestRowRangesIterator.java index 145a979cd8c6..dad66e3692a7 100644 --- a/lib/trino-parquet/src/test/java/io/trino/parquet/reader/flat/TestRowRangesIterator.java +++ b/lib/trino-parquet/src/test/java/io/trino/parquet/reader/flat/TestRowRangesIterator.java @@ -36,6 +36,7 @@ public void testNullRowRanges() assertThat(ranges.skipToRangeStart()).isEqualTo(0); assertThat(ranges.advanceRange(100)).isEqualTo(100); assertThat(ranges.seekForward(100)).isEqualTo(100); + assertThat(ranges.isPageFullyConsumed(100)).isTrue(); } @Test @@ -77,6 +78,25 @@ public void testSkipToRangeStart() .isInstanceOf(VerifyException.class); } + @Test + public void testIsPageFullyConsumed() + { + RowRangesIterator ranges = createRowRangesIterator(range(20, 30), range(50, 99)); + ranges.resetForNewPage(OptionalLong.of(0)); + assertThat(ranges.isPageFullyConsumed(5)).isFalse(); + assertThat(ranges.isPageFullyConsumed(31)).isFalse(); + + ranges = createRowRangesIterator(range(20, 30), range(50, 99)); + ranges.resetForNewPage(OptionalLong.of(20)); + assertThat(ranges.isPageFullyConsumed(11)).isTrue(); + assertThat(ranges.isPageFullyConsumed(12)).isFalse(); + + ranges = createRowRangesIterator(range(20, 30), range(50, 99)); + ranges.resetForNewPage(OptionalLong.of(25)); + assertThat(ranges.isPageFullyConsumed(6)).isTrue(); + assertThat(ranges.isPageFullyConsumed(7)).isFalse(); + } + @Test public void testAdvanceRange() {