Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

PARQUET-1414: Limit page size based on maximum row count #531

Merged
merged 4 commits into from
Nov 7, 2018
Merged
Show file tree
Hide file tree
Changes from 3 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -48,6 +48,7 @@ public class ParquetProperties {
public static final int DEFAULT_MINIMUM_RECORD_COUNT_FOR_CHECK = 100;
public static final int DEFAULT_MAXIMUM_RECORD_COUNT_FOR_CHECK = 10000;
public static final int DEFAULT_COLUMN_INDEX_TRUNCATE_LENGTH = 64;
public static final int DEFAULT_PAGE_ROW_COUNT_LIMIT = 20_000;

public static final ValuesWriterFactory DEFAULT_VALUES_WRITER_FACTORY = new DefaultValuesWriterFactory();

Expand Down Expand Up @@ -85,10 +86,11 @@ public static WriterVersion fromString(String name) {
private final ByteBufferAllocator allocator;
private final ValuesWriterFactory valuesWriterFactory;
private final int columnIndexTruncateLength;
private final int pageRowCountLimit;

private ParquetProperties(WriterVersion writerVersion, int pageSize, int dictPageSize, boolean enableDict, int minRowCountForPageSizeCheck,
int maxRowCountForPageSizeCheck, boolean estimateNextSizeCheck, ByteBufferAllocator allocator,
ValuesWriterFactory writerFactory, int columnIndexMinMaxTruncateLength) {
ValuesWriterFactory writerFactory, int columnIndexMinMaxTruncateLength, int pageRowCountLimit) {
this.pageSizeThreshold = pageSize;
this.initialSlabSize = CapacityByteArrayOutputStream
.initialSlabSizeHeuristic(MIN_SLAB_SIZE, pageSizeThreshold, 10);
Expand All @@ -102,6 +104,7 @@ private ParquetProperties(WriterVersion writerVersion, int pageSize, int dictPag

this.valuesWriterFactory = writerFactory;
this.columnIndexTruncateLength = columnIndexMinMaxTruncateLength;
this.pageRowCountLimit = pageRowCountLimit;
}

public ValuesWriter newRepetitionLevelWriter(ColumnDescriptor path) {
Expand Down Expand Up @@ -194,6 +197,10 @@ public boolean estimateNextSizeCheck() {
return estimateNextSizeCheck;
}

public int getPageRowCountLimit() {
return pageRowCountLimit;
}

public static Builder builder() {
return new Builder();
}
Expand All @@ -213,18 +220,22 @@ public static class Builder {
private ByteBufferAllocator allocator = new HeapByteBufferAllocator();
private ValuesWriterFactory valuesWriterFactory = DEFAULT_VALUES_WRITER_FACTORY;
private int columnIndexTruncateLength = DEFAULT_COLUMN_INDEX_TRUNCATE_LENGTH;
private int pageRowCountLimit = DEFAULT_PAGE_ROW_COUNT_LIMIT;

private Builder() {
}

private Builder(ParquetProperties toCopy) {
this.pageSize = toCopy.pageSizeThreshold;
this.enableDict = toCopy.enableDictionary;
this.dictPageSize = toCopy.dictionaryPageSizeThreshold;
this.writerVersion = toCopy.writerVersion;
this.minRowCountForPageSizeCheck = toCopy.minRowCountForPageSizeCheck;
this.maxRowCountForPageSizeCheck = toCopy.maxRowCountForPageSizeCheck;
this.estimateNextSizeCheck = toCopy.estimateNextSizeCheck;
this.valuesWriterFactory = toCopy.valuesWriterFactory;
this.allocator = toCopy.allocator;
this.pageRowCountLimit = toCopy.pageRowCountLimit;
}

/**
Expand Down Expand Up @@ -313,11 +324,17 @@ public Builder withColumnIndexTruncateLength(int length) {
return this;
}

public Builder withPageRowCountLimit(int rowCount) {
Preconditions.checkArgument(rowCount > 0, "Invalid row count limit for pages: " + rowCount);
pageRowCountLimit = rowCount;
return this;
}

public ParquetProperties build() {
ParquetProperties properties =
new ParquetProperties(writerVersion, pageSize, dictPageSize,
enableDict, minRowCountForPageSizeCheck, maxRowCountForPageSizeCheck,
estimateNextSizeCheck, allocator, valuesWriterFactory, columnIndexTruncateLength);
estimateNextSizeCheck, allocator, valuesWriterFactory, columnIndexTruncateLength, pageRowCountLimit);
// we pass a constructed but uninitialized factory to ParquetProperties above as currently
// creation of ValuesWriters is invoked from within ParquetProperties. In the future
// we'd like to decouple that and won't need to pass an object to properties and then pass the
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -67,7 +67,7 @@ private interface ColumnWriterProvider {

this.columns = new TreeMap<>();

this.rowCountForNextSizeCheck = props.getMinRowCountForPageSizeCheck();
this.rowCountForNextSizeCheck = min(props.getMinRowCountForPageSizeCheck(), props.getPageRowCountLimit());

columnWriterProvider = new ColumnWriterProvider() {
@Override
Expand Down Expand Up @@ -95,7 +95,7 @@ public ColumnWriter getColumnWriter(ColumnDescriptor path) {
}
this.columns = unmodifiableMap(mcolumns);

this.rowCountForNextSizeCheck = props.getMinRowCountForPageSizeCheck();
this.rowCountForNextSizeCheck = min(props.getMinRowCountForPageSizeCheck(), props.getPageRowCountLimit());

columnWriterProvider = new ColumnWriterProvider() {
@Override
Expand Down Expand Up @@ -190,13 +190,18 @@ public void endRecord() {

private void sizeCheck() {
long minRecordToWait = Long.MAX_VALUE;
long maxUnwrittenRows = 0;
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The logic using maxUnwrittenRows is confusing to me. What does it mean for a row to be "unwritten"?

I think this should calculate the next size check row count more directly.

int pageRowCountLimit = props.getPageRowCountLimit();
for (ColumnWriterBase writer : columns.values()) {
long usedMem = writer.getCurrentPageBufferedSize();
long rows = rowCount - writer.getRowsWrittenSoFar();
long remainingMem = props.getPageSizeThreshold() - usedMem;
if (remainingMem <= thresholdTolerance) {
long actualPageRowCount = rowCount - writer.getRowsWrittenSoFar();
if (remainingMem <= thresholdTolerance || actualPageRowCount >= pageRowCountLimit) {
writer.writePage();
remainingMem = props.getPageSizeThreshold();
} else if (actualPageRowCount > maxUnwrittenRows) {
maxUnwrittenRows = actualPageRowCount;
}
long rowsToFillPage =
usedMem == 0 ?
Expand All @@ -219,5 +224,11 @@ private void sizeCheck() {
} else {
rowCountForNextSizeCheck = rowCount + props.getMinRowCountForPageSizeCheck();
}

// Do the check earlier if required to keep the row count limit
long rowCountForNextRowCountCheck = rowCount + pageRowCountLimit - maxUnwrittenRows;
if (rowCountForNextRowCountCheck < rowCountForNextSizeCheck) {
rowCountForNextSizeCheck = rowCountForNextRowCountCheck;
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -18,19 +18,28 @@
*/
package org.apache.parquet.column.mem;

import static org.apache.parquet.schema.PrimitiveType.PrimitiveTypeName.BINARY;
import static org.apache.parquet.schema.PrimitiveType.PrimitiveTypeName.INT32;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertTrue;

import org.apache.parquet.column.ColumnDescriptor;
import org.apache.parquet.column.ColumnReader;
import org.apache.parquet.column.ColumnWriteStore;
import org.apache.parquet.column.ColumnWriter;
import org.apache.parquet.column.ParquetProperties;
import org.apache.parquet.column.impl.ColumnReadStoreImpl;
import org.apache.parquet.column.impl.ColumnWriteStoreV1;
import org.apache.parquet.column.impl.ColumnWriteStoreV2;
import org.apache.parquet.column.page.DataPage;
import org.apache.parquet.column.page.PageReader;
import org.apache.parquet.column.page.mem.MemPageStore;
import org.apache.parquet.example.DummyRecordConverter;
import org.apache.parquet.io.api.Binary;
import org.apache.parquet.schema.MessageType;
import org.apache.parquet.schema.MessageTypeParser;
import org.apache.parquet.schema.PrimitiveType.PrimitiveTypeName;
import org.apache.parquet.schema.Types;
import org.junit.Test;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
Expand Down Expand Up @@ -166,6 +175,68 @@ public void testMemColumnSeveralPagesRepeated() throws Exception {
}
}

@Test
public void testPageSize() {
MessageType schema = Types.buildMessage()
.requiredList().requiredElement(BINARY).named("binary_col")
.requiredList().requiredElement(INT32).named("int32_col")
.named("msg");
System.out.println(schema);
MemPageStore memPageStore = new MemPageStore(123);

// Using V2 pages so we have rowCount info
ColumnWriteStore writeStore = new ColumnWriteStoreV2(schema, memPageStore, ParquetProperties.builder()
.withPageSize(1024) // Less than 10 records for binary_col
.withMinRowCountForPageSizeCheck(1) // Enforce having precise page sizing
.withPageRowCountLimit(10)
.withDictionaryEncoding(false) // Enforce having large binary_col pages
.build());
ColumnDescriptor binaryCol = schema.getColumnDescription(new String[] { "binary_col", "list", "element" });
ColumnWriter binaryColWriter = writeStore.getColumnWriter(binaryCol);
ColumnDescriptor int32Col = schema.getColumnDescription(new String[] { "int32_col", "list", "element" });
ColumnWriter int32ColWriter = writeStore.getColumnWriter(int32Col);
// Writing 123 records
for (int i = 0; i < 123; ++i) {
// Writing 10 values per record
for (int j = 0; j < 10; ++j) {
binaryColWriter.write(Binary.fromString("aaaaaaaaaaaa"), j == 0 ? 0 : 2, 2);
int32ColWriter.write(42, j == 0 ? 0 : 2, 2);
}
writeStore.endRecord();
}
writeStore.flush();

// Check that all the binary_col pages are <= 1024 bytes
{
PageReader binaryColPageReader = memPageStore.getPageReader(binaryCol);
assertEquals(1230, binaryColPageReader.getTotalValueCount());
int pageCnt = 0;
int valueCnt = 0;
while (valueCnt < binaryColPageReader.getTotalValueCount()) {
DataPage page = binaryColPageReader.readPage();
++pageCnt;
valueCnt += page.getValueCount();
LOG.info("binary_col page-{}: {} bytes, {} rows", pageCnt, page.getCompressedSize(), page.getIndexRowCount().get());
assertTrue("Compressed size should be less than 1024", page.getCompressedSize() <= 1024);
}
}

// Check that all the int32_col pages contain <= 10 rows
{
PageReader int32ColPageReader = memPageStore.getPageReader(int32Col);
assertEquals(1230, int32ColPageReader.getTotalValueCount());
int pageCnt = 0;
int valueCnt = 0;
while (valueCnt < int32ColPageReader.getTotalValueCount()) {
DataPage page = int32ColPageReader.readPage();
++pageCnt;
valueCnt += page.getValueCount();
LOG.info("int32_col page-{}: {} bytes, {} rows", pageCnt, page.getCompressedSize(), page.getIndexRowCount().get());
assertTrue("Row count should be less than 10", page.getIndexRowCount().get() <= 10);
}
}
}

private ColumnWriteStoreV1 newColumnWriteStoreImpl(MemPageStore memPageStore) {
return new ColumnWriteStoreV1(memPageStore,
ParquetProperties.builder()
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -144,6 +144,7 @@ public static enum JobSummaryLevel {
public static final String MAX_ROW_COUNT_FOR_PAGE_SIZE_CHECK = "parquet.page.size.row.check.max";
public static final String ESTIMATE_PAGE_SIZE_CHECK = "parquet.page.size.check.estimate";
public static final String COLUMN_INDEX_TRUNCATE_LENGTH = "parquet.columnindex.truncate.length";
public static final String PAGE_ROW_COUNT_LIMIT = "parquet.page.row.count.limit";

public static JobSummaryLevel getJobSummaryLevel(Configuration conf) {
String level = conf.get(JOB_SUMMARY_LEVEL);
Expand Down Expand Up @@ -325,6 +326,18 @@ private static int getColumnIndexTruncateLength(Configuration conf) {
return conf.getInt(COLUMN_INDEX_TRUNCATE_LENGTH, ParquetProperties.DEFAULT_COLUMN_INDEX_TRUNCATE_LENGTH);
}

public static void setPageRowCountLimit(JobContext jobContext, int rowCount) {
setPageRowCountLimit(getConfiguration(jobContext), rowCount);
}

public static void setPageRowCountLimit(Configuration conf, int rowCount) {
conf.setInt(PAGE_ROW_COUNT_LIMIT, rowCount);
}

private static int getPageRowCountLimit(Configuration conf) {
return conf.getInt(PAGE_ROW_COUNT_LIMIT, ParquetProperties.DEFAULT_PAGE_ROW_COUNT_LIMIT);
}

private WriteSupport<T> writeSupport;
private ParquetOutputCommitter committer;

Expand Down Expand Up @@ -380,6 +393,7 @@ public RecordWriter<Void, T> getRecordWriter(Configuration conf, Path file, Comp
.withMinRowCountForPageSizeCheck(getMinRowCountForPageSizeCheck(conf))
.withMaxRowCountForPageSizeCheck(getMaxRowCountForPageSizeCheck(conf))
.withColumnIndexTruncateLength(getColumnIndexTruncateLength(conf))
.withPageRowCountLimit(getPageRowCountLimit(conf))
.build();

long blockSize = getLongBlockSize(conf);
Expand All @@ -398,6 +412,7 @@ public RecordWriter<Void, T> getRecordWriter(Configuration conf, Path file, Comp
LOG.info("Min row count for page size check is: {}", props.getMinRowCountForPageSizeCheck());
LOG.info("Max row count for page size check is: {}", props.getMaxRowCountForPageSizeCheck());
LOG.info("Truncate length for column indexes is: {}", props.getColumnIndexTruncateLength());
LOG.info("Page row count limit to {}", props.getPageRowCountLimit());
}

WriteContext init = writeSupport.init(conf);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -425,6 +425,17 @@ public SELF withPageSize(int pageSize) {
return self();
}

/**
* Sets the Parquet format page row count limit used by the constructed writer.
*
* @param rowCount limit for the number of rows stored in a page
* @return this builder for method chaining
*/
public SELF withPageRowCountLimit(int rowCount) {
encodingPropsBuilder.withPageRowCountLimit(rowCount);
return self();
}

/**
* Set the Parquet format dictionary page size used by the constructed
* writer.
Expand Down