Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Spark partial limit push down #10943

Open
wants to merge 8 commits into
base: main
Choose a base branch
from
Open

Spark partial limit push down #10943

wants to merge 8 commits into from

Conversation

huaxingao
Copy link
Contributor

@huaxingao huaxingao commented Aug 14, 2024

For sql such as SELECT * FROM table LIMIT n, push down Spark's partial limit to Iceberg, so that Iceberg can stop reading data once the limit is reached.

@huaxingao
Copy link
Contributor Author

@@ -1151,6 +1152,11 @@ public ReadBuilder withAADPrefix(ByteBuffer aadPrefix) {
return this;
}

public ReadBuilder pushedlimit(int limit) {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Should we assert this is greater than 1? I assume and input 0 or negative is a bad call

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think we are OK because Spark has assert for limit. If the limit is negative, e.g. SELECT * FROM table limit -2, Spark will throw

org.apache.spark.sql.AnalysisException: [INVALID_LIMIT_LIKE_EXPRESSION.IS_NEGATIVE] The limit like expression "-2" is invalid. The limit expression must be equal to or greater than 0, but got -2.;

If the limit is 0, e.g. SELECT * FROM table limit 0, Spark changes this to an empty table scan and it won't reach here.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is public though, and anyone can call it so we can't rely on Spark to cover us.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Right, I forgot that. I've added a check to throw an IllegalArgumentException if the limit is <= 0

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Usually we do Preconditions.checkArgument
for things like this

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Changed to Preconditions.checkArgument. Thanks

advance();
int expectedBatchSize;
if (numValsToRead < 0) {
throw new IllegalStateException("numValsToRead has invalid value");
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

"Cannot X (because Y) (recover by Z)" - > "Cannot read a negative number of values. numValsToRead = %D"

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Changed. Thanks

}

private boolean hasDeletes(org.apache.iceberg.Scan scan) {
try (CloseableIterable<FileScanTask> fileScanTasks = scan.planFiles()) {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We are now potentially scanning the file scan tasks several times before beginning a query,

  1. Push aggregates
  2. Has Deletes
  3. Actually creating tasks

I'm wondering if we should be caching this

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Probably save this for another PR

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I will have a follow-up PR for this.

Schema expectedSchema,
List<Expression> filters,
Supplier<ScanReport> scanReportSupplier,
Integer pushedLimit) {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'm wondering if we should include this in SparkReadConf rather than having a separate argument. I have similar thoughts around SparkInputPartition. I'm not a big fan of having to plumb the new arguments all the way through the code base but those two options may not look great either since they aren't a great fit imho.

Ideally I think I would want something like SparkReadContext but I don't know how often more things like this will come up

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks @RussellSpitzer for sharing your concern and suggestion!
I think we can borrow @aokolnychyi 's idea of adding ParquetBatchReadConf and OrcBatchReadConf. We can have something like

@Value.Immutable
public interface ParquetBatchReadConf extends Serializable {
  ParquetReaderType readerType();  // this is for comet, we don't need this for now
  int batchSize();
  @Nullable
  Integer limit();
}

@Value.Immutable
public interface OrcBatchReadConf extends Serializable {
  int batchSize();
}

Similarly, we can also have ParquetRowReadConf and OrcRowReadConf.

I have changed the code to add ParquetBatchReadConf and OrcBatchReadConf. We still need to pass pushedLimit to the SparkPartitioningAwareScan, SparkScan and SparkBatch constructors, so pushedLimit can be passed from SparkScanBuilder to SparkBatch, this is because pushedLimit is not available in SparkReadConf, we have to call SparkScanBuilder.pushLimit to get pushedLimit.

Please let me know if this approach looks OK to you. Thanks!

@github-actions github-actions bot added the build label Sep 28, 2024
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Projects
None yet
Development

Successfully merging this pull request may close these issues.

3 participants