diff --git a/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/kubernetes/KubernetesExternalShuffleService.scala b/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/kubernetes/KubernetesExternalShuffleService.scala index 01a8a9a6899fd..c61f4f1d44acf 100644 --- a/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/kubernetes/KubernetesExternalShuffleService.scala +++ b/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/kubernetes/KubernetesExternalShuffleService.scala @@ -91,7 +91,7 @@ private[spark] class KubernetesShuffleBlockHandler ( try { Some(kubernetesClient .pods() - .withLabels(Map(SPARK_ROLE_LABEL -> "driver").asJava) + .withLabels(Map(SPARK_ROLE_LABEL -> SPARK_POD_DRIVER_ROLE).asJava) .watch(new Watcher[Pod] { override def eventReceived(action: Watcher.Action, p: Pod): Unit = { action match { diff --git a/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/kubernetes/config.scala b/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/kubernetes/config.scala index 47c3c24fa88f7..d1fd88fc880d1 100644 --- a/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/kubernetes/config.scala +++ b/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/kubernetes/config.scala @@ -151,6 +151,13 @@ package object config extends Logging { .stringConf .createOptional + private[spark] val KUBERNETES_EXECUTOR_POD_NAME_PREFIX = + ConfigBuilder("spark.kubernetes.executor.podNamePrefix") + .doc("Prefix to use in front of the executor pod names.") + .internal() + .stringConf + .createWithDefault("spark") + private[spark] val KUBERNETES_SHUFFLE_NAMESPACE = ConfigBuilder("spark.kubernetes.shuffle.namespace") .doc("Namespace of the shuffle service") diff --git a/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/kubernetes/constants.scala b/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/kubernetes/constants.scala index e267c9ff7e1d1..9c46d7494b187 100644 --- a/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/kubernetes/constants.scala +++ b/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/kubernetes/constants.scala @@ -19,10 +19,12 @@ package org.apache.spark.deploy.kubernetes package object constants { // Labels private[spark] val SPARK_DRIVER_LABEL = "spark-driver" - private[spark] val SPARK_APP_ID_LABEL = "spark-app-id" - private[spark] val SPARK_APP_NAME_LABEL = "spark-app-name" + private[spark] val SPARK_APP_ID_LABEL = "spark-app-selector" private[spark] val SPARK_EXECUTOR_ID_LABEL = "spark-exec-id" private[spark] val SPARK_ROLE_LABEL = "spark-role" + private[spark] val SPARK_POD_DRIVER_ROLE = "driver" + private[spark] val SPARK_POD_EXECUTOR_ROLE = "executor" + private[spark] val SPARK_APP_NAME_ANNOTATION = "spark-app-name" // Credentials secrets private[spark] val DRIVER_CREDENTIALS_SECRETS_BASE_DIR = diff --git a/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/kubernetes/submit/Client.scala b/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/kubernetes/submit/Client.scala index 0544bf064844f..c2e616eadc1e0 100644 --- a/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/kubernetes/submit/Client.scala +++ b/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/kubernetes/submit/Client.scala @@ -17,13 +17,13 @@ package org.apache.spark.deploy.kubernetes.submit import java.io.File -import java.util.Collections +import java.util.{Collections, UUID} import io.fabric8.kubernetes.api.model.{ContainerBuilder, EnvVarBuilder, OwnerReferenceBuilder, PodBuilder, QuantityBuilder} import io.fabric8.kubernetes.client.KubernetesClient import scala.collection.JavaConverters._ -import org.apache.spark.{SparkConf, SparkException} +import org.apache.spark.SparkConf import org.apache.spark.deploy.kubernetes.{ConfigurationUtils, SparkKubernetesClientFactory} import org.apache.spark.deploy.kubernetes.config._ import org.apache.spark.deploy.kubernetes.constants._ @@ -43,22 +43,21 @@ import org.apache.spark.util.Utils * where different steps of submission should be factored out into separate classes. */ private[spark] class Client( - appName: String, - kubernetesAppId: String, - mainClass: String, - sparkConf: SparkConf, - appArgs: Array[String], - sparkJars: Seq[String], - sparkFiles: Seq[String], - waitForAppCompletion: Boolean, - kubernetesClient: KubernetesClient, - initContainerComponentsProvider: DriverInitContainerComponentsProvider, - kubernetesCredentialsMounterProvider: DriverPodKubernetesCredentialsMounterProvider, - loggingPodStatusWatcher: LoggingPodStatusWatcher) - extends Logging { - + appName: String, + kubernetesResourceNamePrefix: String, + kubernetesAppId: String, + mainClass: String, + sparkConf: SparkConf, + appArgs: Array[String], + sparkJars: Seq[String], + sparkFiles: Seq[String], + waitForAppCompletion: Boolean, + kubernetesClient: KubernetesClient, + initContainerComponentsProvider: DriverInitContainerComponentsProvider, + kubernetesCredentialsMounterProvider: DriverPodKubernetesCredentialsMounterProvider, + loggingPodStatusWatcher: LoggingPodStatusWatcher) extends Logging { private val kubernetesDriverPodName = sparkConf.get(KUBERNETES_DRIVER_POD_NAME) - .getOrElse(kubernetesAppId) + .getOrElse(s"$kubernetesResourceNamePrefix-driver") private val driverDockerImage = sparkConf.get(DRIVER_DOCKER_IMAGE) private val dockerImagePullPolicy = sparkConf.get(DOCKER_IMAGE_PULL_POLICY) @@ -86,15 +85,16 @@ private[spark] class Client( val parsedCustomLabels = ConfigurationUtils.parseKeyValuePairs( customLabels, KUBERNETES_DRIVER_LABELS.key, "labels") require(!parsedCustomLabels.contains(SPARK_APP_ID_LABEL), s"Label with key " + - s" $SPARK_APP_ID_LABEL is not allowed as it is reserved for Spark bookkeeping operations.") - require(!parsedCustomLabels.contains(SPARK_APP_NAME_LABEL), s"Label with key" + - s" $SPARK_APP_NAME_LABEL is not allowed as it is reserved for Spark bookkeeping operations.") + s" $SPARK_APP_ID_LABEL is not allowed as it is reserved for Spark bookkeeping" + + s" operations.") + val parsedCustomAnnotations = ConfigurationUtils.parseKeyValuePairs( + customAnnotations, KUBERNETES_DRIVER_ANNOTATIONS.key, "annotations") + require(!parsedCustomAnnotations.contains(SPARK_APP_NAME_ANNOTATION), s"Annotation with key" + + s" $SPARK_APP_NAME_ANNOTATION is not allowed as it is reserved for Spark bookkeeping" + + s" operations.") val allLabels = parsedCustomLabels ++ Map( SPARK_APP_ID_LABEL -> kubernetesAppId, - SPARK_APP_NAME_LABEL -> appName, - SPARK_ROLE_LABEL -> "driver") - val parsedCustomAnnotations = ConfigurationUtils.parseKeyValuePairs( - customAnnotations, KUBERNETES_DRIVER_ANNOTATIONS.key, "annotations") + SPARK_ROLE_LABEL -> SPARK_POD_DRIVER_ROLE) val driverExtraClasspathEnv = driverExtraClasspath.map { classPath => new EnvVarBuilder() @@ -140,6 +140,7 @@ private[spark] class Client( .withName(kubernetesDriverPodName) .addToLabels(allLabels.asJava) .addToAnnotations(parsedCustomAnnotations.asJava) + .addToAnnotations(SPARK_APP_NAME_ANNOTATION, appName) .endMetadata() .withNewSpec() .withRestartPolicy("Never") @@ -186,6 +187,7 @@ private[spark] class Client( } resolvedSparkConf.setIfMissing(KUBERNETES_DRIVER_POD_NAME, kubernetesDriverPodName) resolvedSparkConf.set("spark.app.id", kubernetesAppId) + resolvedSparkConf.set(KUBERNETES_EXECUTOR_POD_NAME_PREFIX, kubernetesResourceNamePrefix) // We don't need this anymore since we just set the JVM options on the environment resolvedSparkConf.remove(org.apache.spark.internal.config.DRIVER_JAVA_OPTIONS) val resolvedLocalClasspath = containerLocalizedFilesResolver @@ -234,11 +236,11 @@ private[spark] class Client( throw e } if (waitForAppCompletion) { - logInfo(s"Waiting for application $kubernetesAppId to finish...") + logInfo(s"Waiting for application $appName to finish...") loggingPodStatusWatcher.awaitCompletion() - logInfo(s"Application $kubernetesAppId finished.") + logInfo(s"Application $appName finished.") } else { - logInfo(s"Deployed Spark application $kubernetesAppId into Kubernetes.") + logInfo(s"Deployed Spark application $appName into Kubernetes.") } } } @@ -279,15 +281,21 @@ private[spark] object Client { val sparkFiles = sparkConf.getOption("spark.files") .map(_.split(",")) .getOrElse(Array.empty[String]) - val appName = sparkConf.getOption("spark.app.name") - .getOrElse("spark") - val kubernetesAppId = s"$appName-$launchTime".toLowerCase.replaceAll("\\.", "-") + val appName = sparkConf.getOption("spark.app.name").getOrElse("spark") + // The resource name prefix is derived from the application name, making it easy to connect the + // names of the Kubernetes resources from e.g. Kubectl or the Kubernetes dashboard to the + // application the user submitted. However, we can't use the application name in the label, as + // label values are considerably restrictive, e.g. must be no longer than 63 characters in + // length. So we generate a separate identifier for the app ID itself, and bookkeeping that + // requires finding "all pods for this application" should use the kubernetesAppId. + val kubernetesResourceNamePrefix = s"$appName-$launchTime".toLowerCase.replaceAll("\\.", "-") + val kubernetesAppId = s"spark-${UUID.randomUUID().toString.replaceAll("-", "")}" val namespace = sparkConf.get(KUBERNETES_NAMESPACE) val master = resolveK8sMaster(sparkConf.get("spark.master")) val sslOptionsProvider = new ResourceStagingServerSslOptionsProviderImpl(sparkConf) val initContainerComponentsProvider = new DriverInitContainerComponentsProviderImpl( sparkConf, - kubernetesAppId, + kubernetesResourceNamePrefix, namespace, sparkJars, sparkFiles, @@ -300,14 +308,16 @@ private[spark] object Client { None, None)) { kubernetesClient => val kubernetesCredentialsMounterProvider = - new DriverPodKubernetesCredentialsMounterProviderImpl(sparkConf, kubernetesAppId) + new DriverPodKubernetesCredentialsMounterProviderImpl( + sparkConf, kubernetesResourceNamePrefix) val waitForAppCompletion = sparkConf.get(WAIT_FOR_APP_COMPLETION) val loggingInterval = Option(sparkConf.get(REPORT_INTERVAL)) .filter( _ => waitForAppCompletion) val loggingPodStatusWatcher = new LoggingPodStatusWatcherImpl( - kubernetesAppId, loggingInterval) + kubernetesResourceNamePrefix, loggingInterval) new Client( appName, + kubernetesResourceNamePrefix, kubernetesAppId, mainClass, sparkConf, diff --git a/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/kubernetes/submit/DriverInitContainerComponentsProvider.scala b/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/kubernetes/submit/DriverInitContainerComponentsProvider.scala index be9da2582cb47..cfc61e193dcff 100644 --- a/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/kubernetes/submit/DriverInitContainerComponentsProvider.scala +++ b/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/kubernetes/submit/DriverInitContainerComponentsProvider.scala @@ -16,8 +16,6 @@ */ package org.apache.spark.deploy.kubernetes.submit -import java.io.File - import org.apache.spark.{SparkConf, SSLOptions} import org.apache.spark.deploy.kubernetes.{InitContainerResourceStagingServerSecretPluginImpl, OptionRequirements, SparkPodInitContainerBootstrap, SparkPodInitContainerBootstrapImpl} import org.apache.spark.deploy.kubernetes.config._ @@ -46,12 +44,12 @@ private[spark] trait DriverInitContainerComponentsProvider { } private[spark] class DriverInitContainerComponentsProviderImpl( - sparkConf: SparkConf, - kubernetesAppId: String, - namespace: String, - sparkJars: Seq[String], - sparkFiles: Seq[String], - resourceStagingServerExternalSslOptions: SSLOptions) + sparkConf: SparkConf, + kubernetesResourceNamePrefix: String, + namespace: String, + sparkJars: Seq[String], + sparkFiles: Seq[String], + resourceStagingServerExternalSslOptions: SSLOptions) extends DriverInitContainerComponentsProvider { private val maybeResourceStagingServerUri = sparkConf.get(RESOURCE_STAGING_SERVER_URI) @@ -99,10 +97,10 @@ private[spark] class DriverInitContainerComponentsProviderImpl( private val jarsDownloadPath = sparkConf.get(INIT_CONTAINER_JARS_DOWNLOAD_LOCATION) private val filesDownloadPath = sparkConf.get(INIT_CONTAINER_FILES_DOWNLOAD_LOCATION) private val maybeSecretName = maybeResourceStagingServerUri.map { _ => - s"$kubernetesAppId-init-secret" + s"$kubernetesResourceNamePrefix-init-secret" } - private val configMapName = s"$kubernetesAppId-init-config" - private val configMapKey = s"$kubernetesAppId-init-config-key" + private val configMapName = s"$kubernetesResourceNamePrefix-init-config" + private val configMapKey = s"$kubernetesResourceNamePrefix-init-config-key" private val initContainerImage = sparkConf.get(INIT_CONTAINER_DOCKER_IMAGE) private val dockerImagePullPolicy = sparkConf.get(DOCKER_IMAGE_PULL_POLICY) private val downloadTimeoutMinutes = sparkConf.get(INIT_CONTAINER_MOUNT_TIMEOUT) @@ -116,29 +114,29 @@ private[spark] class DriverInitContainerComponentsProviderImpl( filesResourceId <- maybeSubmittedResourceIds.map(_.filesResourceId) } yield { new SubmittedDependencyInitContainerConfigPluginImpl( - // Configure the init-container with the internal URI over the external URI. - maybeResourceStagingServerInternalUri.getOrElse(stagingServerUri), - jarsResourceId, - filesResourceId, - INIT_CONTAINER_SUBMITTED_JARS_SECRET_KEY, - INIT_CONTAINER_SUBMITTED_FILES_SECRET_KEY, - INIT_CONTAINER_STAGING_SERVER_TRUSTSTORE_SECRET_KEY, - INIT_CONTAINER_STAGING_SERVER_CLIENT_CERT_SECRET_KEY, - resourceStagingServerInternalSslEnabled, - maybeResourceStagingServerInternalTrustStore, - maybeResourceStagingServerInternalClientCert, - maybeResourceStagingServerInternalTrustStorePassword, - maybeResourceStagingServerInternalTrustStoreType, - INIT_CONTAINER_SECRET_VOLUME_MOUNT_PATH) + // Configure the init-container with the internal URI over the external URI. + maybeResourceStagingServerInternalUri.getOrElse(stagingServerUri), + jarsResourceId, + filesResourceId, + INIT_CONTAINER_SUBMITTED_JARS_SECRET_KEY, + INIT_CONTAINER_SUBMITTED_FILES_SECRET_KEY, + INIT_CONTAINER_STAGING_SERVER_TRUSTSTORE_SECRET_KEY, + INIT_CONTAINER_STAGING_SERVER_CLIENT_CERT_SECRET_KEY, + resourceStagingServerInternalSslEnabled, + maybeResourceStagingServerInternalTrustStore, + maybeResourceStagingServerInternalClientCert, + maybeResourceStagingServerInternalTrustStorePassword, + maybeResourceStagingServerInternalTrustStoreType, + INIT_CONTAINER_SECRET_VOLUME_MOUNT_PATH) } new SparkInitContainerConfigMapBuilderImpl( - sparkJars, - sparkFiles, - jarsDownloadPath, - filesDownloadPath, - configMapName, - configMapKey, - submittedDependencyConfigPlugin) + sparkJars, + sparkFiles, + jarsDownloadPath, + filesDownloadPath, + configMapName, + configMapKey, + submittedDependencyConfigPlugin) } override def provideContainerLocalizedFilesResolver(): ContainerLocalizedFilesResolver = { @@ -158,14 +156,13 @@ private[spark] class DriverInitContainerComponentsProviderImpl( driverPodLabels: Map[String, String]): Option[SubmittedDependencyUploader] = { maybeResourceStagingServerUri.map { stagingServerUri => new SubmittedDependencyUploaderImpl( - kubernetesAppId, - driverPodLabels, - namespace, - stagingServerUri, - sparkJars, - sparkFiles, - resourceStagingServerExternalSslOptions, - RetrofitClientFactoryImpl) + driverPodLabels, + namespace, + stagingServerUri, + sparkJars, + sparkFiles, + resourceStagingServerExternalSslOptions, + RetrofitClientFactoryImpl) } } @@ -178,15 +175,15 @@ private[spark] class DriverInitContainerComponentsProviderImpl( filesResourceSecret <- maybeSubmittedResourceSecrets.map(_.filesResourceSecret) } yield { new SubmittedDependencySecretBuilderImpl( - secretName, - jarsResourceSecret, - filesResourceSecret, - INIT_CONTAINER_SUBMITTED_JARS_SECRET_KEY, - INIT_CONTAINER_SUBMITTED_FILES_SECRET_KEY, - INIT_CONTAINER_STAGING_SERVER_TRUSTSTORE_SECRET_KEY, - INIT_CONTAINER_STAGING_SERVER_CLIENT_CERT_SECRET_KEY, - maybeResourceStagingServerInternalTrustStore, - maybeResourceStagingServerInternalClientCert) + secretName, + jarsResourceSecret, + filesResourceSecret, + INIT_CONTAINER_SUBMITTED_JARS_SECRET_KEY, + INIT_CONTAINER_SUBMITTED_FILES_SECRET_KEY, + INIT_CONTAINER_STAGING_SERVER_TRUSTSTORE_SECRET_KEY, + INIT_CONTAINER_STAGING_SERVER_CLIENT_CERT_SECRET_KEY, + maybeResourceStagingServerInternalTrustStore, + maybeResourceStagingServerInternalClientCert) } } @@ -196,13 +193,13 @@ private[spark] class DriverInitContainerComponentsProviderImpl( secret, INIT_CONTAINER_SECRET_VOLUME_MOUNT_PATH) } new SparkPodInitContainerBootstrapImpl( - initContainerImage, - dockerImagePullPolicy, - jarsDownloadPath, - filesDownloadPath, - downloadTimeoutMinutes, - configMapName, - configMapKey, - resourceStagingServerSecretPlugin) + initContainerImage, + dockerImagePullPolicy, + jarsDownloadPath, + filesDownloadPath, + downloadTimeoutMinutes, + configMapName, + configMapKey, + resourceStagingServerSecretPlugin) } } diff --git a/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/kubernetes/submit/SubmittedDependencyUploaderImpl.scala b/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/kubernetes/submit/SubmittedDependencyUploaderImpl.scala index a891cf3904d2d..83d7a28f5ca10 100644 --- a/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/kubernetes/submit/SubmittedDependencyUploaderImpl.scala +++ b/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/kubernetes/submit/SubmittedDependencyUploaderImpl.scala @@ -50,7 +50,6 @@ private[spark] trait SubmittedDependencyUploader { * Resource Staging Service. */ private[spark] class SubmittedDependencyUploaderImpl( - kubernetesAppId: String, podLabels: Map[String, String], podNamespace: String, stagingServerUri: String, diff --git a/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/scheduler/cluster/kubernetes/KubernetesClusterSchedulerBackend.scala b/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/scheduler/cluster/kubernetes/KubernetesClusterSchedulerBackend.scala index c3a6fe28a6255..6ab6480d848a2 100644 --- a/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/scheduler/cluster/kubernetes/KubernetesClusterSchedulerBackend.scala +++ b/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/scheduler/cluster/kubernetes/KubernetesClusterSchedulerBackend.scala @@ -65,7 +65,8 @@ private[spark] class KubernetesClusterSchedulerBackend( "executor labels") require( !executorLabels.contains(SPARK_APP_ID_LABEL), - s"Custom executor labels cannot contain $SPARK_APP_ID_LABEL as it is reserved for Spark.") + s"Custom executor labels cannot contain $SPARK_APP_ID_LABEL as it is" + + s" reserved for Spark.") require( !executorLabels.contains(SPARK_EXECUTOR_ID_LABEL), s"Custom executor labels cannot contain $SPARK_EXECUTOR_ID_LABEL as it is reserved for" + @@ -87,6 +88,7 @@ private[spark] class KubernetesClusterSchedulerBackend( .get(KUBERNETES_DRIVER_POD_NAME) .getOrElse( throw new SparkException("Must specify the driver pod name")) + private val executorPodNamePrefix = conf.get(KUBERNETES_EXECUTOR_POD_NAME_PREFIX) private val executorMemoryMb = conf.get(org.apache.spark.internal.config.EXECUTOR_MEMORY) private val executorMemoryString = conf.get( @@ -225,8 +227,11 @@ private[spark] class KubernetesClusterSchedulerBackend( override def start(): Unit = { super.start() - executorWatchResource.set(kubernetesClient.pods().withLabel(SPARK_APP_ID_LABEL, applicationId()) - .watch(new ExecutorPodsWatcher())) + executorWatchResource.set( + kubernetesClient + .pods() + .withLabel(SPARK_APP_ID_LABEL, applicationId()) + .watch(new ExecutorPodsWatcher())) allocator.scheduleWithFixedDelay( allocatorRunnable, 0, podAllocationInterval, TimeUnit.SECONDS) @@ -280,7 +285,7 @@ private[spark] class KubernetesClusterSchedulerBackend( private def allocateNewExecutorPod(): (String, Pod) = { val executorId = EXECUTOR_ID_COUNTER.incrementAndGet().toString - val name = s"${applicationId()}-exec-$executorId" + val name = s"$executorPodNamePrefix-exec-$executorId" // hostname must be no longer than 63 characters, so take the last 63 characters of the pod // name as the hostname. This preserves uniqueness since the end of name contains @@ -289,7 +294,7 @@ private[spark] class KubernetesClusterSchedulerBackend( val resolvedExecutorLabels = Map( SPARK_EXECUTOR_ID_LABEL -> executorId, SPARK_APP_ID_LABEL -> applicationId(), - SPARK_ROLE_LABEL -> "executor") ++ + SPARK_ROLE_LABEL -> SPARK_POD_EXECUTOR_ROLE) ++ executorLabels val executorMemoryQuantity = new QuantityBuilder(false) .withAmount(s"${executorMemoryMb}M") diff --git a/resource-managers/kubernetes/core/src/test/scala/org/apache/spark/deploy/kubernetes/submit/ClientV2Suite.scala b/resource-managers/kubernetes/core/src/test/scala/org/apache/spark/deploy/kubernetes/submit/ClientV2Suite.scala index 00f09c64b53b7..193f36a7423b2 100644 --- a/resource-managers/kubernetes/core/src/test/scala/org/apache/spark/deploy/kubernetes/submit/ClientV2Suite.scala +++ b/resource-managers/kubernetes/core/src/test/scala/org/apache/spark/deploy/kubernetes/submit/ClientV2Suite.scala @@ -45,14 +45,14 @@ class ClientV2Suite extends SparkFunSuite with BeforeAndAfter { private val BOOTSTRAPPED_POD_ANNOTATION = "bootstrapped" private val TRUE = "true" private val APP_NAME = "spark-test" - private val APP_ID = "spark-app-id" + private val APP_RESOURCE_PREFIX = "spark-prefix" + private val APP_ID = "spark-id" private val CUSTOM_LABEL_KEY = "customLabel" private val CUSTOM_LABEL_VALUE = "customLabelValue" private val ALL_EXPECTED_LABELS = Map( CUSTOM_LABEL_KEY -> CUSTOM_LABEL_VALUE, SPARK_APP_ID_LABEL -> APP_ID, - SPARK_APP_NAME_LABEL -> APP_NAME, - SPARK_ROLE_LABEL -> "driver") + SPARK_ROLE_LABEL -> SPARK_POD_DRIVER_ROLE) private val CUSTOM_ANNOTATION_KEY = "customAnnotation" private val CUSTOM_ANNOTATION_VALUE = "customAnnotationValue" private val INIT_CONTAINER_SECRET_NAME = "init-container-secret" @@ -183,7 +183,7 @@ class ClientV2Suite extends SparkFunSuite with BeforeAndAfter { .build() } }) - when(podOps.withName(APP_ID)).thenReturn(namedPodResource) + when(podOps.withName(s"$APP_RESOURCE_PREFIX-driver")).thenReturn(namedPodResource) when(namedPodResource.watch(loggingPodStatusWatcher)).thenReturn(watch) when(containerLocalizedFilesResolver.resolveSubmittedAndRemoteSparkJars()) .thenReturn(RESOLVED_SPARK_REMOTE_AND_LOCAL_JARS) @@ -291,6 +291,7 @@ class ClientV2Suite extends SparkFunSuite with BeforeAndAfter { expectationsForNoDependencyUploader() new Client( APP_NAME, + APP_RESOURCE_PREFIX, APP_ID, MAIN_CLASS, SPARK_CONF, @@ -334,7 +335,7 @@ class ClientV2Suite extends SparkFunSuite with BeforeAndAfter { owners.head.getController && owners.head.getKind == DRIVER_POD_KIND && owners.head.getUid == DRIVER_POD_UID && - owners.head.getName == APP_ID && + owners.head.getName == s"$APP_RESOURCE_PREFIX-driver" && owners.head.getApiVersion == DRIVER_POD_API_VERSION }) } @@ -354,14 +355,15 @@ class ClientV2Suite extends SparkFunSuite with BeforeAndAfter { .toMap ++ Map( "spark.app.id" -> APP_ID, - KUBERNETES_DRIVER_POD_NAME.key -> APP_ID, + KUBERNETES_DRIVER_POD_NAME.key -> s"$APP_RESOURCE_PREFIX-driver", + KUBERNETES_EXECUTOR_POD_NAME_PREFIX.key -> APP_RESOURCE_PREFIX, EXECUTOR_INIT_CONF_KEY -> TRUE, CUSTOM_JAVA_OPTION_KEY -> CUSTOM_JAVA_OPTION_VALUE, "spark.jars" -> RESOLVED_SPARK_JARS.mkString(","), "spark.files" -> RESOLVED_SPARK_FILES.mkString(",")) runAndVerifyPodMatchesPredicate { p => Option(p) - .filter(_.getMetadata.getName == APP_ID) + .filter(_.getMetadata.getName == s"$APP_RESOURCE_PREFIX-driver") .filter(podHasCorrectAnnotations) .filter(_.getMetadata.getLabels.asScala == ALL_EXPECTED_LABELS) .filter(containerHasCorrectBasicContainerConfiguration) @@ -374,6 +376,7 @@ class ClientV2Suite extends SparkFunSuite with BeforeAndAfter { private def runAndVerifyPodMatchesPredicate(pred: (Pod => Boolean)): Unit = { new Client( APP_NAME, + APP_RESOURCE_PREFIX, APP_ID, MAIN_CLASS, SPARK_CONF, @@ -442,6 +445,7 @@ class ClientV2Suite extends SparkFunSuite with BeforeAndAfter { private def podHasCorrectAnnotations(pod: Pod): Boolean = { val expectedAnnotations = Map( CUSTOM_ANNOTATION_KEY -> CUSTOM_ANNOTATION_VALUE, + SPARK_APP_NAME_ANNOTATION -> APP_NAME, BOOTSTRAPPED_POD_ANNOTATION -> TRUE) pod.getMetadata.getAnnotations.asScala == expectedAnnotations } diff --git a/resource-managers/kubernetes/core/src/test/scala/org/apache/spark/deploy/kubernetes/submit/SubmittedDependencyUploaderSuite.scala b/resource-managers/kubernetes/core/src/test/scala/org/apache/spark/deploy/kubernetes/submit/SubmittedDependencyUploaderSuite.scala index c207e3c69cd3c..96fa92c254297 100644 --- a/resource-managers/kubernetes/core/src/test/scala/org/apache/spark/deploy/kubernetes/submit/SubmittedDependencyUploaderSuite.scala +++ b/resource-managers/kubernetes/core/src/test/scala/org/apache/spark/deploy/kubernetes/submit/SubmittedDependencyUploaderSuite.scala @@ -85,14 +85,13 @@ private[spark] class SubmittedDependencyUploaderSuite extends SparkFunSuite with resourcesDataCaptor.capture(), resourcesOwnerCaptor.capture())) .thenReturn(responseCall) dependencyUploaderUnderTest = new SubmittedDependencyUploaderImpl( - APP_ID, - LABELS, - NAMESPACE, - STAGING_SERVER_URI, - JARS, - FILES, - STAGING_SERVER_SSL_OPTIONS, - retrofitClientFactory) + LABELS, + NAMESPACE, + STAGING_SERVER_URI, + JARS, + FILES, + STAGING_SERVER_SSL_OPTIONS, + retrofitClientFactory) } test("Uploading jars should contact the staging server with the appropriate parameters") { diff --git a/resource-managers/kubernetes/integration-tests/src/test/scala/org/apache/spark/deploy/kubernetes/integrationtest/KubernetesSuite.scala b/resource-managers/kubernetes/integration-tests/src/test/scala/org/apache/spark/deploy/kubernetes/integrationtest/KubernetesSuite.scala index 6a296d6112c97..e377f285eb9a6 100644 --- a/resource-managers/kubernetes/integration-tests/src/test/scala/org/apache/spark/deploy/kubernetes/integrationtest/KubernetesSuite.scala +++ b/resource-managers/kubernetes/integration-tests/src/test/scala/org/apache/spark/deploy/kubernetes/integrationtest/KubernetesSuite.scala @@ -195,6 +195,13 @@ private[spark] class KubernetesSuite extends SparkFunSuite with BeforeAndAfter { Array(testExistenceFile.getName, TEST_EXISTENCE_FILE_CONTENTS)) } + test("Use a very long application name.") { + assume(testBackend.name == MINIKUBE_TEST_BACKEND) + + sparkConf.setJars(Seq(CONTAINER_LOCAL_HELPER_JAR_PATH)).setAppName("long" * 40) + runSparkPiAndVerifyCompletion(CONTAINER_LOCAL_MAIN_APP_RESOURCE) + } + private def launchStagingServer( resourceStagingServerSslOptions: SSLOptions, keyAndCertPem: Option[KeyAndCertPem]): Unit = { assume(testBackend.name == MINIKUBE_TEST_BACKEND)