From 7d433cc9573741c250cc0725f26ac7c8fdd35173 Mon Sep 17 00:00:00 2001 From: mcheah Date: Tue, 17 Jan 2017 12:01:56 -0800 Subject: [PATCH 1/5] Support custom labels on the driver pod. --- docs/running-on-kubernetes.md | 8 ++++ .../spark/deploy/kubernetes/Client.scala | 37 +++++++++++++++---- 2 files changed, 38 insertions(+), 7 deletions(-) diff --git a/docs/running-on-kubernetes.md b/docs/running-on-kubernetes.md index 234c9870548c7..4f6e971078e91 100644 --- a/docs/running-on-kubernetes.md +++ b/docs/running-on-kubernetes.md @@ -213,6 +213,14 @@ from the other deployment modes. See the [configuration page](configuration.html (typically 6-10%). + + spark.kubernetes.driver.labels + (none) + + Custom labels that will be added to the driver pod. This should be a comma-separated list of label key-value pairs, + where each label is in the format <key>=<value>. + + ## Current Limitations diff --git a/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/kubernetes/Client.scala b/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/kubernetes/Client.scala index 6d7de973a52c2..8201aa8bf68af 100644 --- a/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/kubernetes/Client.scala +++ b/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/kubernetes/Client.scala @@ -77,6 +77,8 @@ private[spark] class Client( private val serviceAccount = sparkConf.get("spark.kubernetes.submit.serviceAccountName", "default") + private val customLabels = sparkConf.get("spark.kubernetes.driver.labels", "") + private implicit val retryableExecutionContext = ExecutionContext .fromExecutorService( Executors.newSingleThreadExecutor(new ThreadFactoryBuilder() @@ -85,6 +87,7 @@ private[spark] class Client( .build())) def run(): Unit = { + val parsedCustomLabels = parseCustomLabels(customLabels) var k8ConfBuilder = new ConfigBuilder() .withApiVersion("v1") .withMasterUrl(master) @@ -109,14 +112,15 @@ private[spark] class Client( .withType("Opaque") .done() try { - val selectors = Map(DRIVER_LAUNCHER_SELECTOR_LABEL -> driverLauncherSelectorValue).asJava + val resolvedSelectors = (Map(DRIVER_LAUNCHER_SELECTOR_LABEL -> driverLauncherSelectorValue) + ++ parsedCustomLabels).asJava val (servicePorts, containerPorts) = configurePorts() val service = kubernetesClient.services().createNew() .withNewMetadata() .withName(kubernetesAppId) .endMetadata() .withNewSpec() - .withSelector(selectors) + .withSelector(resolvedSelectors) .withPorts(servicePorts.asJava) .endSpec() .done() @@ -137,7 +141,7 @@ private[spark] class Client( .asScala .find(status => status.getName == DRIVER_LAUNCHER_CONTAINER_NAME && status.getReady) match { - case Some(status) => + case Some(_) => try { val driverLauncher = getDriverLauncherService( k8ClientConfig, master) @@ -184,7 +188,7 @@ private[spark] class Client( kubernetesClient.pods().createNew() .withNewMetadata() .withName(kubernetesAppId) - .withLabels(selectors) + .addToLabels(resolvedSelectors) .endMetadata() .withNewSpec() .withRestartPolicy("OnFailure") @@ -291,7 +295,7 @@ private[spark] class Client( Utils.tryWithResource(kubernetesClient .pods() - .withLabels(selectors) + .withLabels(resolvedSelectors) .watch(podWatcher)) { createDriverPod } } finally { kubernetesClient.secrets().delete(secret) @@ -336,7 +340,7 @@ private[spark] class Client( .getOption("spark.ui.port") .map(_.toInt) .getOrElse(DEFAULT_UI_PORT)) - (servicePorts.toSeq, containerPorts.toSeq) + (servicePorts, containerPorts) } private def buildSubmissionRequest(): KubernetesCreateSubmissionRequest = { @@ -366,7 +370,7 @@ private[spark] class Client( uploadedJarsBase64Contents = uploadJarsBase64Contents) } - def compressJars(maybeFilePaths: Option[String]): Option[TarGzippedData] = { + private def compressJars(maybeFilePaths: Option[String]): Option[TarGzippedData] = { maybeFilePaths .map(_.split(",")) .map(CompressionUtils.createTarGzip(_)) @@ -391,6 +395,25 @@ private[spark] class Client( sslSocketFactory = sslContext.getSocketFactory, trustContext = trustManager) } + + private def parseCustomLabels(labels: String): Map[String, String] = { + val parsedCustomLabels = labels.split(",").map(label => { + if (!label.contains("=")) { + throw new IllegalArgumentException("Custom labels set by spark.kubernetes.driver.labels" + + " must be a comma-separated list of key-value pairs, with format =." + + s" Got label: $label. All labels: $labels") + } else { + (label.split("=")(0), label.split("=")(1)) + } + }).toMap + // Unlikely, but throw a useful error if the reserved label is used + if (parsedCustomLabels.contains(DRIVER_LAUNCHER_SELECTOR_LABEL)) { + throw new IllegalArgumentException(s"Label with key $DRIVER_LAUNCHER_SELECTOR_LABEL" + + " cannot be used in spark.kubernetes.driver.labels, as it is reserved for Spark's" + + " internal configuration.") + } + parsedCustomLabels + } } private object Client { From 027726b6f6f57b331251ef9a83c7a5bca2a94ce3 Mon Sep 17 00:00:00 2001 From: mcheah Date: Tue, 17 Jan 2017 13:37:37 -0800 Subject: [PATCH 2/5] Add integration test and fix logic. --- docs/running-on-kubernetes.md | 2 +- .../spark/deploy/kubernetes/Client.scala | 28 +++++++------- .../integrationtest/KubernetesSuite.scala | 38 +++++++++++++++++-- 3 files changed, 49 insertions(+), 19 deletions(-) diff --git a/docs/running-on-kubernetes.md b/docs/running-on-kubernetes.md index 4f6e971078e91..14e2df4ed0702 100644 --- a/docs/running-on-kubernetes.md +++ b/docs/running-on-kubernetes.md @@ -218,7 +218,7 @@ from the other deployment modes. See the [configuration page](configuration.html (none) Custom labels that will be added to the driver pod. This should be a comma-separated list of label key-value pairs, - where each label is in the format <key>=<value>. + where each label is in the format key=value. diff --git a/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/kubernetes/Client.scala b/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/kubernetes/Client.scala index 8201aa8bf68af..073afcbba7b52 100644 --- a/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/kubernetes/Client.scala +++ b/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/kubernetes/Client.scala @@ -188,7 +188,7 @@ private[spark] class Client( kubernetesClient.pods().createNew() .withNewMetadata() .withName(kubernetesAppId) - .addToLabels(resolvedSelectors) + .withLabels(resolvedSelectors) .endMetadata() .withNewSpec() .withRestartPolicy("OnFailure") @@ -397,22 +397,20 @@ private[spark] class Client( } private def parseCustomLabels(labels: String): Map[String, String] = { - val parsedCustomLabels = labels.split(",").map(label => { - if (!label.contains("=")) { - throw new IllegalArgumentException("Custom labels set by spark.kubernetes.driver.labels" + - " must be a comma-separated list of key-value pairs, with format =." + - s" Got label: $label. All labels: $labels") - } else { - (label.split("=")(0), label.split("=")(1)) + labels.split(",").map(_.trim).filterNot(_.isEmpty).map(label => { + label.split("=", 2).toSeq match { + case Seq(k, v) => + require(k != DRIVER_LAUNCHER_SELECTOR_LABEL, "Label with key" + + s" $DRIVER_LAUNCHER_SELECTOR_LABEL cannot be used in" + + " spark.kubernetes.driver.labels, as it is reserved for Spark's" + + " internal configuration.") + (k, v) + case _ => + throw new SparkException("Custom labels set by spark.kubernetes.driver.labels" + + " must be a comma-separated list of key-value pairs, with format =." + + s" Got label: $label. All labels: $labels") } }).toMap - // Unlikely, but throw a useful error if the reserved label is used - if (parsedCustomLabels.contains(DRIVER_LAUNCHER_SELECTOR_LABEL)) { - throw new IllegalArgumentException(s"Label with key $DRIVER_LAUNCHER_SELECTOR_LABEL" + - " cannot be used in spark.kubernetes.driver.labels, as it is reserved for Spark's" + - " internal configuration.") - } - parsedCustomLabels } } diff --git a/resource-managers/kubernetes/integration-tests/src/test/scala/org/apache/spark/deploy/kubernetes/integrationtest/KubernetesSuite.scala b/resource-managers/kubernetes/integration-tests/src/test/scala/org/apache/spark/deploy/kubernetes/integrationtest/KubernetesSuite.scala index 6247a1674f8d6..6e9f56a2fee5e 100644 --- a/resource-managers/kubernetes/integration-tests/src/test/scala/org/apache/spark/deploy/kubernetes/integrationtest/KubernetesSuite.scala +++ b/resource-managers/kubernetes/integration-tests/src/test/scala/org/apache/spark/deploy/kubernetes/integrationtest/KubernetesSuite.scala @@ -155,10 +155,42 @@ private[spark] class KubernetesSuite extends SparkFunSuite with BeforeAndAfter { "--conf", s"spark.kubernetes.submit.clientCertFile=${clientConfig.getClientCertFile}", "--conf", "spark.kubernetes.executor.docker.image=spark-executor:latest", "--conf", "spark.kubernetes.driver.docker.image=spark-driver:latest", + "--conf", "spark.kubernetes.driver.labels=label1=label1value,label2=label2value", EXAMPLES_JAR) SparkSubmit.main(args) - val sparkMetricsService = Minikube.getService[SparkRestApiV1]( - "spark-pi", NAMESPACE, "spark-ui-port") - expectationsForStaticAllocation(sparkMetricsService) + val driverPodLabels = minikubeKubernetesClient + .pods + .withName("spark-pi") + .get + .getMetadata + .getLabels + // We can't match all of the selectors directly since one of the selectors is based on the + // launch time. + assert(driverPodLabels.size == 3, "Unexpected number of pod labels.") + assert(driverPodLabels.containsKey("driver-launcher-selector"), "Expected driver launcher" + + " selector label to be present.") + assert(driverPodLabels.get("label1") == "label1value", "Unexpected value for label1") + assert(driverPodLabels.get("label2") == "label2value", "Unexpected value for label2") + } + + test("Run driver with custom labels") { + val args = Array( + "--master", s"k8s://https://${Minikube.getMinikubeIp}:8443", + "--deploy-mode", "cluster", + "--kubernetes-namespace", NAMESPACE, + "--name", "spark-pi", + "--executor-memory", "512m", + "--executor-cores", "1", + "--num-executors", "1", + "--upload-jars", HELPER_JAR, + "--class", MAIN_CLASS, + "--conf", s"spark.kubernetes.submit.caCertFile=${clientConfig.getCaCertFile}", + "--conf", s"spark.kubernetes.submit.clientKeyFile=${clientConfig.getClientKeyFile}", + "--conf", s"spark.kubernetes.submit.clientCertFile=${clientConfig.getClientCertFile}", + "--conf", "spark.kubernetes.executor.docker.image=spark-executor:latest", + "--conf", "spark.kubernetes.driver.docker.image=spark-driver:latest", + EXAMPLES_JAR + ) + SparkSubmit.main(args) } } From f5883a0da138d2cb3b49605f04f4062ef9908553 Mon Sep 17 00:00:00 2001 From: mcheah Date: Tue, 17 Jan 2017 13:45:45 -0800 Subject: [PATCH 3/5] Fix tests --- .../integrationtest/KubernetesSuite.scala | 41 ++++++++++--------- 1 file changed, 22 insertions(+), 19 deletions(-) diff --git a/resource-managers/kubernetes/integration-tests/src/test/scala/org/apache/spark/deploy/kubernetes/integrationtest/KubernetesSuite.scala b/resource-managers/kubernetes/integration-tests/src/test/scala/org/apache/spark/deploy/kubernetes/integrationtest/KubernetesSuite.scala index 6e9f56a2fee5e..8d6942ee65828 100644 --- a/resource-managers/kubernetes/integration-tests/src/test/scala/org/apache/spark/deploy/kubernetes/integrationtest/KubernetesSuite.scala +++ b/resource-managers/kubernetes/integration-tests/src/test/scala/org/apache/spark/deploy/kubernetes/integrationtest/KubernetesSuite.scala @@ -139,7 +139,7 @@ private[spark] class KubernetesSuite extends SparkFunSuite with BeforeAndAfter { expectationsForStaticAllocation(sparkMetricsService) } - test("Run using spark-submit") { + test("Run with spark-submit") { val args = Array( "--master", s"k8s://https://${Minikube.getMinikubeIp}:8443", "--deploy-mode", "cluster", @@ -155,25 +155,15 @@ private[spark] class KubernetesSuite extends SparkFunSuite with BeforeAndAfter { "--conf", s"spark.kubernetes.submit.clientCertFile=${clientConfig.getClientCertFile}", "--conf", "spark.kubernetes.executor.docker.image=spark-executor:latest", "--conf", "spark.kubernetes.driver.docker.image=spark-driver:latest", - "--conf", "spark.kubernetes.driver.labels=label1=label1value,label2=label2value", - EXAMPLES_JAR) + EXAMPLES_JAR + ) SparkSubmit.main(args) - val driverPodLabels = minikubeKubernetesClient - .pods - .withName("spark-pi") - .get - .getMetadata - .getLabels - // We can't match all of the selectors directly since one of the selectors is based on the - // launch time. - assert(driverPodLabels.size == 3, "Unexpected number of pod labels.") - assert(driverPodLabels.containsKey("driver-launcher-selector"), "Expected driver launcher" + - " selector label to be present.") - assert(driverPodLabels.get("label1") == "label1value", "Unexpected value for label1") - assert(driverPodLabels.get("label2") == "label2value", "Unexpected value for label2") + val sparkMetricsService = Minikube.getService[SparkRestApiV1]( + "spark-pi", NAMESPACE, "spark-ui-port") + expectationsForStaticAllocation(sparkMetricsService) } - test("Run driver with custom labels") { + test("Run with custom labels") { val args = Array( "--master", s"k8s://https://${Minikube.getMinikubeIp}:8443", "--deploy-mode", "cluster", @@ -189,8 +179,21 @@ private[spark] class KubernetesSuite extends SparkFunSuite with BeforeAndAfter { "--conf", s"spark.kubernetes.submit.clientCertFile=${clientConfig.getClientCertFile}", "--conf", "spark.kubernetes.executor.docker.image=spark-executor:latest", "--conf", "spark.kubernetes.driver.docker.image=spark-driver:latest", - EXAMPLES_JAR - ) + "--conf", "spark.kubernetes.driver.labels=label1=label1value,label2=label2value", + EXAMPLES_JAR) SparkSubmit.main(args) + val driverPodLabels = minikubeKubernetesClient + .pods + .withName("spark-pi") + .get + .getMetadata + .getLabels + // We can't match all of the selectors directly since one of the selectors is based on the + // launch time. + assert(driverPodLabels.size == 3, "Unexpected number of pod labels.") + assert(driverPodLabels.containsKey("driver-launcher-selector"), "Expected driver launcher" + + " selector label to be present.") + assert(driverPodLabels.get("label1") == "label1value", "Unexpected value for label1") + assert(driverPodLabels.get("label2") == "label2value", "Unexpected value for label2") } } From 385d9f218d7f4b988d865cf092088f6e40bf0c59 Mon Sep 17 00:00:00 2001 From: mcheah Date: Tue, 17 Jan 2017 13:46:12 -0800 Subject: [PATCH 4/5] Fix minor formatting mistake --- .../deploy/kubernetes/integrationtest/KubernetesSuite.scala | 3 +-- 1 file changed, 1 insertion(+), 2 deletions(-) diff --git a/resource-managers/kubernetes/integration-tests/src/test/scala/org/apache/spark/deploy/kubernetes/integrationtest/KubernetesSuite.scala b/resource-managers/kubernetes/integration-tests/src/test/scala/org/apache/spark/deploy/kubernetes/integrationtest/KubernetesSuite.scala index 8d6942ee65828..e1ce870c22803 100644 --- a/resource-managers/kubernetes/integration-tests/src/test/scala/org/apache/spark/deploy/kubernetes/integrationtest/KubernetesSuite.scala +++ b/resource-managers/kubernetes/integration-tests/src/test/scala/org/apache/spark/deploy/kubernetes/integrationtest/KubernetesSuite.scala @@ -155,8 +155,7 @@ private[spark] class KubernetesSuite extends SparkFunSuite with BeforeAndAfter { "--conf", s"spark.kubernetes.submit.clientCertFile=${clientConfig.getClientCertFile}", "--conf", "spark.kubernetes.executor.docker.image=spark-executor:latest", "--conf", "spark.kubernetes.driver.docker.image=spark-driver:latest", - EXAMPLES_JAR - ) + EXAMPLES_JAR) SparkSubmit.main(args) val sparkMetricsService = Minikube.getService[SparkRestApiV1]( "spark-pi", NAMESPACE, "spark-ui-port") From 7c8bc62d21fc899ff78641829877154207e87e21 Mon Sep 17 00:00:00 2001 From: mcheah Date: Tue, 17 Jan 2017 13:46:44 -0800 Subject: [PATCH 5/5] Reduce unnecessary diff --- .../deploy/kubernetes/integrationtest/KubernetesSuite.scala | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/resource-managers/kubernetes/integration-tests/src/test/scala/org/apache/spark/deploy/kubernetes/integrationtest/KubernetesSuite.scala b/resource-managers/kubernetes/integration-tests/src/test/scala/org/apache/spark/deploy/kubernetes/integrationtest/KubernetesSuite.scala index e1ce870c22803..7b3c2b93b865b 100644 --- a/resource-managers/kubernetes/integration-tests/src/test/scala/org/apache/spark/deploy/kubernetes/integrationtest/KubernetesSuite.scala +++ b/resource-managers/kubernetes/integration-tests/src/test/scala/org/apache/spark/deploy/kubernetes/integrationtest/KubernetesSuite.scala @@ -139,7 +139,7 @@ private[spark] class KubernetesSuite extends SparkFunSuite with BeforeAndAfter { expectationsForStaticAllocation(sparkMetricsService) } - test("Run with spark-submit") { + test("Run using spark-submit") { val args = Array( "--master", s"k8s://https://${Minikube.getMinikubeIp}:8443", "--deploy-mode", "cluster",