Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[SPARK-38921][K8S][Tests] Use k8s-client to create queue resource in Volcano IT #36219

Closed
wants to merge 1 commit into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view

This file was deleted.

This file was deleted.

Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@
apiVersion: scheduling.volcano.sh/v1beta1
kind: PodGroup
spec:
queue: queue-2u-3g
queue: queue-2u
minMember: 1
minResources:
cpu: "2"
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@
apiVersion: scheduling.volcano.sh/v1beta1
kind: PodGroup
spec:
queue: queue-2u-3g
queue: queue-3g
minMember: 1
minResources:
memory: "3Gi"

This file was deleted.

This file was deleted.

This file was deleted.

Original file line number Diff line number Diff line change
Expand Up @@ -27,9 +27,10 @@ import scala.concurrent.ExecutionContext.Implicits.global
// scalastyle:on executioncontextglobal
import scala.concurrent.Future

import io.fabric8.kubernetes.api.model.Pod
import io.fabric8.kubernetes.api.model.{HasMetadata, Pod, Quantity}
import io.fabric8.kubernetes.client.NamespacedKubernetesClient
import io.fabric8.volcano.client.VolcanoClient
import io.fabric8.volcano.scheduling.v1beta1.{Queue, QueueBuilder}
import org.scalatest.BeforeAndAfterEach
import org.scalatest.concurrent.Eventually

