Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[SPARK-49502][CORE] Avoid NPE in SparkEnv.get.shuffleManager.unregisterShuffle #47977

Closed
wants to merge 2 commits into from

Conversation

cxzl25
Copy link
Contributor

@cxzl25 cxzl25 commented Sep 3, 2024

What changes were proposed in this pull request?

This PR aims to avoid NPE in SparkEnv.get.shuffleManager.unregisterShuffle.

Why are the changes needed?

After SPARK-45762, the shuffle manager is initialized after the block manager, which means that when the driver cleans up the shuffle, the shuffle manager may not have been initialized yet, causing NPE.

24/09/03 20:09:51,668 [dispatcher-Executor] INFO BlockManager: Initialized BlockManager: BlockManagerId(168, x, 25467, None)
24/09/03 20:09:51,684 [block-manager-storage-async-thread-pool-2] ERROR BlockManagerStorageEndpoint: Error in removing shuffle 29
java.lang.NullPointerException
        at org.apache.spark.storage.BlockManagerStorageEndpoint$$anonfun$receiveAndReply$1.$anonfun$applyOrElse$3(BlockManagerStorageEndpoint.scala:61)
        at scala.runtime.java8.JFunction0$mcZ$sp.apply(JFunction0$mcZ$sp.java:23)
        at org.apache.spark.storage.BlockManagerStorageEndpoint.$anonfun$doAsync$1(BlockManagerStorageEndpoint.scala:89)
        at scala.concurrent.Future$.$anonfun$apply$1(Future.scala:659)
        at scala.util.Success.$anonfun$map$1(Try.scala:255)
        at scala.util.Success.map(Try.scala:213)
        at scala.concurrent.Future.$anonfun$map$1(Future.scala:292)
        at scala.concurrent.impl.Promise.liftedTree1$1(Promise.scala:33)
        at scala.concurrent.impl.Promise.$anonfun$transform$1(Promise.scala:33)
        at scala.concurrent.impl.CallbackRunnable.run(Promise.scala:64)
        at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
        at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
        at java.lang.Thread.run(Thread.java:745)

Does this PR introduce any user-facing change?

No

How was this patch tested?

GA

Was this patch authored or co-authored using generative AI tooling?

No

@github-actions github-actions bot added the CORE label Sep 3, 2024
@mridulm
Copy link
Contributor

mridulm commented Sep 4, 2024

How is unregister shuffle getting called ? Shuffle registration requires SparkContext to have been initialized - which will include shuffle manager.

@cxzl25
Copy link
Contributor Author

cxzl25 commented Sep 4, 2024

How is unregister shuffle getting called ?

The Executor is added in the case of dynamic allocation. It first initializes the BlockManager and registers it with the Driver, and then initializes the ShuffleManager.

env.blockManager.initialize(conf.getAppId)

The Driver will send removeShuffle RPC to all registered BlockManagers. At this time, there are some newly added Executors that have not yet initialized the ShuffleManager.

val removeShuffleFromExecutorsFutures = blockManagerInfo.values.map { bm =>
bm.storageEndpoint.ask[Boolean](removeMsg).recover {
// use false as default value means no shuffle data were removed
handleBlockRemovalFailure("shuffle", shuffleId.toString, bm.blockManagerId, false)
}
}.toSeq

RemoveShuffle RPC has two code paths that will trigger.

