From 55af9d80ad19d49f0032011b9db12a8611475275 Mon Sep 17 00:00:00 2001 From: Yikun Jiang Date: Fri, 15 Apr 2022 21:29:03 +0800 Subject: [PATCH] Set queue cpu resource according request cores --- .../driver-podgroup-template-cpu-2u.yml | 23 ---------- .../integrationtest/VolcanoTestsSuite.scala | 45 ++++++++++++++----- 2 files changed, 34 insertions(+), 34 deletions(-) delete mode 100644 resource-managers/kubernetes/integration-tests/src/test/resources/volcano/driver-podgroup-template-cpu-2u.yml diff --git a/resource-managers/kubernetes/integration-tests/src/test/resources/volcano/driver-podgroup-template-cpu-2u.yml b/resource-managers/kubernetes/integration-tests/src/test/resources/volcano/driver-podgroup-template-cpu-2u.yml deleted file mode 100644 index 4a784f0f86494..0000000000000 --- a/resource-managers/kubernetes/integration-tests/src/test/resources/volcano/driver-podgroup-template-cpu-2u.yml +++ /dev/null @@ -1,23 +0,0 @@ -# -# Licensed to the Apache Software Foundation (ASF) under one or more -# contributor license agreements. See the NOTICE file distributed with -# this work for additional information regarding copyright ownership. -# The ASF licenses this file to You under the Apache License, Version 2.0 -# (the "License"); you may not use this file except in compliance with -# the License. You may obtain a copy of the License at -# -# http://www.apache.org/licenses/LICENSE-2.0 -# -# Unless required by applicable law or agreed to in writing, software -# distributed under the License is distributed on an "AS IS" BASIS, -# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -# See the License for the specific language governing permissions and -# limitations under the License. -# -apiVersion: scheduling.volcano.sh/v1beta1 -kind: PodGroup -spec: - queue: queue-2u - minMember: 1 - minResources: - cpu: "2" diff --git a/resource-managers/kubernetes/integration-tests/src/test/scala/org/apache/spark/deploy/k8s/integrationtest/VolcanoTestsSuite.scala b/resource-managers/kubernetes/integration-tests/src/test/scala/org/apache/spark/deploy/k8s/integrationtest/VolcanoTestsSuite.scala index a61cc12b7704e..2643c6605f8c3 100644 --- a/resource-managers/kubernetes/integration-tests/src/test/scala/org/apache/spark/deploy/k8s/integrationtest/VolcanoTestsSuite.scala +++ b/resource-managers/kubernetes/integration-tests/src/test/scala/org/apache/spark/deploy/k8s/integrationtest/VolcanoTestsSuite.scala @@ -37,6 +37,7 @@ import org.scalatest.concurrent.Eventually import org.apache.spark.SparkFunSuite import org.apache.spark.deploy.k8s.Config._ import org.apache.spark.deploy.k8s.features.VolcanoFeatureStep +import org.apache.spark.deploy.k8s.integrationtest.TestConstants.{CONFIG_DRIVER_REQUEST_CORES, CONFIG_EXECUTOR_REQUEST_CORES} import org.apache.spark.internal.config.NETWORK_AUTH_ENABLED private[spark] trait VolcanoTestsSuite extends BeforeAndAfterEach { k8sSuite: KubernetesSuite => @@ -51,6 +52,8 @@ private[spark] trait VolcanoTestsSuite extends BeforeAndAfterEach { k8sSuite: Ku private val testGroups: mutable.Set[String] = mutable.Set.empty private val testYAMLPaths: mutable.Set[String] = mutable.Set.empty private val testResources: mutable.Set[HasMetadata] = mutable.Set.empty + private val driverCores = java.lang.Double.parseDouble(DRIVER_REQUEST_CORES) + private val executorCores = java.lang.Double.parseDouble(EXECUTOR_REQUEST_CORES) private def deletePodInTestGroup(): Unit = { testGroups.foreach { g => @@ -252,6 +255,12 @@ private[spark] trait VolcanoTestsSuite extends BeforeAndAfterEach { k8sSuite: Ku .set(KUBERNETES_SCHEDULER_NAME.key, "volcano") .set(KUBERNETES_DRIVER_POD_FEATURE_STEPS.key, VOLCANO_FEATURE_STEP) .set(KUBERNETES_EXECUTOR_POD_FEATURE_STEPS.key, VOLCANO_FEATURE_STEP) + sys.props.get(CONFIG_DRIVER_REQUEST_CORES).map { cpu => + conf.set("spark.kubernetes.driver.request.cores", cpu) + } + sys.props.get(CONFIG_EXECUTOR_REQUEST_CORES).map { cpu => + conf.set("spark.kubernetes.executor.request.cores", cpu) + } queue.foreach { q => conf.set(VolcanoFeatureStep.POD_GROUP_TEMPLATE_FILE_KEY, new File( @@ -303,8 +312,22 @@ private[spark] trait VolcanoTestsSuite extends BeforeAndAfterEach { k8sSuite: Ku test("SPARK-38187: Run SparkPi Jobs with minCPU", k8sTestTag, volcanoTag) { val groupName = generateGroupName("min-cpu") - // Create a queue with 2 CPU capacity - createOrReplaceQueue(name = "queue-2u", cpu = Some("2")) + // Create a queue with drvier + executor CPU capacity + val jobCores = driverCores + executorCores + val queueName = s"queue-$jobCores" + createOrReplaceQueue(name = queueName, cpu = Some(s"$jobCores")) + val testContent = + s""" + |apiVersion: scheduling.volcano.sh/v1beta1 + |kind: PodGroup + |spec: + | queue: $queueName + | minMember: 1 + | minResources: + | cpu: $jobCores + |""".stripMargin + val file = Utils.createTempFile(testContent, HOST_PATH) + val path = HOST_PATH + file // Submit 3 jobs with minCPU = 2 val jobNum = 3 (1 to jobNum).map { i => @@ -312,7 +335,7 @@ private[spark] trait VolcanoTestsSuite extends BeforeAndAfterEach { k8sSuite: Ku runJobAndVerify( i.toString, groupLoc = Option(groupName), - driverPodGroupTemplate = Option(DRIVER_PG_TEMPLATE_CPU_2U)) + driverPodGroupTemplate = Option(path)) } } verifyJobsSucceededOneByOne(jobNum, groupName) @@ -340,7 +363,7 @@ private[spark] trait VolcanoTestsSuite extends BeforeAndAfterEach { k8sSuite: Ku createOrReplaceQueue(name = "queue0", cpu = Some("0.001")) createOrReplaceQueue(name = "queue1") // Submit jobs into disabled queue0 and enabled queue1 - val jobNum = 4 + val jobNum = 2 (1 to jobNum).foreach { i => Future { val queueName = s"queue${i % 2}" @@ -351,9 +374,9 @@ private[spark] trait VolcanoTestsSuite extends BeforeAndAfterEach { k8sSuite: Ku // There are two `Succeeded` jobs and two `Pending` jobs Eventually.eventually(TIMEOUT, INTERVAL) { val completedPods = getPods("driver", s"${GROUP_PREFIX}queue1", "Succeeded") - assert(completedPods.size === 2) + assert(completedPods.size === jobNum/2) val pendingPods = getPods("driver", s"${GROUP_PREFIX}queue0", "Pending") - assert(pendingPods.size === 2) + assert(pendingPods.size === jobNum/2) } } @@ -362,7 +385,7 @@ private[spark] trait VolcanoTestsSuite extends BeforeAndAfterEach { k8sSuite: Ku // Enable all queues createOrReplaceQueue(name = "queue1") createOrReplaceQueue(name = "queue0") - val jobNum = 4 + val jobNum = 2 // Submit jobs into these two queues (1 to jobNum).foreach { i => Future { @@ -410,7 +433,7 @@ private[spark] trait VolcanoTestsSuite extends BeforeAndAfterEach { k8sSuite: Ku } // Enable queue to let jobs running one by one - createOrReplaceQueue(name = "queue", cpu = Some("1")) + createOrReplaceQueue(name = "queue", cpu = Some(s"$driverCores")) // Verify scheduling order follow the specified priority Eventually.eventually(TIMEOUT, INTERVAL) { @@ -435,10 +458,10 @@ private[spark] object VolcanoTestsSuite extends SparkFunSuite { val GROUP_PREFIX = "volcano-test" + UUID.randomUUID().toString.replaceAll("-", "") + "-" val VOLCANO_PRIORITY_YAML = new File(getClass.getResource("/volcano/priorityClasses.yml").getFile).getAbsolutePath - val DRIVER_PG_TEMPLATE_CPU_2U = new File( - getClass.getResource("/volcano/driver-podgroup-template-cpu-2u.yml").getFile - ).getAbsolutePath val DRIVER_PG_TEMPLATE_MEMORY_3G = new File( getClass.getResource("/volcano/driver-podgroup-template-memory-3g.yml").getFile ).getAbsolutePath + val DRIVER_REQUEST_CORES = sys.props.get(CONFIG_DRIVER_REQUEST_CORES).getOrElse("1") + val EXECUTOR_REQUEST_CORES = sys.props.get(CONFIG_EXECUTOR_REQUEST_CORES).getOrElse("1") + val HOST_PATH = "/tmp/" }