Expand All @@ -49,6 +50,7 @@ private[spark] trait VolcanoTestsSuite extends BeforeAndAfterEach { k8sSuite: Ku
lazy val k8sClient: NamespacedKubernetesClient = kubernetesTestComponents.kubernetesClient
private val testGroups: mutable.Set[String] = mutable.Set.empty
private val testYAMLPaths: mutable.Set[String] = mutable.Set.empty
private val testResources: mutable.Set[HasMetadata] = mutable.Set.empty

private def deletePodInTestGroup(): Unit = {
testGroups.foreach { g =>
Expand All @@ -72,9 +74,22 @@ private[spark] trait VolcanoTestsSuite extends BeforeAndAfterEach { k8sSuite: Ku
testYAMLPaths.clear()
}

private def deleteResources(): Unit = {
testResources.foreach { _ =>
k8sClient.resourceList(testResources.toSeq: _*).delete()
Eventually.eventually(TIMEOUT, INTERVAL) {
val resources = k8sClient.resourceList(testResources.toSeq: _*).fromServer.get.asScala
// Make sure all elements are null (no specific resources in cluster)
resources.foreach { r => assert(r === null) }
}
}
testResources.clear()
}

override protected def afterEach(): Unit = {
deletePodInTestGroup()
deleteYamlResources()
deleteResources()
super.afterEach()
}

Expand Down Expand Up @@ -108,6 +123,30 @@ private[spark] trait VolcanoTestsSuite extends BeforeAndAfterEach { k8sSuite: Ku
assert(pod.getSpec.getPriorityClassName === podGroup.getSpec.getPriorityClassName))
}

private def createOrReplaceResource(resource: Queue): Unit = {
volcanoClient.queues().createOrReplace(resource)
testResources += resource
}

private def createOrReplaceQueue(name: String,
cpu: Option[String] = None,
memory: Option[String] = None): Unit = {
val queueBuilder = new QueueBuilder()
.editOrNewMetadata()
.withName(name)
.endMetadata()
.editOrNewSpec()
.withWeight(1)
.endSpec()
cpu.foreach{ cpu =>
queueBuilder.editOrNewSpec().addToCapability("cpu", new Quantity(cpu)).endSpec()
}
memory.foreach{ memory =>
queueBuilder.editOrNewSpec().addToCapability("memory", new Quantity(memory)).endSpec()
}
createOrReplaceResource(queueBuilder.build())
}

private def createOrReplaceYAMLResource(yamlPath: String): Unit = {
k8sClient.load(new FileInputStream(yamlPath)).createOrReplace()
testYAMLPaths += yamlPath
Expand Down Expand Up @@ -264,8 +303,8 @@ private[spark] trait VolcanoTestsSuite extends BeforeAndAfterEach { k8sSuite: Ku

test("SPARK-38187: Run SparkPi Jobs with minCPU", k8sTestTag, volcanoTag) {
val groupName = generateGroupName("min-cpu")
// Create a queue with 2 CPU, 3G memory capacity
createOrReplaceYAMLResource(QUEUE_2U_3G_YAML)
// Create a queue with 2 CPU capacity
createOrReplaceQueue(name = "queue-2u", cpu = Some("2"))
// Submit 3 jobs with minCPU = 2
val jobNum = 3
(1 to jobNum).map { i =>
Expand All @@ -281,8 +320,8 @@ private[spark] trait VolcanoTestsSuite extends BeforeAndAfterEach { k8sSuite: Ku

test("SPARK-38187: Run SparkPi Jobs with minMemory", k8sTestTag, volcanoTag) {
val groupName = generateGroupName("min-mem")
// Create a queue with 2 CPU, 3G memory capacity
createOrReplaceYAMLResource(QUEUE_2U_3G_YAML)
// Create a queue with 3G memory capacity
createOrReplaceQueue(name = "queue-3g", memory = Some("3Gi"))
// Submit 3 jobs with minMemory = 3g
val jobNum = 3
(1 to jobNum).map { i =>
Expand All @@ -298,7 +337,8 @@ private[spark] trait VolcanoTestsSuite extends BeforeAndAfterEach { k8sSuite: Ku

test("SPARK-38188: Run SparkPi jobs with 2 queues (only 1 enabled)", k8sTestTag, volcanoTag) {
// Disabled queue0 and enabled queue1
createOrReplaceYAMLResource(VOLCANO_Q0_DISABLE_Q1_ENABLE_YAML)
createOrReplaceQueue(name = "queue0", cpu = Some("0.001"))
createOrReplaceQueue(name = "queue1")
// Submit jobs into disabled queue0 and enabled queue1
val jobNum = 4
(1 to jobNum).foreach { i =>
Expand All @@ -320,7 +360,8 @@ private[spark] trait VolcanoTestsSuite extends BeforeAndAfterEach { k8sSuite: Ku
test("SPARK-38188: Run SparkPi jobs with 2 queues (all enabled)", k8sTestTag, volcanoTag) {
val groupName = generateGroupName("queue-enable")
// Enable all queues
createOrReplaceYAMLResource(VOLCANO_ENABLE_Q0_AND_Q1_YAML)
createOrReplaceQueue(name = "queue1")
createOrReplaceQueue(name = "queue0")
val jobNum = 4
// Submit jobs into these two queues
(1 to jobNum).foreach { i =>
Expand All @@ -338,7 +379,7 @@ private[spark] trait VolcanoTestsSuite extends BeforeAndAfterEach { k8sSuite: Ku

test("SPARK-38423: Run driver job to validate priority order", k8sTestTag, volcanoTag) {
// Prepare the priority resource and queue
createOrReplaceYAMLResource(DISABLE_QUEUE)
createOrReplaceQueue(name = "queue", cpu = Some("0.001"))
createOrReplaceYAMLResource(VOLCANO_PRIORITY_YAML)
// Submit 3 jobs with different priority
val priorities = Seq("low", "medium", "high")
Expand Down Expand Up @@ -369,7 +410,7 @@ private[spark] trait VolcanoTestsSuite extends BeforeAndAfterEach { k8sSuite: Ku
}

// Enable queue to let jobs running one by one
createOrReplaceYAMLResource(ENABLE_QUEUE)
createOrReplaceQueue(name = "queue", cpu = Some("1"))

// Verify scheduling order follow the specified priority
Eventually.eventually(TIMEOUT, INTERVAL) {
Expand All @@ -391,24 +432,9 @@ private[spark] trait VolcanoTestsSuite extends BeforeAndAfterEach { k8sSuite: Ku

private[spark] object VolcanoTestsSuite extends SparkFunSuite {
val VOLCANO_FEATURE_STEP = classOf[VolcanoFeatureStep].getName
val VOLCANO_ENABLE_Q0_AND_Q1_YAML = new File(
getClass.getResource("/volcano/enable-queue0-enable-queue1.yml").getFile
).getAbsolutePath
val VOLCANO_Q0_DISABLE_Q1_ENABLE_YAML = new File(
getClass.getResource("/volcano/disable-queue0-enable-queue1.yml").getFile
).getAbsolutePath
val GROUP_PREFIX = "volcano-test" + UUID.randomUUID().toString.replaceAll("-", "") + "-"
val VOLCANO_PRIORITY_YAML
= new File(getClass.getResource("/volcano/priorityClasses.yml").getFile).getAbsolutePath
val ENABLE_QUEUE = new File(
getClass.getResource("/volcano/enable-queue.yml").getFile
).getAbsolutePath
val DISABLE_QUEUE = new File(
getClass.getResource("/volcano/disable-queue.yml").getFile
).getAbsolutePath
val QUEUE_2U_3G_YAML = new File(
getClass.getResource("/volcano/queue-2u-3g.yml").getFile
).getAbsolutePath
val DRIVER_PG_TEMPLATE_CPU_2U = new File(
getClass.getResource("/volcano/driver-podgroup-template-cpu-2u.yml").getFile
).getAbsolutePath
Expand Down