Skip to content

Commit

Permalink
[SPARK-41253][K8S][TESTS] Make Spark K8S volcano IT work in Github Ac…
Browse files Browse the repository at this point in the history
…tion

### What changes were proposed in this pull request?
This patch makes Spark K8s volcano IT can be ran in Github Action resource limited env. It will help downstream community like volcano to enable spark IT test in github action.

BTW, there is no plan to enable volcano test in Spark community, this patch only make test work but **DO NOT** enable the volcano test in Apache Spark GA, it will help downstream test.

- Change parallel job number from 4 to 2. (Only 1 job in each queue) if in github action env.
- Get specified `spark.kubernetes.[driver|executor].request.cores`
- Set queue limit according specified [driver|executor].request.cores just like we done in normal test: 883a481

### Why are the changes needed?

It helps downstream communitys who want to use free github action hosted resources to enable spark IT test in github action.

### Does this PR introduce _any_ user-facing change?
No, test only.

### How was this patch tested?
- Test on my local env with enough resource (default):
```
$  build/sbt -Pvolcano -Pkubernetes -Pkubernetes-integration-tests -Dtest.include.tags=volcano "kubernetes-integration-tests/test"

[info] KubernetesSuite:
[info] VolcanoSuite:
[info] - Run SparkPi with volcano scheduler (10 seconds, 410 milliseconds)
[info] - SPARK-38187: Run SparkPi Jobs with minCPU (25 seconds, 489 milliseconds)
[info] - SPARK-38187: Run SparkPi Jobs with minMemory (25 seconds, 518 milliseconds)
[info] - SPARK-38188: Run SparkPi jobs with 2 queues (only 1 enabled) (14 seconds, 349 milliseconds)
[info] - SPARK-38188: Run SparkPi jobs with 2 queues (all enabled) (23 seconds, 516 milliseconds)
[info] - SPARK-38423: Run driver job to validate priority order (16 seconds, 404 milliseconds)
[info] YuniKornSuite:
[info] Run completed in 2 minutes, 34 seconds.
[info] Total number of tests run: 6
[info] Suites: completed 3, aborted 0
[info] Tests: succeeded 6, failed 0, canceled 0, ignored 0, pending 0
[info] All tests passed.
[success] Total time: 439 s (07:19), completed 2022-12-3 8:58:50
```

- Test on Github Action with `volcanoMaxConcurrencyJobNum`: Yikun#192
```
$ build/sbt -Pvolcano -Psparkr -Pkubernetes -Pkubernetes-integration-tests -Dspark.kubernetes.test.driverRequestCores=0.5 -Dspark.kubernetes.test.executorRequestCores=0.2 -Dspark.kubernetes.test.volcanoMaxConcurrencyJobNum=1 -Dtest.include.tags=volcano "kubernetes-integration-tests/test"

[info] VolcanoSuite:
[info] - Run SparkPi with volcano scheduler (18 seconds, 122 milliseconds)
[info] - SPARK-38187: Run SparkPi Jobs with minCPU (53 seconds, 964 milliseconds)
[info] - SPARK-38187: Run SparkPi Jobs with minMemory (54 seconds, 523 milliseconds)
[info] - SPARK-38188: Run SparkPi jobs with 2 queues (only 1 enabled) (22 seconds, 185 milliseconds)
[info] - SPARK-38188: Run SparkPi jobs with 2 queues (all enabled) (33 seconds, 349 milliseconds)
[info] - SPARK-38423: Run driver job to validate priority order (32 seconds, 435 milliseconds)
[info] YuniKornSuite:
[info] Run completed in 4 minutes, 16 seconds.
[info] Total number of tests run: 6
[info] Suites: completed 3, aborted 0
[info] Tests: succeeded 6, failed 0, canceled 0, ignored 0, pending 0
[info] All tests passed.
[warn] In the last 494 seconds, 7.296 (1.5%) were spent in GC. [Heap: 3.12GB free of 3.83GB, max 3.83GB] Consider increasing the JVM heap using `-Xmx` or try a different collector, e.g. `-XX:+UseG1GC`, for better performance.
[success] Total time: 924 s (15:24), completed Dec 3, 2022 12:49:42 AM
```

- CI passed

Closes #38789 from Yikun/SPARK-41253.

Authored-by: Yikun Jiang <yikunkero@gmail.com>
Signed-off-by: Dongjoon Hyun <dongjoon@apache.org>
(cherry picked from commit 72d58d5)
Signed-off-by: Yikun Jiang <yikunkero@gmail.com>
  • Loading branch information
