-
Notifications
You must be signed in to change notification settings - Fork 2.2k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Spark partial limit push down #10943
base: main
Are you sure you want to change the base?
Conversation
arrow/src/main/java/org/apache/iceberg/arrow/vectorized/parquet/VectorizedColumnIterator.java
Show resolved
Hide resolved
@@ -1151,6 +1152,11 @@ public ReadBuilder withAADPrefix(ByteBuffer aadPrefix) { | |||
return this; | |||
} | |||
|
|||
public ReadBuilder pushedlimit(int limit) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Should we assert this is greater than 1? I assume and input 0 or negative is a bad call
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think we are OK because Spark has assert for limit. If the limit is negative, e.g. SELECT * FROM table limit -2
, Spark will throw
org.apache.spark.sql.AnalysisException: [INVALID_LIMIT_LIKE_EXPRESSION.IS_NEGATIVE] The limit like expression "-2" is invalid. The limit expression must be equal to or greater than 0, but got -2.;
If the limit is 0, e.g. SELECT * FROM table limit 0
, Spark changes this to an empty table scan and it won't reach here.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This is public though, and anyone can call it so we can't rely on Spark to cover us.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Right, I forgot that. I've added a check to throw an IllegalArgumentException
if the limit is <= 0
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Usually we do Preconditions.checkArgument
for things like this
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Changed to Preconditions.checkArgument
. Thanks
arrow/src/main/java/org/apache/iceberg/arrow/vectorized/parquet/VectorizedColumnIterator.java
Outdated
Show resolved
Hide resolved
parquet/src/main/java/org/apache/iceberg/parquet/ParquetReader.java
Outdated
Show resolved
Hide resolved
parquet/src/main/java/org/apache/iceberg/parquet/VectorizedParquetReader.java
Outdated
Show resolved
Hide resolved
parquet/src/main/java/org/apache/iceberg/parquet/VectorizedParquetReader.java
Outdated
Show resolved
Hide resolved
spark/v3.5/spark/src/main/java/org/apache/iceberg/spark/SparkSQLProperties.java
Show resolved
Hide resolved
arrow/src/main/java/org/apache/iceberg/arrow/vectorized/parquet/VectorizedColumnIterator.java
Outdated
Show resolved
Hide resolved
spark/v3.5/spark/src/main/java/org/apache/iceberg/spark/source/SparkScanBuilder.java
Outdated
Show resolved
Hide resolved
advance(); | ||
int expectedBatchSize; | ||
if (numValsToRead < 0) { | ||
throw new IllegalStateException("numValsToRead has invalid value"); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
"Cannot X (because Y) (recover by Z)" - > "Cannot read a negative number of values. numValsToRead = %D"
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Changed. Thanks
parquet/src/main/java/org/apache/iceberg/parquet/VectorizedParquetReader.java
Show resolved
Hide resolved
} | ||
|
||
private boolean hasDeletes(org.apache.iceberg.Scan scan) { | ||
try (CloseableIterable<FileScanTask> fileScanTasks = scan.planFiles()) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
We are now potentially scanning the file scan tasks several times before beginning a query,
- Push aggregates
- Has Deletes
- Actually creating tasks
I'm wondering if we should be caching this
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Probably save this for another PR
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I will have a follow-up PR for this.
Schema expectedSchema, | ||
List<Expression> filters, | ||
Supplier<ScanReport> scanReportSupplier, | ||
Integer pushedLimit) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I'm wondering if we should include this in SparkReadConf rather than having a separate argument. I have similar thoughts around SparkInputPartition. I'm not a big fan of having to plumb the new arguments all the way through the code base but those two options may not look great either since they aren't a great fit imho.
Ideally I think I would want something like SparkReadContext but I don't know how often more things like this will come up
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thanks @RussellSpitzer for sharing your concern and suggestion!
I think we can borrow @aokolnychyi 's idea of adding ParquetBatchReadConf
and OrcBatchReadConf
. We can have something like
@Value.Immutable
public interface ParquetBatchReadConf extends Serializable {
ParquetReaderType readerType(); // this is for comet, we don't need this for now
int batchSize();
@Nullable
Integer limit();
}
@Value.Immutable
public interface OrcBatchReadConf extends Serializable {
int batchSize();
}
Similarly, we can also have ParquetRowReadConf
and OrcRowReadConf
.
I have changed the code to add ParquetBatchReadConf
and OrcBatchReadConf
. We still need to pass pushedLimit
to the SparkPartitioningAwareScan
, SparkScan
and SparkBatch
constructors, so pushedLimit
can be passed from SparkScanBuilder
to SparkBatch
, this is because pushedLimit
is not available in SparkReadConf
, we have to call SparkScanBuilder
.pushLimit
to get pushedLimit
.
Please let me know if this approach looks OK to you. Thanks!
For sql such as
SELECT * FROM table LIMIT n
, push down Spark's partial limit to Iceberg, so that Iceberg can stop reading data once the limit is reached.