Skip to content

Commit

Permalink
[SPARK-38921][K8S][TESTS] Use k8s-client to create queue resource in …
Browse files Browse the repository at this point in the history
…Volcano IT

### What changes were proposed in this pull request?
Use fabric8io/k8s-client to create queue resource in Volcano IT.

### Why are the changes needed?
Use k8s-client to create volcano queue to
- Make code easy to understand
- Enable abity to set queue capacity dynamically. This will help to support running Volcano test in a resource limited env (such as github action).

### Does this PR introduce _any_ user-facing change?
No

### How was this patch tested?
Volcano IT passed

Closes #36219 from Yikun/SPARK-38921.

Authored-by: Yikun Jiang <yikunkero@gmail.com>
Signed-off-by: Dongjoon Hyun <dongjoon@apache.org>
(cherry picked from commit a49f66f)
Signed-off-by: Yikun Jiang <yikunkero@gmail.com>
  • Loading branch information
Yikun committed Dec 3, 2022
1 parent bdafe57 commit 20cc2b6
Show file tree
Hide file tree
Showing 8 changed files with 52 additions and 159 deletions.

This file was deleted.

This file was deleted.

Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@
apiVersion: scheduling.volcano.sh/v1beta1
kind: PodGroup
spec:
queue: queue-2u-3g
queue: queue-2u
minMember: 1
minResources:
cpu: "2"
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@
apiVersion: scheduling.volcano.sh/v1beta1
kind: PodGroup
spec:
queue: queue-2u-3g
queue: queue-3g
minMember: 1
minResources:
memory: "3Gi"

This file was deleted.

This file was deleted.

This file was deleted.

Original file line number Diff line number Diff line change
Expand Up @@ -27,9 +27,10 @@ import scala.concurrent.ExecutionContext.Implicits.global
// scalastyle:on executioncontextglobal
import scala.concurrent.Future

import io.fabric8.kubernetes.api.model.Pod
import io.fabric8.kubernetes.api.model.{HasMetadata, Pod, Quantity}
import io.fabric8.kubernetes.client.NamespacedKubernetesClient
import io.fabric8.volcano.client.VolcanoClient
import io.fabric8.volcano.scheduling.v1beta1.{Queue, QueueBuilder}
import org.scalatest.BeforeAndAfterEach
import org.scalatest.concurrent.Eventually

