Skip to content

Commit

Permalink
Use column stats to switch to non-null reads in FlatColumnReader
Browse files Browse the repository at this point in the history
Parquet schema may specify a column definition as OPTIONAL even though
there are no nulls in the actual data. Row-group column statistics
can be used to identify such cases and switch to faster non-nullable read
paths in FlatColumnReader.
  • Loading branch information
raunaqmorarka committed Nov 18, 2022
1 parent 6877f67 commit 22c4144
Show file tree
Hide file tree
Showing 6 changed files with 75 additions and 11 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@ public final class PageReader
{
private final CompressionCodecName codec;
private final long valueCount;
private final boolean hasNoNulls;
private final LinkedList<DataPage> compressedPages;
private final DictionaryPage compressedDictionaryPage;

Expand All @@ -38,19 +39,26 @@ public final class PageReader
public PageReader(CompressionCodecName codec,
LinkedList<DataPage> compressedPages,
DictionaryPage compressedDictionaryPage,
long valueCount)
long valueCount,
boolean hasNoNulls)
{
this.codec = codec;
this.compressedPages = compressedPages;
this.compressedDictionaryPage = compressedDictionaryPage;
this.valueCount = valueCount;
this.hasNoNulls = hasNoNulls;
}

public long getTotalValueCount()
{
return valueCount;
}

public boolean hasNoNulls()
{
return hasNoNulls;
}

public DataPage readPage()
{
if (compressedPages.isEmpty()) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@
import io.trino.parquet.DictionaryPage;
import io.trino.parquet.ParquetCorruptionException;
import org.apache.parquet.column.Encoding;
import org.apache.parquet.column.statistics.Statistics;
import org.apache.parquet.format.DataPageHeader;
import org.apache.parquet.format.DataPageHeaderV2;
import org.apache.parquet.format.DictionaryPageHeader;
Expand Down Expand Up @@ -112,7 +113,12 @@ public PageReader readAllPages()
break;
}
}
return new PageReader(descriptor.getColumnChunkMetaData().getCodec(), pages, dictionaryPage, valueCount);
// Parquet schema may specify a column definition as OPTIONAL even though there are no nulls in the actual data.
// Row-group column statistics can be used to identify such cases and switch to faster non-nullable read
// paths in FlatColumnReader.
Statistics<?> columnStatistics = descriptor.getColumnChunkMetaData().getStatistics();
boolean hasNoNulls = columnStatistics != null && columnStatistics.getNumNulls() == 0;
return new PageReader(descriptor.getColumnChunkMetaData().getCodec(), pages, dictionaryPage, valueCount, hasNoNulls);
}

private boolean hasMorePages(long valuesCountReadSoFar, int dataPageCountReadSoFar)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -115,7 +115,7 @@ private void seek()

