Skip to content

Commit

Permalink
[SPARK-40490][YARN][TESTS][3.3] Ensure YarnShuffleIntegrationSuite te…
Browse files Browse the repository at this point in the history
…sts registeredExecFile reload scenarios

### What changes were proposed in this pull request?
After SPARK-17321, `YarnShuffleService` will persist data to local shuffle state db/reload data from local shuffle state db only when Yarn NodeManager start with `YarnConfiguration#NM_RECOVERY_ENABLED = true`.

`YarnShuffleIntegrationSuite` not set `YarnConfiguration#NM_RECOVERY_ENABLED` and the default value of the configuration is false,  so `YarnShuffleIntegrationSuite` will neither trigger data persistence to the db nor verify the reload of data.

This pr aims to let `YarnShuffleIntegrationSuite` restart the verification of registeredExecFile reload scenarios, to achieve this goal, this pr make the following changes:

1. Add a new un-document configuration `spark.yarn.shuffle.testing` to `YarnShuffleService`, and Initialize `_recoveryPath` when `_recoveryPath == null && spark.yarn.shuffle.testing == true`.

2. Only set `spark.yarn.shuffle.testing = true` in `YarnShuffleIntegrationSuite`, and add assertions to check `registeredExecFile` is not null to ensure that registeredExecFile reload scenarios will be verified.

### Why are the changes needed?
Fix registeredExecFile reload  test scenarios.

Why not test by configuring `YarnConfiguration#NM_RECOVERY_ENABLED` as true?

This configuration has been tried

**Hadoop 3.3.2**

```
build/mvn clean install -pl resource-managers/yarn -Pyarn -Dtest=none -DwildcardSuites=org.apache.spark.deploy.yarn.YarnShuffleIntegrationSuite -Phadoop-3
```

```
YarnShuffleIntegrationSuite:
*** RUN ABORTED ***
  java.lang.NoClassDefFoundError: org/apache/hadoop/shaded/org/iq80/leveldb/DBException
  at org.apache.hadoop.yarn.server.nodemanager.NodeManager.initAndStartRecoveryStore(NodeManager.java:313)
  at org.apache.hadoop.yarn.server.nodemanager.NodeManager.serviceInit(NodeManager.java:370)
  at org.apache.hadoop.service.AbstractService.init(AbstractService.java:164)
  at org.apache.hadoop.yarn.server.MiniYARNCluster$NodeManagerWrapper.serviceInit(MiniYARNCluster.java:597)
  at org.apache.hadoop.service.AbstractService.init(AbstractService.java:164)
  at org.apache.hadoop.service.CompositeService.serviceInit(CompositeService.java:109)
  at org.apache.hadoop.yarn.server.MiniYARNCluster.serviceInit(MiniYARNCluster.java:327)
  at org.apache.hadoop.service.AbstractService.init(AbstractService.java:164)
  at org.apache.spark.deploy.yarn.BaseYarnClusterSuite.beforeAll(BaseYarnClusterSuite.scala:111)
  at org.scalatest.BeforeAndAfterAll.liftedTree1$1(BeforeAndAfterAll.scala:212)
  ...
  Cause: java.lang.ClassNotFoundException: org.apache.hadoop.shaded.org.iq80.leveldb.DBException
  at java.net.URLClassLoader.findClass(URLClassLoader.java:387)
  at java.lang.ClassLoader.loadClass(ClassLoader.java:419)
  at sun.misc.Launcher$AppClassLoader.loadClass(Launcher.java:352)
  at java.lang.ClassLoader.loadClass(ClassLoader.java:352)
  at org.apache.hadoop.yarn.server.nodemanager.NodeManager.initAndStartRecoveryStore(NodeManager.java:313)
  at org.apache.hadoop.yarn.server.nodemanager.NodeManager.serviceInit(NodeManager.java:370)
  at org.apache.hadoop.service.AbstractService.init(AbstractService.java:164)
  at org.apache.hadoop.yarn.server.MiniYARNCluster$NodeManagerWrapper.serviceInit(MiniYARNCluster.java:597)
  at org.apache.hadoop.service.AbstractService.init(AbstractService.java:164)
  at org.apache.hadoop.service.CompositeService.serviceInit(CompositeService.java:109)
```

**Hadoop 2.7.4**

```
build/mvn clean install -pl resource-managers/yarn -Pyarn -Dtest=none -DwildcardSuites=org.apache.spark.deploy.yarn.YarnShuffleIntegrationSuite -Phadoop-2
```

