diff --git a/FORK.md b/FORK.md index 6184e3bda9..7b8002db9a 100644 --- a/FORK.md +++ b/FORK.md @@ -1,5 +1,8 @@ # Differences to mainline -This repo exists mostly to make releases of parquet-mr more often. The only difference to upstream is solution for [PARQUET-686](https://issues.apache.org/jira/browse/PARQUET-686). +This repo exists mostly to make releases of parquet-mr more often. The only difference to upstream is as follows: + +1. Solution for [PARQUET-686](https://issues.apache.org/jira/browse/PARQUET-686). +2. Temporarily revert [PARQUET-1414](https://issues.apache.org/jira/browse/PARQUET-1414) because it causes Spark to write unreadable empty Parquet pages. The change that we had made that upstream only made in statistics v2 is to change binary comparison to be unsigned and declare all statistics priori to that change as corrupted. This lets us more quickly take advantage of binary statistics and removes burden on user to know whether they should account for signed binary comparison in their values. diff --git a/parquet-column/src/main/java/org/apache/parquet/column/ParquetProperties.java b/parquet-column/src/main/java/org/apache/parquet/column/ParquetProperties.java index 41e482cfdd..b173239332 100644 --- a/parquet-column/src/main/java/org/apache/parquet/column/ParquetProperties.java +++ b/parquet-column/src/main/java/org/apache/parquet/column/ParquetProperties.java @@ -48,7 +48,6 @@ public class ParquetProperties { public static final int DEFAULT_MINIMUM_RECORD_COUNT_FOR_CHECK = 100; public static final int DEFAULT_MAXIMUM_RECORD_COUNT_FOR_CHECK = 10000; public static final int DEFAULT_COLUMN_INDEX_TRUNCATE_LENGTH = 64; - public static final int DEFAULT_PAGE_ROW_COUNT_LIMIT = 20_000; public static final ValuesWriterFactory DEFAULT_VALUES_WRITER_FACTORY = new DefaultValuesWriterFactory(); @@ -86,11 +85,10 @@ public static WriterVersion fromString(String name) { private final ByteBufferAllocator allocator; private final ValuesWriterFactory valuesWriterFactory; private final int columnIndexTruncateLength; - private final int pageRowCountLimit; private ParquetProperties(WriterVersion writerVersion, int pageSize, int dictPageSize, boolean enableDict, int minRowCountForPageSizeCheck, int maxRowCountForPageSizeCheck, boolean estimateNextSizeCheck, ByteBufferAllocator allocator, - ValuesWriterFactory writerFactory, int columnIndexMinMaxTruncateLength, int pageRowCountLimit) { + ValuesWriterFactory writerFactory, int columnIndexMinMaxTruncateLength) { this.pageSizeThreshold = pageSize; this.initialSlabSize = CapacityByteArrayOutputStream .initialSlabSizeHeuristic(MIN_SLAB_SIZE, pageSizeThreshold, 10); @@ -104,7 +102,6 @@ private ParquetProperties(WriterVersion writerVersion, int pageSize, int dictPag this.valuesWriterFactory = writerFactory; this.columnIndexTruncateLength = columnIndexMinMaxTruncateLength; - this.pageRowCountLimit = pageRowCountLimit; } public ValuesWriter newRepetitionLevelWriter(ColumnDescriptor path) { @@ -197,10 +194,6 @@ public boolean estimateNextSizeCheck() { return estimateNextSizeCheck; } - public int getPageRowCountLimit() { - return pageRowCountLimit; - } - public static Builder builder() { return new Builder(); } @@ -220,22 +213,18 @@ public static class Builder { private ByteBufferAllocator allocator = new HeapByteBufferAllocator(); private ValuesWriterFactory valuesWriterFactory = DEFAULT_VALUES_WRITER_FACTORY; private int columnIndexTruncateLength = DEFAULT_COLUMN_INDEX_TRUNCATE_LENGTH; - private int pageRowCountLimit = DEFAULT_PAGE_ROW_COUNT_LIMIT; private Builder() { } private Builder(ParquetProperties toCopy) { - this.pageSize = toCopy.pageSizeThreshold; this.enableDict = toCopy.enableDictionary; this.dictPageSize = toCopy.dictionaryPageSizeThreshold; this.writerVersion = toCopy.writerVersion; this.minRowCountForPageSizeCheck = toCopy.minRowCountForPageSizeCheck; this.maxRowCountForPageSizeCheck = toCopy.maxRowCountForPageSizeCheck; this.estimateNextSizeCheck = toCopy.estimateNextSizeCheck; - this.valuesWriterFactory = toCopy.valuesWriterFactory; this.allocator = toCopy.allocator; - this.pageRowCountLimit = toCopy.pageRowCountLimit; } /** @@ -324,17 +313,11 @@ public Builder withColumnIndexTruncateLength(int length) { return this; } - public Builder withPageRowCountLimit(int rowCount) { - Preconditions.checkArgument(rowCount > 0, "Invalid row count limit for pages: " + rowCount); - pageRowCountLimit = rowCount; - return this; - } - public ParquetProperties build() { ParquetProperties properties = new ParquetProperties(writerVersion, pageSize, dictPageSize, enableDict, minRowCountForPageSizeCheck, maxRowCountForPageSizeCheck, - estimateNextSizeCheck, allocator, valuesWriterFactory, columnIndexTruncateLength, pageRowCountLimit); + estimateNextSizeCheck, allocator, valuesWriterFactory, columnIndexTruncateLength); // we pass a constructed but uninitialized factory to ParquetProperties above as currently // creation of ValuesWriters is invoked from within ParquetProperties. In the future // we'd like to decouple that and won't need to pass an object to properties and then pass the diff --git a/parquet-column/src/main/java/org/apache/parquet/column/impl/ColumnWriteStoreBase.java b/parquet-column/src/main/java/org/apache/parquet/column/impl/ColumnWriteStoreBase.java index ac9aaca26a..5cd7d876e4 100644 --- a/parquet-column/src/main/java/org/apache/parquet/column/impl/ColumnWriteStoreBase.java +++ b/parquet-column/src/main/java/org/apache/parquet/column/impl/ColumnWriteStoreBase.java @@ -67,7 +67,7 @@ private interface ColumnWriterProvider { this.columns = new TreeMap<>(); - this.rowCountForNextSizeCheck = min(props.getMinRowCountForPageSizeCheck(), props.getPageRowCountLimit()); + this.rowCountForNextSizeCheck = props.getMinRowCountForPageSizeCheck(); columnWriterProvider = new ColumnWriterProvider() { @Override @@ -95,7 +95,7 @@ public ColumnWriter getColumnWriter(ColumnDescriptor path) { } this.columns = unmodifiableMap(mcolumns); - this.rowCountForNextSizeCheck = min(props.getMinRowCountForPageSizeCheck(), props.getPageRowCountLimit()); + this.rowCountForNextSizeCheck = props.getMinRowCountForPageSizeCheck(); columnWriterProvider = new ColumnWriterProvider() { @Override @@ -190,17 +190,13 @@ public void endRecord() { private void sizeCheck() { long minRecordToWait = Long.MAX_VALUE; - int pageRowCountLimit = props.getPageRowCountLimit(); - long rowCountForNextRowCountCheck = rowCount + pageRowCountLimit; for (ColumnWriterBase writer : columns.values()) { long usedMem = writer.getCurrentPageBufferedSize(); long rows = rowCount - writer.getRowsWrittenSoFar(); long remainingMem = props.getPageSizeThreshold() - usedMem; - if (remainingMem <= thresholdTolerance || rows >= pageRowCountLimit) { + if (remainingMem <= thresholdTolerance) { writer.writePage(); remainingMem = props.getPageSizeThreshold(); - } else { - rowCountForNextRowCountCheck = min(rowCountForNextRowCountCheck, writer.getRowsWrittenSoFar() + pageRowCountLimit); } long rowsToFillPage = usedMem == 0 ? @@ -223,10 +219,5 @@ private void sizeCheck() { } else { rowCountForNextSizeCheck = rowCount + props.getMinRowCountForPageSizeCheck(); } - - // Do the check earlier if required to keep the row count limit - if (rowCountForNextRowCountCheck < rowCountForNextSizeCheck) { - rowCountForNextSizeCheck = rowCountForNextRowCountCheck; - } } } diff --git a/parquet-column/src/test/java/org/apache/parquet/column/mem/TestMemColumn.java b/parquet-column/src/test/java/org/apache/parquet/column/mem/TestMemColumn.java index f89d0cbf7a..e5db38c945 100644 --- a/parquet-column/src/test/java/org/apache/parquet/column/mem/TestMemColumn.java +++ b/parquet-column/src/test/java/org/apache/parquet/column/mem/TestMemColumn.java @@ -18,28 +18,19 @@ */ package org.apache.parquet.column.mem; -import static org.apache.parquet.schema.PrimitiveType.PrimitiveTypeName.BINARY; -import static org.apache.parquet.schema.PrimitiveType.PrimitiveTypeName.INT32; import static org.junit.Assert.assertEquals; -import static org.junit.Assert.assertTrue; import org.apache.parquet.column.ColumnDescriptor; import org.apache.parquet.column.ColumnReader; -import org.apache.parquet.column.ColumnWriteStore; import org.apache.parquet.column.ColumnWriter; import org.apache.parquet.column.ParquetProperties; import org.apache.parquet.column.impl.ColumnReadStoreImpl; import org.apache.parquet.column.impl.ColumnWriteStoreV1; -import org.apache.parquet.column.impl.ColumnWriteStoreV2; -import org.apache.parquet.column.page.DataPage; -import org.apache.parquet.column.page.PageReader; import org.apache.parquet.column.page.mem.MemPageStore; import org.apache.parquet.example.DummyRecordConverter; import org.apache.parquet.io.api.Binary; import org.apache.parquet.schema.MessageType; import org.apache.parquet.schema.MessageTypeParser; -import org.apache.parquet.schema.PrimitiveType.PrimitiveTypeName; -import org.apache.parquet.schema.Types; import org.junit.Test; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -175,68 +166,6 @@ public void testMemColumnSeveralPagesRepeated() throws Exception { } } - @Test - public void testPageSize() { - MessageType schema = Types.buildMessage() - .requiredList().requiredElement(BINARY).named("binary_col") - .requiredList().requiredElement(INT32).named("int32_col") - .named("msg"); - System.out.println(schema); - MemPageStore memPageStore = new MemPageStore(123); - - // Using V2 pages so we have rowCount info - ColumnWriteStore writeStore = new ColumnWriteStoreV2(schema, memPageStore, ParquetProperties.builder() - .withPageSize(1024) // Less than 10 records for binary_col - .withMinRowCountForPageSizeCheck(1) // Enforce having precise page sizing - .withPageRowCountLimit(10) - .withDictionaryEncoding(false) // Enforce having large binary_col pages - .build()); - ColumnDescriptor binaryCol = schema.getColumnDescription(new String[] { "binary_col", "list", "element" }); - ColumnWriter binaryColWriter = writeStore.getColumnWriter(binaryCol); - ColumnDescriptor int32Col = schema.getColumnDescription(new String[] { "int32_col", "list", "element" }); - ColumnWriter int32ColWriter = writeStore.getColumnWriter(int32Col); - // Writing 123 records - for (int i = 0; i < 123; ++i) { - // Writing 10 values per record - for (int j = 0; j < 10; ++j) { - binaryColWriter.write(Binary.fromString("aaaaaaaaaaaa"), j == 0 ? 0 : 2, 2); - int32ColWriter.write(42, j == 0 ? 0 : 2, 2); - } - writeStore.endRecord(); - } - writeStore.flush(); - - // Check that all the binary_col pages are <= 1024 bytes - { - PageReader binaryColPageReader = memPageStore.getPageReader(binaryCol); - assertEquals(1230, binaryColPageReader.getTotalValueCount()); - int pageCnt = 0; - int valueCnt = 0; - while (valueCnt < binaryColPageReader.getTotalValueCount()) { - DataPage page = binaryColPageReader.readPage(); - ++pageCnt; - valueCnt += page.getValueCount(); - LOG.info("binary_col page-{}: {} bytes, {} rows", pageCnt, page.getCompressedSize(), page.getIndexRowCount().get()); - assertTrue("Compressed size should be less than 1024", page.getCompressedSize() <= 1024); - } - } - - // Check that all the int32_col pages contain <= 10 rows - { - PageReader int32ColPageReader = memPageStore.getPageReader(int32Col); - assertEquals(1230, int32ColPageReader.getTotalValueCount()); - int pageCnt = 0; - int valueCnt = 0; - while (valueCnt < int32ColPageReader.getTotalValueCount()) { - DataPage page = int32ColPageReader.readPage(); - ++pageCnt; - valueCnt += page.getValueCount(); - LOG.info("int32_col page-{}: {} bytes, {} rows", pageCnt, page.getCompressedSize(), page.getIndexRowCount().get()); - assertTrue("Row count should be less than 10", page.getIndexRowCount().get() <= 10); - } - } - } - private ColumnWriteStoreV1 newColumnWriteStoreImpl(MemPageStore memPageStore) { return new ColumnWriteStoreV1(memPageStore, ParquetProperties.builder() diff --git a/parquet-hadoop/src/main/java/org/apache/parquet/hadoop/ParquetOutputFormat.java b/parquet-hadoop/src/main/java/org/apache/parquet/hadoop/ParquetOutputFormat.java index 04cbd15c0b..0789bf50d4 100644 --- a/parquet-hadoop/src/main/java/org/apache/parquet/hadoop/ParquetOutputFormat.java +++ b/parquet-hadoop/src/main/java/org/apache/parquet/hadoop/ParquetOutputFormat.java @@ -144,7 +144,6 @@ public static enum JobSummaryLevel { public static final String MAX_ROW_COUNT_FOR_PAGE_SIZE_CHECK = "parquet.page.size.row.check.max"; public static final String ESTIMATE_PAGE_SIZE_CHECK = "parquet.page.size.check.estimate"; public static final String COLUMN_INDEX_TRUNCATE_LENGTH = "parquet.columnindex.truncate.length"; - public static final String PAGE_ROW_COUNT_LIMIT = "parquet.page.row.count.limit"; public static JobSummaryLevel getJobSummaryLevel(Configuration conf) { String level = conf.get(JOB_SUMMARY_LEVEL); @@ -326,18 +325,6 @@ private static int getColumnIndexTruncateLength(Configuration conf) { return conf.getInt(COLUMN_INDEX_TRUNCATE_LENGTH, ParquetProperties.DEFAULT_COLUMN_INDEX_TRUNCATE_LENGTH); } - public static void setPageRowCountLimit(JobContext jobContext, int rowCount) { - setPageRowCountLimit(getConfiguration(jobContext), rowCount); - } - - public static void setPageRowCountLimit(Configuration conf, int rowCount) { - conf.setInt(PAGE_ROW_COUNT_LIMIT, rowCount); - } - - private static int getPageRowCountLimit(Configuration conf) { - return conf.getInt(PAGE_ROW_COUNT_LIMIT, ParquetProperties.DEFAULT_PAGE_ROW_COUNT_LIMIT); - } - private WriteSupport writeSupport; private ParquetOutputCommitter committer; @@ -393,7 +380,6 @@ public RecordWriter getRecordWriter(Configuration conf, Path file, Comp .withMinRowCountForPageSizeCheck(getMinRowCountForPageSizeCheck(conf)) .withMaxRowCountForPageSizeCheck(getMaxRowCountForPageSizeCheck(conf)) .withColumnIndexTruncateLength(getColumnIndexTruncateLength(conf)) - .withPageRowCountLimit(getPageRowCountLimit(conf)) .build(); long blockSize = getLongBlockSize(conf); @@ -412,7 +398,6 @@ public RecordWriter getRecordWriter(Configuration conf, Path file, Comp LOG.info("Min row count for page size check is: {}", props.getMinRowCountForPageSizeCheck()); LOG.info("Max row count for page size check is: {}", props.getMaxRowCountForPageSizeCheck()); LOG.info("Truncate length for column indexes is: {}", props.getColumnIndexTruncateLength()); - LOG.info("Page row count limit to {}", props.getPageRowCountLimit()); } WriteContext init = writeSupport.init(conf); diff --git a/parquet-hadoop/src/main/java/org/apache/parquet/hadoop/ParquetWriter.java b/parquet-hadoop/src/main/java/org/apache/parquet/hadoop/ParquetWriter.java index 1ed5e32ca7..5b0e4f82d1 100644 --- a/parquet-hadoop/src/main/java/org/apache/parquet/hadoop/ParquetWriter.java +++ b/parquet-hadoop/src/main/java/org/apache/parquet/hadoop/ParquetWriter.java @@ -425,17 +425,6 @@ public SELF withPageSize(int pageSize) { return self(); } - /** - * Sets the Parquet format page row count limit used by the constructed writer. - * - * @param rowCount limit for the number of rows stored in a page - * @return this builder for method chaining - */ - public SELF withPageRowCountLimit(int rowCount) { - encodingPropsBuilder.withPageRowCountLimit(rowCount); - return self(); - } - /** * Set the Parquet format dictionary page size used by the constructed * writer.