Yikun committed Dec 3, 2022
1 parent 20cc2b6 commit 821997b
Show file tree
Hide file tree
Showing 4 changed files with 51 additions and 34 deletions.
8 changes: 8 additions & 0 deletions resource-managers/kubernetes/integration-tests/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -284,6 +284,14 @@ to the wrapper scripts and using the wrapper scripts will simply set these appro
</td>
<td></td>
</tr>
<tr>
<td><code>spark.kubernetes.test.volcanoMaxConcurrencyJobNum</code></td>
<td>
Set maximum number for concurrency jobs, It helps developers setting suitable resources according to test env in
volcano test.
</td>
<td></td>
</tr>
</table>

# Running the Kubernetes Integration Tests with SBT
Expand Down

This file was deleted.

Original file line number Diff line number Diff line change
Expand Up @@ -36,4 +36,6 @@ object TestConstants {
val CONFIG_KEY_UNPACK_DIR = "spark.kubernetes.test.unpackSparkDir"
val CONFIG_DRIVER_REQUEST_CORES = "spark.kubernetes.test.driverRequestCores"
val CONFIG_EXECUTOR_REQUEST_CORES = "spark.kubernetes.test.executorRequestCores"

val CONFIG_KEY_VOLCANO_MAX_JOB_NUM = "spark.kubernetes.test.volcanoMaxConcurrencyJobNum"
}
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,7 @@ import org.scalatest.concurrent.Eventually
import org.apache.spark.SparkFunSuite
import org.apache.spark.deploy.k8s.Config._
import org.apache.spark.deploy.k8s.features.VolcanoFeatureStep
import org.apache.spark.deploy.k8s.integrationtest.TestConstants.{CONFIG_DRIVER_REQUEST_CORES, CONFIG_EXECUTOR_REQUEST_CORES, CONFIG_KEY_VOLCANO_MAX_JOB_NUM}
import org.apache.spark.internal.config.NETWORK_AUTH_ENABLED

