From 866a8780077cd220992299945391797c1fd730ab Mon Sep 17 00:00:00 2001 From: huaxingao Date: Wed, 14 Aug 2024 14:57:17 -0700 Subject: [PATCH 1/8] Spark partial limit push down --- .palantir/revapi.yml | 32 ++- .../vectorized/VectorizedArrowReader.java | 26 +- .../parquet/VectorizedColumnIterator.java | 8 +- .../org/apache/iceberg/parquet/Parquet.java | 19 +- .../apache/iceberg/parquet/ParquetReader.java | 14 +- .../parquet/VectorizedParquetReader.java | 23 +- .../apache/iceberg/spark/SparkReadConf.java | 8 + .../iceberg/spark/SparkSQLProperties.java | 4 + .../iceberg/spark/source/BaseBatchReader.java | 12 +- .../iceberg/spark/source/BaseRowReader.java | 13 +- .../iceberg/spark/source/BatchDataReader.java | 10 +- .../spark/source/ChangelogRowReader.java | 3 +- .../spark/source/EqualityDeleteRowReader.java | 2 +- .../source/PositionDeletesRowReader.java | 2 +- .../iceberg/spark/source/RowDataReader.java | 10 +- .../iceberg/spark/source/SparkBatch.java | 11 +- .../spark/source/SparkBatchQueryScan.java | 5 +- .../spark/source/SparkChangelogScan.java | 3 +- .../source/SparkColumnarReaderFactory.java | 6 +- .../spark/source/SparkCopyOnWriteScan.java | 2 +- .../spark/source/SparkMicroBatchStream.java | 2 +- .../source/SparkPartitioningAwareScan.java | 5 +- .../spark/source/SparkRowReaderFactory.java | 7 +- .../iceberg/spark/source/SparkScan.java | 18 +- .../spark/source/SparkScanBuilder.java | 46 ++- .../iceberg/spark/source/SparkStagedScan.java | 2 +- .../spark/source/TestLimitPushDown.java | 271 ++++++++++++++++++ 27 files changed, 499 insertions(+), 65 deletions(-) create mode 100644 spark/v3.5/spark/src/test/java/org/apache/iceberg/spark/source/TestLimitPushDown.java diff --git a/.palantir/revapi.yml b/.palantir/revapi.yml index e58ce70ded7a..bb64b1923d56 100644 --- a/.palantir/revapi.yml +++ b/.palantir/revapi.yml @@ -1025,14 +1025,11 @@ acceptedBreaks: new: "class org.apache.iceberg.types.Types.NestedField" justification: "new Constructor added" org.apache.iceberg:iceberg-core: - - code: "java.method.visibilityReduced" - old: "method void org.apache.iceberg.encryption.PlaintextEncryptionManager::()" - new: "method void org.apache.iceberg.encryption.PlaintextEncryptionManager::()" - justification: "Deprecations for 1.6.0 release" - code: "java.element.noLongerDeprecated" old: "method void org.apache.iceberg.encryption.PlaintextEncryptionManager::()" new: "method void org.apache.iceberg.encryption.PlaintextEncryptionManager::()" - justification: "Constructor became private as part of deprecations cleanup for 1.6.0 release" + justification: "Constructor became private as part of deprecations cleanup for\ + \ 1.6.0 release" - code: "java.element.noLongerDeprecated" old: "method void org.apache.iceberg.rest.auth.OAuth2Util.AuthSession::(java.util.Map, java.lang.String, java.lang.String, java.lang.String,\ @@ -1056,6 +1053,10 @@ acceptedBreaks: - code: "java.method.removed" old: "method org.apache.iceberg.DataFiles.Builder org.apache.iceberg.DataFiles.Builder::withEqualityFieldIds(java.util.List)" justification: "Deprecations for 1.6.0 release" + - code: "java.method.visibilityReduced" + old: "method void org.apache.iceberg.encryption.PlaintextEncryptionManager::()" + new: "method void org.apache.iceberg.encryption.PlaintextEncryptionManager::()" + justification: "Deprecations for 1.6.0 release" "1.6.0": org.apache.iceberg:iceberg-common: - code: "java.method.removed" @@ -1131,6 +1132,27 @@ acceptedBreaks: new: "method org.apache.iceberg.BaseMetastoreOperations.CommitStatus org.apache.iceberg.BaseMetastoreTableOperations::checkCommitStatus(java.lang.String,\ \ org.apache.iceberg.TableMetadata)" justification: "Removing deprecated code" + org.apache.iceberg:iceberg-parquet: + - code: "java.method.numberOfParametersChanged" + old: "method void org.apache.iceberg.parquet.ParquetReader::(org.apache.iceberg.io.InputFile,\ + \ org.apache.iceberg.Schema, org.apache.parquet.ParquetReadOptions, java.util.function.Function>, org.apache.iceberg.mapping.NameMapping,\ + \ org.apache.iceberg.expressions.Expression, boolean, boolean)" + new: "method void org.apache.iceberg.parquet.ParquetReader::(org.apache.iceberg.io.InputFile,\ + \ org.apache.iceberg.Schema, org.apache.parquet.ParquetReadOptions, java.util.function.Function>, org.apache.iceberg.mapping.NameMapping,\ + \ org.apache.iceberg.expressions.Expression, boolean, boolean, int)" + justification: "{limit push down}" + - code: "java.method.numberOfParametersChanged" + old: "method void org.apache.iceberg.parquet.VectorizedParquetReader::(org.apache.iceberg.io.InputFile,\ + \ org.apache.iceberg.Schema, org.apache.parquet.ParquetReadOptions, java.util.function.Function>, org.apache.iceberg.mapping.NameMapping,\ + \ org.apache.iceberg.expressions.Expression, boolean, boolean, int)" + new: "method void org.apache.iceberg.parquet.VectorizedParquetReader::(org.apache.iceberg.io.InputFile,\ + \ org.apache.iceberg.Schema, org.apache.parquet.ParquetReadOptions, java.util.function.Function>, org.apache.iceberg.mapping.NameMapping,\ + \ org.apache.iceberg.expressions.Expression, boolean, boolean, int, int)" + justification: "{limit push down}" apache-iceberg-0.14.0: org.apache.iceberg:iceberg-api: - code: "java.class.defaultSerializationChanged" diff --git a/arrow/src/main/java/org/apache/iceberg/arrow/vectorized/VectorizedArrowReader.java b/arrow/src/main/java/org/apache/iceberg/arrow/vectorized/VectorizedArrowReader.java index 27ee25124f16..68877f340759 100644 --- a/arrow/src/main/java/org/apache/iceberg/arrow/vectorized/VectorizedArrowReader.java +++ b/arrow/src/main/java/org/apache/iceberg/arrow/vectorized/VectorizedArrowReader.java @@ -147,54 +147,60 @@ public VectorHolder read(VectorHolder reuse, int numValsToRead) { } if (vectorizedColumnIterator.hasNext()) { if (dictEncoded) { - vectorizedColumnIterator.dictionaryBatchReader().nextBatch(vec, -1, nullabilityHolder); + vectorizedColumnIterator + .dictionaryBatchReader() + .nextBatch(numValsToRead, vec, -1, nullabilityHolder); } else { switch (readType) { case VARBINARY: case VARCHAR: vectorizedColumnIterator .varWidthTypeBatchReader() - .nextBatch(vec, -1, nullabilityHolder); + .nextBatch(numValsToRead, vec, -1, nullabilityHolder); break; case BOOLEAN: - vectorizedColumnIterator.booleanBatchReader().nextBatch(vec, -1, nullabilityHolder); + vectorizedColumnIterator + .booleanBatchReader() + .nextBatch(numValsToRead, vec, -1, nullabilityHolder); break; case INT: case INT_BACKED_DECIMAL: vectorizedColumnIterator .integerBatchReader() - .nextBatch(vec, typeWidth, nullabilityHolder); + .nextBatch(numValsToRead, vec, typeWidth, nullabilityHolder); break; case LONG: case LONG_BACKED_DECIMAL: - vectorizedColumnIterator.longBatchReader().nextBatch(vec, typeWidth, nullabilityHolder); + vectorizedColumnIterator + .longBatchReader() + .nextBatch(numValsToRead, vec, typeWidth, nullabilityHolder); break; case FLOAT: vectorizedColumnIterator .floatBatchReader() - .nextBatch(vec, typeWidth, nullabilityHolder); + .nextBatch(numValsToRead, vec, typeWidth, nullabilityHolder); break; case DOUBLE: vectorizedColumnIterator .doubleBatchReader() - .nextBatch(vec, typeWidth, nullabilityHolder); + .nextBatch(numValsToRead, vec, typeWidth, nullabilityHolder); break; case TIMESTAMP_MILLIS: vectorizedColumnIterator .timestampMillisBatchReader() - .nextBatch(vec, typeWidth, nullabilityHolder); + .nextBatch(numValsToRead, vec, typeWidth, nullabilityHolder); break; case TIMESTAMP_INT96: vectorizedColumnIterator .timestampInt96BatchReader() - .nextBatch(vec, typeWidth, nullabilityHolder); + .nextBatch(numValsToRead, vec, typeWidth, nullabilityHolder); break; case UUID: case FIXED_WIDTH_BINARY: case FIXED_LENGTH_DECIMAL: vectorizedColumnIterator .fixedSizeBinaryBatchReader() - .nextBatch(vec, typeWidth, nullabilityHolder); + .nextBatch(numValsToRead, vec, typeWidth, nullabilityHolder); break; } } diff --git a/arrow/src/main/java/org/apache/iceberg/arrow/vectorized/parquet/VectorizedColumnIterator.java b/arrow/src/main/java/org/apache/iceberg/arrow/vectorized/parquet/VectorizedColumnIterator.java index 822ca8973f54..e4b6569a983b 100644 --- a/arrow/src/main/java/org/apache/iceberg/arrow/vectorized/parquet/VectorizedColumnIterator.java +++ b/arrow/src/main/java/org/apache/iceberg/arrow/vectorized/parquet/VectorizedColumnIterator.java @@ -69,12 +69,14 @@ public boolean producesDictionaryEncodedVector() { } public abstract class BatchReader { - public void nextBatch(FieldVector fieldVector, int typeWidth, NullabilityHolder holder) { + public void nextBatch( + int numValsToRead, FieldVector fieldVector, int typeWidth, NullabilityHolder holder) { int rowsReadSoFar = 0; - while (rowsReadSoFar < batchSize && hasNext()) { + while (rowsReadSoFar < batchSize && hasNext() && rowsReadSoFar < numValsToRead) { advance(); + int expectedBatchSize = Math.min(batchSize - rowsReadSoFar, numValsToRead - rowsReadSoFar); int rowsInThisBatch = - nextBatchOf(fieldVector, batchSize - rowsReadSoFar, rowsReadSoFar, typeWidth, holder); + nextBatchOf(fieldVector, expectedBatchSize, rowsReadSoFar, typeWidth, holder); rowsReadSoFar += rowsInThisBatch; triplesRead += rowsInThisBatch; fieldVector.setValueCount(rowsReadSoFar); diff --git a/parquet/src/main/java/org/apache/iceberg/parquet/Parquet.java b/parquet/src/main/java/org/apache/iceberg/parquet/Parquet.java index 3421c0b86d3a..0e69fab83699 100644 --- a/parquet/src/main/java/org/apache/iceberg/parquet/Parquet.java +++ b/parquet/src/main/java/org/apache/iceberg/parquet/Parquet.java @@ -1048,6 +1048,7 @@ public static class ReadBuilder { private NameMapping nameMapping = null; private ByteBuffer fileEncryptionKey = null; private ByteBuffer fileAADPrefix = null; + private int pushedLimit = -1; private ReadBuilder(InputFile file) { this.file = file; @@ -1151,6 +1152,11 @@ public ReadBuilder withAADPrefix(ByteBuffer aadPrefix) { return this; } + public ReadBuilder pushedlimit(int limit) { + this.pushedLimit = limit; + return this; + } + @SuppressWarnings({"unchecked", "checkstyle:CyclomaticComplexity"}) public CloseableIterable build() { FileDecryptionProperties fileDecryptionProperties = null; @@ -1212,10 +1218,19 @@ public CloseableIterable build() { filter, reuseContainers, caseSensitive, - maxRecordsPerBatch); + maxRecordsPerBatch, + pushedLimit); } else { return new org.apache.iceberg.parquet.ParquetReader<>( - file, schema, options, readerFunc, mapping, filter, reuseContainers, caseSensitive); + file, + schema, + options, + readerFunc, + mapping, + filter, + reuseContainers, + caseSensitive, + pushedLimit); } } diff --git a/parquet/src/main/java/org/apache/iceberg/parquet/ParquetReader.java b/parquet/src/main/java/org/apache/iceberg/parquet/ParquetReader.java index c1d8b0ccbbad..dc9d6414b2a2 100644 --- a/parquet/src/main/java/org/apache/iceberg/parquet/ParquetReader.java +++ b/parquet/src/main/java/org/apache/iceberg/parquet/ParquetReader.java @@ -43,6 +43,7 @@ public class ParquetReader extends CloseableGroup implements CloseableIterabl private final boolean reuseContainers; private final boolean caseSensitive; private final NameMapping nameMapping; + private final int pushedLimit; public ParquetReader( InputFile input, @@ -52,7 +53,8 @@ public ParquetReader( NameMapping nameMapping, Expression filter, boolean reuseContainers, - boolean caseSensitive) { + boolean caseSensitive, + int pushedLimit) { this.input = input; this.expectedSchema = expectedSchema; this.options = options; @@ -62,6 +64,7 @@ public ParquetReader( this.reuseContainers = reuseContainers; this.caseSensitive = caseSensitive; this.nameMapping = nameMapping; + this.pushedLimit = pushedLimit; } private ReadConf conf = null; @@ -89,6 +92,7 @@ private ReadConf init() { @Override public CloseableIterator iterator() { FileIterator iter = new FileIterator<>(init()); + iter.pushedLimit = pushedLimit; addCloseable(iter); return iter; } @@ -105,6 +109,7 @@ private static class FileIterator implements CloseableIterator { private long nextRowGroupStart = 0; private long valuesRead = 0; private T last = null; + private int pushedLimit = -1; FileIterator(ReadConf conf) { this.reader = conf.reader(); @@ -117,7 +122,12 @@ private static class FileIterator implements CloseableIterator { @Override public boolean hasNext() { - return valuesRead < totalValues; + long numToRead = totalValues; + if (pushedLimit > 0 && pushedLimit < numToRead) { + numToRead = pushedLimit; + } + + return valuesRead < numToRead; } @Override diff --git a/parquet/src/main/java/org/apache/iceberg/parquet/VectorizedParquetReader.java b/parquet/src/main/java/org/apache/iceberg/parquet/VectorizedParquetReader.java index 35d94f328d60..8ef1f2c53920 100644 --- a/parquet/src/main/java/org/apache/iceberg/parquet/VectorizedParquetReader.java +++ b/parquet/src/main/java/org/apache/iceberg/parquet/VectorizedParquetReader.java @@ -49,6 +49,7 @@ public class VectorizedParquetReader extends CloseableGroup implements Closea private final boolean caseSensitive; private final int batchSize; private final NameMapping nameMapping; + private final int pushedLimit; public VectorizedParquetReader( InputFile input, @@ -59,7 +60,8 @@ public VectorizedParquetReader( Expression filter, boolean reuseContainers, boolean caseSensitive, - int maxRecordsPerBatch) { + int maxRecordsPerBatch, + int pushedLimit) { this.input = input; this.expectedSchema = expectedSchema; this.options = options; @@ -70,6 +72,7 @@ public VectorizedParquetReader( this.caseSensitive = caseSensitive; this.batchSize = maxRecordsPerBatch; this.nameMapping = nameMapping; + this.pushedLimit = pushedLimit; } private ReadConf conf = null; @@ -97,6 +100,7 @@ private ReadConf init() { @Override public CloseableIterator iterator() { FileIterator iter = new FileIterator<>(init()); + iter.pushedLimit = pushedLimit; addCloseable(iter); return iter; } @@ -114,6 +118,7 @@ private static class FileIterator implements CloseableIterator { private long valuesRead = 0; private T last = null; private final long[] rowGroupsStartRowPos; + private int pushedLimit = -1; FileIterator(ReadConf conf) { this.reader = conf.reader(); @@ -129,7 +134,12 @@ private static class FileIterator implements CloseableIterator { @Override public boolean hasNext() { - return valuesRead < totalValues; + long numToRead = totalValues; + if (pushedLimit > 0 && pushedLimit < numToRead) { + numToRead = pushedLimit; + } + + return valuesRead < numToRead; } @Override @@ -141,8 +151,15 @@ public T next() { advance(); } + long remainingValues = nextRowGroupStart - valuesRead; + int remainingLimit = (int) (pushedLimit - valuesRead); // batchSize is an integer, so casting to integer is safe - int numValuesToRead = (int) Math.min(nextRowGroupStart - valuesRead, batchSize); + int numValuesToRead = + (int) + Math.min( + remainingValues, + (remainingLimit > 0 ? Math.min(batchSize, remainingLimit) : batchSize)); + if (reuseContainers) { this.last = model.read(last, numValuesToRead); } else { diff --git a/spark/v3.5/spark/src/main/java/org/apache/iceberg/spark/SparkReadConf.java b/spark/v3.5/spark/src/main/java/org/apache/iceberg/spark/SparkReadConf.java index 67e9d78ada4d..c3fc3debe96b 100644 --- a/spark/v3.5/spark/src/main/java/org/apache/iceberg/spark/SparkReadConf.java +++ b/spark/v3.5/spark/src/main/java/org/apache/iceberg/spark/SparkReadConf.java @@ -277,6 +277,14 @@ public boolean aggregatePushDownEnabled() { .parse(); } + public boolean limitPushDownEnabled() { + return confParser + .booleanConf() + .sessionConf(SparkSQLProperties.LIMIT_PUSH_DOWN_ENABLED) + .defaultValue(SparkSQLProperties.LIMIT_PUSH_DOWN_ENABLED_DEFAULT) + .parse(); + } + public boolean adaptiveSplitSizeEnabled() { return confParser .booleanConf() diff --git a/spark/v3.5/spark/src/main/java/org/apache/iceberg/spark/SparkSQLProperties.java b/spark/v3.5/spark/src/main/java/org/apache/iceberg/spark/SparkSQLProperties.java index 77ae796ffb76..a823e2a4da52 100644 --- a/spark/v3.5/spark/src/main/java/org/apache/iceberg/spark/SparkSQLProperties.java +++ b/spark/v3.5/spark/src/main/java/org/apache/iceberg/spark/SparkSQLProperties.java @@ -45,6 +45,10 @@ private SparkSQLProperties() {} "spark.sql.iceberg.aggregate-push-down.enabled"; public static final boolean AGGREGATE_PUSH_DOWN_ENABLED_DEFAULT = true; + // Controls whether to push down limit to Iceberg + public static final String LIMIT_PUSH_DOWN_ENABLED = "spark.sql.iceberg.limit-push-down.enabled"; + public static final boolean LIMIT_PUSH_DOWN_ENABLED_DEFAULT = true; + // Controls write distribution mode public static final String DISTRIBUTION_MODE = "spark.sql.iceberg.distribution-mode"; diff --git a/spark/v3.5/spark/src/main/java/org/apache/iceberg/spark/source/BaseBatchReader.java b/spark/v3.5/spark/src/main/java/org/apache/iceberg/spark/source/BaseBatchReader.java index c05b694a60dc..b0d6e1499ca9 100644 --- a/spark/v3.5/spark/src/main/java/org/apache/iceberg/spark/source/BaseBatchReader.java +++ b/spark/v3.5/spark/src/main/java/org/apache/iceberg/spark/source/BaseBatchReader.java @@ -39,6 +39,7 @@ abstract class BaseBatchReader extends BaseReader { private final int batchSize; + private final int pushedLimit; BaseBatchReader( Table table, @@ -46,9 +47,11 @@ abstract class BaseBatchReader extends BaseReader newBatchIterable( @@ -61,7 +64,8 @@ protected CloseableIterable newBatchIterable( SparkDeleteFilter deleteFilter) { switch (format) { case PARQUET: - return newParquetIterable(inputFile, start, length, residual, idToConstant, deleteFilter); + return newParquetIterable( + inputFile, start, length, residual, idToConstant, deleteFilter, pushedLimit); case ORC: return newOrcIterable(inputFile, start, length, residual, idToConstant); @@ -78,7 +82,8 @@ private CloseableIterable newParquetIterable( long length, Expression residual, Map idToConstant, - SparkDeleteFilter deleteFilter) { + SparkDeleteFilter deleteFilter, + int limit) { // get required schema if there are deletes Schema requiredSchema = deleteFilter != null ? deleteFilter.requiredSchema() : expectedSchema(); @@ -97,6 +102,7 @@ private CloseableIterable newParquetIterable( // read performance as every batch read doesn't have to pay the cost of allocating memory. .reuseContainers() .withNameMapping(nameMapping()) + .pushedlimit(limit) .build(); } diff --git a/spark/v3.5/spark/src/main/java/org/apache/iceberg/spark/source/BaseRowReader.java b/spark/v3.5/spark/src/main/java/org/apache/iceberg/spark/source/BaseRowReader.java index 927084caea1c..a293a7bdbde8 100644 --- a/spark/v3.5/spark/src/main/java/org/apache/iceberg/spark/source/BaseRowReader.java +++ b/spark/v3.5/spark/src/main/java/org/apache/iceberg/spark/source/BaseRowReader.java @@ -39,13 +39,17 @@ import org.apache.spark.sql.catalyst.InternalRow; abstract class BaseRowReader extends BaseReader { + private final int pushedLimit; + BaseRowReader( Table table, ScanTaskGroup taskGroup, Schema tableSchema, Schema expectedSchema, - boolean caseSensitive) { + boolean caseSensitive, + int pushedLimit) { super(table, taskGroup, tableSchema, expectedSchema, caseSensitive); + this.pushedLimit = pushedLimit; } protected CloseableIterable newIterable( @@ -58,7 +62,8 @@ protected CloseableIterable newIterable( Map idToConstant) { switch (format) { case PARQUET: - return newParquetIterable(file, start, length, residual, projection, idToConstant); + return newParquetIterable( + file, start, length, residual, projection, idToConstant, pushedLimit); case AVRO: return newAvroIterable(file, start, length, projection, idToConstant); @@ -88,7 +93,8 @@ private CloseableIterable newParquetIterable( long length, Expression residual, Schema readSchema, - Map idToConstant) { + Map idToConstant, + int limit) { return Parquet.read(file) .reuseContainers() .split(start, length) @@ -98,6 +104,7 @@ private CloseableIterable newParquetIterable( .filter(residual) .caseSensitive(caseSensitive()) .withNameMapping(nameMapping()) + .pushedlimit(limit) .build(); } diff --git a/spark/v3.5/spark/src/main/java/org/apache/iceberg/spark/source/BatchDataReader.java b/spark/v3.5/spark/src/main/java/org/apache/iceberg/spark/source/BatchDataReader.java index 389ad1d5a2d9..16ec43ff3d13 100644 --- a/spark/v3.5/spark/src/main/java/org/apache/iceberg/spark/source/BatchDataReader.java +++ b/spark/v3.5/spark/src/main/java/org/apache/iceberg/spark/source/BatchDataReader.java @@ -45,14 +45,15 @@ class BatchDataReader extends BaseBatchReader private final long numSplits; - BatchDataReader(SparkInputPartition partition, int batchSize) { + BatchDataReader(SparkInputPartition partition, int batchSize, int pushedLimit) { this( partition.table(), partition.taskGroup(), SnapshotUtil.schemaFor(partition.table(), partition.branch()), partition.expectedSchema(), partition.isCaseSensitive(), - batchSize); + batchSize, + pushedLimit); } BatchDataReader( @@ -61,8 +62,9 @@ class BatchDataReader extends BaseBatchReader Schema tableSchema, Schema expectedSchema, boolean caseSensitive, - int size) { - super(table, taskGroup, tableSchema, expectedSchema, caseSensitive, size); + int size, + int pushedLimit) { + super(table, taskGroup, tableSchema, expectedSchema, caseSensitive, size, pushedLimit); numSplits = taskGroup.tasks().size(); LOG.debug("Reading {} file split(s) for table {}", numSplits, table.name()); diff --git a/spark/v3.5/spark/src/main/java/org/apache/iceberg/spark/source/ChangelogRowReader.java b/spark/v3.5/spark/src/main/java/org/apache/iceberg/spark/source/ChangelogRowReader.java index 572f955884a3..c85b5c082aa0 100644 --- a/spark/v3.5/spark/src/main/java/org/apache/iceberg/spark/source/ChangelogRowReader.java +++ b/spark/v3.5/spark/src/main/java/org/apache/iceberg/spark/source/ChangelogRowReader.java @@ -68,7 +68,8 @@ class ChangelogRowReader extends BaseRowReader taskGroup, tableSchema, ChangelogUtil.dropChangelogMetadata(expectedSchema), - caseSensitive); + caseSensitive, + -1); } @Override diff --git a/spark/v3.5/spark/src/main/java/org/apache/iceberg/spark/source/EqualityDeleteRowReader.java b/spark/v3.5/spark/src/main/java/org/apache/iceberg/spark/source/EqualityDeleteRowReader.java index f5b98a5a43bd..634b297c6c88 100644 --- a/spark/v3.5/spark/src/main/java/org/apache/iceberg/spark/source/EqualityDeleteRowReader.java +++ b/spark/v3.5/spark/src/main/java/org/apache/iceberg/spark/source/EqualityDeleteRowReader.java @@ -35,7 +35,7 @@ public EqualityDeleteRowReader( Schema tableSchema, Schema expectedSchema, boolean caseSensitive) { - super(table, task, tableSchema, expectedSchema, caseSensitive); + super(table, task, tableSchema, expectedSchema, caseSensitive, -1); } @Override diff --git a/spark/v3.5/spark/src/main/java/org/apache/iceberg/spark/source/PositionDeletesRowReader.java b/spark/v3.5/spark/src/main/java/org/apache/iceberg/spark/source/PositionDeletesRowReader.java index 4b847474153c..9e611f2b1eb8 100644 --- a/spark/v3.5/spark/src/main/java/org/apache/iceberg/spark/source/PositionDeletesRowReader.java +++ b/spark/v3.5/spark/src/main/java/org/apache/iceberg/spark/source/PositionDeletesRowReader.java @@ -61,7 +61,7 @@ class PositionDeletesRowReader extends BaseRowReader Schema expectedSchema, boolean caseSensitive) { - super(table, taskGroup, tableSchema, expectedSchema, caseSensitive); + super(table, taskGroup, tableSchema, expectedSchema, caseSensitive, -1); int numSplits = taskGroup.tasks().size(); LOG.debug("Reading {} position delete file split(s) for table {}", numSplits, table.name()); diff --git a/spark/v3.5/spark/src/main/java/org/apache/iceberg/spark/source/RowDataReader.java b/spark/v3.5/spark/src/main/java/org/apache/iceberg/spark/source/RowDataReader.java index 9356f62f3593..6d8bf65b3053 100644 --- a/spark/v3.5/spark/src/main/java/org/apache/iceberg/spark/source/RowDataReader.java +++ b/spark/v3.5/spark/src/main/java/org/apache/iceberg/spark/source/RowDataReader.java @@ -45,13 +45,14 @@ class RowDataReader extends BaseRowReader implements PartitionRead private final long numSplits; - RowDataReader(SparkInputPartition partition) { + RowDataReader(SparkInputPartition partition, int pushedLimit) { this( partition.table(), partition.taskGroup(), SnapshotUtil.schemaFor(partition.table(), partition.branch()), partition.expectedSchema(), - partition.isCaseSensitive()); + partition.isCaseSensitive(), + pushedLimit); } RowDataReader( @@ -59,9 +60,10 @@ class RowDataReader extends BaseRowReader implements PartitionRead ScanTaskGroup taskGroup, Schema tableSchema, Schema expectedSchema, - boolean caseSensitive) { + boolean caseSensitive, + int pushedLimit) { - super(table, taskGroup, tableSchema, expectedSchema, caseSensitive); + super(table, taskGroup, tableSchema, expectedSchema, caseSensitive, pushedLimit); numSplits = taskGroup.tasks().size(); LOG.debug("Reading {} file split(s) for table {}", numSplits, table.name()); diff --git a/spark/v3.5/spark/src/main/java/org/apache/iceberg/spark/source/SparkBatch.java b/spark/v3.5/spark/src/main/java/org/apache/iceberg/spark/source/SparkBatch.java index fd6783f3e1f7..6b464a6ba0da 100644 --- a/spark/v3.5/spark/src/main/java/org/apache/iceberg/spark/source/SparkBatch.java +++ b/spark/v3.5/spark/src/main/java/org/apache/iceberg/spark/source/SparkBatch.java @@ -50,6 +50,7 @@ class SparkBatch implements Batch { private final boolean localityEnabled; private final boolean executorCacheLocalityEnabled; private final int scanHashCode; + private final int pushedLimit; SparkBatch( JavaSparkContext sparkContext, @@ -58,7 +59,8 @@ class SparkBatch implements Batch { Types.StructType groupingKeyType, List> taskGroups, Schema expectedSchema, - int scanHashCode) { + int scanHashCode, + int pushedLimit) { this.sparkContext = sparkContext; this.table = table; this.branch = readConf.branch(); @@ -70,6 +72,7 @@ class SparkBatch implements Batch { this.localityEnabled = readConf.localityEnabled(); this.executorCacheLocalityEnabled = readConf.executorCacheLocalityEnabled(); this.scanHashCode = scanHashCode; + this.pushedLimit = pushedLimit; } @Override @@ -115,14 +118,14 @@ private String[][] computePreferredLocations() { public PartitionReaderFactory createReaderFactory() { if (useParquetBatchReads()) { int batchSize = readConf.parquetBatchSize(); - return new SparkColumnarReaderFactory(batchSize); + return new SparkColumnarReaderFactory(batchSize, pushedLimit); } else if (useOrcBatchReads()) { int batchSize = readConf.orcBatchSize(); - return new SparkColumnarReaderFactory(batchSize); + return new SparkColumnarReaderFactory(batchSize, pushedLimit); } else { - return new SparkRowReaderFactory(); + return new SparkRowReaderFactory(pushedLimit); } } diff --git a/spark/v3.5/spark/src/main/java/org/apache/iceberg/spark/source/SparkBatchQueryScan.java b/spark/v3.5/spark/src/main/java/org/apache/iceberg/spark/source/SparkBatchQueryScan.java index 66cda5b82955..073b5596f24b 100644 --- a/spark/v3.5/spark/src/main/java/org/apache/iceberg/spark/source/SparkBatchQueryScan.java +++ b/spark/v3.5/spark/src/main/java/org/apache/iceberg/spark/source/SparkBatchQueryScan.java @@ -76,8 +76,9 @@ class SparkBatchQueryScan extends SparkPartitioningAwareScan SparkReadConf readConf, Schema expectedSchema, List filters, - Supplier scanReportSupplier) { - super(spark, table, scan, readConf, expectedSchema, filters, scanReportSupplier); + Supplier scanReportSupplier, + int pushedLimit) { + super(spark, table, scan, readConf, expectedSchema, filters, scanReportSupplier, pushedLimit); this.snapshotId = readConf.snapshotId(); this.startSnapshotId = readConf.startSnapshotId(); diff --git a/spark/v3.5/spark/src/main/java/org/apache/iceberg/spark/source/SparkChangelogScan.java b/spark/v3.5/spark/src/main/java/org/apache/iceberg/spark/source/SparkChangelogScan.java index 71b53d70262f..0089ef0f13c9 100644 --- a/spark/v3.5/spark/src/main/java/org/apache/iceberg/spark/source/SparkChangelogScan.java +++ b/spark/v3.5/spark/src/main/java/org/apache/iceberg/spark/source/SparkChangelogScan.java @@ -109,7 +109,8 @@ public Batch toBatch() { EMPTY_GROUPING_KEY_TYPE, taskGroups(), expectedSchema, - hashCode()); + hashCode(), + -1); } private List> taskGroups() { diff --git a/spark/v3.5/spark/src/main/java/org/apache/iceberg/spark/source/SparkColumnarReaderFactory.java b/spark/v3.5/spark/src/main/java/org/apache/iceberg/spark/source/SparkColumnarReaderFactory.java index 655e20a50e11..c5b38575902a 100644 --- a/spark/v3.5/spark/src/main/java/org/apache/iceberg/spark/source/SparkColumnarReaderFactory.java +++ b/spark/v3.5/spark/src/main/java/org/apache/iceberg/spark/source/SparkColumnarReaderFactory.java @@ -28,10 +28,12 @@ class SparkColumnarReaderFactory implements PartitionReaderFactory { private final int batchSize; + private final int pushedLimit; - SparkColumnarReaderFactory(int batchSize) { + SparkColumnarReaderFactory(int batchSize, int pushedLimit) { Preconditions.checkArgument(batchSize > 1, "Batch size must be > 1"); this.batchSize = batchSize; + this.pushedLimit = pushedLimit; } @Override @@ -49,7 +51,7 @@ public PartitionReader createColumnarReader(InputPartition inputP SparkInputPartition partition = (SparkInputPartition) inputPartition; if (partition.allTasksOfType(FileScanTask.class)) { - return new BatchDataReader(partition, batchSize); + return new BatchDataReader(partition, batchSize, pushedLimit); } else { throw new UnsupportedOperationException( diff --git a/spark/v3.5/spark/src/main/java/org/apache/iceberg/spark/source/SparkCopyOnWriteScan.java b/spark/v3.5/spark/src/main/java/org/apache/iceberg/spark/source/SparkCopyOnWriteScan.java index 5a3963bc614c..2ccc84e51c2b 100644 --- a/spark/v3.5/spark/src/main/java/org/apache/iceberg/spark/source/SparkCopyOnWriteScan.java +++ b/spark/v3.5/spark/src/main/java/org/apache/iceberg/spark/source/SparkCopyOnWriteScan.java @@ -73,7 +73,7 @@ class SparkCopyOnWriteScan extends SparkPartitioningAwareScan Schema expectedSchema, List filters, Supplier scanReportSupplier) { - super(spark, table, scan, readConf, expectedSchema, filters, scanReportSupplier); + super(spark, table, scan, readConf, expectedSchema, filters, scanReportSupplier, -1); this.snapshot = snapshot; diff --git a/spark/v3.5/spark/src/main/java/org/apache/iceberg/spark/source/SparkMicroBatchStream.java b/spark/v3.5/spark/src/main/java/org/apache/iceberg/spark/source/SparkMicroBatchStream.java index 49180e07c465..f1624dd64c89 100644 --- a/spark/v3.5/spark/src/main/java/org/apache/iceberg/spark/source/SparkMicroBatchStream.java +++ b/spark/v3.5/spark/src/main/java/org/apache/iceberg/spark/source/SparkMicroBatchStream.java @@ -177,7 +177,7 @@ private String[][] computePreferredLocations(List taskGroups) @Override public PartitionReaderFactory createReaderFactory() { - return new SparkRowReaderFactory(); + return new SparkRowReaderFactory(-1); } @Override diff --git a/spark/v3.5/spark/src/main/java/org/apache/iceberg/spark/source/SparkPartitioningAwareScan.java b/spark/v3.5/spark/src/main/java/org/apache/iceberg/spark/source/SparkPartitioningAwareScan.java index 141dd4dcba0e..6b7a8d1693de 100644 --- a/spark/v3.5/spark/src/main/java/org/apache/iceberg/spark/source/SparkPartitioningAwareScan.java +++ b/spark/v3.5/spark/src/main/java/org/apache/iceberg/spark/source/SparkPartitioningAwareScan.java @@ -77,8 +77,9 @@ abstract class SparkPartitioningAwareScan extends S SparkReadConf readConf, Schema expectedSchema, List filters, - Supplier scanReportSupplier) { - super(spark, table, readConf, expectedSchema, filters, scanReportSupplier); + Supplier scanReportSupplier, + int pushedLimit) { + super(spark, table, readConf, expectedSchema, filters, scanReportSupplier, pushedLimit); this.scan = scan; this.preserveDataGrouping = readConf.preserveDataGrouping(); diff --git a/spark/v3.5/spark/src/main/java/org/apache/iceberg/spark/source/SparkRowReaderFactory.java b/spark/v3.5/spark/src/main/java/org/apache/iceberg/spark/source/SparkRowReaderFactory.java index 23699aeb167c..be005c59ad8d 100644 --- a/spark/v3.5/spark/src/main/java/org/apache/iceberg/spark/source/SparkRowReaderFactory.java +++ b/spark/v3.5/spark/src/main/java/org/apache/iceberg/spark/source/SparkRowReaderFactory.java @@ -29,8 +29,11 @@ import org.apache.spark.sql.vectorized.ColumnarBatch; class SparkRowReaderFactory implements PartitionReaderFactory { + private final int pushedLimit; - SparkRowReaderFactory() {} + SparkRowReaderFactory(int pushedLimit) { + this.pushedLimit = pushedLimit; + } @Override public PartitionReader createReader(InputPartition inputPartition) { @@ -42,7 +45,7 @@ public PartitionReader createReader(InputPartition inputPartition) SparkInputPartition partition = (SparkInputPartition) inputPartition; if (partition.allTasksOfType(FileScanTask.class)) { - return new RowDataReader(partition); + return new RowDataReader(partition, pushedLimit); } else if (partition.allTasksOfType(ChangelogScanTask.class)) { return new ChangelogRowReader(partition); diff --git a/spark/v3.5/spark/src/main/java/org/apache/iceberg/spark/source/SparkScan.java b/spark/v3.5/spark/src/main/java/org/apache/iceberg/spark/source/SparkScan.java index 8b88cf49c692..b60503c3b259 100644 --- a/spark/v3.5/spark/src/main/java/org/apache/iceberg/spark/source/SparkScan.java +++ b/spark/v3.5/spark/src/main/java/org/apache/iceberg/spark/source/SparkScan.java @@ -107,6 +107,7 @@ abstract class SparkScan implements Scan, SupportsReportStatistics { private final List filterExpressions; private final String branch; private final Supplier scanReportSupplier; + private final int pushdownLimit; // lazy variables private StructType readSchema; @@ -117,7 +118,8 @@ abstract class SparkScan implements Scan, SupportsReportStatistics { SparkReadConf readConf, Schema expectedSchema, List filters, - Supplier scanReportSupplier) { + Supplier scanReportSupplier, + int pushdownLimit) { Schema snapshotSchema = SnapshotUtil.schemaFor(table, readConf.branch()); SparkSchemaUtil.validateMetadataColumnReferences(snapshotSchema, expectedSchema); @@ -130,6 +132,7 @@ abstract class SparkScan implements Scan, SupportsReportStatistics { this.filterExpressions = filters != null ? filters : Collections.emptyList(); this.branch = readConf.branch(); this.scanReportSupplier = scanReportSupplier; + this.pushdownLimit = pushdownLimit; } protected Table table() { @@ -152,6 +155,10 @@ protected List filterExpressions() { return filterExpressions; } + protected int pushdownLimit() { + return pushdownLimit; + } + protected Types.StructType groupingKeyType() { return Types.StructType.of(); } @@ -161,7 +168,14 @@ protected Types.StructType groupingKeyType() { @Override public Batch toBatch() { return new SparkBatch( - sparkContext, table, readConf, groupingKeyType(), taskGroups(), expectedSchema, hashCode()); + sparkContext, + table, + readConf, + groupingKeyType(), + taskGroups(), + expectedSchema, + hashCode(), + pushdownLimit); } @Override diff --git a/spark/v3.5/spark/src/main/java/org/apache/iceberg/spark/source/SparkScanBuilder.java b/spark/v3.5/spark/src/main/java/org/apache/iceberg/spark/source/SparkScanBuilder.java index 9dc214a755d3..a76f1fc91739 100644 --- a/spark/v3.5/spark/src/main/java/org/apache/iceberg/spark/source/SparkScanBuilder.java +++ b/spark/v3.5/spark/src/main/java/org/apache/iceberg/spark/source/SparkScanBuilder.java @@ -71,6 +71,7 @@ import org.apache.spark.sql.connector.read.ScanBuilder; import org.apache.spark.sql.connector.read.Statistics; import org.apache.spark.sql.connector.read.SupportsPushDownAggregates; +import org.apache.spark.sql.connector.read.SupportsPushDownLimit; import org.apache.spark.sql.connector.read.SupportsPushDownRequiredColumns; import org.apache.spark.sql.connector.read.SupportsPushDownV2Filters; import org.apache.spark.sql.connector.read.SupportsReportStatistics; @@ -85,7 +86,8 @@ public class SparkScanBuilder SupportsPushDownAggregates, SupportsPushDownV2Filters, SupportsPushDownRequiredColumns, - SupportsReportStatistics { + SupportsReportStatistics, + SupportsPushDownLimit { private static final Logger LOG = LoggerFactory.getLogger(SparkScanBuilder.class); private static final Predicate[] NO_PREDICATES = new Predicate[0]; @@ -103,6 +105,7 @@ public class SparkScanBuilder private boolean caseSensitive; private List filterExpressions = null; private Predicate[] pushedPredicates = NO_PREDICATES; + private int pushedLimit = -1; SparkScanBuilder( SparkSession spark, @@ -324,6 +327,16 @@ private boolean metricsModeSupportsAggregatePushDown(List> return true; } + @Override + public boolean pushLimit(int limit) { + if (readConf.limitPushDownEnabled()) { + pushedLimit = limit; + return true; + } + + return true; + } + @Override public void pruneColumns(StructType requestedSchema) { StructType requestedProjection = @@ -407,14 +420,35 @@ public Scan build() { private Scan buildBatchScan() { Schema expectedSchema = schemaWithMetadataColumns(); + org.apache.iceberg.Scan scan = + buildIcebergBatchScan(false /* not include Column Stats */, expectedSchema); + if (pushedLimit > 0 && hasDeletes(scan)) { + LOG.info("Skipping limit pushdown: detected row level deletes"); + pushedLimit = -1; + } return new SparkBatchQueryScan( spark, table, - buildIcebergBatchScan(false /* not include Column Stats */, expectedSchema), + scan, readConf, expectedSchema, filterExpressions, - metricsReporter::scanReport); + metricsReporter::scanReport, + pushedLimit); + } + + private boolean hasDeletes(org.apache.iceberg.Scan scan) { + try (CloseableIterable fileScanTasks = scan.planFiles()) { + for (FileScanTask task : fileScanTasks) { + if (!task.deletes().isEmpty()) { + return true; + } + } + } catch (IOException e) { + throw new RuntimeException("Failed to plan files for scan", e); + } + + return false; } private org.apache.iceberg.Scan buildIcebergBatchScan(boolean withStats, Schema expectedSchema) { @@ -648,7 +682,8 @@ public Scan buildMergeOnReadScan() { readConf, schemaWithMetadataColumns(), filterExpressions, - metricsReporter::scanReport); + metricsReporter::scanReport, + pushedLimit); } // remember the current snapshot ID for commit validation @@ -678,7 +713,8 @@ public Scan buildMergeOnReadScan() { adjustedReadConf, expectedSchema, filterExpressions, - metricsReporter::scanReport); + metricsReporter::scanReport, + pushedLimit); } public Scan buildCopyOnWriteScan() { diff --git a/spark/v3.5/spark/src/main/java/org/apache/iceberg/spark/source/SparkStagedScan.java b/spark/v3.5/spark/src/main/java/org/apache/iceberg/spark/source/SparkStagedScan.java index fd299ade7fdc..eb6461f21494 100644 --- a/spark/v3.5/spark/src/main/java/org/apache/iceberg/spark/source/SparkStagedScan.java +++ b/spark/v3.5/spark/src/main/java/org/apache/iceberg/spark/source/SparkStagedScan.java @@ -41,7 +41,7 @@ class SparkStagedScan extends SparkScan { private List> taskGroups = null; // lazy cache of tasks SparkStagedScan(SparkSession spark, Table table, Schema expectedSchema, SparkReadConf readConf) { - super(spark, table, readConf, expectedSchema, ImmutableList.of(), null); + super(spark, table, readConf, expectedSchema, ImmutableList.of(), null, -1); this.taskSetId = readConf.scanTaskSetId(); this.splitSize = readConf.splitSize(); this.splitLookback = readConf.splitLookback(); diff --git a/spark/v3.5/spark/src/test/java/org/apache/iceberg/spark/source/TestLimitPushDown.java b/spark/v3.5/spark/src/test/java/org/apache/iceberg/spark/source/TestLimitPushDown.java new file mode 100644 index 000000000000..feaf5ec0d3e7 --- /dev/null +++ b/spark/v3.5/spark/src/test/java/org/apache/iceberg/spark/source/TestLimitPushDown.java @@ -0,0 +1,271 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.iceberg.spark.source; + +import static org.apache.hadoop.hive.conf.HiveConf.ConfVars.METASTOREURIS; +import static org.apache.iceberg.TableProperties.DEFAULT_FILE_FORMAT; +import static org.apache.iceberg.TableProperties.DEFAULT_FILE_FORMAT_DEFAULT; +import static org.assertj.core.api.Assertions.assertThat; + +import java.io.Closeable; +import java.io.File; +import java.io.IOException; +import java.nio.file.Path; +import java.util.Arrays; +import java.util.Collection; +import java.util.List; +import java.util.Map; +import java.util.Optional; +import java.util.stream.Stream; +import org.apache.hadoop.hive.conf.HiveConf; +import org.apache.iceberg.BaseTable; +import org.apache.iceberg.CatalogUtil; +import org.apache.iceberg.DataFile; +import org.apache.iceberg.DataFiles; +import org.apache.iceberg.FileFormat; +import org.apache.iceberg.Files; +import org.apache.iceberg.Parameter; +import org.apache.iceberg.ParameterizedTestExtension; +import org.apache.iceberg.Parameters; +import org.apache.iceberg.Schema; +import org.apache.iceberg.StructLike; +import org.apache.iceberg.Table; +import org.apache.iceberg.TableMetadata; +import org.apache.iceberg.TableOperations; +import org.apache.iceberg.TableProperties; +import org.apache.iceberg.TestHelpers.Row; +import org.apache.iceberg.catalog.Namespace; +import org.apache.iceberg.catalog.TableIdentifier; +import org.apache.iceberg.data.GenericAppenderFactory; +import org.apache.iceberg.data.GenericRecord; +import org.apache.iceberg.data.Record; +import org.apache.iceberg.exceptions.AlreadyExistsException; +import org.apache.iceberg.hive.HiveCatalog; +import org.apache.iceberg.hive.TestHiveMetastore; +import org.apache.iceberg.io.FileAppender; +import org.apache.iceberg.io.OutputFile; +import org.apache.iceberg.relocated.com.google.common.collect.ImmutableMap; +import org.apache.iceberg.relocated.com.google.common.collect.Lists; +import org.apache.iceberg.spark.SparkValueConverter; +import org.apache.iceberg.types.Types; +import org.apache.spark.sql.Dataset; +import org.apache.spark.sql.SparkSession; +import org.apache.spark.sql.catalyst.plans.logical.LogicalPlan; +import org.apache.spark.sql.execution.datasources.v2.DataSourceV2ScanRelation; +import org.junit.jupiter.api.AfterAll; +import org.junit.jupiter.api.AfterEach; +import org.junit.jupiter.api.BeforeAll; +import org.junit.jupiter.api.BeforeEach; +import org.junit.jupiter.api.TestTemplate; +import org.junit.jupiter.api.extension.ExtendWith; +import org.junit.jupiter.api.io.TempDir; +import scala.collection.JavaConverters; + +@ExtendWith(ParameterizedTestExtension.class) +public class TestLimitPushDown { + + protected String tableName = null; + protected Table table = null; + protected List records = null; + protected DataFile dataFile = null; + + private static TestHiveMetastore metastore = null; + protected static SparkSession spark = null; + protected static HiveCatalog catalog = null; + + // Schema passed to create tables + public static final Schema SCHEMA = + new Schema( + Types.NestedField.required(1, "id", Types.IntegerType.get()), + Types.NestedField.required(2, "data", Types.StringType.get())); + + @TempDir private Path temp; + + @Parameter private boolean vectorized; + + @Parameters(name = "vectorized = {0}") + public static Collection parameters() { + return Arrays.asList(false, true); + } + + @BeforeEach + public void writeTestDataFile() throws IOException { + this.tableName = "test"; + createTable(tableName, SCHEMA); + this.records = Lists.newArrayList(); + + GenericRecord record = GenericRecord.create(table.schema()); + + records.add(record.copy("id", 29, "data", "a")); + records.add(record.copy("id", 43, "data", "b")); + records.add(record.copy("id", 61, "data", "c")); + records.add(record.copy("id", 89, "data", "d")); + records.add(record.copy("id", 100, "data", "e")); + records.add(record.copy("id", 121, "data", "f")); + records.add(record.copy("id", 122, "data", "g")); + + this.dataFile = + writeDataFile( + Files.localOutput(File.createTempFile("junit", null, temp.toFile())), + Row.of(0), + records); + + table.newAppend().appendFile(dataFile).commit(); + } + + @AfterEach + public void cleanup() throws IOException { + dropTable("test"); + } + + @BeforeAll + public static void startMetastoreAndSpark() { + metastore = new TestHiveMetastore(); + metastore.start(); + HiveConf hiveConf = metastore.hiveConf(); + + spark = + SparkSession.builder() + .master("local[2]") + .config("spark.hadoop." + METASTOREURIS.varname, hiveConf.get(METASTOREURIS.varname)) + .enableHiveSupport() + .getOrCreate(); + + catalog = + (HiveCatalog) + CatalogUtil.loadCatalog( + HiveCatalog.class.getName(), "hive", ImmutableMap.of(), hiveConf); + + try { + catalog.createNamespace(Namespace.of("default")); + } catch (AlreadyExistsException ignored) { + // the default namespace already exists. ignore the create error + } + } + + @AfterAll + public static void stopMetastoreAndSpark() throws Exception { + catalog = null; + metastore.stop(); + metastore = null; + spark.stop(); + spark = null; + } + + protected void createTable(String name, Schema schema) { + table = catalog.createTable(TableIdentifier.of("default", name), schema); + TableOperations ops = ((BaseTable) table).operations(); + TableMetadata meta = ops.current(); + ops.commit(meta, meta.upgradeToFormatVersion(2)); + + if (vectorized) { + table + .updateProperties() + .set(TableProperties.PARQUET_VECTORIZATION_ENABLED, "true") + .set(TableProperties.PARQUET_BATCH_SIZE, "4") + .commit(); + } else { + table.updateProperties().set(TableProperties.PARQUET_VECTORIZATION_ENABLED, "false").commit(); + } + } + + protected void dropTable(String name) { + catalog.dropTable(TableIdentifier.of("default", name)); + } + + private DataFile writeDataFile(OutputFile out, StructLike partition, List rows) + throws IOException { + FileFormat format = defaultFormat(table.properties()); + GenericAppenderFactory factory = new GenericAppenderFactory(table.schema(), table.spec()); + + FileAppender writer = factory.newAppender(out, format); + try (Closeable toClose = writer) { + writer.addAll(rows); + } + + return DataFiles.builder(table.spec()) + .withFormat(format) + .withPath(out.location()) + .withPartition(partition) + .withFileSizeInBytes(writer.length()) + .withSplitOffsets(writer.splitOffsets()) + .withMetrics(writer.metrics()) + .build(); + } + + private FileFormat defaultFormat(Map properties) { + String formatString = properties.getOrDefault(DEFAULT_FILE_FORMAT, DEFAULT_FILE_FORMAT_DEFAULT); + return FileFormat.fromString(formatString); + } + + @TestTemplate + public void testReadWithLimit() { + Dataset df = + spark + .read() + .format("iceberg") + .load(TableIdentifier.of("default", tableName).toString()) + .selectExpr("*"); + + testLimit(df, 2, new Object[][] {{29, "a"}, {43, "b"}}); + // testLimit(df, 4, new Object[][] {{29, "a"}, {43, "b"}, {61, "c"}, {89, "d"}}); + // testLimit(df, 6, new Object[][] {{29, "a"}, {43, "b"}, {61, "c"}, {89, "d"}, {100, "e"}, + // {121, "f"}}); + } + + private void testLimit( + Dataset df, int limit, Object[][] expectedValues) { + Dataset limitedDf = df.limit(limit); + LogicalPlan optimizedPlan = limitedDf.queryExecution().optimizedPlan(); + int pushedLimit = collectPushDownLimit(optimizedPlan); + assertThat(pushedLimit).as("Pushed down limit should be " + limit).isEqualTo(limit); + + for (int i = 0; i < expectedValues.length; i++) { + Record record = SparkValueConverter.convert(table.schema(), limitedDf.collectAsList().get(i)); + assertThat(record.get(0)) + .as("Table should contain expected rows") + .isEqualTo(expectedValues[i][0]); + assertThat(record.get(1)) + .as("Table should contain expected rows") + .isEqualTo(expectedValues[i][1]); + } + } + + private int collectPushDownLimit(LogicalPlan logicalPlan) { + Optional limit = + JavaConverters.asJavaCollection(logicalPlan.collectLeaves()).stream() + .flatMap( + plan -> { + if (!(plan instanceof DataSourceV2ScanRelation)) { + return Stream.empty(); + } + + DataSourceV2ScanRelation scanRelation = (DataSourceV2ScanRelation) plan; + if (!(scanRelation.scan() instanceof SparkBatchQueryScan)) { + return Stream.empty(); + } + + SparkBatchQueryScan batchQueryScan = (SparkBatchQueryScan) scanRelation.scan(); + return Stream.of(batchQueryScan.pushdownLimit()); + }) + .findFirst(); + + return limit.orElse(0); + } +} From 39f663e77b4dbb284724abe28313eb2bf389b869 Mon Sep 17 00:00:00 2001 From: huaxingao Date: Wed, 14 Aug 2024 15:12:22 -0700 Subject: [PATCH 2/8] fix test --- .../org/apache/iceberg/spark/source/TestLimitPushDown.java | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) diff --git a/spark/v3.5/spark/src/test/java/org/apache/iceberg/spark/source/TestLimitPushDown.java b/spark/v3.5/spark/src/test/java/org/apache/iceberg/spark/source/TestLimitPushDown.java index feaf5ec0d3e7..3a6f9a7208ae 100644 --- a/spark/v3.5/spark/src/test/java/org/apache/iceberg/spark/source/TestLimitPushDown.java +++ b/spark/v3.5/spark/src/test/java/org/apache/iceberg/spark/source/TestLimitPushDown.java @@ -224,9 +224,9 @@ public void testReadWithLimit() { .selectExpr("*"); testLimit(df, 2, new Object[][] {{29, "a"}, {43, "b"}}); - // testLimit(df, 4, new Object[][] {{29, "a"}, {43, "b"}, {61, "c"}, {89, "d"}}); - // testLimit(df, 6, new Object[][] {{29, "a"}, {43, "b"}, {61, "c"}, {89, "d"}, {100, "e"}, - // {121, "f"}}); + testLimit(df, 4, new Object[][] {{29, "a"}, {43, "b"}, {61, "c"}, {89, "d"}}); + testLimit( + df, 6, new Object[][] {{29, "a"}, {43, "b"}, {61, "c"}, {89, "d"}, {100, "e"}, {121, "f"}}); } private void testLimit( From f179a793232c2b6395d3f52373fd1a19ce07e1ff Mon Sep 17 00:00:00 2001 From: huaxingao Date: Fri, 16 Aug 2024 08:06:55 -0700 Subject: [PATCH 3/8] address comments --- .palantir/revapi.yml | 11 +++++------ .../parquet/VectorizedColumnIterator.java | 8 +++++++- .../org/apache/iceberg/parquet/ParquetReader.java | 9 +++------ .../iceberg/parquet/VectorizedParquetReader.java | 9 +++------ .../iceberg/spark/source/BaseBatchReader.java | 2 +- .../apache/iceberg/spark/source/BaseRowReader.java | 2 +- .../org/apache/iceberg/spark/source/SparkBatch.java | 2 +- .../spark/source/SparkColumnarReaderFactory.java | 2 +- .../iceberg/spark/source/SparkRowReaderFactory.java | 2 +- .../iceberg/spark/source/SparkScanBuilder.java | 6 ++++-- .../iceberg/spark/source/TestLimitPushDown.java | 13 ++++++++++++- 11 files changed, 39 insertions(+), 27 deletions(-) diff --git a/.palantir/revapi.yml b/.palantir/revapi.yml index bb64b1923d56..ca873f1cb520 100644 --- a/.palantir/revapi.yml +++ b/.palantir/revapi.yml @@ -1025,11 +1025,14 @@ acceptedBreaks: new: "class org.apache.iceberg.types.Types.NestedField" justification: "new Constructor added" org.apache.iceberg:iceberg-core: + - code: "java.method.visibilityReduced" + old: "method void org.apache.iceberg.encryption.PlaintextEncryptionManager::()" + new: "method void org.apache.iceberg.encryption.PlaintextEncryptionManager::()" + justification: "Deprecations for 1.6.0 release" - code: "java.element.noLongerDeprecated" old: "method void org.apache.iceberg.encryption.PlaintextEncryptionManager::()" new: "method void org.apache.iceberg.encryption.PlaintextEncryptionManager::()" - justification: "Constructor became private as part of deprecations cleanup for\ - \ 1.6.0 release" + justification: "Constructor became private as part of deprecations cleanup for 1.6.0 release" - code: "java.element.noLongerDeprecated" old: "method void org.apache.iceberg.rest.auth.OAuth2Util.AuthSession::(java.util.Map, java.lang.String, java.lang.String, java.lang.String,\ @@ -1053,10 +1056,6 @@ acceptedBreaks: - code: "java.method.removed" old: "method org.apache.iceberg.DataFiles.Builder org.apache.iceberg.DataFiles.Builder::withEqualityFieldIds(java.util.List)" justification: "Deprecations for 1.6.0 release" - - code: "java.method.visibilityReduced" - old: "method void org.apache.iceberg.encryption.PlaintextEncryptionManager::()" - new: "method void org.apache.iceberg.encryption.PlaintextEncryptionManager::()" - justification: "Deprecations for 1.6.0 release" "1.6.0": org.apache.iceberg:iceberg-common: - code: "java.method.removed" diff --git a/arrow/src/main/java/org/apache/iceberg/arrow/vectorized/parquet/VectorizedColumnIterator.java b/arrow/src/main/java/org/apache/iceberg/arrow/vectorized/parquet/VectorizedColumnIterator.java index e4b6569a983b..a98a9848b4e9 100644 --- a/arrow/src/main/java/org/apache/iceberg/arrow/vectorized/parquet/VectorizedColumnIterator.java +++ b/arrow/src/main/java/org/apache/iceberg/arrow/vectorized/parquet/VectorizedColumnIterator.java @@ -74,7 +74,13 @@ public void nextBatch( int rowsReadSoFar = 0; while (rowsReadSoFar < batchSize && hasNext() && rowsReadSoFar < numValsToRead) { advance(); - int expectedBatchSize = Math.min(batchSize - rowsReadSoFar, numValsToRead - rowsReadSoFar); + int expectedBatchSize; + if (numValsToRead < 0) { + expectedBatchSize = batchSize - rowsReadSoFar; + } else { + expectedBatchSize = Math.min(batchSize - rowsReadSoFar, numValsToRead - rowsReadSoFar); + } + int rowsInThisBatch = nextBatchOf(fieldVector, expectedBatchSize, rowsReadSoFar, typeWidth, holder); rowsReadSoFar += rowsInThisBatch; diff --git a/parquet/src/main/java/org/apache/iceberg/parquet/ParquetReader.java b/parquet/src/main/java/org/apache/iceberg/parquet/ParquetReader.java index dc9d6414b2a2..2842337120df 100644 --- a/parquet/src/main/java/org/apache/iceberg/parquet/ParquetReader.java +++ b/parquet/src/main/java/org/apache/iceberg/parquet/ParquetReader.java @@ -43,7 +43,7 @@ public class ParquetReader extends CloseableGroup implements CloseableIterabl private final boolean reuseContainers; private final boolean caseSensitive; private final NameMapping nameMapping; - private final int pushedLimit; + private int pushedLimit = -1; public ParquetReader( InputFile input, @@ -122,11 +122,8 @@ private static class FileIterator implements CloseableIterator { @Override public boolean hasNext() { - long numToRead = totalValues; - if (pushedLimit > 0 && pushedLimit < numToRead) { - numToRead = pushedLimit; - } - + long numToRead = + (pushedLimit == -1 || pushedLimit >= totalValues) ? totalValues : pushedLimit; return valuesRead < numToRead; } diff --git a/parquet/src/main/java/org/apache/iceberg/parquet/VectorizedParquetReader.java b/parquet/src/main/java/org/apache/iceberg/parquet/VectorizedParquetReader.java index 8ef1f2c53920..fb011191835f 100644 --- a/parquet/src/main/java/org/apache/iceberg/parquet/VectorizedParquetReader.java +++ b/parquet/src/main/java/org/apache/iceberg/parquet/VectorizedParquetReader.java @@ -49,7 +49,7 @@ public class VectorizedParquetReader extends CloseableGroup implements Closea private final boolean caseSensitive; private final int batchSize; private final NameMapping nameMapping; - private final int pushedLimit; + private int pushedLimit = -1; public VectorizedParquetReader( InputFile input, @@ -134,11 +134,8 @@ private static class FileIterator implements CloseableIterator { @Override public boolean hasNext() { - long numToRead = totalValues; - if (pushedLimit > 0 && pushedLimit < numToRead) { - numToRead = pushedLimit; - } - + long numToRead = + (pushedLimit == -1 || pushedLimit >= totalValues) ? totalValues : pushedLimit; return valuesRead < numToRead; } diff --git a/spark/v3.5/spark/src/main/java/org/apache/iceberg/spark/source/BaseBatchReader.java b/spark/v3.5/spark/src/main/java/org/apache/iceberg/spark/source/BaseBatchReader.java index b0d6e1499ca9..0fb6b31083e4 100644 --- a/spark/v3.5/spark/src/main/java/org/apache/iceberg/spark/source/BaseBatchReader.java +++ b/spark/v3.5/spark/src/main/java/org/apache/iceberg/spark/source/BaseBatchReader.java @@ -39,7 +39,7 @@ abstract class BaseBatchReader extends BaseReader { private final int batchSize; - private final int pushedLimit; + private int pushedLimit = -1; BaseBatchReader( Table table, diff --git a/spark/v3.5/spark/src/main/java/org/apache/iceberg/spark/source/BaseRowReader.java b/spark/v3.5/spark/src/main/java/org/apache/iceberg/spark/source/BaseRowReader.java index a293a7bdbde8..fe8cdf7dc2cd 100644 --- a/spark/v3.5/spark/src/main/java/org/apache/iceberg/spark/source/BaseRowReader.java +++ b/spark/v3.5/spark/src/main/java/org/apache/iceberg/spark/source/BaseRowReader.java @@ -39,7 +39,7 @@ import org.apache.spark.sql.catalyst.InternalRow; abstract class BaseRowReader extends BaseReader { - private final int pushedLimit; + private int pushedLimit = -1; BaseRowReader( Table table, diff --git a/spark/v3.5/spark/src/main/java/org/apache/iceberg/spark/source/SparkBatch.java b/spark/v3.5/spark/src/main/java/org/apache/iceberg/spark/source/SparkBatch.java index 6b464a6ba0da..5935d225fbac 100644 --- a/spark/v3.5/spark/src/main/java/org/apache/iceberg/spark/source/SparkBatch.java +++ b/spark/v3.5/spark/src/main/java/org/apache/iceberg/spark/source/SparkBatch.java @@ -50,7 +50,7 @@ class SparkBatch implements Batch { private final boolean localityEnabled; private final boolean executorCacheLocalityEnabled; private final int scanHashCode; - private final int pushedLimit; + private int pushedLimit = -1; SparkBatch( JavaSparkContext sparkContext, diff --git a/spark/v3.5/spark/src/main/java/org/apache/iceberg/spark/source/SparkColumnarReaderFactory.java b/spark/v3.5/spark/src/main/java/org/apache/iceberg/spark/source/SparkColumnarReaderFactory.java index c5b38575902a..7da15f30bed5 100644 --- a/spark/v3.5/spark/src/main/java/org/apache/iceberg/spark/source/SparkColumnarReaderFactory.java +++ b/spark/v3.5/spark/src/main/java/org/apache/iceberg/spark/source/SparkColumnarReaderFactory.java @@ -28,7 +28,7 @@ class SparkColumnarReaderFactory implements PartitionReaderFactory { private final int batchSize; - private final int pushedLimit; + private int pushedLimit = -1; SparkColumnarReaderFactory(int batchSize, int pushedLimit) { Preconditions.checkArgument(batchSize > 1, "Batch size must be > 1"); diff --git a/spark/v3.5/spark/src/main/java/org/apache/iceberg/spark/source/SparkRowReaderFactory.java b/spark/v3.5/spark/src/main/java/org/apache/iceberg/spark/source/SparkRowReaderFactory.java index be005c59ad8d..a913679d988c 100644 --- a/spark/v3.5/spark/src/main/java/org/apache/iceberg/spark/source/SparkRowReaderFactory.java +++ b/spark/v3.5/spark/src/main/java/org/apache/iceberg/spark/source/SparkRowReaderFactory.java @@ -29,7 +29,7 @@ import org.apache.spark.sql.vectorized.ColumnarBatch; class SparkRowReaderFactory implements PartitionReaderFactory { - private final int pushedLimit; + private int pushedLimit = -1; SparkRowReaderFactory(int pushedLimit) { this.pushedLimit = pushedLimit; diff --git a/spark/v3.5/spark/src/main/java/org/apache/iceberg/spark/source/SparkScanBuilder.java b/spark/v3.5/spark/src/main/java/org/apache/iceberg/spark/source/SparkScanBuilder.java index a76f1fc91739..ba746139cbeb 100644 --- a/spark/v3.5/spark/src/main/java/org/apache/iceberg/spark/source/SparkScanBuilder.java +++ b/spark/v3.5/spark/src/main/java/org/apache/iceberg/spark/source/SparkScanBuilder.java @@ -329,12 +329,14 @@ private boolean metricsModeSupportsAggregatePushDown(List> @Override public boolean pushLimit(int limit) { - if (readConf.limitPushDownEnabled()) { + // If the limit is 0, Spark converts it to an empty table scan, + // and this section will not be reached. + if (readConf.limitPushDownEnabled() && limit > 0) { pushedLimit = limit; return true; } - return true; + return false; } @Override diff --git a/spark/v3.5/spark/src/test/java/org/apache/iceberg/spark/source/TestLimitPushDown.java b/spark/v3.5/spark/src/test/java/org/apache/iceberg/spark/source/TestLimitPushDown.java index 3a6f9a7208ae..a3f19eb3f4d3 100644 --- a/spark/v3.5/spark/src/test/java/org/apache/iceberg/spark/source/TestLimitPushDown.java +++ b/spark/v3.5/spark/src/test/java/org/apache/iceberg/spark/source/TestLimitPushDown.java @@ -223,6 +223,7 @@ public void testReadWithLimit() { .load(TableIdentifier.of("default", tableName).toString()) .selectExpr("*"); + testLimit(df, 0, new Object[][] {}); testLimit(df, 2, new Object[][] {{29, "a"}, {43, "b"}}); testLimit(df, 4, new Object[][] {{29, "a"}, {43, "b"}, {61, "c"}, {89, "d"}}); testLimit( @@ -236,8 +237,18 @@ private void testLimit( int pushedLimit = collectPushDownLimit(optimizedPlan); assertThat(pushedLimit).as("Pushed down limit should be " + limit).isEqualTo(limit); + if (limit == 0) { + assertThat(limitedDf.isEmpty()).as("Dataset should be empty when limit is 0").isTrue(); + return; + } + + List collectedRows = limitedDf.collectAsList(); + assertThat(collectedRows.size()) + .as("Number of collected rows should match expected values length") + .isEqualTo(expectedValues.length); + for (int i = 0; i < expectedValues.length; i++) { - Record record = SparkValueConverter.convert(table.schema(), limitedDf.collectAsList().get(i)); + Record record = SparkValueConverter.convert(table.schema(), collectedRows.get(i)); assertThat(record.get(0)) .as("Table should contain expected rows") .isEqualTo(expectedValues[i][0]); From 72d48ba45a206759645523529e461bd61eb9cb22 Mon Sep 17 00:00:00 2001 From: huaxingao Date: Fri, 16 Aug 2024 09:33:52 -0700 Subject: [PATCH 4/8] address comments --- .../org/apache/iceberg/parquet/Parquet.java | 4 +++ .../apache/iceberg/parquet/ParquetReader.java | 8 +++--- .../parquet/VectorizedParquetReader.java | 25 +++++++++++-------- 3 files changed, 24 insertions(+), 13 deletions(-) diff --git a/parquet/src/main/java/org/apache/iceberg/parquet/Parquet.java b/parquet/src/main/java/org/apache/iceberg/parquet/Parquet.java index 0e69fab83699..7ab11ad2ccbd 100644 --- a/parquet/src/main/java/org/apache/iceberg/parquet/Parquet.java +++ b/parquet/src/main/java/org/apache/iceberg/parquet/Parquet.java @@ -1153,6 +1153,10 @@ public ReadBuilder withAADPrefix(ByteBuffer aadPrefix) { } public ReadBuilder pushedlimit(int limit) { + if (limit <= 0) { + throw new IllegalArgumentException("Pushed limit must be > 0"); + } + this.pushedLimit = limit; return this; } diff --git a/parquet/src/main/java/org/apache/iceberg/parquet/ParquetReader.java b/parquet/src/main/java/org/apache/iceberg/parquet/ParquetReader.java index 2842337120df..2a473a6f31d9 100644 --- a/parquet/src/main/java/org/apache/iceberg/parquet/ParquetReader.java +++ b/parquet/src/main/java/org/apache/iceberg/parquet/ParquetReader.java @@ -122,9 +122,11 @@ private static class FileIterator implements CloseableIterator { @Override public boolean hasNext() { - long numToRead = - (pushedLimit == -1 || pushedLimit >= totalValues) ? totalValues : pushedLimit; - return valuesRead < numToRead; + if (pushedLimit > 0) { + return valuesRead < Math.min(totalValues, pushedLimit); + } else { + return valuesRead < totalValues; + } } @Override diff --git a/parquet/src/main/java/org/apache/iceberg/parquet/VectorizedParquetReader.java b/parquet/src/main/java/org/apache/iceberg/parquet/VectorizedParquetReader.java index fb011191835f..1fcbb6892fb2 100644 --- a/parquet/src/main/java/org/apache/iceberg/parquet/VectorizedParquetReader.java +++ b/parquet/src/main/java/org/apache/iceberg/parquet/VectorizedParquetReader.java @@ -134,9 +134,11 @@ private static class FileIterator implements CloseableIterator { @Override public boolean hasNext() { - long numToRead = - (pushedLimit == -1 || pushedLimit >= totalValues) ? totalValues : pushedLimit; - return valuesRead < numToRead; + if (pushedLimit > 0) { + return valuesRead < Math.min(totalValues, pushedLimit); + } else { + return valuesRead < totalValues; + } } @Override @@ -149,13 +151,16 @@ public T next() { } long remainingValues = nextRowGroupStart - valuesRead; - int remainingLimit = (int) (pushedLimit - valuesRead); - // batchSize is an integer, so casting to integer is safe - int numValuesToRead = - (int) - Math.min( - remainingValues, - (remainingLimit > 0 ? Math.min(batchSize, remainingLimit) : batchSize)); + long remainingLimit = pushedLimit - valuesRead; + + int numValuesToRead; + if (remainingLimit > 0) { + // batchSize is an integer, so casting to integer is safe + numValuesToRead = (int) Math.min(remainingValues, Math.min(batchSize, remainingLimit)); + } else { + // batchSize is an integer, so casting to integer is safe + numValuesToRead = (int) Math.min(remainingValues, batchSize); + } if (reuseContainers) { this.last = model.read(last, numValuesToRead); From 55a9833b643d4e06394ec7d255351d34adc7527b Mon Sep 17 00:00:00 2001 From: huaxingao Date: Fri, 16 Aug 2024 11:08:41 -0700 Subject: [PATCH 5/8] address comments and fix test failure --- .../parquet/VectorizedColumnIterator.java | 2 +- .../iceberg/spark/source/BaseBatchReader.java | 40 +++++++++++-------- .../iceberg/spark/source/BaseRowReader.java | 26 +++++++----- 3 files changed, 39 insertions(+), 29 deletions(-) diff --git a/arrow/src/main/java/org/apache/iceberg/arrow/vectorized/parquet/VectorizedColumnIterator.java b/arrow/src/main/java/org/apache/iceberg/arrow/vectorized/parquet/VectorizedColumnIterator.java index a98a9848b4e9..ec60810d3436 100644 --- a/arrow/src/main/java/org/apache/iceberg/arrow/vectorized/parquet/VectorizedColumnIterator.java +++ b/arrow/src/main/java/org/apache/iceberg/arrow/vectorized/parquet/VectorizedColumnIterator.java @@ -76,7 +76,7 @@ public void nextBatch( advance(); int expectedBatchSize; if (numValsToRead < 0) { - expectedBatchSize = batchSize - rowsReadSoFar; + throw new IllegalStateException("numValsToRead has invalid value"); } else { expectedBatchSize = Math.min(batchSize - rowsReadSoFar, numValsToRead - rowsReadSoFar); } diff --git a/spark/v3.5/spark/src/main/java/org/apache/iceberg/spark/source/BaseBatchReader.java b/spark/v3.5/spark/src/main/java/org/apache/iceberg/spark/source/BaseBatchReader.java index 0fb6b31083e4..b7bc262cb21b 100644 --- a/spark/v3.5/spark/src/main/java/org/apache/iceberg/spark/source/BaseBatchReader.java +++ b/spark/v3.5/spark/src/main/java/org/apache/iceberg/spark/source/BaseBatchReader.java @@ -87,23 +87,29 @@ private CloseableIterable newParquetIterable( // get required schema if there are deletes Schema requiredSchema = deleteFilter != null ? deleteFilter.requiredSchema() : expectedSchema(); - return Parquet.read(inputFile) - .project(requiredSchema) - .split(start, length) - .createBatchedReaderFunc( - fileSchema -> - VectorizedSparkParquetReaders.buildReader( - requiredSchema, fileSchema, idToConstant, deleteFilter)) - .recordsPerBatch(batchSize) - .filter(residual) - .caseSensitive(caseSensitive()) - // Spark eagerly consumes the batches. So the underlying memory allocated could be reused - // without worrying about subsequent reads clobbering over each other. This improves - // read performance as every batch read doesn't have to pay the cost of allocating memory. - .reuseContainers() - .withNameMapping(nameMapping()) - .pushedlimit(limit) - .build(); + Parquet.ReadBuilder readerBuilder = + Parquet.read(inputFile) + .project(requiredSchema) + .split(start, length) + .createBatchedReaderFunc( + fileSchema -> + VectorizedSparkParquetReaders.buildReader( + requiredSchema, fileSchema, idToConstant, deleteFilter)) + .recordsPerBatch(batchSize) + .filter(residual) + .caseSensitive(caseSensitive()) + // Spark eagerly consumes the batches. So the underlying memory allocated could be + // reused + // without worrying about subsequent reads clobbering over each other. This improves + // read performance as every batch read doesn't have to pay the cost of allocating + // memory. + .reuseContainers() + .withNameMapping(nameMapping()); + if (limit > 0) { + readerBuilder = readerBuilder.pushedlimit(limit); + } + + return readerBuilder.build(); } private CloseableIterable newOrcIterable( diff --git a/spark/v3.5/spark/src/main/java/org/apache/iceberg/spark/source/BaseRowReader.java b/spark/v3.5/spark/src/main/java/org/apache/iceberg/spark/source/BaseRowReader.java index fe8cdf7dc2cd..304887c7f133 100644 --- a/spark/v3.5/spark/src/main/java/org/apache/iceberg/spark/source/BaseRowReader.java +++ b/spark/v3.5/spark/src/main/java/org/apache/iceberg/spark/source/BaseRowReader.java @@ -95,17 +95,21 @@ private CloseableIterable newParquetIterable( Schema readSchema, Map idToConstant, int limit) { - return Parquet.read(file) - .reuseContainers() - .split(start, length) - .project(readSchema) - .createReaderFunc( - fileSchema -> SparkParquetReaders.buildReader(readSchema, fileSchema, idToConstant)) - .filter(residual) - .caseSensitive(caseSensitive()) - .withNameMapping(nameMapping()) - .pushedlimit(limit) - .build(); + Parquet.ReadBuilder readerBuilder = + Parquet.read(file) + .reuseContainers() + .split(start, length) + .project(readSchema) + .createReaderFunc( + fileSchema -> SparkParquetReaders.buildReader(readSchema, fileSchema, idToConstant)) + .filter(residual) + .caseSensitive(caseSensitive()) + .withNameMapping(nameMapping()); + if (limit > 0) { + readerBuilder = readerBuilder.pushedlimit(limit); + } + + return readerBuilder.build(); } private CloseableIterable newOrcIterable( From c79648826bedd95edd0ebd0d2cfab66843e7942d Mon Sep 17 00:00:00 2001 From: huaxingao Date: Sun, 18 Aug 2024 15:33:06 -0700 Subject: [PATCH 6/8] add more tests --- .../spark/source/TestLimitPushDown.java | 83 ++++++++++++++++--- 1 file changed, 70 insertions(+), 13 deletions(-) diff --git a/spark/v3.5/spark/src/test/java/org/apache/iceberg/spark/source/TestLimitPushDown.java b/spark/v3.5/spark/src/test/java/org/apache/iceberg/spark/source/TestLimitPushDown.java index a3f19eb3f4d3..00af430b8adf 100644 --- a/spark/v3.5/spark/src/test/java/org/apache/iceberg/spark/source/TestLimitPushDown.java +++ b/spark/v3.5/spark/src/test/java/org/apache/iceberg/spark/source/TestLimitPushDown.java @@ -62,6 +62,7 @@ import org.apache.iceberg.io.OutputFile; import org.apache.iceberg.relocated.com.google.common.collect.ImmutableMap; import org.apache.iceberg.relocated.com.google.common.collect.Lists; +import org.apache.iceberg.spark.SparkSQLProperties; import org.apache.iceberg.spark.SparkValueConverter; import org.apache.iceberg.types.Types; import org.apache.spark.sql.Dataset; @@ -130,7 +131,7 @@ public void writeTestDataFile() throws IOException { } @AfterEach - public void cleanup() throws IOException { + public void cleanup() { dropTable("test"); } @@ -215,7 +216,7 @@ private FileFormat defaultFormat(Map properties) { } @TestTemplate - public void testReadWithLimit() { + public void testLimitPushedDown() { Dataset df = spark .read() @@ -223,23 +224,79 @@ public void testReadWithLimit() { .load(TableIdentifier.of("default", tableName).toString()) .selectExpr("*"); - testLimit(df, 0, new Object[][] {}); - testLimit(df, 2, new Object[][] {{29, "a"}, {43, "b"}}); - testLimit(df, 4, new Object[][] {{29, "a"}, {43, "b"}, {61, "c"}, {89, "d"}}); + testLimit(df, 2, new Object[][] {{29, "a"}, {43, "b"}}, true); + testLimit(df, 4, new Object[][] {{29, "a"}, {43, "b"}, {61, "c"}, {89, "d"}}, true); testLimit( - df, 6, new Object[][] {{29, "a"}, {43, "b"}, {61, "c"}, {89, "d"}, {100, "e"}, {121, "f"}}); + df, + 6, + new Object[][] {{29, "a"}, {43, "b"}, {61, "c"}, {89, "d"}, {100, "e"}, {121, "f"}}, + true); + } + + @TestTemplate + public void testDisableLimitPushDown() { + spark.conf().set(SparkSQLProperties.LIMIT_PUSH_DOWN_ENABLED, "false"); + Dataset df = + spark + .read() + .format("iceberg") + .load(TableIdentifier.of("default", tableName).toString()) + .selectExpr("*"); + + testLimit(df, 2, new Object[][] {{29, "a"}, {43, "b"}}, false); + } + + @TestTemplate + public void testLimitZeroNotPushedDown() { + Dataset df = + spark + .read() + .format("iceberg") + .load(TableIdentifier.of("default", tableName).toString()) + .selectExpr("*"); + + // Spark converts limit 0 to an empty table scan + testLimit(df, 0, new Object[][] {}, false); + } + + @TestTemplate + public void testLimitWithDataFilterNotPushedDown() { + Dataset df = + spark + .read() + .format("iceberg") + .load(TableIdentifier.of("default", tableName).toString()) + .selectExpr("*") + .filter("id > 30"); + + testLimit(df, 2, new Object[][] {{43, "b"}, {61, "c"}}, false); + } + + @TestTemplate + public void testLimitWithSortNotPushedDown() { + Dataset df = + spark + .read() + .format("iceberg") + .load(TableIdentifier.of("default", tableName).toString()) + .selectExpr("*") + .sort("id"); + + testLimit(df, 2, new Object[][] {{29, "a"}, {43, "b"}}, false); } private void testLimit( - Dataset df, int limit, Object[][] expectedValues) { + Dataset df, + int limit, + Object[][] expectedValues, + boolean limitPushedDown) { Dataset limitedDf = df.limit(limit); LogicalPlan optimizedPlan = limitedDf.queryExecution().optimizedPlan(); int pushedLimit = collectPushDownLimit(optimizedPlan); - assertThat(pushedLimit).as("Pushed down limit should be " + limit).isEqualTo(limit); - - if (limit == 0) { - assertThat(limitedDf.isEmpty()).as("Dataset should be empty when limit is 0").isTrue(); - return; + if (limitPushedDown) { + assertThat(pushedLimit).as("Pushed down limit should be " + limit).isEqualTo(limit); + } else { + assertThat(pushedLimit).as("Pushed down limit should be " + limit).isEqualTo(-1); } List collectedRows = limitedDf.collectAsList(); @@ -277,6 +334,6 @@ private int collectPushDownLimit(LogicalPlan logicalPlan) { }) .findFirst(); - return limit.orElse(0); + return limit.orElse(-1); } } From e35665b6e300b6a7a2d71e6fba7bfce4f8c2912f Mon Sep 17 00:00:00 2001 From: huaxingao Date: Fri, 27 Sep 2024 11:02:44 -0700 Subject: [PATCH 7/8] address comments --- .palantir/revapi.yml | 4 ++-- .../parquet/VectorizedColumnIterator.java | 4 +++- .../org/apache/iceberg/parquet/Parquet.java | 9 +++---- .../apache/iceberg/parquet/ParquetReader.java | 8 +++---- .../parquet/VectorizedParquetReader.java | 14 +++++------ .../iceberg/spark/source/BaseBatchReader.java | 8 +++---- .../iceberg/spark/source/BaseRowReader.java | 19 +++++++++++---- .../iceberg/spark/source/BatchDataReader.java | 4 ++-- .../spark/source/ChangelogRowReader.java | 3 +-- .../spark/source/EqualityDeleteRowReader.java | 2 +- .../source/PositionDeletesRowReader.java | 2 +- .../iceberg/spark/source/RowDataReader.java | 17 +++++++++++-- .../iceberg/spark/source/SparkBatch.java | 17 ++++++++++--- .../spark/source/SparkBatchQueryScan.java | 2 +- .../spark/source/SparkChangelogScan.java | 3 +-- .../source/SparkColumnarReaderFactory.java | 4 ++-- .../spark/source/SparkCopyOnWriteScan.java | 2 +- .../spark/source/SparkMicroBatchStream.java | 2 +- .../source/SparkPartitioningAwareScan.java | 13 +++++++++- .../spark/source/SparkRowReaderFactory.java | 6 +++-- .../iceberg/spark/source/SparkScan.java | 24 +++++++++++++------ .../spark/source/SparkScanBuilder.java | 6 ++--- .../iceberg/spark/source/SparkStagedScan.java | 2 +- .../spark/source/TestLimitPushDown.java | 10 ++++---- 24 files changed, 119 insertions(+), 66 deletions(-) diff --git a/.palantir/revapi.yml b/.palantir/revapi.yml index ca873f1cb520..7d853e7a650d 100644 --- a/.palantir/revapi.yml +++ b/.palantir/revapi.yml @@ -1140,7 +1140,7 @@ acceptedBreaks: new: "method void org.apache.iceberg.parquet.ParquetReader::(org.apache.iceberg.io.InputFile,\ \ org.apache.iceberg.Schema, org.apache.parquet.ParquetReadOptions, java.util.function.Function>, org.apache.iceberg.mapping.NameMapping,\ - \ org.apache.iceberg.expressions.Expression, boolean, boolean, int)" + \ org.apache.iceberg.expressions.Expression, boolean, boolean, java.lang.Integer)" justification: "{limit push down}" - code: "java.method.numberOfParametersChanged" old: "method void org.apache.iceberg.parquet.VectorizedParquetReader::(org.apache.iceberg.io.InputFile,\ @@ -1150,7 +1150,7 @@ acceptedBreaks: new: "method void org.apache.iceberg.parquet.VectorizedParquetReader::(org.apache.iceberg.io.InputFile,\ \ org.apache.iceberg.Schema, org.apache.parquet.ParquetReadOptions, java.util.function.Function>, org.apache.iceberg.mapping.NameMapping,\ - \ org.apache.iceberg.expressions.Expression, boolean, boolean, int, int)" + \ org.apache.iceberg.expressions.Expression, boolean, boolean, int, java.lang.Integer)" justification: "{limit push down}" apache-iceberg-0.14.0: org.apache.iceberg:iceberg-api: diff --git a/arrow/src/main/java/org/apache/iceberg/arrow/vectorized/parquet/VectorizedColumnIterator.java b/arrow/src/main/java/org/apache/iceberg/arrow/vectorized/parquet/VectorizedColumnIterator.java index ec60810d3436..ed034c42b87a 100644 --- a/arrow/src/main/java/org/apache/iceberg/arrow/vectorized/parquet/VectorizedColumnIterator.java +++ b/arrow/src/main/java/org/apache/iceberg/arrow/vectorized/parquet/VectorizedColumnIterator.java @@ -76,7 +76,9 @@ public void nextBatch( advance(); int expectedBatchSize; if (numValsToRead < 0) { - throw new IllegalStateException("numValsToRead has invalid value"); + throw new IllegalStateException( + String.format( + "Cannot read a negative number of values. numValsToRead = %d", numValsToRead)); } else { expectedBatchSize = Math.min(batchSize - rowsReadSoFar, numValsToRead - rowsReadSoFar); } diff --git a/parquet/src/main/java/org/apache/iceberg/parquet/Parquet.java b/parquet/src/main/java/org/apache/iceberg/parquet/Parquet.java index 7ab11ad2ccbd..051aee3ee058 100644 --- a/parquet/src/main/java/org/apache/iceberg/parquet/Parquet.java +++ b/parquet/src/main/java/org/apache/iceberg/parquet/Parquet.java @@ -1048,7 +1048,7 @@ public static class ReadBuilder { private NameMapping nameMapping = null; private ByteBuffer fileEncryptionKey = null; private ByteBuffer fileAADPrefix = null; - private int pushedLimit = -1; + private Integer pushedLimit; private ReadBuilder(InputFile file) { this.file = file; @@ -1152,11 +1152,8 @@ public ReadBuilder withAADPrefix(ByteBuffer aadPrefix) { return this; } - public ReadBuilder pushedlimit(int limit) { - if (limit <= 0) { - throw new IllegalArgumentException("Pushed limit must be > 0"); - } - + public ReadBuilder pushedlimit(Integer limit) { + Preconditions.checkArgument(limit >= 0); this.pushedLimit = limit; return this; } diff --git a/parquet/src/main/java/org/apache/iceberg/parquet/ParquetReader.java b/parquet/src/main/java/org/apache/iceberg/parquet/ParquetReader.java index 2a473a6f31d9..849a5ce072ea 100644 --- a/parquet/src/main/java/org/apache/iceberg/parquet/ParquetReader.java +++ b/parquet/src/main/java/org/apache/iceberg/parquet/ParquetReader.java @@ -43,7 +43,7 @@ public class ParquetReader extends CloseableGroup implements CloseableIterabl private final boolean reuseContainers; private final boolean caseSensitive; private final NameMapping nameMapping; - private int pushedLimit = -1; + private Integer pushedLimit; public ParquetReader( InputFile input, @@ -54,7 +54,7 @@ public ParquetReader( Expression filter, boolean reuseContainers, boolean caseSensitive, - int pushedLimit) { + Integer pushedLimit) { this.input = input; this.expectedSchema = expectedSchema; this.options = options; @@ -109,7 +109,7 @@ private static class FileIterator implements CloseableIterator { private long nextRowGroupStart = 0; private long valuesRead = 0; private T last = null; - private int pushedLimit = -1; + private Integer pushedLimit; FileIterator(ReadConf conf) { this.reader = conf.reader(); @@ -122,7 +122,7 @@ private static class FileIterator implements CloseableIterator { @Override public boolean hasNext() { - if (pushedLimit > 0) { + if (pushedLimit != null && pushedLimit > 0) { return valuesRead < Math.min(totalValues, pushedLimit); } else { return valuesRead < totalValues; diff --git a/parquet/src/main/java/org/apache/iceberg/parquet/VectorizedParquetReader.java b/parquet/src/main/java/org/apache/iceberg/parquet/VectorizedParquetReader.java index 1fcbb6892fb2..7c0b2d4c20c5 100644 --- a/parquet/src/main/java/org/apache/iceberg/parquet/VectorizedParquetReader.java +++ b/parquet/src/main/java/org/apache/iceberg/parquet/VectorizedParquetReader.java @@ -49,7 +49,7 @@ public class VectorizedParquetReader extends CloseableGroup implements Closea private final boolean caseSensitive; private final int batchSize; private final NameMapping nameMapping; - private int pushedLimit = -1; + private Integer pushedLimit; public VectorizedParquetReader( InputFile input, @@ -61,7 +61,7 @@ public VectorizedParquetReader( boolean reuseContainers, boolean caseSensitive, int maxRecordsPerBatch, - int pushedLimit) { + Integer pushedLimit) { this.input = input; this.expectedSchema = expectedSchema; this.options = options; @@ -118,7 +118,7 @@ private static class FileIterator implements CloseableIterator { private long valuesRead = 0; private T last = null; private final long[] rowGroupsStartRowPos; - private int pushedLimit = -1; + private Integer pushedLimit; FileIterator(ReadConf conf) { this.reader = conf.reader(); @@ -134,7 +134,7 @@ private static class FileIterator implements CloseableIterator { @Override public boolean hasNext() { - if (pushedLimit > 0) { + if (pushedLimit != null && pushedLimit > 0) { return valuesRead < Math.min(totalValues, pushedLimit); } else { return valuesRead < totalValues; @@ -151,12 +151,12 @@ public T next() { } long remainingValues = nextRowGroupStart - valuesRead; - long remainingLimit = pushedLimit - valuesRead; int numValuesToRead; - if (remainingLimit > 0) { + if (pushedLimit != null && pushedLimit - valuesRead > 0) { // batchSize is an integer, so casting to integer is safe - numValuesToRead = (int) Math.min(remainingValues, Math.min(batchSize, remainingLimit)); + numValuesToRead = + (int) Math.min(remainingValues, Math.min(batchSize, pushedLimit - valuesRead)); } else { // batchSize is an integer, so casting to integer is safe numValuesToRead = (int) Math.min(remainingValues, batchSize); diff --git a/spark/v3.5/spark/src/main/java/org/apache/iceberg/spark/source/BaseBatchReader.java b/spark/v3.5/spark/src/main/java/org/apache/iceberg/spark/source/BaseBatchReader.java index b7bc262cb21b..d6b0a893ff72 100644 --- a/spark/v3.5/spark/src/main/java/org/apache/iceberg/spark/source/BaseBatchReader.java +++ b/spark/v3.5/spark/src/main/java/org/apache/iceberg/spark/source/BaseBatchReader.java @@ -39,7 +39,7 @@ abstract class BaseBatchReader extends BaseReader { private final int batchSize; - private int pushedLimit = -1; + private Integer pushedLimit; BaseBatchReader( Table table, @@ -48,7 +48,7 @@ abstract class BaseBatchReader extends BaseReader newParquetIterable( Expression residual, Map idToConstant, SparkDeleteFilter deleteFilter, - int limit) { + Integer limit) { // get required schema if there are deletes Schema requiredSchema = deleteFilter != null ? deleteFilter.requiredSchema() : expectedSchema(); @@ -105,7 +105,7 @@ private CloseableIterable newParquetIterable( // memory. .reuseContainers() .withNameMapping(nameMapping()); - if (limit > 0) { + if (limit != null && limit > 0) { readerBuilder = readerBuilder.pushedlimit(limit); } diff --git a/spark/v3.5/spark/src/main/java/org/apache/iceberg/spark/source/BaseRowReader.java b/spark/v3.5/spark/src/main/java/org/apache/iceberg/spark/source/BaseRowReader.java index 304887c7f133..29a874f325fc 100644 --- a/spark/v3.5/spark/src/main/java/org/apache/iceberg/spark/source/BaseRowReader.java +++ b/spark/v3.5/spark/src/main/java/org/apache/iceberg/spark/source/BaseRowReader.java @@ -39,16 +39,25 @@ import org.apache.spark.sql.catalyst.InternalRow; abstract class BaseRowReader extends BaseReader { - private int pushedLimit = -1; + private Integer pushedLimit; BaseRowReader( Table table, ScanTaskGroup taskGroup, Schema tableSchema, Schema expectedSchema, - boolean caseSensitive, - int pushedLimit) { + boolean caseSensitive) { super(table, taskGroup, tableSchema, expectedSchema, caseSensitive); + } + + BaseRowReader( + Table table, + ScanTaskGroup taskGroup, + Schema tableSchema, + Schema expectedSchema, + boolean caseSensitive, + Integer pushedLimit) { + this(table, taskGroup, tableSchema, expectedSchema, caseSensitive); this.pushedLimit = pushedLimit; } @@ -94,7 +103,7 @@ private CloseableIterable newParquetIterable( Expression residual, Schema readSchema, Map idToConstant, - int limit) { + Integer limit) { Parquet.ReadBuilder readerBuilder = Parquet.read(file) .reuseContainers() @@ -105,7 +114,7 @@ private CloseableIterable newParquetIterable( .filter(residual) .caseSensitive(caseSensitive()) .withNameMapping(nameMapping()); - if (limit > 0) { + if (limit != null && limit > 0) { readerBuilder = readerBuilder.pushedlimit(limit); } diff --git a/spark/v3.5/spark/src/main/java/org/apache/iceberg/spark/source/BatchDataReader.java b/spark/v3.5/spark/src/main/java/org/apache/iceberg/spark/source/BatchDataReader.java index 16ec43ff3d13..61ab8039da6e 100644 --- a/spark/v3.5/spark/src/main/java/org/apache/iceberg/spark/source/BatchDataReader.java +++ b/spark/v3.5/spark/src/main/java/org/apache/iceberg/spark/source/BatchDataReader.java @@ -45,7 +45,7 @@ class BatchDataReader extends BaseBatchReader private final long numSplits; - BatchDataReader(SparkInputPartition partition, int batchSize, int pushedLimit) { + BatchDataReader(SparkInputPartition partition, int batchSize, Integer pushedLimit) { this( partition.table(), partition.taskGroup(), @@ -63,7 +63,7 @@ class BatchDataReader extends BaseBatchReader Schema expectedSchema, boolean caseSensitive, int size, - int pushedLimit) { + Integer pushedLimit) { super(table, taskGroup, tableSchema, expectedSchema, caseSensitive, size, pushedLimit); numSplits = taskGroup.tasks().size(); diff --git a/spark/v3.5/spark/src/main/java/org/apache/iceberg/spark/source/ChangelogRowReader.java b/spark/v3.5/spark/src/main/java/org/apache/iceberg/spark/source/ChangelogRowReader.java index c85b5c082aa0..572f955884a3 100644 --- a/spark/v3.5/spark/src/main/java/org/apache/iceberg/spark/source/ChangelogRowReader.java +++ b/spark/v3.5/spark/src/main/java/org/apache/iceberg/spark/source/ChangelogRowReader.java @@ -68,8 +68,7 @@ class ChangelogRowReader extends BaseRowReader taskGroup, tableSchema, ChangelogUtil.dropChangelogMetadata(expectedSchema), - caseSensitive, - -1); + caseSensitive); } @Override diff --git a/spark/v3.5/spark/src/main/java/org/apache/iceberg/spark/source/EqualityDeleteRowReader.java b/spark/v3.5/spark/src/main/java/org/apache/iceberg/spark/source/EqualityDeleteRowReader.java index 634b297c6c88..f5b98a5a43bd 100644 --- a/spark/v3.5/spark/src/main/java/org/apache/iceberg/spark/source/EqualityDeleteRowReader.java +++ b/spark/v3.5/spark/src/main/java/org/apache/iceberg/spark/source/EqualityDeleteRowReader.java @@ -35,7 +35,7 @@ public EqualityDeleteRowReader( Schema tableSchema, Schema expectedSchema, boolean caseSensitive) { - super(table, task, tableSchema, expectedSchema, caseSensitive, -1); + super(table, task, tableSchema, expectedSchema, caseSensitive); } @Override diff --git a/spark/v3.5/spark/src/main/java/org/apache/iceberg/spark/source/PositionDeletesRowReader.java b/spark/v3.5/spark/src/main/java/org/apache/iceberg/spark/source/PositionDeletesRowReader.java index 9e611f2b1eb8..4b847474153c 100644 --- a/spark/v3.5/spark/src/main/java/org/apache/iceberg/spark/source/PositionDeletesRowReader.java +++ b/spark/v3.5/spark/src/main/java/org/apache/iceberg/spark/source/PositionDeletesRowReader.java @@ -61,7 +61,7 @@ class PositionDeletesRowReader extends BaseRowReader Schema expectedSchema, boolean caseSensitive) { - super(table, taskGroup, tableSchema, expectedSchema, caseSensitive, -1); + super(table, taskGroup, tableSchema, expectedSchema, caseSensitive); int numSplits = taskGroup.tasks().size(); LOG.debug("Reading {} position delete file split(s) for table {}", numSplits, table.name()); diff --git a/spark/v3.5/spark/src/main/java/org/apache/iceberg/spark/source/RowDataReader.java b/spark/v3.5/spark/src/main/java/org/apache/iceberg/spark/source/RowDataReader.java index 6d8bf65b3053..34eafe774e87 100644 --- a/spark/v3.5/spark/src/main/java/org/apache/iceberg/spark/source/RowDataReader.java +++ b/spark/v3.5/spark/src/main/java/org/apache/iceberg/spark/source/RowDataReader.java @@ -45,7 +45,7 @@ class RowDataReader extends BaseRowReader implements PartitionRead private final long numSplits; - RowDataReader(SparkInputPartition partition, int pushedLimit) { + RowDataReader(SparkInputPartition partition, Integer pushedLimit) { this( partition.table(), partition.taskGroup(), @@ -55,13 +55,26 @@ class RowDataReader extends BaseRowReader implements PartitionRead pushedLimit); } + RowDataReader( + Table table, + ScanTaskGroup taskGroup, + Schema tableSchema, + Schema expectedSchema, + boolean caseSensitive) { + + super(table, taskGroup, tableSchema, expectedSchema, caseSensitive); + + numSplits = taskGroup.tasks().size(); + LOG.debug("Reading {} file split(s) for table {}", numSplits, table.name()); + } + RowDataReader( Table table, ScanTaskGroup taskGroup, Schema tableSchema, Schema expectedSchema, boolean caseSensitive, - int pushedLimit) { + Integer pushedLimit) { super(table, taskGroup, tableSchema, expectedSchema, caseSensitive, pushedLimit); diff --git a/spark/v3.5/spark/src/main/java/org/apache/iceberg/spark/source/SparkBatch.java b/spark/v3.5/spark/src/main/java/org/apache/iceberg/spark/source/SparkBatch.java index 5935d225fbac..debd61e4ebac 100644 --- a/spark/v3.5/spark/src/main/java/org/apache/iceberg/spark/source/SparkBatch.java +++ b/spark/v3.5/spark/src/main/java/org/apache/iceberg/spark/source/SparkBatch.java @@ -50,7 +50,7 @@ class SparkBatch implements Batch { private final boolean localityEnabled; private final boolean executorCacheLocalityEnabled; private final int scanHashCode; - private int pushedLimit = -1; + private Integer pushedLimit; SparkBatch( JavaSparkContext sparkContext, @@ -59,8 +59,7 @@ class SparkBatch implements Batch { Types.StructType groupingKeyType, List> taskGroups, Schema expectedSchema, - int scanHashCode, - int pushedLimit) { + int scanHashCode) { this.sparkContext = sparkContext; this.table = table; this.branch = readConf.branch(); @@ -72,6 +71,18 @@ class SparkBatch implements Batch { this.localityEnabled = readConf.localityEnabled(); this.executorCacheLocalityEnabled = readConf.executorCacheLocalityEnabled(); this.scanHashCode = scanHashCode; + } + + SparkBatch( + JavaSparkContext sparkContext, + Table table, + SparkReadConf readConf, + Types.StructType groupingKeyType, + List> taskGroups, + Schema expectedSchema, + int scanHashCode, + Integer pushedLimit) { + this(sparkContext, table, readConf, groupingKeyType, taskGroups, expectedSchema, scanHashCode); this.pushedLimit = pushedLimit; } diff --git a/spark/v3.5/spark/src/main/java/org/apache/iceberg/spark/source/SparkBatchQueryScan.java b/spark/v3.5/spark/src/main/java/org/apache/iceberg/spark/source/SparkBatchQueryScan.java index 073b5596f24b..61a503d63957 100644 --- a/spark/v3.5/spark/src/main/java/org/apache/iceberg/spark/source/SparkBatchQueryScan.java +++ b/spark/v3.5/spark/src/main/java/org/apache/iceberg/spark/source/SparkBatchQueryScan.java @@ -77,7 +77,7 @@ class SparkBatchQueryScan extends SparkPartitioningAwareScan Schema expectedSchema, List filters, Supplier scanReportSupplier, - int pushedLimit) { + Integer pushedLimit) { super(spark, table, scan, readConf, expectedSchema, filters, scanReportSupplier, pushedLimit); this.snapshotId = readConf.snapshotId(); diff --git a/spark/v3.5/spark/src/main/java/org/apache/iceberg/spark/source/SparkChangelogScan.java b/spark/v3.5/spark/src/main/java/org/apache/iceberg/spark/source/SparkChangelogScan.java index 0089ef0f13c9..71b53d70262f 100644 --- a/spark/v3.5/spark/src/main/java/org/apache/iceberg/spark/source/SparkChangelogScan.java +++ b/spark/v3.5/spark/src/main/java/org/apache/iceberg/spark/source/SparkChangelogScan.java @@ -109,8 +109,7 @@ public Batch toBatch() { EMPTY_GROUPING_KEY_TYPE, taskGroups(), expectedSchema, - hashCode(), - -1); + hashCode()); } private List> taskGroups() { diff --git a/spark/v3.5/spark/src/main/java/org/apache/iceberg/spark/source/SparkColumnarReaderFactory.java b/spark/v3.5/spark/src/main/java/org/apache/iceberg/spark/source/SparkColumnarReaderFactory.java index 7da15f30bed5..762a844db6ac 100644 --- a/spark/v3.5/spark/src/main/java/org/apache/iceberg/spark/source/SparkColumnarReaderFactory.java +++ b/spark/v3.5/spark/src/main/java/org/apache/iceberg/spark/source/SparkColumnarReaderFactory.java @@ -28,9 +28,9 @@ class SparkColumnarReaderFactory implements PartitionReaderFactory { private final int batchSize; - private int pushedLimit = -1; + private Integer pushedLimit; - SparkColumnarReaderFactory(int batchSize, int pushedLimit) { + SparkColumnarReaderFactory(int batchSize, Integer pushedLimit) { Preconditions.checkArgument(batchSize > 1, "Batch size must be > 1"); this.batchSize = batchSize; this.pushedLimit = pushedLimit; diff --git a/spark/v3.5/spark/src/main/java/org/apache/iceberg/spark/source/SparkCopyOnWriteScan.java b/spark/v3.5/spark/src/main/java/org/apache/iceberg/spark/source/SparkCopyOnWriteScan.java index 2ccc84e51c2b..5a3963bc614c 100644 --- a/spark/v3.5/spark/src/main/java/org/apache/iceberg/spark/source/SparkCopyOnWriteScan.java +++ b/spark/v3.5/spark/src/main/java/org/apache/iceberg/spark/source/SparkCopyOnWriteScan.java @@ -73,7 +73,7 @@ class SparkCopyOnWriteScan extends SparkPartitioningAwareScan Schema expectedSchema, List filters, Supplier scanReportSupplier) { - super(spark, table, scan, readConf, expectedSchema, filters, scanReportSupplier, -1); + super(spark, table, scan, readConf, expectedSchema, filters, scanReportSupplier); this.snapshot = snapshot; diff --git a/spark/v3.5/spark/src/main/java/org/apache/iceberg/spark/source/SparkMicroBatchStream.java b/spark/v3.5/spark/src/main/java/org/apache/iceberg/spark/source/SparkMicroBatchStream.java index f1624dd64c89..49180e07c465 100644 --- a/spark/v3.5/spark/src/main/java/org/apache/iceberg/spark/source/SparkMicroBatchStream.java +++ b/spark/v3.5/spark/src/main/java/org/apache/iceberg/spark/source/SparkMicroBatchStream.java @@ -177,7 +177,7 @@ private String[][] computePreferredLocations(List taskGroups) @Override public PartitionReaderFactory createReaderFactory() { - return new SparkRowReaderFactory(-1); + return new SparkRowReaderFactory(); } @Override diff --git a/spark/v3.5/spark/src/main/java/org/apache/iceberg/spark/source/SparkPartitioningAwareScan.java b/spark/v3.5/spark/src/main/java/org/apache/iceberg/spark/source/SparkPartitioningAwareScan.java index 6b7a8d1693de..70949a358e1a 100644 --- a/spark/v3.5/spark/src/main/java/org/apache/iceberg/spark/source/SparkPartitioningAwareScan.java +++ b/spark/v3.5/spark/src/main/java/org/apache/iceberg/spark/source/SparkPartitioningAwareScan.java @@ -70,6 +70,17 @@ abstract class SparkPartitioningAwareScan extends S private StructType groupingKeyType = null; // lazy cache of the grouping key type private Transform[] groupingKeyTransforms = null; // lazy cache of grouping key transforms + SparkPartitioningAwareScan( + SparkSession spark, + Table table, + Scan> scan, + SparkReadConf readConf, + Schema expectedSchema, + List filters, + Supplier scanReportSupplier) { + this(spark, table, scan, readConf, expectedSchema, filters, scanReportSupplier, null); + } + SparkPartitioningAwareScan( SparkSession spark, Table table, @@ -78,7 +89,7 @@ abstract class SparkPartitioningAwareScan extends S Schema expectedSchema, List filters, Supplier scanReportSupplier, - int pushedLimit) { + Integer pushedLimit) { super(spark, table, readConf, expectedSchema, filters, scanReportSupplier, pushedLimit); this.scan = scan; diff --git a/spark/v3.5/spark/src/main/java/org/apache/iceberg/spark/source/SparkRowReaderFactory.java b/spark/v3.5/spark/src/main/java/org/apache/iceberg/spark/source/SparkRowReaderFactory.java index a913679d988c..e90b44485f87 100644 --- a/spark/v3.5/spark/src/main/java/org/apache/iceberg/spark/source/SparkRowReaderFactory.java +++ b/spark/v3.5/spark/src/main/java/org/apache/iceberg/spark/source/SparkRowReaderFactory.java @@ -29,9 +29,11 @@ import org.apache.spark.sql.vectorized.ColumnarBatch; class SparkRowReaderFactory implements PartitionReaderFactory { - private int pushedLimit = -1; + private Integer pushedLimit; - SparkRowReaderFactory(int pushedLimit) { + SparkRowReaderFactory() {} + + SparkRowReaderFactory(Integer pushedLimit) { this.pushedLimit = pushedLimit; } diff --git a/spark/v3.5/spark/src/main/java/org/apache/iceberg/spark/source/SparkScan.java b/spark/v3.5/spark/src/main/java/org/apache/iceberg/spark/source/SparkScan.java index b60503c3b259..2ad6496dc7d2 100644 --- a/spark/v3.5/spark/src/main/java/org/apache/iceberg/spark/source/SparkScan.java +++ b/spark/v3.5/spark/src/main/java/org/apache/iceberg/spark/source/SparkScan.java @@ -107,7 +107,7 @@ abstract class SparkScan implements Scan, SupportsReportStatistics { private final List filterExpressions; private final String branch; private final Supplier scanReportSupplier; - private final int pushdownLimit; + private Integer pushedLimit; // lazy variables private StructType readSchema; @@ -118,8 +118,7 @@ abstract class SparkScan implements Scan, SupportsReportStatistics { SparkReadConf readConf, Schema expectedSchema, List filters, - Supplier scanReportSupplier, - int pushdownLimit) { + Supplier scanReportSupplier) { Schema snapshotSchema = SnapshotUtil.schemaFor(table, readConf.branch()); SparkSchemaUtil.validateMetadataColumnReferences(snapshotSchema, expectedSchema); @@ -132,7 +131,18 @@ abstract class SparkScan implements Scan, SupportsReportStatistics { this.filterExpressions = filters != null ? filters : Collections.emptyList(); this.branch = readConf.branch(); this.scanReportSupplier = scanReportSupplier; - this.pushdownLimit = pushdownLimit; + } + + SparkScan( + SparkSession spark, + Table table, + SparkReadConf readConf, + Schema expectedSchema, + List filters, + Supplier scanReportSupplier, + Integer pushedLimit) { + this(spark, table, readConf, expectedSchema, filters, scanReportSupplier); + this.pushedLimit = pushedLimit; } protected Table table() { @@ -155,8 +165,8 @@ protected List filterExpressions() { return filterExpressions; } - protected int pushdownLimit() { - return pushdownLimit; + protected Integer pushedLimit() { + return pushedLimit; } protected Types.StructType groupingKeyType() { @@ -175,7 +185,7 @@ public Batch toBatch() { taskGroups(), expectedSchema, hashCode(), - pushdownLimit); + pushedLimit); } @Override diff --git a/spark/v3.5/spark/src/main/java/org/apache/iceberg/spark/source/SparkScanBuilder.java b/spark/v3.5/spark/src/main/java/org/apache/iceberg/spark/source/SparkScanBuilder.java index ba746139cbeb..646d7aaeae52 100644 --- a/spark/v3.5/spark/src/main/java/org/apache/iceberg/spark/source/SparkScanBuilder.java +++ b/spark/v3.5/spark/src/main/java/org/apache/iceberg/spark/source/SparkScanBuilder.java @@ -105,7 +105,7 @@ public class SparkScanBuilder private boolean caseSensitive; private List filterExpressions = null; private Predicate[] pushedPredicates = NO_PREDICATES; - private int pushedLimit = -1; + private Integer pushedLimit; SparkScanBuilder( SparkSession spark, @@ -424,9 +424,9 @@ private Scan buildBatchScan() { Schema expectedSchema = schemaWithMetadataColumns(); org.apache.iceberg.Scan scan = buildIcebergBatchScan(false /* not include Column Stats */, expectedSchema); - if (pushedLimit > 0 && hasDeletes(scan)) { + if (pushedLimit != null && pushedLimit > 0 && hasDeletes(scan)) { LOG.info("Skipping limit pushdown: detected row level deletes"); - pushedLimit = -1; + pushedLimit = null; } return new SparkBatchQueryScan( spark, diff --git a/spark/v3.5/spark/src/main/java/org/apache/iceberg/spark/source/SparkStagedScan.java b/spark/v3.5/spark/src/main/java/org/apache/iceberg/spark/source/SparkStagedScan.java index eb6461f21494..fd299ade7fdc 100644 --- a/spark/v3.5/spark/src/main/java/org/apache/iceberg/spark/source/SparkStagedScan.java +++ b/spark/v3.5/spark/src/main/java/org/apache/iceberg/spark/source/SparkStagedScan.java @@ -41,7 +41,7 @@ class SparkStagedScan extends SparkScan { private List> taskGroups = null; // lazy cache of tasks SparkStagedScan(SparkSession spark, Table table, Schema expectedSchema, SparkReadConf readConf) { - super(spark, table, readConf, expectedSchema, ImmutableList.of(), null, -1); + super(spark, table, readConf, expectedSchema, ImmutableList.of(), null); this.taskSetId = readConf.scanTaskSetId(); this.splitSize = readConf.splitSize(); this.splitLookback = readConf.splitLookback(); diff --git a/spark/v3.5/spark/src/test/java/org/apache/iceberg/spark/source/TestLimitPushDown.java b/spark/v3.5/spark/src/test/java/org/apache/iceberg/spark/source/TestLimitPushDown.java index 00af430b8adf..c7983c8f364b 100644 --- a/spark/v3.5/spark/src/test/java/org/apache/iceberg/spark/source/TestLimitPushDown.java +++ b/spark/v3.5/spark/src/test/java/org/apache/iceberg/spark/source/TestLimitPushDown.java @@ -292,11 +292,11 @@ private void testLimit( boolean limitPushedDown) { Dataset limitedDf = df.limit(limit); LogicalPlan optimizedPlan = limitedDf.queryExecution().optimizedPlan(); - int pushedLimit = collectPushDownLimit(optimizedPlan); + Integer pushedLimit = collectPushDownLimit(optimizedPlan); if (limitPushedDown) { assertThat(pushedLimit).as("Pushed down limit should be " + limit).isEqualTo(limit); } else { - assertThat(pushedLimit).as("Pushed down limit should be " + limit).isEqualTo(-1); + assertThat(pushedLimit).as("Pushed down limit should be " + limit).isEqualTo(null); } List collectedRows = limitedDf.collectAsList(); @@ -315,7 +315,7 @@ private void testLimit( } } - private int collectPushDownLimit(LogicalPlan logicalPlan) { + private Integer collectPushDownLimit(LogicalPlan logicalPlan) { Optional limit = JavaConverters.asJavaCollection(logicalPlan.collectLeaves()).stream() .flatMap( @@ -330,10 +330,10 @@ private int collectPushDownLimit(LogicalPlan logicalPlan) { } SparkBatchQueryScan batchQueryScan = (SparkBatchQueryScan) scanRelation.scan(); - return Stream.of(batchQueryScan.pushdownLimit()); + return Stream.ofNullable(batchQueryScan.pushedLimit()); }) .findFirst(); - return limit.orElse(-1); + return limit.orElse(null); } } From cea85f4f266c2278ee9c08b6184ddd1aa062a1d0 Mon Sep 17 00:00:00 2001 From: huaxingao Date: Fri, 27 Sep 2024 17:44:05 -0700 Subject: [PATCH 8/8] Add ParquetBatchReadConf and OrcBatchReadConf --- spark/v3.5/build.gradle | 2 ++ .../iceberg/spark/OrcBatchReadConf.java | 27 ++++++++++++++++ .../iceberg/spark/ParquetBatchReadConf.java | 31 +++++++++++++++++++ .../iceberg/spark/source/BaseBatchReader.java | 20 ++++++------ .../iceberg/spark/source/BatchDataReader.java | 17 ++++++---- .../iceberg/spark/source/SparkBatch.java | 25 ++++++++++++--- .../source/SparkColumnarReaderFactory.java | 20 +++++++----- 7 files changed, 116 insertions(+), 26 deletions(-) create mode 100644 spark/v3.5/spark/src/main/java/org/apache/iceberg/spark/OrcBatchReadConf.java create mode 100644 spark/v3.5/spark/src/main/java/org/apache/iceberg/spark/ParquetBatchReadConf.java diff --git a/spark/v3.5/build.gradle b/spark/v3.5/build.gradle index 2ba5d493c6cd..f16a7d7ab620 100644 --- a/spark/v3.5/build.gradle +++ b/spark/v3.5/build.gradle @@ -52,6 +52,8 @@ project(":iceberg-spark:iceberg-spark-${sparkMajorVersion}_${scalaVersion}") { dependencies { implementation project(path: ':iceberg-bundled-guava', configuration: 'shadow') api project(':iceberg-api') + annotationProcessor libs.immutables.value + compileOnly libs.immutables.value implementation project(':iceberg-common') implementation project(':iceberg-core') implementation project(':iceberg-data') diff --git a/spark/v3.5/spark/src/main/java/org/apache/iceberg/spark/OrcBatchReadConf.java b/spark/v3.5/spark/src/main/java/org/apache/iceberg/spark/OrcBatchReadConf.java new file mode 100644 index 000000000000..d3b339d60e3f --- /dev/null +++ b/spark/v3.5/spark/src/main/java/org/apache/iceberg/spark/OrcBatchReadConf.java @@ -0,0 +1,27 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.iceberg.spark; + +import java.io.Serializable; +import org.immutables.value.Value; + +@Value.Immutable +public interface OrcBatchReadConf extends Serializable { + int batchSize(); +} diff --git a/spark/v3.5/spark/src/main/java/org/apache/iceberg/spark/ParquetBatchReadConf.java b/spark/v3.5/spark/src/main/java/org/apache/iceberg/spark/ParquetBatchReadConf.java new file mode 100644 index 000000000000..c28f0c48e2bf --- /dev/null +++ b/spark/v3.5/spark/src/main/java/org/apache/iceberg/spark/ParquetBatchReadConf.java @@ -0,0 +1,31 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.iceberg.spark; + +import java.io.Serializable; +import javax.annotation.Nullable; +import org.immutables.value.Value; + +@Value.Immutable +public interface ParquetBatchReadConf extends Serializable { + int batchSize(); + + @Nullable + Integer limit(); +} diff --git a/spark/v3.5/spark/src/main/java/org/apache/iceberg/spark/source/BaseBatchReader.java b/spark/v3.5/spark/src/main/java/org/apache/iceberg/spark/source/BaseBatchReader.java index d6b0a893ff72..28c797a0d038 100644 --- a/spark/v3.5/spark/src/main/java/org/apache/iceberg/spark/source/BaseBatchReader.java +++ b/spark/v3.5/spark/src/main/java/org/apache/iceberg/spark/source/BaseBatchReader.java @@ -32,14 +32,16 @@ import org.apache.iceberg.orc.ORC; import org.apache.iceberg.parquet.Parquet; import org.apache.iceberg.relocated.com.google.common.collect.Sets; +import org.apache.iceberg.spark.OrcBatchReadConf; +import org.apache.iceberg.spark.ParquetBatchReadConf; import org.apache.iceberg.spark.data.vectorized.VectorizedSparkOrcReaders; import org.apache.iceberg.spark.data.vectorized.VectorizedSparkParquetReaders; import org.apache.iceberg.types.TypeUtil; import org.apache.spark.sql.vectorized.ColumnarBatch; abstract class BaseBatchReader extends BaseReader { - private final int batchSize; - private Integer pushedLimit; + private final ParquetBatchReadConf parquetConf; + private final OrcBatchReadConf orcConf; BaseBatchReader( Table table, @@ -47,11 +49,11 @@ abstract class BaseBatchReader extends BaseReader newBatchIterable( @@ -65,7 +67,7 @@ protected CloseableIterable newBatchIterable( switch (format) { case PARQUET: return newParquetIterable( - inputFile, start, length, residual, idToConstant, deleteFilter, pushedLimit); + inputFile, start, length, residual, idToConstant, deleteFilter, parquetConf.limit()); case ORC: return newOrcIterable(inputFile, start, length, residual, idToConstant); @@ -95,7 +97,7 @@ private CloseableIterable newParquetIterable( fileSchema -> VectorizedSparkParquetReaders.buildReader( requiredSchema, fileSchema, idToConstant, deleteFilter)) - .recordsPerBatch(batchSize) + .recordsPerBatch(parquetConf.batchSize()) .filter(residual) .caseSensitive(caseSensitive()) // Spark eagerly consumes the batches. So the underlying memory allocated could be @@ -131,7 +133,7 @@ private CloseableIterable newOrcIterable( .createBatchedReaderFunc( fileSchema -> VectorizedSparkOrcReaders.buildReader(expectedSchema(), fileSchema, idToConstant)) - .recordsPerBatch(batchSize) + .recordsPerBatch(orcConf.batchSize()) .filter(residual) .caseSensitive(caseSensitive()) .withNameMapping(nameMapping()) diff --git a/spark/v3.5/spark/src/main/java/org/apache/iceberg/spark/source/BatchDataReader.java b/spark/v3.5/spark/src/main/java/org/apache/iceberg/spark/source/BatchDataReader.java index 61ab8039da6e..983e272d75b9 100644 --- a/spark/v3.5/spark/src/main/java/org/apache/iceberg/spark/source/BatchDataReader.java +++ b/spark/v3.5/spark/src/main/java/org/apache/iceberg/spark/source/BatchDataReader.java @@ -28,6 +28,8 @@ import org.apache.iceberg.io.CloseableIterator; import org.apache.iceberg.io.InputFile; import org.apache.iceberg.relocated.com.google.common.base.Preconditions; +import org.apache.iceberg.spark.OrcBatchReadConf; +import org.apache.iceberg.spark.ParquetBatchReadConf; import org.apache.iceberg.spark.source.metrics.TaskNumDeletes; import org.apache.iceberg.spark.source.metrics.TaskNumSplits; import org.apache.iceberg.util.SnapshotUtil; @@ -45,15 +47,18 @@ class BatchDataReader extends BaseBatchReader private final long numSplits; - BatchDataReader(SparkInputPartition partition, int batchSize, Integer pushedLimit) { + BatchDataReader( + SparkInputPartition partition, + ParquetBatchReadConf parquetBatchReadConf, + OrcBatchReadConf orcBatchReadConf) { this( partition.table(), partition.taskGroup(), SnapshotUtil.schemaFor(partition.table(), partition.branch()), partition.expectedSchema(), partition.isCaseSensitive(), - batchSize, - pushedLimit); + parquetBatchReadConf, + orcBatchReadConf); } BatchDataReader( @@ -62,9 +67,9 @@ class BatchDataReader extends BaseBatchReader Schema tableSchema, Schema expectedSchema, boolean caseSensitive, - int size, - Integer pushedLimit) { - super(table, taskGroup, tableSchema, expectedSchema, caseSensitive, size, pushedLimit); + ParquetBatchReadConf parquetConf, + OrcBatchReadConf orcConf) { + super(table, taskGroup, tableSchema, expectedSchema, caseSensitive, parquetConf, orcConf); numSplits = taskGroup.tasks().size(); LOG.debug("Reading {} file split(s) for table {}", numSplits, table.name()); diff --git a/spark/v3.5/spark/src/main/java/org/apache/iceberg/spark/source/SparkBatch.java b/spark/v3.5/spark/src/main/java/org/apache/iceberg/spark/source/SparkBatch.java index debd61e4ebac..82e8fe87c33e 100644 --- a/spark/v3.5/spark/src/main/java/org/apache/iceberg/spark/source/SparkBatch.java +++ b/spark/v3.5/spark/src/main/java/org/apache/iceberg/spark/source/SparkBatch.java @@ -28,6 +28,10 @@ import org.apache.iceberg.Schema; import org.apache.iceberg.SchemaParser; import org.apache.iceberg.Table; +import org.apache.iceberg.spark.ImmutableOrcBatchReadConf; +import org.apache.iceberg.spark.ImmutableParquetBatchReadConf; +import org.apache.iceberg.spark.OrcBatchReadConf; +import org.apache.iceberg.spark.ParquetBatchReadConf; import org.apache.iceberg.spark.SparkReadConf; import org.apache.iceberg.spark.SparkUtil; import org.apache.iceberg.types.Types; @@ -128,18 +132,31 @@ private String[][] computePreferredLocations() { @Override public PartitionReaderFactory createReaderFactory() { if (useParquetBatchReads()) { - int batchSize = readConf.parquetBatchSize(); - return new SparkColumnarReaderFactory(batchSize, pushedLimit); + return new SparkColumnarReaderFactory(parquetBatchReadConf()); } else if (useOrcBatchReads()) { - int batchSize = readConf.orcBatchSize(); - return new SparkColumnarReaderFactory(batchSize, pushedLimit); + return new SparkColumnarReaderFactory(orcBatchReadConf()); } else { return new SparkRowReaderFactory(pushedLimit); } } + private ParquetBatchReadConf parquetBatchReadConf() { + ImmutableParquetBatchReadConf.Builder builder = + ImmutableParquetBatchReadConf.builder().batchSize(readConf.parquetBatchSize()); + + if (pushedLimit != null && pushedLimit >= 1) { + builder.limit(pushedLimit); + } + + return builder.build(); + } + + private OrcBatchReadConf orcBatchReadConf() { + return ImmutableOrcBatchReadConf.builder().batchSize(readConf.parquetBatchSize()).build(); + } + // conditions for using Parquet batch reads: // - Parquet vectorization is enabled // - only primitives or metadata columns are projected diff --git a/spark/v3.5/spark/src/main/java/org/apache/iceberg/spark/source/SparkColumnarReaderFactory.java b/spark/v3.5/spark/src/main/java/org/apache/iceberg/spark/source/SparkColumnarReaderFactory.java index 762a844db6ac..917204d4aed9 100644 --- a/spark/v3.5/spark/src/main/java/org/apache/iceberg/spark/source/SparkColumnarReaderFactory.java +++ b/spark/v3.5/spark/src/main/java/org/apache/iceberg/spark/source/SparkColumnarReaderFactory.java @@ -20,6 +20,8 @@ import org.apache.iceberg.FileScanTask; import org.apache.iceberg.relocated.com.google.common.base.Preconditions; +import org.apache.iceberg.spark.OrcBatchReadConf; +import org.apache.iceberg.spark.ParquetBatchReadConf; import org.apache.spark.sql.catalyst.InternalRow; import org.apache.spark.sql.connector.read.InputPartition; import org.apache.spark.sql.connector.read.PartitionReader; @@ -27,13 +29,17 @@ import org.apache.spark.sql.vectorized.ColumnarBatch; class SparkColumnarReaderFactory implements PartitionReaderFactory { - private final int batchSize; - private Integer pushedLimit; + private final ParquetBatchReadConf parquetConf; + private final OrcBatchReadConf orcConf; - SparkColumnarReaderFactory(int batchSize, Integer pushedLimit) { - Preconditions.checkArgument(batchSize > 1, "Batch size must be > 1"); - this.batchSize = batchSize; - this.pushedLimit = pushedLimit; + SparkColumnarReaderFactory(ParquetBatchReadConf conf) { + this.parquetConf = conf; + this.orcConf = null; + } + + SparkColumnarReaderFactory(OrcBatchReadConf conf) { + this.orcConf = conf; + this.parquetConf = null; } @Override @@ -51,7 +57,7 @@ public PartitionReader createColumnarReader(InputPartition inputP SparkInputPartition partition = (SparkInputPartition) inputPartition; if (partition.allTasksOfType(FileScanTask.class)) { - return new BatchDataReader(partition, batchSize, pushedLimit); + return new BatchDataReader(partition, parquetConf, orcConf); } else { throw new UnsupportedOperationException(