-
Notifications
You must be signed in to change notification settings - Fork 28.3k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
[SPARK-49502][CORE] Avoid NPE in SparkEnv.get.shuffleManager.unregisterShuffle #47977
Conversation
How is unregister shuffle getting called ? Shuffle registration requires SparkContext to have been initialized - which will include shuffle manager. |
The Executor is added in the case of dynamic allocation. It first initializes the BlockManager and registers it with the Driver, and then initializes the ShuffleManager.
The Driver will send removeShuffle RPC to all registered BlockManagers. At this time, there are some newly added Executors that have not yet initialized the ShuffleManager. spark/core/src/main/scala/org/apache/spark/storage/BlockManagerMasterEndpoint.scala Lines 469 to 474 in 339d1c9
spark/core/src/main/scala/org/apache/spark/ContextCleaner.scala Lines 235 to 241 in 339d1c9
spark/sql/core/src/main/scala/org/apache/spark/sql/execution/SQLExecution.scala Lines 177 to 183 in 339d1c9
|
Ah ! This is happening at the executor, not driver - thanks for clarifying. |
if (shuffleManager != null) { | ||
shuffleManager.unregisterShuffle(shuffleId) | ||
} else { | ||
true |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Shall we add a debug-level log here?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Looks good to me.
Will leave it open for a bit, in case other reviewers of #43627 want to take a look as well.
…erShuffle ### What changes were proposed in this pull request? This PR aims to avoid NPE in `SparkEnv.get.shuffleManager.unregisterShuffle`. ### Why are the changes needed? After SPARK-45762, the shuffle manager is initialized after the block manager, which means that when the driver cleans up the shuffle, the shuffle manager may not have been initialized yet, causing NPE. ``` 24/09/03 20:09:51,668 [dispatcher-Executor] INFO BlockManager: Initialized BlockManager: BlockManagerId(168, x, 25467, None) 24/09/03 20:09:51,684 [block-manager-storage-async-thread-pool-2] ERROR BlockManagerStorageEndpoint: Error in removing shuffle 29 java.lang.NullPointerException at org.apache.spark.storage.BlockManagerStorageEndpoint$$anonfun$receiveAndReply$1.$anonfun$applyOrElse$3(BlockManagerStorageEndpoint.scala:61) at scala.runtime.java8.JFunction0$mcZ$sp.apply(JFunction0$mcZ$sp.java:23) at org.apache.spark.storage.BlockManagerStorageEndpoint.$anonfun$doAsync$1(BlockManagerStorageEndpoint.scala:89) at scala.concurrent.Future$.$anonfun$apply$1(Future.scala:659) at scala.util.Success.$anonfun$map$1(Try.scala:255) at scala.util.Success.map(Try.scala:213) at scala.concurrent.Future.$anonfun$map$1(Future.scala:292) at scala.concurrent.impl.Promise.liftedTree1$1(Promise.scala:33) at scala.concurrent.impl.Promise.$anonfun$transform$1(Promise.scala:33) at scala.concurrent.impl.CallbackRunnable.run(Promise.scala:64) at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142) at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617) at java.lang.Thread.run(Thread.java:745) ``` ### Does this PR introduce _any_ user-facing change? No ### How was this patch tested? GA ### Was this patch authored or co-authored using generative AI tooling? No Closes apache#47977 from cxzl25/SPARK-49502. Authored-by: sychen <sychen@ctrip.com> Signed-off-by: Mridul Muralidharan <mridul<at>gmail.com>
…erShuffle ### What changes were proposed in this pull request? This PR aims to avoid NPE in `SparkEnv.get.shuffleManager.unregisterShuffle`. ### Why are the changes needed? After SPARK-45762, the shuffle manager is initialized after the block manager, which means that when the driver cleans up the shuffle, the shuffle manager may not have been initialized yet, causing NPE. ``` 24/09/03 20:09:51,668 [dispatcher-Executor] INFO BlockManager: Initialized BlockManager: BlockManagerId(168, x, 25467, None) 24/09/03 20:09:51,684 [block-manager-storage-async-thread-pool-2] ERROR BlockManagerStorageEndpoint: Error in removing shuffle 29 java.lang.NullPointerException at org.apache.spark.storage.BlockManagerStorageEndpoint$$anonfun$receiveAndReply$1.$anonfun$applyOrElse$3(BlockManagerStorageEndpoint.scala:61) at scala.runtime.java8.JFunction0$mcZ$sp.apply(JFunction0$mcZ$sp.java:23) at org.apache.spark.storage.BlockManagerStorageEndpoint.$anonfun$doAsync$1(BlockManagerStorageEndpoint.scala:89) at scala.concurrent.Future$.$anonfun$apply$1(Future.scala:659) at scala.util.Success.$anonfun$map$1(Try.scala:255) at scala.util.Success.map(Try.scala:213) at scala.concurrent.Future.$anonfun$map$1(Future.scala:292) at scala.concurrent.impl.Promise.liftedTree1$1(Promise.scala:33) at scala.concurrent.impl.Promise.$anonfun$transform$1(Promise.scala:33) at scala.concurrent.impl.CallbackRunnable.run(Promise.scala:64) at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142) at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617) at java.lang.Thread.run(Thread.java:745) ``` ### Does this PR introduce _any_ user-facing change? No ### How was this patch tested? GA ### Was this patch authored or co-authored using generative AI tooling? No Closes apache#47977 from cxzl25/SPARK-49502. Authored-by: sychen <sychen@ctrip.com> Signed-off-by: Mridul Muralidharan <mridul<at>gmail.com>
…erShuffle ### What changes were proposed in this pull request? This PR aims to avoid NPE in `SparkEnv.get.shuffleManager.unregisterShuffle`. ### Why are the changes needed? After SPARK-45762, the shuffle manager is initialized after the block manager, which means that when the driver cleans up the shuffle, the shuffle manager may not have been initialized yet, causing NPE. ``` 24/09/03 20:09:51,668 [dispatcher-Executor] INFO BlockManager: Initialized BlockManager: BlockManagerId(168, x, 25467, None) 24/09/03 20:09:51,684 [block-manager-storage-async-thread-pool-2] ERROR BlockManagerStorageEndpoint: Error in removing shuffle 29 java.lang.NullPointerException at org.apache.spark.storage.BlockManagerStorageEndpoint$$anonfun$receiveAndReply$1.$anonfun$applyOrElse$3(BlockManagerStorageEndpoint.scala:61) at scala.runtime.java8.JFunction0$mcZ$sp.apply(JFunction0$mcZ$sp.java:23) at org.apache.spark.storage.BlockManagerStorageEndpoint.$anonfun$doAsync$1(BlockManagerStorageEndpoint.scala:89) at scala.concurrent.Future$.$anonfun$apply$1(Future.scala:659) at scala.util.Success.$anonfun$map$1(Try.scala:255) at scala.util.Success.map(Try.scala:213) at scala.concurrent.Future.$anonfun$map$1(Future.scala:292) at scala.concurrent.impl.Promise.liftedTree1$1(Promise.scala:33) at scala.concurrent.impl.Promise.$anonfun$transform$1(Promise.scala:33) at scala.concurrent.impl.CallbackRunnable.run(Promise.scala:64) at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142) at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617) at java.lang.Thread.run(Thread.java:745) ``` ### Does this PR introduce _any_ user-facing change? No ### How was this patch tested? GA ### Was this patch authored or co-authored using generative AI tooling? No Closes apache#47977 from cxzl25/SPARK-49502. Authored-by: sychen <sychen@ctrip.com> Signed-off-by: Mridul Muralidharan <mridul<at>gmail.com>
What changes were proposed in this pull request?
This PR aims to avoid NPE in
SparkEnv.get.shuffleManager.unregisterShuffle
.Why are the changes needed?
After SPARK-45762, the shuffle manager is initialized after the block manager, which means that when the driver cleans up the shuffle, the shuffle manager may not have been initialized yet, causing NPE.
Does this PR introduce any user-facing change?
No
How was this patch tested?
GA
Was this patch authored or co-authored using generative AI tooling?
No