private[spark] trait VolcanoTestsSuite extends BeforeAndAfterEach { k8sSuite: KubernetesSuite =>
Expand All @@ -51,6 +52,9 @@ private[spark] trait VolcanoTestsSuite extends BeforeAndAfterEach { k8sSuite: Ku
private val testGroups: mutable.Set[String] = mutable.Set.empty
private val testYAMLPaths: mutable.Set[String] = mutable.Set.empty
private val testResources: mutable.Set[HasMetadata] = mutable.Set.empty
private val driverCores = java.lang.Double.parseDouble(DRIVER_REQUEST_CORES)
private val executorCores = java.lang.Double.parseDouble(EXECUTOR_REQUEST_CORES)
private val maxConcurrencyJobNum = VOLCANO_MAX_JOB_NUM.toInt

private def deletePodInTestGroup(): Unit = {
testGroups.foreach { g =>
Expand Down Expand Up @@ -252,6 +256,12 @@ private[spark] trait VolcanoTestsSuite extends BeforeAndAfterEach { k8sSuite: Ku
.set(KUBERNETES_SCHEDULER_NAME.key, "volcano")
.set(KUBERNETES_DRIVER_POD_FEATURE_STEPS.key, VOLCANO_FEATURE_STEP)
.set(KUBERNETES_EXECUTOR_POD_FEATURE_STEPS.key, VOLCANO_FEATURE_STEP)
sys.props.get(CONFIG_DRIVER_REQUEST_CORES).foreach { cpu =>
conf.set("spark.kubernetes.driver.request.cores", cpu)
}
sys.props.get(CONFIG_EXECUTOR_REQUEST_CORES).foreach { cpu =>
conf.set("spark.kubernetes.executor.request.cores", cpu)
}
queue.foreach { q =>
conf.set(VolcanoFeatureStep.POD_GROUP_TEMPLATE_FILE_KEY,
new File(
Expand Down Expand Up @@ -303,16 +313,30 @@ private[spark] trait VolcanoTestsSuite extends BeforeAndAfterEach { k8sSuite: Ku

test("SPARK-38187: Run SparkPi Jobs with minCPU", k8sTestTag, volcanoTag) {
val groupName = generateGroupName("min-cpu")
// Create a queue with 2 CPU capacity
createOrReplaceQueue(name = "queue-2u", cpu = Some("2"))
// Create a queue with driver + executor CPU capacity
val jobCores = driverCores + executorCores
val queueName = s"queue-$jobCores"
createOrReplaceQueue(name = queueName, cpu = Some(s"$jobCores"))
val testContent =
s"""
|apiVersion: scheduling.volcano.sh/v1beta1
|kind: PodGroup
|spec:
| queue: $queueName
| minMember: 1
| minResources:
| cpu: $jobCores
|""".stripMargin
val file = Utils.createTempFile(testContent, TEMP_DIR)
val path = TEMP_DIR + file
// Submit 3 jobs with minCPU = 2
val jobNum = 3
(1 to jobNum).map { i =>
Future {
runJobAndVerify(
i.toString,
groupLoc = Option(groupName),
driverPodGroupTemplate = Option(DRIVER_PG_TEMPLATE_CPU_2U))
driverPodGroupTemplate = Option(path))
}
}
verifyJobsSucceededOneByOne(jobNum, groupName)
Expand All @@ -339,8 +363,10 @@ private[spark] trait VolcanoTestsSuite extends BeforeAndAfterEach { k8sSuite: Ku
// Disabled queue0 and enabled queue1
createOrReplaceQueue(name = "queue0", cpu = Some("0.001"))
createOrReplaceQueue(name = "queue1")
val QUEUE_NUMBER = 2
// Submit jobs into disabled queue0 and enabled queue1
val jobNum = 4
// By default is 4 (2 jobs in each queue)
val jobNum = maxConcurrencyJobNum * QUEUE_NUMBER
(1 to jobNum).foreach { i =>
Future {
val queueName = s"queue${i % 2}"
Expand All @@ -351,9 +377,9 @@ private[spark] trait VolcanoTestsSuite extends BeforeAndAfterEach { k8sSuite: Ku
// There are two `Succeeded` jobs and two `Pending` jobs
Eventually.eventually(TIMEOUT, INTERVAL) {
val completedPods = getPods("driver", s"${GROUP_PREFIX}queue1", "Succeeded")
assert(completedPods.size === 2)
assert(completedPods.size === jobNum/2)
val pendingPods = getPods("driver", s"${GROUP_PREFIX}queue0", "Pending")
assert(pendingPods.size === 2)
assert(pendingPods.size === jobNum/2)
}
}

Expand All @@ -362,7 +388,10 @@ private[spark] trait VolcanoTestsSuite extends BeforeAndAfterEach { k8sSuite: Ku
// Enable all queues
createOrReplaceQueue(name = "queue1")
createOrReplaceQueue(name = "queue0")
val jobNum = 4
val QUEUE_NUMBER = 2
// Submit jobs into disabled queue0 and enabled queue1
// By default is 4 (2 jobs in each queue)
val jobNum = maxConcurrencyJobNum * QUEUE_NUMBER
// Submit jobs into these two queues
(1 to jobNum).foreach { i =>
Future {
Expand Down Expand Up @@ -410,7 +439,7 @@ private[spark] trait VolcanoTestsSuite extends BeforeAndAfterEach { k8sSuite: Ku
}

// Enable queue to let jobs running one by one
createOrReplaceQueue(name = "queue", cpu = Some("1"))
createOrReplaceQueue(name = "queue", cpu = Some(s"$driverCores"))

// Verify scheduling order follow the specified priority
Eventually.eventually(TIMEOUT, INTERVAL) {
Expand All @@ -435,10 +464,11 @@ private[spark] object VolcanoTestsSuite extends SparkFunSuite {
val GROUP_PREFIX = "volcano-test" + UUID.randomUUID().toString.replaceAll("-", "") + "-"
val VOLCANO_PRIORITY_YAML
= new File(getClass.getResource("/volcano/priorityClasses.yml").getFile).getAbsolutePath
val DRIVER_PG_TEMPLATE_CPU_2U = new File(
getClass.getResource("/volcano/driver-podgroup-template-cpu-2u.yml").getFile
).getAbsolutePath
val DRIVER_PG_TEMPLATE_MEMORY_3G = new File(
getClass.getResource("/volcano/driver-podgroup-template-memory-3g.yml").getFile
).getAbsolutePath
val DRIVER_REQUEST_CORES = sys.props.get(CONFIG_DRIVER_REQUEST_CORES).getOrElse("1")
val EXECUTOR_REQUEST_CORES = sys.props.get(CONFIG_EXECUTOR_REQUEST_CORES).getOrElse("1")
val VOLCANO_MAX_JOB_NUM = sys.props.get(CONFIG_KEY_VOLCANO_MAX_JOB_NUM).getOrElse("2")
val TEMP_DIR = "/tmp/"
}

0 comments on commit 821997b

Please sign in to comment.