Skip to content

Commit

Permalink
Set queue cpu resource according request cores
Browse files Browse the repository at this point in the history
  • Loading branch information
Yikun committed Apr 16, 2022
1 parent 2c66a9d commit 55af9d8
Show file tree
Hide file tree
Showing 2 changed files with 34 additions and 34 deletions.

This file was deleted.

Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,7 @@ import org.scalatest.concurrent.Eventually
import org.apache.spark.SparkFunSuite
import org.apache.spark.deploy.k8s.Config._
import org.apache.spark.deploy.k8s.features.VolcanoFeatureStep
import org.apache.spark.deploy.k8s.integrationtest.TestConstants.{CONFIG_DRIVER_REQUEST_CORES, CONFIG_EXECUTOR_REQUEST_CORES}
import org.apache.spark.internal.config.NETWORK_AUTH_ENABLED

private[spark] trait VolcanoTestsSuite extends BeforeAndAfterEach { k8sSuite: KubernetesSuite =>
Expand All @@ -51,6 +52,8 @@ private[spark] trait VolcanoTestsSuite extends BeforeAndAfterEach { k8sSuite: Ku
private val testGroups: mutable.Set[String] = mutable.Set.empty
private val testYAMLPaths: mutable.Set[String] = mutable.Set.empty
private val testResources: mutable.Set[HasMetadata] = mutable.Set.empty
private val driverCores = java.lang.Double.parseDouble(DRIVER_REQUEST_CORES)
private val executorCores = java.lang.Double.parseDouble(EXECUTOR_REQUEST_CORES)

private def deletePodInTestGroup(): Unit = {
testGroups.foreach { g =>
Expand Down Expand Up @@ -252,6 +255,12 @@ private[spark] trait VolcanoTestsSuite extends BeforeAndAfterEach { k8sSuite: Ku
.set(KUBERNETES_SCHEDULER_NAME.key, "volcano")
.set(KUBERNETES_DRIVER_POD_FEATURE_STEPS.key, VOLCANO_FEATURE_STEP)
.set(KUBERNETES_EXECUTOR_POD_FEATURE_STEPS.key, VOLCANO_FEATURE_STEP)
sys.props.get(CONFIG_DRIVER_REQUEST_CORES).map { cpu =>
conf.set("spark.kubernetes.driver.request.cores", cpu)
}
sys.props.get(CONFIG_EXECUTOR_REQUEST_CORES).map { cpu =>
conf.set("spark.kubernetes.executor.request.cores", cpu)
}
queue.foreach { q =>
conf.set(VolcanoFeatureStep.POD_GROUP_TEMPLATE_FILE_KEY,
new File(
Expand Down Expand Up @@ -303,16 +312,30 @@ private[spark] trait VolcanoTestsSuite extends BeforeAndAfterEach { k8sSuite: Ku

test("SPARK-38187: Run SparkPi Jobs with minCPU", k8sTestTag, volcanoTag) {
val groupName = generateGroupName("min-cpu")
// Create a queue with 2 CPU capacity
createOrReplaceQueue(name = "queue-2u", cpu = Some("2"))
// Create a queue with drvier + executor CPU capacity
val jobCores = driverCores + executorCores
val queueName = s"queue-$jobCores"
createOrReplaceQueue(name = queueName, cpu = Some(s"$jobCores"))
val testContent =
s"""
|apiVersion: scheduling.volcano.sh/v1beta1
|kind: PodGroup
|spec:
| queue: $queueName
| minMember: 1
| minResources:
| cpu: $jobCores
|""".stripMargin
val file = Utils.createTempFile(testContent, HOST_PATH)
val path = HOST_PATH + file
// Submit 3 jobs with minCPU = 2
val jobNum = 3
(1 to jobNum).map { i =>
Future {
runJobAndVerify(
i.toString,
groupLoc = Option(groupName),
driverPodGroupTemplate = Option(DRIVER_PG_TEMPLATE_CPU_2U))
driverPodGroupTemplate = Option(path))
}
}
verifyJobsSucceededOneByOne(jobNum, groupName)
Expand Down Expand Up @@ -340,7 +363,7 @@ private[spark] trait VolcanoTestsSuite extends BeforeAndAfterEach { k8sSuite: Ku
createOrReplaceQueue(name = "queue0", cpu = Some("0.001"))
createOrReplaceQueue(name = "queue1")
// Submit jobs into disabled queue0 and enabled queue1
val jobNum = 4
val jobNum = 2
(1 to jobNum).foreach { i =>
Future {
val queueName = s"queue${i % 2}"
Expand All @@ -351,9 +374,9 @@ private[spark] trait VolcanoTestsSuite extends BeforeAndAfterEach { k8sSuite: Ku
// There are two `Succeeded` jobs and two `Pending` jobs
Eventually.eventually(TIMEOUT, INTERVAL) {
val completedPods = getPods("driver", s"${GROUP_PREFIX}queue1", "Succeeded")
assert(completedPods.size === 2)
assert(completedPods.size === jobNum/2)
val pendingPods = getPods("driver", s"${GROUP_PREFIX}queue0", "Pending")
assert(pendingPods.size === 2)
assert(pendingPods.size === jobNum/2)
}
}

Expand All @@ -362,7 +385,7 @@ private[spark] trait VolcanoTestsSuite extends BeforeAndAfterEach { k8sSuite: Ku
// Enable all queues
createOrReplaceQueue(name = "queue1")
createOrReplaceQueue(name = "queue0")
val jobNum = 4
val jobNum = 2
// Submit jobs into these two queues
(1 to jobNum).foreach { i =>
Future {
Expand Down Expand Up @@ -410,7 +433,7 @@ private[spark] trait VolcanoTestsSuite extends BeforeAndAfterEach { k8sSuite: Ku
}

// Enable queue to let jobs running one by one
createOrReplaceQueue(name = "queue", cpu = Some("1"))
createOrReplaceQueue(name = "queue", cpu = Some(s"$driverCores"))

// Verify scheduling order follow the specified priority
Eventually.eventually(TIMEOUT, INTERVAL) {
Expand All @@ -435,10 +458,10 @@ private[spark] object VolcanoTestsSuite extends SparkFunSuite {
val GROUP_PREFIX = "volcano-test" + UUID.randomUUID().toString.replaceAll("-", "") + "-"
val VOLCANO_PRIORITY_YAML
= new File(getClass.getResource("/volcano/priorityClasses.yml").getFile).getAbsolutePath
val DRIVER_PG_TEMPLATE_CPU_2U = new File(
getClass.getResource("/volcano/driver-podgroup-template-cpu-2u.yml").getFile
).getAbsolutePath
val DRIVER_PG_TEMPLATE_MEMORY_3G = new File(
getClass.getResource("/volcano/driver-podgroup-template-memory-3g.yml").getFile
).getAbsolutePath
val DRIVER_REQUEST_CORES = sys.props.get(CONFIG_DRIVER_REQUEST_CORES).getOrElse("1")
val EXECUTOR_REQUEST_CORES = sys.props.get(CONFIG_EXECUTOR_REQUEST_CORES).getOrElse("1")
val HOST_PATH = "/tmp/"
}

0 comments on commit 55af9d8

Please sign in to comment.