def doCleanupShuffle(shuffleId: Int, blocking: Boolean): Unit = {
try {
if (mapOutputTrackerMaster.containsShuffle(shuffleId)) {
logDebug("Cleaning shuffle " + shuffleId)
// Shuffle must be removed before it's unregistered from the output tracker
// to find blocks served by the shuffle service on deallocated executors
shuffleDriverComponents.removeShuffle(shuffleId, blocking)

shuffleIds.foreach { shuffleId =>
queryExecution.shuffleCleanupMode match {
case RemoveShuffleFiles =>
// Same as what we do in ContextCleaner.doCleanupShuffle, but do not unregister
// the shuffle on MapOutputTracker, so that stage retries would be triggered.
// Set blocking to Utils.isTesting to deflake unit tests.
sc.shuffleDriverComponents.removeShuffle(shuffleId, Utils.isTesting)

@mridulm
Copy link
Contributor

mridulm commented Sep 4, 2024

Ah ! This is happening at the executor, not driver - thanks for clarifying.
Let me take a look at the detailed comment tomorrow (a bit late for me :-) )

if (shuffleManager != null) {
shuffleManager.unregisterShuffle(shuffleId)
} else {
true
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Shall we add a debug-level log here?

Copy link
Contributor

@mridulm mridulm left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Looks good to me.
Will leave it open for a bit, in case other reviewers of #43627 want to take a look as well.

@mridulm mridulm closed this in 3a4ea84 Sep 6, 2024
@mridulm
Copy link
Contributor

mridulm commented Sep 6, 2024

Merged to master.
Thanks for fixing this @cxzl25 !
Thanks for the review @yaooqinn :-)

IvanK-db pushed a commit to IvanK-db/spark that referenced this pull request Sep 20, 2024
…erShuffle

### What changes were proposed in this pull request?
This PR aims to avoid NPE in `SparkEnv.get.shuffleManager.unregisterShuffle`.

### Why are the changes needed?
After SPARK-45762, the shuffle manager is initialized after the block manager, which means that when the driver cleans up the shuffle, the shuffle manager may not have been initialized yet, causing NPE.

```
24/09/03 20:09:51,668 [dispatcher-Executor] INFO BlockManager: Initialized BlockManager: BlockManagerId(168, x, 25467, None)
24/09/03 20:09:51,684 [block-manager-storage-async-thread-pool-2] ERROR BlockManagerStorageEndpoint: Error in removing shuffle 29
java.lang.NullPointerException
        at org.apache.spark.storage.BlockManagerStorageEndpoint$$anonfun$receiveAndReply$1.$anonfun$applyOrElse$3(BlockManagerStorageEndpoint.scala:61)
        at scala.runtime.java8.JFunction0$mcZ$sp.apply(JFunction0$mcZ$sp.java:23)
        at org.apache.spark.storage.BlockManagerStorageEndpoint.$anonfun$doAsync$1(BlockManagerStorageEndpoint.scala:89)
        at scala.concurrent.Future$.$anonfun$apply$1(Future.scala:659)
        at scala.util.Success.$anonfun$map$1(Try.scala:255)
        at scala.util.Success.map(Try.scala:213)
        at scala.concurrent.Future.$anonfun$map$1(Future.scala:292)
        at scala.concurrent.impl.Promise.liftedTree1$1(Promise.scala:33)
        at scala.concurrent.impl.Promise.$anonfun$transform$1(Promise.scala:33)
        at scala.concurrent.impl.CallbackRunnable.run(Promise.scala:64)
        at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
        at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
        at java.lang.Thread.run(Thread.java:745)
```

### Does this PR introduce _any_ user-facing change?
No

### How was this patch tested?
GA

### Was this patch authored or co-authored using generative AI tooling?
No

Closes apache#47977 from cxzl25/SPARK-49502.

Authored-by: sychen <sychen@ctrip.com>
Signed-off-by: Mridul Muralidharan <mridul<at>gmail.com>
attilapiros pushed a commit to attilapiros/spark that referenced this pull request Oct 4, 2024
…erShuffle

### What changes were proposed in this pull request?
This PR aims to avoid NPE in `SparkEnv.get.shuffleManager.unregisterShuffle`.

### Why are the changes needed?
After SPARK-45762, the shuffle manager is initialized after the block manager, which means that when the driver cleans up the shuffle, the shuffle manager may not have been initialized yet, causing NPE.

```
24/09/03 20:09:51,668 [dispatcher-Executor] INFO BlockManager: Initialized BlockManager: BlockManagerId(168, x, 25467, None)
24/09/03 20:09:51,684 [block-manager-storage-async-thread-pool-2] ERROR BlockManagerStorageEndpoint: Error in removing shuffle 29
java.lang.NullPointerException
        at org.apache.spark.storage.BlockManagerStorageEndpoint$$anonfun$receiveAndReply$1.$anonfun$applyOrElse$3(BlockManagerStorageEndpoint.scala:61)
        at scala.runtime.java8.JFunction0$mcZ$sp.apply(JFunction0$mcZ$sp.java:23)
        at org.apache.spark.storage.BlockManagerStorageEndpoint.$anonfun$doAsync$1(BlockManagerStorageEndpoint.scala:89)
        at scala.concurrent.Future$.$anonfun$apply$1(Future.scala:659)
        at scala.util.Success.$anonfun$map$1(Try.scala:255)
        at scala.util.Success.map(Try.scala:213)
        at scala.concurrent.Future.$anonfun$map$1(Future.scala:292)
        at scala.concurrent.impl.Promise.liftedTree1$1(Promise.scala:33)
        at scala.concurrent.impl.Promise.$anonfun$transform$1(Promise.scala:33)
        at scala.concurrent.impl.CallbackRunnable.run(Promise.scala:64)
        at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
        at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
        at java.lang.Thread.run(Thread.java:745)
```

### Does this PR introduce _any_ user-facing change?
No

### How was this patch tested?
GA

### Was this patch authored or co-authored using generative AI tooling?
No

Closes apache#47977 from cxzl25/SPARK-49502.

Authored-by: sychen <sychen@ctrip.com>
Signed-off-by: Mridul Muralidharan <mridul<at>gmail.com>
himadripal pushed a commit to himadripal/spark that referenced this pull request Oct 19, 2024
…erShuffle

### What changes were proposed in this pull request?
This PR aims to avoid NPE in `SparkEnv.get.shuffleManager.unregisterShuffle`.

### Why are the changes needed?
After SPARK-45762, the shuffle manager is initialized after the block manager, which means that when the driver cleans up the shuffle, the shuffle manager may not have been initialized yet, causing NPE.

```
24/09/03 20:09:51,668 [dispatcher-Executor] INFO BlockManager: Initialized BlockManager: BlockManagerId(168, x, 25467, None)
24/09/03 20:09:51,684 [block-manager-storage-async-thread-pool-2] ERROR BlockManagerStorageEndpoint: Error in removing shuffle 29
java.lang.NullPointerException
        at org.apache.spark.storage.BlockManagerStorageEndpoint$$anonfun$receiveAndReply$1.$anonfun$applyOrElse$3(BlockManagerStorageEndpoint.scala:61)
        at scala.runtime.java8.JFunction0$mcZ$sp.apply(JFunction0$mcZ$sp.java:23)
        at org.apache.spark.storage.BlockManagerStorageEndpoint.$anonfun$doAsync$1(BlockManagerStorageEndpoint.scala:89)
        at scala.concurrent.Future$.$anonfun$apply$1(Future.scala:659)
        at scala.util.Success.$anonfun$map$1(Try.scala:255)
        at scala.util.Success.map(Try.scala:213)
        at scala.concurrent.Future.$anonfun$map$1(Future.scala:292)
        at scala.concurrent.impl.Promise.liftedTree1$1(Promise.scala:33)
        at scala.concurrent.impl.Promise.$anonfun$transform$1(Promise.scala:33)
        at scala.concurrent.impl.CallbackRunnable.run(Promise.scala:64)
        at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
        at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
        at java.lang.Thread.run(Thread.java:745)
```

### Does this PR introduce _any_ user-facing change?
No

### How was this patch tested?
GA

### Was this patch authored or co-authored using generative AI tooling?
No

Closes apache#47977 from cxzl25/SPARK-49502.

Authored-by: sychen <sychen@ctrip.com>
Signed-off-by: Mridul Muralidharan <mridul<at>gmail.com>
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
Projects
None yet
Development

Successfully merging this pull request may close these issues.

3 participants