Skip to content

Commit

Permalink
[SPARK-23668][K8S] Add config option for passing through k8s Pod.spec…
Browse files Browse the repository at this point in the history
….imagePullSecrets (apache#355)

Pass through the `imagePullSecrets` option to the k8s pod in order to allow user to access private image registries.

See https://kubernetes.io/docs/tasks/configure-pod-container/pull-image-private-registry/

Unit tests + manual testing.

Manual testing procedure:
1. Have private image registry.
2. Spark-submit application with no `spark.kubernetes.imagePullSecret` set. Do `kubectl describe pod ...`. See the error message:
```
Error syncing pod, skipping: failed to "StartContainer" for "spark-kubernetes-driver" with ErrImagePull: "rpc error: code = 2 desc = Error: Status 400 trying to pull repository ...: \"{\\n  \\\"errors\\\" : [ {\\n    \\\"status\\\" : 400,\\n    \\\"message\\\" : \\\"Unsupported docker v1 repository request for '...'\\\"\\n  } ]\\n}\""
```
3. Create secret `kubectl create secret docker-registry ...`
4. Spark-submit with `spark.kubernetes.imagePullSecret` set to the new secret. See that deployment was successful.

Author: Andrew Korzhuev <andrew.korzhuev@klarna.com>
Author: Andrew Korzhuev <korzhuev@andrusha.me>

Closes apache#20811 from andrusha/spark-23668-image-pull-secrets.
  • Loading branch information
avovchenko authored and ekrivokonmapr committed Sep 19, 2019
1 parent 74b6da5 commit e9786d4
Show file tree
Hide file tree
Showing 5 changed files with 364 additions and 0 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,11 @@
*/
package org.apache.spark.deploy.k8s

import java.io.File

import io.fabric8.kubernetes.api.model.{Container, Pod, PodBuilder}
import io.fabric8.kubernetes.api.model.LocalObjectReference

import org.apache.spark.SparkConf
import org.apache.spark.util.Utils

Expand All @@ -35,6 +40,17 @@ private[spark] object KubernetesUtils {
sparkConf.getAllWithPrefix(prefix).toMap
}

/**
* Parses comma-separated list of imagePullSecrets into K8s-understandable format
*/
def parseImagePullSecrets(imagePullSecrets: Option[String]): List[LocalObjectReference] = {
imagePullSecrets match {
case Some(secretsCommaSeparated) =>
secretsCommaSeparated.split(',').map(_.trim).map(new LocalObjectReference(_)).toList
case None => Nil
}
}

def requireNandDefined(opt1: Option[_], opt2: Option[_], errMessage: String): Unit = {
opt1.foreach { _ => require(opt2.isEmpty, errMessage) }
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -49,6 +49,8 @@ private[spark] class BasicDriverConfigurationStep(
.get(DRIVER_CONTAINER_IMAGE)
.getOrElse(throw new SparkException("Must specify the driver container image"))

private val imagePullSecrets = sparkConf.get(IMAGE_PULL_SECRETS)

// CPU settings
private val driverCpuCores = sparkConf.getOption("spark.driver.cores").getOrElse("1")
private val driverLimitCores = sparkConf.get(KUBERNETES_DRIVER_LIMIT_CORES)
Expand Down Expand Up @@ -209,6 +211,8 @@ private[spark] class BasicDriverConfigurationStep(
.addToArgs("driver")
.build()

val parsedImagePullSecrets = KubernetesUtils.parseImagePullSecrets(imagePullSecrets)

val baseDriverPod = new PodBuilder(driverSpec.driverPod)
.editOrNewMetadata()
.withName(driverPodName)
Expand All @@ -218,6 +222,7 @@ private[spark] class BasicDriverConfigurationStep(
.withNewSpec()
.withRestartPolicy("Never")
.withNodeSelector(nodeSelector.asJava)
.withImagePullSecrets(parsedImagePullSecrets.asJava)
.endSpec()
.build()

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -74,6 +74,7 @@ private[spark] class ExecutorPodFactory(
.get(EXECUTOR_CONTAINER_IMAGE)
.getOrElse(throw new SparkException("Must specify the executor container image"))
private val imagePullPolicy = sparkConf.get(CONTAINER_IMAGE_PULL_POLICY)
private val imagePullSecrets = sparkConf.get(IMAGE_PULL_SECRETS)
private val blockManagerPort = sparkConf
.getInt("spark.blockmanager.port", DEFAULT_BLOCKMANAGER_PORT)

Expand Down Expand Up @@ -106,6 +107,8 @@ private[spark] class ExecutorPodFactory(
nodeToLocalTaskCount: Map[String, Int]): Pod = {
val name = s"$executorPodNamePrefix-exec-$executorId"

val parsedImagePullSecrets = KubernetesUtils.parseImagePullSecrets(imagePullSecrets)

// hostname must be no longer than 63 characters, so take the last 63 characters of the pod
// name as the hostname. This preserves uniqueness since the end of name contains
// executorId
Expand Down Expand Up @@ -281,6 +284,7 @@ private[spark] class ExecutorPodFactory(
.withHostname(hostname)
.withRestartPolicy("Never")
.withNodeSelector(nodeSelector.asJava)
.withImagePullSecrets(parsedImagePullSecrets.asJava)
.endSpec()
.build()

Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,124 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.spark.deploy.k8s.submit.steps

import scala.collection.JavaConverters._

import io.fabric8.kubernetes.api.model.{ContainerBuilder, HasMetadata, PodBuilder}

import org.apache.spark.{SparkConf, SparkFunSuite}
import org.apache.spark.deploy.k8s.Config._
import org.apache.spark.deploy.k8s.Constants._
import org.apache.spark.deploy.k8s.submit.KubernetesDriverSpec

class BasicDriverConfigurationStepSuite extends SparkFunSuite {

private val APP_ID = "spark-app-id"
private val RESOURCE_NAME_PREFIX = "spark"
private val DRIVER_LABELS = Map("labelkey" -> "labelvalue")
private val CONTAINER_IMAGE_PULL_POLICY = "IfNotPresent"
private val APP_NAME = "spark-test"
private val MAIN_CLASS = "org.apache.spark.examples.SparkPi"
private val APP_ARGS = Array("arg1", "arg2", "\"arg 3\"")
private val CUSTOM_ANNOTATION_KEY = "customAnnotation"
private val CUSTOM_ANNOTATION_VALUE = "customAnnotationValue"
private val DRIVER_CUSTOM_ENV_KEY1 = "customDriverEnv1"
private val DRIVER_CUSTOM_ENV_KEY2 = "customDriverEnv2"

test("Set all possible configurations from the user.") {
val sparkConf = new SparkConf()
.set(KUBERNETES_DRIVER_POD_NAME, "spark-driver-pod")
.set(org.apache.spark.internal.config.DRIVER_CLASS_PATH, "/opt/spark/spark-examples.jar")
.set("spark.driver.cores", "2")
.set(KUBERNETES_DRIVER_LIMIT_CORES, "4")
.set(org.apache.spark.internal.config.DRIVER_MEMORY.key, "256M")
.set(org.apache.spark.internal.config.DRIVER_MEMORY_OVERHEAD, 200L)
.set(CONTAINER_IMAGE, "spark-driver:latest")
.set(s"$KUBERNETES_DRIVER_ANNOTATION_PREFIX$CUSTOM_ANNOTATION_KEY", CUSTOM_ANNOTATION_VALUE)
.set(s"$KUBERNETES_DRIVER_ENV_KEY$DRIVER_CUSTOM_ENV_KEY1", "customDriverEnv1")
.set(s"$KUBERNETES_DRIVER_ENV_KEY$DRIVER_CUSTOM_ENV_KEY2", "customDriverEnv2")
.set(IMAGE_PULL_SECRETS, "imagePullSecret1, imagePullSecret2")

val submissionStep = new BasicDriverConfigurationStep(
APP_ID,
RESOURCE_NAME_PREFIX,
DRIVER_LABELS,
CONTAINER_IMAGE_PULL_POLICY,
APP_NAME,
MAIN_CLASS,
APP_ARGS,
sparkConf)
val basePod = new PodBuilder().withNewMetadata().endMetadata().withNewSpec().endSpec().build()
val baseDriverSpec = KubernetesDriverSpec(
driverPod = basePod,
driverContainer = new ContainerBuilder().build(),
driverSparkConf = new SparkConf(false),
otherKubernetesResources = Seq.empty[HasMetadata])
val preparedDriverSpec = submissionStep.configureDriver(baseDriverSpec)

assert(preparedDriverSpec.driverContainer.getName === DRIVER_CONTAINER_NAME)
assert(preparedDriverSpec.driverContainer.getImage === "spark-driver:latest")
assert(preparedDriverSpec.driverContainer.getImagePullPolicy === CONTAINER_IMAGE_PULL_POLICY)

assert(preparedDriverSpec.driverContainer.getEnv.size === 7)
val envs = preparedDriverSpec.driverContainer
.getEnv
.asScala
.map(env => (env.getName, env.getValue))
.toMap
assert(envs(ENV_CLASSPATH) === "/opt/spark/spark-examples.jar")
assert(envs(ENV_DRIVER_MEMORY) === "256M")
assert(envs(ENV_DRIVER_MAIN_CLASS) === MAIN_CLASS)
assert(envs(ENV_DRIVER_ARGS) === "arg1 arg2 \"arg 3\"")
assert(envs(DRIVER_CUSTOM_ENV_KEY1) === "customDriverEnv1")
assert(envs(DRIVER_CUSTOM_ENV_KEY2) === "customDriverEnv2")

assert(preparedDriverSpec.driverContainer.getEnv.asScala.exists(envVar =>
envVar.getName.equals(ENV_DRIVER_BIND_ADDRESS) &&
envVar.getValueFrom.getFieldRef.getApiVersion.equals("v1") &&
envVar.getValueFrom.getFieldRef.getFieldPath.equals("status.podIP")))

val resourceRequirements = preparedDriverSpec.driverContainer.getResources
val requests = resourceRequirements.getRequests.asScala
assert(requests("cpu").getAmount === "2")
assert(requests("memory").getAmount === "256Mi")
val limits = resourceRequirements.getLimits.asScala
assert(limits("memory").getAmount === "456Mi")
assert(limits("cpu").getAmount === "4")

val driverPodMetadata = preparedDriverSpec.driverPod.getMetadata
assert(driverPodMetadata.getName === "spark-driver-pod")
assert(driverPodMetadata.getLabels.asScala === DRIVER_LABELS)
val expectedAnnotations = Map(
CUSTOM_ANNOTATION_KEY -> CUSTOM_ANNOTATION_VALUE,
SPARK_APP_NAME_ANNOTATION -> APP_NAME)
assert(driverPodMetadata.getAnnotations.asScala === expectedAnnotations)

val driverPodSpec = preparedDriverSpec.driverPod.getSpec
assert(driverPodSpec.getRestartPolicy === "Never")
assert(driverPodSpec.getImagePullSecrets.size() === 2)
assert(driverPodSpec.getImagePullSecrets.get(0).getName === "imagePullSecret1")
assert(driverPodSpec.getImagePullSecrets.get(1).getName === "imagePullSecret2")

val resolvedSparkConf = preparedDriverSpec.driverSparkConf.getAll.toMap
val expectedSparkConf = Map(
KUBERNETES_DRIVER_POD_NAME.key -> "spark-driver-pod",
"spark.app.id" -> APP_ID,
KUBERNETES_EXECUTOR_POD_NAME_PREFIX.key -> RESOURCE_NAME_PREFIX)
assert(resolvedSparkConf === expectedSparkConf)
}
}
Loading

0 comments on commit e9786d4

Please sign in to comment.