int chunkSize = Math.min(remainingPageValueCount, remainingInBatch);
int nonNullCount;
if (field.isRequired()) {
if (isNonNull()) {
nonNullCount = chunkSize;
}
else {
Expand Down Expand Up @@ -227,7 +227,7 @@ private boolean skipToRowRangesStart()
}
if (skipCount > 0) {
int nonNullsCount;
if (field.isRequired()) {
if (isNonNull()) {
nonNullsCount = skipCount;
}
else {
Expand Down Expand Up @@ -287,7 +287,7 @@ private void readFlatPageV1(DataPageV1 page)
Slice buffer = page.getSlice();
ParquetEncoding definitionEncoding = page.getDefinitionLevelEncoding();

checkArgument(field.isRequired() || definitionEncoding == RLE, "Invalid definition level encoding: " + definitionEncoding);
checkArgument(isNonNull() || definitionEncoding == RLE, "Invalid definition level encoding: " + definitionEncoding);
int alreadyRead = 0;
if (definitionEncoding == RLE) {
// Definition levels are skipped from file when the max definition level is 0 as the bit-width required to store them is 0.
Expand Down Expand Up @@ -318,6 +318,11 @@ private void readFlatPageV2(DataPageV2 page)
valueDecoder.init(new SimpleSliceInputStream(page.getSlice()));
}

protected boolean isNonNull()
{
return field.isRequired() || pageReader.hasNoNulls();
}

@Override
public boolean hasPageReader()
{
Expand Down Expand Up @@ -358,7 +363,7 @@ public ColumnChunk readPrimitive()
{
ColumnChunk columnChunk;
seek();
if (field.isRequired()) {
if (isNonNull()) {
columnChunk = readNoNull();
}
else {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -126,7 +126,7 @@ public int read()
throws IOException
{
ColumnReader columnReader = ColumnReaderFactory.create(field, UTC, true);
columnReader.setPageReader(new PageReader(UNCOMPRESSED, new LinkedList<>(dataPages), null, MAX_VALUES), Optional.empty());
columnReader.setPageReader(new PageReader(UNCOMPRESSED, new LinkedList<>(dataPages), null, MAX_VALUES, false), Optional.empty());
int rowsRead = 0;
while (rowsRead < MAX_VALUES) {
int remaining = MAX_VALUES - rowsRead;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -358,7 +358,8 @@ private static PageReader getPageReader(List<TestingPage> testingPages, boolean
UNCOMPRESSED,
inputPages,
dictionaryPage,
pagesRowCount(testingPages));
pagesRowCount(testingPages),
false);
}

private static List<DataPage> createDataPages(List<TestingPage> testingPage, ValuesWriter encoder)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -194,6 +194,25 @@ public <T> void testSingleValueDictionaryNullableWithNoNulls(DataPageVersion ver
format.assertBlock(values, actual);
}

@Test(dataProvider = "dictionaryReadersWithPageVersions", dataProviderClass = TestingColumnReader.class)
public <T> void testSingleValueDictionaryNullableWithNoNullsUsingColumnStats(DataPageVersion version, ColumnReaderFormat<T> format)
throws IOException
{
// Create reader
PrimitiveField field = createField(format, false);
ColumnReader reader = createColumnReader(field);
// Write data
DictionaryValuesWriter dictionaryWriter = format.getDictionaryWriter();
T[] values = format.write(dictionaryWriter, new Integer[] {1, 1});
DataPage page = createNullableDataPage(version, RLE_DICTIONARY, dictionaryWriter, false, false);
DictionaryPage dictionaryPage = getDictionaryPage(dictionaryWriter);
// Read and assert
reader.setPageReader(getPageReaderMock(new LinkedList<>(List.of(page)), dictionaryPage, false), Optional.empty());
reader.prepareNextRead(2);
Block actual = reader.readPrimitive().getBlock();
format.assertBlock(values, actual);
}

@Test(dataProvider = "dictionaryReadersWithPageVersions", dataProviderClass = TestingColumnReader.class)
public <T> void testSingleValueDictionaryNullableWithOnlyNulls(DataPageVersion version, ColumnReaderFormat<T> format)
throws IOException
Expand Down Expand Up @@ -336,6 +355,25 @@ public <T> void testReadNullableWithNoNulls(DataPageVersion version, ColumnReade
format.assertBlock(values1, actual1);
}

@Test(dataProvider = "readersWithPageVersions", dataProviderClass = TestingColumnReader.class)
public <T> void testReadNullableWithNoNullsUsingColumnStats(DataPageVersion version, ColumnReaderFormat<T> format)
throws IOException
{
// Create reader
PrimitiveField field = createField(format, false);
ColumnReader reader = createColumnReader(field);
// Write data
ValuesWriter writer = format.getPlainWriter();
T[] values1 = format.write(writer, new Integer[] {1, 2});
DataPage page1 = createNullableDataPage(version, PLAIN, writer, false, false);
// Read and assert
reader.setPageReader(getPageReaderMock(new LinkedList<>(List.of(page1)), null, true), Optional.empty());
// Deliberate mismatch between Trino/Parquet page sizes
Block actual1 = readBlock(reader, 2);
assertThat(actual1.mayHaveNull()).isFalse();
format.assertBlock(values1, actual1);
}

@Test(dataProvider = "dictionaryReadersWithPageVersions", dataProviderClass = TestingColumnReader.class)
public <T> void testMixedDictionaryAndOrdinary(DataPageVersion version, ColumnReaderFormat<T> format)
throws IOException
Expand Down Expand Up @@ -518,7 +556,7 @@ private static PageReader getSimplePageReaderMock(ParquetEncoding encoding)
encoding,
encoding,
PLAIN));
return new PageReader(UNCOMPRESSED, pages, null, 1);
return new PageReader(UNCOMPRESSED, pages, null, 1, false);
}

private static PageReader getNullOnlyPageReaderMock()
Expand All @@ -536,18 +574,24 @@ private static PageReader getNullOnlyPageReaderMock()
RLE,
RLE,
PLAIN));
return new PageReader(UNCOMPRESSED, pages, null, 1);
return new PageReader(UNCOMPRESSED, pages, null, 1, false);
}

private static PageReader getPageReaderMock(LinkedList<DataPage> dataPages, @Nullable DictionaryPage dictionaryPage)
{
return getPageReaderMock(dataPages, dictionaryPage, false);
}

private static PageReader getPageReaderMock(LinkedList<DataPage> dataPages, @Nullable DictionaryPage dictionaryPage, boolean hasNoNulls)
{
return new PageReader(
UNCOMPRESSED,
dataPages,
dictionaryPage,
dataPages.stream()
.mapToInt(DataPage::getValueCount)
.sum());
.sum(),
hasNoNulls);
}

private static ColumnReader createColumnReader(PrimitiveField field)
Expand Down

0 comments on commit 22c4144

Please sign in to comment.