```
YarnShuffleIntegrationSuite:
org.apache.spark.deploy.yarn.YarnShuffleIntegrationSuite *** ABORTED ***
  java.lang.IllegalArgumentException: Cannot support recovery with an ephemeral server port. Check the setting of yarn.nodemanager.address
  at org.apache.hadoop.yarn.server.nodemanager.containermanager.ContainerManagerImpl.serviceStart(ContainerManagerImpl.java:395)
  at org.apache.hadoop.service.AbstractService.start(AbstractService.java:193)
  at org.apache.hadoop.service.CompositeService.serviceStart(CompositeService.java:120)
  at org.apache.hadoop.yarn.server.nodemanager.NodeManager.serviceStart(NodeManager.java:272)
  at org.apache.hadoop.service.AbstractService.start(AbstractService.java:193)
  at org.apache.hadoop.yarn.server.MiniYARNCluster$NodeManagerWrapper.serviceStart(MiniYARNCluster.java:560)
  at org.apache.hadoop.service.AbstractService.start(AbstractService.java:193)
  at org.apache.hadoop.service.CompositeService.serviceStart(CompositeService.java:120)
  at org.apache.hadoop.yarn.server.MiniYARNCluster.serviceStart(MiniYARNCluster.java:278)
  at org.apache.hadoop.service.AbstractService.start(AbstractService.java:193)
  ...
Run completed in 3 seconds, 992 milliseconds.
Total number of tests run: 0
Suites: completed 1, aborted 1
Tests: succeeded 0, failed 0, canceled 0, ignored 0, pending 0
*** 1 SUITE ABORTED ***
```

From the above test, we need to use a fixed port to enable Yarn NodeManager recovery, but this is difficult to be guaranteed in UT, so this pr try a workaround way.

### Does this PR introduce _any_ user-facing change?
No

### How was this patch tested?
Pass GitHub Actions

Closes #37962 from LuciferYang/SPARK-40490-33.

Authored-by: yangjie01 <yangjie01@baidu.com>
Signed-off-by: Dongjoon Hyun <dongjoon@apache.org>
  • Loading branch information
LuciferYang authored and dongjoon-hyun committed Sep 22, 2022
1 parent 50f0ced commit b608ba3
Show file tree
Hide file tree
Showing 3 changed files with 17 additions and 8 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,7 @@
import com.google.common.annotations.VisibleForTesting;
import com.google.common.base.Preconditions;
import com.google.common.collect.Lists;
import com.google.common.io.Files;
import org.apache.commons.lang3.builder.ToStringBuilder;
import org.apache.commons.lang3.builder.ToStringStyle;
import org.apache.hadoop.conf.Configuration;
Expand Down Expand Up @@ -124,6 +125,10 @@ public class YarnShuffleService extends AuxiliaryService {
// Whether failure during service initialization should stop the NM.
@VisibleForTesting
static final String STOP_ON_FAILURE_KEY = "spark.yarn.shuffle.stopOnFailure";

@VisibleForTesting
static final String INTEGRATION_TESTING = "spark.yarn.shuffle.testing";

private static final boolean DEFAULT_STOP_ON_FAILURE = false;

// just for testing when you want to find an open port
Expand Down Expand Up @@ -222,6 +227,10 @@ protected void serviceInit(Configuration externalConf) throws Exception {

boolean stopOnFailure = _conf.getBoolean(STOP_ON_FAILURE_KEY, DEFAULT_STOP_ON_FAILURE);

if (_recoveryPath == null && _conf.getBoolean(INTEGRATION_TESTING, false)) {
_recoveryPath = new Path(Files.createTempDir().toURI());
}

try {
// In case this NM was killed while there were running spark applications, we need to restore
// lost state for the existing executors. We look for an existing file in the NM's local dirs.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -47,6 +47,7 @@ class YarnShuffleIntegrationSuite extends BaseYarnClusterSuite {
yarnConfig.set(YarnConfiguration.NM_AUX_SERVICE_FMT.format("spark_shuffle"),
classOf[YarnShuffleService].getCanonicalName)
yarnConfig.set(SHUFFLE_SERVICE_PORT.key, "0")
yarnConfig.set(YarnTestAccessor.shuffleServiceIntegrationTestingKey, "true")
yarnConfig
}

Expand All @@ -67,23 +68,18 @@ class YarnShuffleIntegrationSuite extends BaseYarnClusterSuite {
val shuffleService = YarnTestAccessor.getShuffleServiceInstance

val registeredExecFile = YarnTestAccessor.getRegisteredExecutorFile(shuffleService)
assert(registeredExecFile != null)

val result = File.createTempFile("result", null, tempDir)
val finalState = runSpark(
false,
mainClassName(YarnExternalShuffleDriver.getClass),
appArgs = if (registeredExecFile != null) {
Seq(result.getAbsolutePath, registeredExecFile.getAbsolutePath)
} else {
Seq(result.getAbsolutePath)
},
appArgs = Seq(result.getAbsolutePath, registeredExecFile.getAbsolutePath),
extraConf = extraSparkConf()
)
checkResult(finalState, result)

if (registeredExecFile != null) {
assert(YarnTestAccessor.getRegisteredExecutorFile(shuffleService).exists())
}
assert(YarnTestAccessor.getRegisteredExecutorFile(shuffleService).exists())
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -37,4 +37,8 @@ object YarnTestAccessor {
def getShuffleServiceConfOverlayResourceName: String = {
YarnShuffleService.SHUFFLE_SERVICE_CONF_OVERLAY_RESOURCE_NAME
}

def shuffleServiceIntegrationTestingKey: String = {
YarnShuffleService.INTEGRATION_TESTING
}
}

0 comments on commit b608ba3

Please sign in to comment.