From 92a6611a090bbe216f6bb23b0b27db433aabd24a Mon Sep 17 00:00:00 2001 From: Yinan Li Date: Wed, 22 Apr 2020 09:11:19 -0700 Subject: [PATCH] Fixe for #868 (#876) --- pkg/config/constants.go | 5 +- pkg/webhook/patch.go | 118 +++++++++++++++++++++++++++----------- pkg/webhook/patch_test.go | 8 +-- 3 files changed, 91 insertions(+), 40 deletions(-) diff --git a/pkg/config/constants.go b/pkg/config/constants.go index dbf2f0ad0..46116d61c 100644 --- a/pkg/config/constants.go +++ b/pkg/config/constants.go @@ -275,7 +275,10 @@ const ( SparkDriverContainerName = "spark-kubernetes-driver" // SparkExecutorContainerName is name of executor container in spark executor pod SparkExecutorContainerName = "executor" - + // Spark3DefaultExecutorContainerName is the default executor container name in + // Spark 3.x, which allows the container name to be configured through the pod + // template support. + Spark3DefaultExecutorContainerName = "spark-kubernetes-executor" // SparkLocalDirVolumePrefix is the volume name prefix for "scratch" space directory SparkLocalDirVolumePrefix = "spark-local-dir-" ) diff --git a/pkg/webhook/patch.go b/pkg/webhook/patch.go index 4b67898dd..3e378dcfd 100644 --- a/pkg/webhook/patch.go +++ b/pkg/webhook/patch.go @@ -141,7 +141,11 @@ func addVolumes(pod *corev1.Pod, app *v1beta2.SparkApplication) []patchOperation ops = append(ops, addVolume(pod, v)) addedVolumeMap[m.Name] = v } - ops = append(ops, addVolumeMount(pod, m)) + vmPatchOp := addVolumeMount(pod, m) + if vmPatchOp == nil { + return nil + } + ops = append(ops, *vmPatchOp) } } @@ -162,14 +166,11 @@ func addVolume(pod *corev1.Pod, volume corev1.Volume) patchOperation { return patchOperation{Op: "add", Path: path, Value: value} } -func addVolumeMount(pod *corev1.Pod, mount corev1.VolumeMount) patchOperation { - i := 0 - // Find the driver or executor container in the pod. - for ; i < len(pod.Spec.Containers); i++ { - if pod.Spec.Containers[i].Name == config.SparkDriverContainerName || - pod.Spec.Containers[i].Name == config.SparkExecutorContainerName { - break - } +func addVolumeMount(pod *corev1.Pod, mount corev1.VolumeMount) *patchOperation { + i := findContainer(pod) + if i < 0 { + glog.Warningf("not able to add VolumeMount %s as Spark container was not found in pod %s", mount.Name, pod.Name) + return nil } path := fmt.Sprintf("/spec/containers/%d/volumeMounts", i) @@ -182,7 +183,7 @@ func addVolumeMount(pod *corev1.Pod, mount corev1.VolumeMount) patchOperation { } pod.Spec.Containers[i].VolumeMounts = append(pod.Spec.Containers[i].VolumeMounts, mount) - return patchOperation{Op: "add", Path: path, Value: value} + return &patchOperation{Op: "add", Path: path, Value: value} } func addEnvVars(pod *corev1.Pod, app *v1beta2.SparkApplication) []patchOperation { @@ -269,14 +270,11 @@ func addEnvFrom(pod *corev1.Pod, app *v1beta2.SparkApplication) []patchOperation return patchOps } -func addEnvironmentVariable(pod *corev1.Pod, envName, envValue string) patchOperation { - i := 0 - // Find the driver or executor container in the pod. - for ; i < len(pod.Spec.Containers); i++ { - if pod.Spec.Containers[i].Name == config.SparkDriverContainerName || - pod.Spec.Containers[i].Name == config.SparkExecutorContainerName { - break - } +func addEnvironmentVariable(pod *corev1.Pod, envName, envValue string) *patchOperation { + i := findContainer(pod) + if i < 0 { + glog.Warningf("not able to add environment variable %s as Spark container was not found in pod %s", envName, pod.Name) + return nil } path := fmt.Sprintf("/spec/containers/%d/env", i) @@ -288,7 +286,7 @@ func addEnvironmentVariable(pod *corev1.Pod, envName, envValue string) patchOper value = corev1.EnvVar{Name: envName, Value: envValue} } - return patchOperation{Op: "add", Path: path, Value: value} + return &patchOperation{Op: "add", Path: path, Value: value} } func addSparkConfigMap(pod *corev1.Pod, app *v1beta2.SparkApplication) []patchOperation { @@ -296,9 +294,16 @@ func addSparkConfigMap(pod *corev1.Pod, app *v1beta2.SparkApplication) []patchOp sparkConfigMapName := app.Spec.SparkConfigMap if sparkConfigMapName != nil { patchOps = append(patchOps, addConfigMapVolume(pod, *sparkConfigMapName, config.SparkConfigMapVolumeName)) - patchOps = append(patchOps, addConfigMapVolumeMount(pod, config.SparkConfigMapVolumeName, - config.DefaultSparkConfDir)) - patchOps = append(patchOps, addEnvironmentVariable(pod, config.SparkConfDirEnvVar, config.DefaultSparkConfDir)) + vmPatchOp := addConfigMapVolumeMount(pod, config.SparkConfigMapVolumeName, config.DefaultSparkConfDir) + if vmPatchOp == nil { + return nil + } + patchOps = append(patchOps, *vmPatchOp) + envPatchOp := addEnvironmentVariable(pod, config.SparkConfDirEnvVar, config.DefaultSparkConfDir) + if envPatchOp == nil { + return nil + } + patchOps = append(patchOps, *envPatchOp) } return patchOps } @@ -308,9 +313,16 @@ func addHadoopConfigMap(pod *corev1.Pod, app *v1beta2.SparkApplication) []patchO hadoopConfigMapName := app.Spec.HadoopConfigMap if hadoopConfigMapName != nil { patchOps = append(patchOps, addConfigMapVolume(pod, *hadoopConfigMapName, config.HadoopConfigMapVolumeName)) - patchOps = append(patchOps, addConfigMapVolumeMount(pod, config.HadoopConfigMapVolumeName, - config.DefaultHadoopConfDir)) - patchOps = append(patchOps, addEnvironmentVariable(pod, config.HadoopConfDirEnvVar, config.DefaultHadoopConfDir)) + vmPatchOp := addConfigMapVolumeMount(pod, config.HadoopConfigMapVolumeName, config.DefaultHadoopConfDir) + if vmPatchOp == nil { + return nil + } + patchOps = append(patchOps, *vmPatchOp) + envPatchOp := addEnvironmentVariable(pod, config.HadoopConfDirEnvVar, config.DefaultHadoopConfDir) + if envPatchOp == nil { + return nil + } + patchOps = append(patchOps, *envPatchOp) } return patchOps } @@ -331,7 +343,11 @@ func addGeneralConfigMaps(pod *corev1.Pod, app *v1beta2.SparkApplication) []patc glog.V(2).Infof("ConfigMap volume name is too long. Truncating to length %d. Result: %s.", maxNameLength, volumeName) } patchOps = append(patchOps, addConfigMapVolume(pod, namePath.Name, volumeName)) - patchOps = append(patchOps, addConfigMapVolumeMount(pod, volumeName, namePath.Path)) + vmPatchOp := addConfigMapVolumeMount(pod, volumeName, namePath.Path) + if vmPatchOp == nil { + return nil + } + patchOps = append(patchOps, *vmPatchOp) } return patchOps } @@ -355,7 +371,11 @@ func addPrometheusConfigMap(pod *corev1.Pod, app *v1beta2.SparkApplication) []pa volumeName := name + "-vol" mountPath := config.PrometheusConfigMapMountPath patchOps = append(patchOps, addConfigMapVolume(pod, name, volumeName)) - patchOps = append(patchOps, addConfigMapVolumeMount(pod, volumeName, mountPath)) + vmPatchOp := addConfigMapVolumeMount(pod, volumeName, mountPath) + if vmPatchOp == nil { + return nil + } + patchOps = append(patchOps, *vmPatchOp) return patchOps } @@ -373,7 +393,7 @@ func addConfigMapVolume(pod *corev1.Pod, configMapName string, configMapVolumeNa return addVolume(pod, volume) } -func addConfigMapVolumeMount(pod *corev1.Pod, configMapVolumeName string, mountPath string) patchOperation { +func addConfigMapVolumeMount(pod *corev1.Pod, configMapVolumeName string, mountPath string) *patchOperation { mount := corev1.VolumeMount{ Name: configMapVolumeName, ReadOnly: true, @@ -559,14 +579,13 @@ func addGPU(pod *corev1.Pod, app *v1beta2.SparkApplication) *patchOperation { glog.V(2).Infof("GPU Quantity must be positive. Current gpu spec: %+v", gpu) return nil } - i := 0 - // Find the driver or executor container in the pod. - for ; i < len(pod.Spec.Containers); i++ { - if pod.Spec.Containers[i].Name == config.SparkDriverContainerName || - pod.Spec.Containers[i].Name == config.SparkExecutorContainerName { - break - } + + i := findContainer(pod) + if i < 0 { + glog.Warningf("not able to add GPU as Spark container was not found in pod %s", pod.Name) + return nil } + path := fmt.Sprintf("/spec/containers/%d/resources/limits", i) var value interface{} if len(pod.Spec.Containers[i].Resources.Limits) == 0 { @@ -642,6 +661,7 @@ func addPodLifeCycleConfig(pod *corev1.Pod, app *v1beta2.SparkApplication) *patc if lifeCycle == nil { return nil } + i := 0 // Find the driver container in the pod. for ; i < len(pod.Spec.Containers); i++ { @@ -649,6 +669,34 @@ func addPodLifeCycleConfig(pod *corev1.Pod, app *v1beta2.SparkApplication) *patc break } } + if i == len(pod.Spec.Containers) { + glog.Warningf("Spark driver container not found in pod %s", pod.Name) + return nil + } + path := fmt.Sprintf("/spec/containers/%d/lifecycle", i) return &patchOperation{Op: "add", Path: path, Value: *lifeCycle} } + +func findContainer(pod *corev1.Pod) int { + var candidateContainerNames []string + if util.IsDriverPod(pod) { + candidateContainerNames = append(candidateContainerNames, config.SparkDriverContainerName) + } else if util.IsExecutorPod(pod) { + // Spark 3.x changed the default executor container name so we need to include both. + candidateContainerNames = append(candidateContainerNames, config.SparkExecutorContainerName, config.Spark3DefaultExecutorContainerName) + } + + if len(candidateContainerNames) == 0 { + return -1 + } + + for i := 0; i < len(pod.Spec.Containers); i++ { + for _, name := range candidateContainerNames { + if pod.Spec.Containers[i].Name == name { + return i + } + } + } + return -1 +} diff --git a/pkg/webhook/patch_test.go b/pkg/webhook/patch_test.go index 32e705a91..ecd88f9a5 100644 --- a/pkg/webhook/patch_test.go +++ b/pkg/webhook/patch_test.go @@ -152,7 +152,7 @@ func TestPatchSparkPod_Volumes_Subpath(t *testing.T) { }, Spec: v1beta2.SparkApplicationSpec{ Volumes: []corev1.Volume{ - corev1.Volume{ + { Name: "spark-pvc", VolumeSource: corev1.VolumeSource{ PersistentVolumeClaim: &corev1.PersistentVolumeClaimVolumeSource{ @@ -219,7 +219,7 @@ func TestPatchSparkPod_Volumes(t *testing.T) { }, Spec: v1beta2.SparkApplicationSpec{ Volumes: []corev1.Volume{ - corev1.Volume{ + { Name: "spark", VolumeSource: corev1.VolumeSource{ HostPath: &corev1.HostPathVolumeSource{ @@ -227,7 +227,7 @@ func TestPatchSparkPod_Volumes(t *testing.T) { }, }, }, - corev1.Volume{ + { Name: "foo", VolumeSource: corev1.VolumeSource{ EmptyDir: &corev1.EmptyDirVolumeSource{}, @@ -938,7 +938,7 @@ func TestPatchSparkPod_DNSConfig(t *testing.T) { Nameservers: []string{"8.8.8.8", "4.4.4.4"}, Searches: []string{"svc.cluster.local", "cluster.local"}, Options: []corev1.PodDNSConfigOption{ - corev1.PodDNSConfigOption{Name: "ndots", Value: &aVal}, + {Name: "ndots", Value: &aVal}, }, }