Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[SPARK-49857][SQL] Add storageLevel to Dataset localCheckpoint API #48324

Closed
wants to merge 11 commits into from

Conversation

juliuszsompolski
Copy link
Contributor

@juliuszsompolski juliuszsompolski commented Oct 2, 2024

What changes were proposed in this pull request?

Currently, when running Dataset.localCheckpoint(eager = true), it is impossible to specify a non-default StorageLevel for the checkpoint. On the other hand it is possible with Dataset cache by using Dataset.persist(newLevel: StorageLevel).

If one wants to specify a non-default StorageLevel for localCheckpoint, it currently needs accessing the plan, changing the level, and then triggering an action to materialize checkpoint:

// start lazy
val checkpointDf = df.localCheckpoint(eager = false)
// fish out the RDD
val checkpointPlan = checkpointedSourcePlanDF.queryExecution.analyzed
val rdd = checkpointedPlan.asInstanceOf[LogicalRDD].rdd
// change the StorageLevel
rdd.persist(StorageLevel.DISK_ONLY)
// force materialization
checkpointDf
    .mapPartitions(_ => Iterator.empty.asInstanceOf[Iterator[Row]])
    .foreach((_: Row) => ())

There are several issues with this:

  1. Won't work with Connect as we don't have access to RDD internals
  2. Lazy checkpoint is not in fact lazy when AQE is involved. In order to get the RDD of a lazy checkpoint, AQE will actually trigger execution of all the query stages except the result stage in order to get the final plan. So the start lazy phase will already execute everything except the final stage, and then force materialization will only execute result stage. This is "unexpected" and makes it more difficult to debug, first showing a query with missing metrics for the final stage, and then another query that skipped everything and only ran final stage.

Having an API to specify storageLevel for localCheckpoint will help avoid such hacks. As a precedent, it is already possible to specify StorageLevel for Dataset cache by using Dataset.persist(newLevel: StorageLevel).

In this PR, I implement this API for scala and python, and classic and connect.

Why are the changes needed?

https://github.com/delta-io/delta/blob/master/spark/src/main/scala/org/apache/spark/sql/delta/commands/merge/MergeIntoMaterializeSource.scala in prepareMergeSource has to do hacks as described above to use localCheckpoint with non-default StorageLevel. It is hacky, and confusing that it then records two separate executions as described above.

Does this PR introduce any user-facing change?

Yes.
Adds API to pass storageLevel to Dataset localCheckpoint.

How was this patch tested?

Tests added.

Was this patch authored or co-authored using generative AI tooling?

Generated-by: Github Copilot
(trivial code completions)

def test_checkpoint_dataframe(self):
def test_local_checkpoint_dataframe(self):
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

note: there are no tests at all for reliable checkpoint in pyspark API. I renamed this test accordingly.

