-
Notifications
You must be signed in to change notification settings - Fork 28.5k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
[SPARK-49857][SQL] Add storageLevel to Dataset localCheckpoint API #48324
Conversation
def test_checkpoint_dataframe(self): | ||
def test_local_checkpoint_dataframe(self): |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
note: there are no tests at all for reliable checkpoint in pyspark API. I renamed this test accordingly.
test("checkpoint") { | ||
test("localCheckpoint") { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
note: there are no tests that test Connect reliable checkpoint. I renamed this test accordingly.
// We don't have a way to reach into the server and assert the storage level server side, but | ||
// this test should cover for unexpected errors in the API. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@hvanhovell with the SQL API refactoring, would it be now possible to have tests that use a connect client to self-connect, and have server side objects (SparkContext) etc. available inside the test to verify? The existing SparkConnectServerTest
can only test internal SparkConnectClient with the server, due to past namespace conflicts between server and client SparkSession APIs.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@juliuszsompolski it will take a few more PRs, but yeah that is the objective.
@HyukjinKwon @hvanhovell @cloud-fan @dongjoon-hyun |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thank you.
Could you make CI happy, @juliuszsompolski ?
./python/pyspark/sql/connect/dataframe.py:2184:63: F821 undefined name 'storage_level'
cmd.storage_level.CopyFrom(storage_level_to_proto(storage_level))
^
1 F821 undefined name 'storage_level'
This has been hanging and timing out and never passed on my every Spark PR in the last month... |
* @group basic | ||
* @since 4.0.0 | ||
*/ | ||
def localCheckpoint(eager: Boolean, storageLevel: Option[StorageLevel]): Dataset[T] = |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
an API question: since we already have an overload without the storageLevel
parameter, should this overload use storageLevel: StorageLevel
? PySpark does not have this issue as it only has one method without overloads.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Looking at precedent of
def repartition(numPartitions: Int)
def repartition(numPartitions: Int, partitionExprs: Column*)
def repartition(partitionExprs: Column*)
def repartitionByRange(numPartitions: Int, partitionExprs: Column*)
def repartitionByRange(partitionExprs: Column*)
calling into
protected def repartitionByExpression(
numPartitions: Option[Int],
partitionExprs: Seq[Column]): Dataset[T]
protected def repartitionByRange(
numPartitions: Option[Int],
partitionExprs: Seq[Column]): Dataset[T]
I think I should remove Option from the public API here. Let me do this.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I suppose that Option is also very bad for Java API compatibility... so no Option definitely.
@juliuszsompolski can you re-trigger the failed GA job? |
@cloud-fan I retriggered. But this test has been hanging every single time on all my recent Spark PRs:
I haven't got it to pass on any PR CI, but it does seem to pass on master CI after PRs where it was failing merged... |
I see, I think it's definitely unrelated, thanks, merging to master! |
It is somewhat dangerous, because because of the hang and timeout it doesn't run the rest of |
### What changes were proposed in this pull request? Currently, when running `Dataset.localCheckpoint(eager = true)`, it is impossible to specify a non-default StorageLevel for the checkpoint. On the other hand it is possible with Dataset cache by using `Dataset.persist(newLevel: StorageLevel)`. If one wants to specify a non-default StorageLevel for localCheckpoint, it currently needs accessing the plan, changing the level, and then triggering an action to materialize checkpoint: ``` // start lazy val checkpointDf = df.localCheckpoint(eager = false) // fish out the RDD val checkpointPlan = checkpointedSourcePlanDF.queryExecution.analyzed val rdd = checkpointedPlan.asInstanceOf[LogicalRDD].rdd // change the StorageLevel rdd.persist(StorageLevel.DISK_ONLY) // force materialization checkpointDf .mapPartitions(_ => Iterator.empty.asInstanceOf[Iterator[Row]]) .foreach((_: Row) => ()) ``` There are several issues with this: 1. Won't work with Connect as we don't have access to RDD internals 2. Lazy checkpoint is not in fact lazy when AQE is involved. In order to get the RDD of a lazy checkpoint, AQE will actually trigger execution of all the query stages except the result stage in order to get the final plan. So the `start lazy` phase will already execute everything except the final stage, and then `force materialization` will only execute result stage. This is "unexpected" and makes it more difficult to debug, first showing a query with missing metrics for the final stage, and then another query that skipped everything and only ran final stage. Having an API to specify storageLevel for localCheckpoint will help avoid such hacks. As a precedent, it is already possible to specify StorageLevel for Dataset cache by using `Dataset.persist(newLevel: StorageLevel)`. In this PR, I implement this API for scala and python, and classic and connect. ### Why are the changes needed? https://github.com/delta-io/delta/blob/master/spark/src/main/scala/org/apache/spark/sql/delta/commands/merge/MergeIntoMaterializeSource.scala in `prepareMergeSource` has to do hacks as described above to use localCheckpoint with non-default StorageLevel. It is hacky, and confusing that it then records two separate executions as described above. ### Does this PR introduce _any_ user-facing change? Yes. Adds API to pass `storageLevel` to Dataset `localCheckpoint`. ### How was this patch tested? Tests added. ### Was this patch authored or co-authored using generative AI tooling? Generated-by: Github Copilot (trivial code completions) Closes apache#48324 from juliuszsompolski/SPARK-49857. Authored-by: Julek Sompolski <Juliusz Sompolski> Signed-off-by: Wenchen Fan <wenchen@databricks.com>
What changes were proposed in this pull request?
Currently, when running
Dataset.localCheckpoint(eager = true)
, it is impossible to specify a non-default StorageLevel for the checkpoint. On the other hand it is possible with Dataset cache by usingDataset.persist(newLevel: StorageLevel)
.If one wants to specify a non-default StorageLevel for localCheckpoint, it currently needs accessing the plan, changing the level, and then triggering an action to materialize checkpoint:
There are several issues with this:
start lazy
phase will already execute everything except the final stage, and thenforce materialization
will only execute result stage. This is "unexpected" and makes it more difficult to debug, first showing a query with missing metrics for the final stage, and then another query that skipped everything and only ran final stage.Having an API to specify storageLevel for localCheckpoint will help avoid such hacks. As a precedent, it is already possible to specify StorageLevel for Dataset cache by using
Dataset.persist(newLevel: StorageLevel)
.In this PR, I implement this API for scala and python, and classic and connect.
Why are the changes needed?
https://github.com/delta-io/delta/blob/master/spark/src/main/scala/org/apache/spark/sql/delta/commands/merge/MergeIntoMaterializeSource.scala in
prepareMergeSource
has to do hacks as described above to use localCheckpoint with non-default StorageLevel. It is hacky, and confusing that it then records two separate executions as described above.Does this PR introduce any user-facing change?
Yes.
Adds API to pass
storageLevel
to DatasetlocalCheckpoint
.How was this patch tested?
Tests added.
Was this patch authored or co-authored using generative AI tooling?
Generated-by: Github Copilot
(trivial code completions)