Expand All @@ -49,6 +50,7 @@ private[spark] trait VolcanoTestsSuite extends BeforeAndAfterEach { k8sSuite: Ku
lazy val k8sClient: NamespacedKubernetesClient = kubernetesTestComponents.kubernetesClient
private val testGroups: mutable.Set[String] = mutable.Set.empty
private val testYAMLPaths: mutable.Set[String] = mutable.Set.empty
private val testResources: mutable.Set[HasMetadata] = mutable.Set.empty

private def deletePodInTestGroup(): Unit = {
testGroups.foreach { g =>
Expand All @@ -72,9 +74,22 @@ private[spark] trait VolcanoTestsSuite extends BeforeAndAfterEach { k8sSuite: Ku
testYAMLPaths.clear()
}

private def deleteResources(): Unit = {
testResources.foreach { _ =>
k8sClient.resourceList(testResources.toSeq: _*).delete()
Eventually.eventually(TIMEOUT, INTERVAL) {
val resources = k8sClient.resourceList(testResources.toSeq: _*).fromServer.get.asScala
// Make sure all elements are null (no specific resources in cluster)
resources.foreach { r => assert(r === null) }
}
}
testResources.clear()
}

override protected def afterEach(): Unit = {
deletePodInTestGroup()
deleteYamlResources()
deleteResources()
super.afterEach()
}

Expand Down Expand Up @@ -108,6 +123,30 @@ private[spark] trait VolcanoTestsSuite extends BeforeAndAfterEach { k8sSuite: Ku
assert(pod.getSpec.getPriorityClassName === podGroup.getSpec.getPriorityClassName))
}

private def createOrReplaceResource(resource: Queue): Unit = {
volcanoClient.queues().createOrReplace(resource)
testResources += resource
}

private def createOrReplaceQueue(name: String,
cpu: Option[String] = None,
memory: Option[String] = None): Unit = {
val queueBuilder = new QueueBuilder()
.editOrNewMetadata()
.withName(name)
.endMetadata()
.editOrNewSpec()
.withWeight(1)
.endSpec()
cpu.foreach{ cpu =>
queueBuilder.editOrNewSpec().addToCapability("cpu", new Quantity(cpu)).endSpec()
}
memory.foreach{ memory =>
queueBuilder.editOrNewSpec().addToCapability("memory", new Quantity(memory)).endSpec()
}
createOrReplaceResource(queueBuilder.build())
}

private def createOrReplaceYAMLResource(yamlPath: String): Unit = {
k8sClient.load(new FileInputStream(yamlPath)).createOrReplace()
testYAMLPaths += yamlPath
Expand Down Expand Up @@ -264,8 +303,8 @@ private[spark] trait VolcanoTestsSuite extends BeforeAndAfterEach { k8sSuite: Ku

test("SPARK-38187: Run SparkPi Jobs with minCPU", k8sTestTag, volcanoTag) {
val groupName = generateGroupName("min-cpu")
// Create a queue with 2 CPU, 3G memory capacity
createOrReplaceYAMLResource(QUEUE_2U_3G_YAML)
// Create a queue with 2 CPU capacity
createOrReplaceQueue(name = "queue-2u", cpu = Some("2"))
// Submit 3 jobs with minCPU = 2
val jobNum = 3
(1 to jobNum).map { i =>
Expand All @@ -281,8 +320,8 @@ private[spark] trait VolcanoTestsSuite extends BeforeAndAfterEach { k8sSuite: Ku

test("SPARK-38187: Run SparkPi Jobs with minMemory", k8sTestTag, volcanoTag) {
val groupName = generateGroupName("min-mem")
// Create a queue with 2 CPU, 3G memory capacity
createOrReplaceYAMLResource(QUEUE_2U_3G_YAML)
// Create a queue with 3G memory capacity
createOrReplaceQueue(name = "queue-3g", memory = Some("3Gi"))
// Submit 3 jobs with minMemory = 3g
val jobNum = 3
(1 to jobNum).map { i =>
Expand All @@ -298,7 +337,8 @@ private[spark] trait VolcanoTestsSuite extends BeforeAndAfterEach { k8sSuite: Ku

test("SPARK-38188: Run SparkPi jobs with 2 queues (only 1 enabled)", k8sTestTag, volcanoTag) {
// Disabled queue0 and enabled queue1
createOrReplaceYAMLResource(VOLCANO_Q0_DISABLE_Q1_ENABLE_YAML)
createOrReplaceQueue(name = "queue0", cpu = Some("0.001"))
createOrReplaceQueue(name = "queue1")
// Submit jobs into disabled queue0 and enabled queue1
val jobNum = 4
(1 to jobNum).foreach { i =>
Expand All @@ -320,7 +360,8 @@ private[spark] trait VolcanoTestsSuite extends BeforeAndAfterEach { k8sSuite: Ku
test("SPARK-38188: Run SparkPi jobs with 2 queues (all enabled)", k8sTestTag, volcanoTag) {
val groupName = generateGroupName("queue-enable")
// Enable all queues
createOrReplaceYAMLResource(VOLCANO_ENABLE_Q0_AND_Q1_YAML)
createOrReplaceQueue(name = "queue1")
createOrReplaceQueue(name = "queue0")
val jobNum = 4
// Submit jobs into these two queues
(1 to jobNum).foreach { i =>
Expand All @@ -338,7 +379,7 @@ private[spark] trait VolcanoTestsSuite extends BeforeAndAfterEach { k8sSuite: Ku

test("SPARK-38423: Run driver job to validate priority order", k8sTestTag, volcanoTag) {
// Prepare the priority resource and queue
createOrReplaceYAMLResource(DISABLE_QUEUE)
createOrReplaceQueue(name = "queue", cpu = Some("0.001"))
createOrReplaceYAMLResource(VOLCANO_PRIORITY_YAML)
// Submit 3 jobs with different priority
val priorities = Seq("low", "medium", "high")
Expand Down Expand Up @@ -369,7 +410,7 @@ private[spark] trait VolcanoTestsSuite extends BeforeAndAfterEach { k8sSuite: Ku
}

// Enable queue to let jobs running one by one
createOrReplaceYAMLResource(ENABLE_QUEUE)
createOrReplaceQueue(name = "queue", cpu = Some("1"))

// Verify scheduling order follow the specified priority
Eventually.eventually(TIMEOUT, INTERVAL) {
Expand All @@ -391,24 +432,9 @@ private[spark] trait VolcanoTestsSuite extends BeforeAndAfterEach { k8sSuite: Ku

private[spark] object VolcanoTestsSuite extends SparkFunSuite {
val VOLCANO_FEATURE_STEP = classOf[VolcanoFeatureStep].getName
val VOLCANO_ENABLE_Q0_AND_Q1_YAML = new File(
getClass.getResource("/volcano/enable-queue0-enable-queue1.yml").getFile
).getAbsolutePath
val VOLCANO_Q0_DISABLE_Q1_ENABLE_YAML = new File(
getClass.getResource("/volcano/disable-queue0-enable-queue1.yml").getFile
).getAbsolutePath
val GROUP_PREFIX = "volcano-test" + UUID.randomUUID().toString.replaceAll("-", "") + "-"
val VOLCANO_PRIORITY_YAML
= new File(getClass.getResource("/volcano/priorityClasses.yml").getFile).getAbsolutePath
val ENABLE_QUEUE = new File(
getClass.getResource("/volcano/enable-queue.yml").getFile
).getAbsolutePath
val DISABLE_QUEUE = new File(
getClass.getResource("/volcano/disable-queue.yml").getFile
).getAbsolutePath
val QUEUE_2U_3G_YAML = new File(
getClass.getResource("/volcano/queue-2u-3g.yml").getFile
).getAbsolutePath
val DRIVER_PG_TEMPLATE_CPU_2U = new File(
getClass.getResource("/volcano/driver-podgroup-template-cpu-2u.yml").getFile
).getAbsolutePath
Expand Down

0 comments on commit 20cc2b6

Please sign in to comment.