Comment on lines -53 to +54
test("checkpoint") {
test("localCheckpoint") {
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

note: there are no tests that test Connect reliable checkpoint. I renamed this test accordingly.

Comment on lines +60 to +61
// We don't have a way to reach into the server and assert the storage level server side, but
// this test should cover for unexpected errors in the API.
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@hvanhovell with the SQL API refactoring, would it be now possible to have tests that use a connect client to self-connect, and have server side objects (SparkContext) etc. available inside the test to verify? The existing SparkConnectServerTest can only test internal SparkConnectClient with the server, due to past namespace conflicts between server and client SparkSession APIs.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@juliuszsompolski it will take a few more PRs, but yeah that is the objective.

@juliuszsompolski
Copy link
Contributor Author

@HyukjinKwon @hvanhovell @cloud-fan @dongjoon-hyun
Would it be OK to add storageLevel arg to Dataset localCheckpoint API? It would simplify the use case in Delta MergeIntoMaterializeSource that I describe, and there is a precedent with persist having a storageLevel arg.

Julek Sompolski added 2 commits October 3, 2024 14:53
@HyukjinKwon HyukjinKwon changed the title [SPARK-49857] Add storageLevel to Dataset localCheckpoint API [SPARK-49857][CORE] Add storageLevel to Dataset localCheckpoint API Oct 3, 2024
Copy link
Member

@dongjoon-hyun dongjoon-hyun left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thank you.

Could you make CI happy, @juliuszsompolski ?

./python/pyspark/sql/connect/dataframe.py:2184:63: F821 undefined name 'storage_level'
            cmd.storage_level.CopyFrom(storage_level_to_proto(storage_level))
                                                              ^
1     F821 undefined name 'storage_level'

@juliuszsompolski juliuszsompolski changed the title [SPARK-49857][CORE] Add storageLevel to Dataset localCheckpoint API [SPARK-49857][SQL] Add storageLevel to Dataset localCheckpoint API Oct 7, 2024
@juliuszsompolski
Copy link
Contributor Author

Starting test(python3.11): pyspark.ml.tests.connect.test_parity_torch_data_loader (temp output: /__w/apache-spark/apache-spark/python/target/59e82264-3602-4d7c-a002-35841701fa06/python3.11__pyspark.ml.tests.connect.test_parity_torch_data_loader__feru625e.log)
Error: The operation was canceled.

This has been hanging and timing out and never passed on my every Spark PR in the last month...

* @group basic
* @since 4.0.0
*/
def localCheckpoint(eager: Boolean, storageLevel: Option[StorageLevel]): Dataset[T] =
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

an API question: since we already have an overload without the storageLevel parameter, should this overload use storageLevel: StorageLevel? PySpark does not have this issue as it only has one method without overloads.

cc @HyukjinKwon @hvanhovell

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Looking at precedent of

def repartition(numPartitions: Int)
def repartition(numPartitions: Int, partitionExprs: Column*)
def repartition(partitionExprs: Column*)
def repartitionByRange(numPartitions: Int, partitionExprs: Column*)
def repartitionByRange(partitionExprs: Column*)

calling into

  protected def repartitionByExpression(
      numPartitions: Option[Int],
      partitionExprs: Seq[Column]): Dataset[T]
protected def repartitionByRange(
      numPartitions: Option[Int],
      partitionExprs: Seq[Column]): Dataset[T]

I think I should remove Option from the public API here. Let me do this.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I suppose that Option is also very bad for Java API compatibility... so no Option definitely.

@cloud-fan
Copy link
Contributor

@juliuszsompolski can you re-trigger the failed GA job?

@juliuszsompolski
Copy link
Contributor Author

juliuszsompolski commented Oct 9, 2024

@cloud-fan I retriggered. But this test has been hanging every single time on all my recent Spark PRs:

Starting test(python3.11): pyspark.ml.tests.connect.test_parity_torch_data_loader (temp output: /__w/apache-spark/apache-spark/python/target/5559b0e3-8145-4cc6-bf11-3236e444d49f/python3.11__pyspark.ml.tests.connect.test_parity_torch_data_loader__ryalth7p.log)
Error: The operation was canceled.

I haven't got it to pass on any PR CI, but it does seem to pass on master CI after PRs where it was failing merged...

@cloud-fan
Copy link
Contributor

I see, I think it's definitely unrelated, thanks, merging to master!

@cloud-fan cloud-fan closed this in b1ff767 Oct 9, 2024
@juliuszsompolski
Copy link
Contributor Author

It is somewhat dangerous, because because of the hang and timeout it doesn't run the rest of ['pyspark-mllib', 'pyspark-ml-connect', 'pyspark-ml'] tests, but these tests should all not be relevant, and pyspark-connect and pyspark-sql modules that run the tests for local checkpoint ran successfully.

himadripal pushed a commit to himadripal/spark that referenced this pull request Oct 19, 2024
### What changes were proposed in this pull request?

Currently, when running `Dataset.localCheckpoint(eager = true)`, it is impossible to specify a non-default StorageLevel for the checkpoint. On the other hand it is possible with Dataset cache by using `Dataset.persist(newLevel: StorageLevel)`.

If one wants to specify a non-default StorageLevel for localCheckpoint, it currently needs accessing the plan, changing the level, and then triggering an action to materialize checkpoint:
```
// start lazy
val checkpointDf = df.localCheckpoint(eager = false)
// fish out the RDD
val checkpointPlan = checkpointedSourcePlanDF.queryExecution.analyzed
val rdd = checkpointedPlan.asInstanceOf[LogicalRDD].rdd
// change the StorageLevel
rdd.persist(StorageLevel.DISK_ONLY)
// force materialization
checkpointDf
    .mapPartitions(_ => Iterator.empty.asInstanceOf[Iterator[Row]])
    .foreach((_: Row) => ())
```

There are several issues with this:
1. Won't work with Connect as we don't have access to RDD internals
2. Lazy checkpoint is not in fact lazy when AQE is involved. In order to get the RDD of a lazy checkpoint, AQE will actually trigger execution of all the query stages except the result stage in order to get the final plan. So the `start lazy` phase will already execute everything except the final stage, and then `force materialization` will only execute result stage. This is "unexpected" and makes it more difficult to debug, first showing a query with missing metrics for the final stage, and then another query that skipped everything and only ran final stage.

Having an API to specify storageLevel for localCheckpoint will help avoid such hacks. As a precedent, it is already possible to specify StorageLevel for Dataset cache by using `Dataset.persist(newLevel: StorageLevel)`.

In this PR, I implement this API for scala and python, and classic and connect.

### Why are the changes needed?

https://github.com/delta-io/delta/blob/master/spark/src/main/scala/org/apache/spark/sql/delta/commands/merge/MergeIntoMaterializeSource.scala in `prepareMergeSource` has to do hacks as described above to use localCheckpoint with non-default StorageLevel. It is hacky, and confusing that it then records two separate executions as described above.

### Does this PR introduce _any_ user-facing change?

Yes.
Adds API to pass `storageLevel` to Dataset `localCheckpoint`.

### How was this patch tested?

Tests added.

### Was this patch authored or co-authored using generative AI tooling?

Generated-by: Github Copilot
(trivial code completions)

Closes apache#48324 from juliuszsompolski/SPARK-49857.

Authored-by: Julek Sompolski <Juliusz Sompolski>
Signed-off-by: Wenchen Fan <wenchen@databricks.com>
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Projects
None yet
Development

Successfully merging this pull request may close these issues.

5 participants