Skip to content

Commit

Permalink
Fixe for #868 (#876)
Browse files Browse the repository at this point in the history
  • Loading branch information
liyinan926 authored Apr 22, 2020
1 parent 5c35182 commit 92a6611
Show file tree
Hide file tree
Showing 3 changed files with 91 additions and 40 deletions.
5 changes: 4 additions & 1 deletion pkg/config/constants.go
Original file line number Diff line number Diff line change
Expand Up @@ -275,7 +275,10 @@ const (
SparkDriverContainerName = "spark-kubernetes-driver"
// SparkExecutorContainerName is name of executor container in spark executor pod
SparkExecutorContainerName = "executor"

// Spark3DefaultExecutorContainerName is the default executor container name in
// Spark 3.x, which allows the container name to be configured through the pod
// template support.
Spark3DefaultExecutorContainerName = "spark-kubernetes-executor"
// SparkLocalDirVolumePrefix is the volume name prefix for "scratch" space directory
SparkLocalDirVolumePrefix = "spark-local-dir-"
)
118 changes: 83 additions & 35 deletions pkg/webhook/patch.go
Original file line number Diff line number Diff line change
Expand Up @@ -141,7 +141,11 @@ func addVolumes(pod *corev1.Pod, app *v1beta2.SparkApplication) []patchOperation
ops = append(ops, addVolume(pod, v))
addedVolumeMap[m.Name] = v
}
ops = append(ops, addVolumeMount(pod, m))
vmPatchOp := addVolumeMount(pod, m)
if vmPatchOp == nil {
return nil
}
ops = append(ops, *vmPatchOp)
}
}

Expand All @@ -162,14 +166,11 @@ func addVolume(pod *corev1.Pod, volume corev1.Volume) patchOperation {
return patchOperation{Op: "add", Path: path, Value: value}
}

func addVolumeMount(pod *corev1.Pod, mount corev1.VolumeMount) patchOperation {
i := 0
// Find the driver or executor container in the pod.
for ; i < len(pod.Spec.Containers); i++ {
if pod.Spec.Containers[i].Name == config.SparkDriverContainerName ||
pod.Spec.Containers[i].Name == config.SparkExecutorContainerName {
break
}
func addVolumeMount(pod *corev1.Pod, mount corev1.VolumeMount) *patchOperation {
i := findContainer(pod)
if i < 0 {
glog.Warningf("not able to add VolumeMount %s as Spark container was not found in pod %s", mount.Name, pod.Name)
return nil
}

path := fmt.Sprintf("/spec/containers/%d/volumeMounts", i)
Expand All @@ -182,7 +183,7 @@ func addVolumeMount(pod *corev1.Pod, mount corev1.VolumeMount) patchOperation {
}
pod.Spec.Containers[i].VolumeMounts = append(pod.Spec.Containers[i].VolumeMounts, mount)

return patchOperation{Op: "add", Path: path, Value: value}
return &patchOperation{Op: "add", Path: path, Value: value}
}

func addEnvVars(pod *corev1.Pod, app *v1beta2.SparkApplication) []patchOperation {
Expand Down Expand Up @@ -269,14 +270,11 @@ func addEnvFrom(pod *corev1.Pod, app *v1beta2.SparkApplication) []patchOperation
return patchOps
}

func addEnvironmentVariable(pod *corev1.Pod, envName, envValue string) patchOperation {
i := 0
// Find the driver or executor container in the pod.
for ; i < len(pod.Spec.Containers); i++ {
if pod.Spec.Containers[i].Name == config.SparkDriverContainerName ||
pod.Spec.Containers[i].Name == config.SparkExecutorContainerName {
break
}
func addEnvironmentVariable(pod *corev1.Pod, envName, envValue string) *patchOperation {
i := findContainer(pod)
if i < 0 {
glog.Warningf("not able to add environment variable %s as Spark container was not found in pod %s", envName, pod.Name)
return nil
}

path := fmt.Sprintf("/spec/containers/%d/env", i)
Expand All @@ -288,17 +286,24 @@ func addEnvironmentVariable(pod *corev1.Pod, envName, envValue string) patchOper
value = corev1.EnvVar{Name: envName, Value: envValue}
}

return patchOperation{Op: "add", Path: path, Value: value}
return &patchOperation{Op: "add", Path: path, Value: value}
}

func addSparkConfigMap(pod *corev1.Pod, app *v1beta2.SparkApplication) []patchOperation {
var patchOps []patchOperation
sparkConfigMapName := app.Spec.SparkConfigMap
if sparkConfigMapName != nil {
patchOps = append(patchOps, addConfigMapVolume(pod, *sparkConfigMapName, config.SparkConfigMapVolumeName))
patchOps = append(patchOps, addConfigMapVolumeMount(pod, config.SparkConfigMapVolumeName,
config.DefaultSparkConfDir))
patchOps = append(patchOps, addEnvironmentVariable(pod, config.SparkConfDirEnvVar, config.DefaultSparkConfDir))
vmPatchOp := addConfigMapVolumeMount(pod, config.SparkConfigMapVolumeName, config.DefaultSparkConfDir)
if vmPatchOp == nil {
return nil
}
patchOps = append(patchOps, *vmPatchOp)
envPatchOp := addEnvironmentVariable(pod, config.SparkConfDirEnvVar, config.DefaultSparkConfDir)
if envPatchOp == nil {
return nil
}
patchOps = append(patchOps, *envPatchOp)
}
return patchOps
}
Expand All @@ -308,9 +313,16 @@ func addHadoopConfigMap(pod *corev1.Pod, app *v1beta2.SparkApplication) []patchO
hadoopConfigMapName := app.Spec.HadoopConfigMap
if hadoopConfigMapName != nil {
patchOps = append(patchOps, addConfigMapVolume(pod, *hadoopConfigMapName, config.HadoopConfigMapVolumeName))
patchOps = append(patchOps, addConfigMapVolumeMount(pod, config.HadoopConfigMapVolumeName,
config.DefaultHadoopConfDir))
patchOps = append(patchOps, addEnvironmentVariable(pod, config.HadoopConfDirEnvVar, config.DefaultHadoopConfDir))
vmPatchOp := addConfigMapVolumeMount(pod, config.HadoopConfigMapVolumeName, config.DefaultHadoopConfDir)
if vmPatchOp == nil {
return nil
}
patchOps = append(patchOps, *vmPatchOp)
envPatchOp := addEnvironmentVariable(pod, config.HadoopConfDirEnvVar, config.DefaultHadoopConfDir)
if envPatchOp == nil {
return nil
}
patchOps = append(patchOps, *envPatchOp)
}
return patchOps
}
Expand All @@ -331,7 +343,11 @@ func addGeneralConfigMaps(pod *corev1.Pod, app *v1beta2.SparkApplication) []patc
glog.V(2).Infof("ConfigMap volume name is too long. Truncating to length %d. Result: %s.", maxNameLength, volumeName)
}
patchOps = append(patchOps, addConfigMapVolume(pod, namePath.Name, volumeName))
patchOps = append(patchOps, addConfigMapVolumeMount(pod, volumeName, namePath.Path))
vmPatchOp := addConfigMapVolumeMount(pod, volumeName, namePath.Path)
if vmPatchOp == nil {
return nil
}
patchOps = append(patchOps, *vmPatchOp)
}
return patchOps
}
Expand All @@ -355,7 +371,11 @@ func addPrometheusConfigMap(pod *corev1.Pod, app *v1beta2.SparkApplication) []pa
volumeName := name + "-vol"
mountPath := config.PrometheusConfigMapMountPath
patchOps = append(patchOps, addConfigMapVolume(pod, name, volumeName))
patchOps = append(patchOps, addConfigMapVolumeMount(pod, volumeName, mountPath))
vmPatchOp := addConfigMapVolumeMount(pod, volumeName, mountPath)
if vmPatchOp == nil {
return nil
}
patchOps = append(patchOps, *vmPatchOp)
return patchOps
}

Expand All @@ -373,7 +393,7 @@ func addConfigMapVolume(pod *corev1.Pod, configMapName string, configMapVolumeNa
return addVolume(pod, volume)
}

func addConfigMapVolumeMount(pod *corev1.Pod, configMapVolumeName string, mountPath string) patchOperation {
func addConfigMapVolumeMount(pod *corev1.Pod, configMapVolumeName string, mountPath string) *patchOperation {
mount := corev1.VolumeMount{
Name: configMapVolumeName,
ReadOnly: true,
Expand Down Expand Up @@ -559,14 +579,13 @@ func addGPU(pod *corev1.Pod, app *v1beta2.SparkApplication) *patchOperation {
glog.V(2).Infof("GPU Quantity must be positive. Current gpu spec: %+v", gpu)
return nil
}
i := 0
// Find the driver or executor container in the pod.
for ; i < len(pod.Spec.Containers); i++ {
if pod.Spec.Containers[i].Name == config.SparkDriverContainerName ||
pod.Spec.Containers[i].Name == config.SparkExecutorContainerName {
break
}

i := findContainer(pod)
if i < 0 {
glog.Warningf("not able to add GPU as Spark container was not found in pod %s", pod.Name)
return nil
}

path := fmt.Sprintf("/spec/containers/%d/resources/limits", i)
var value interface{}
if len(pod.Spec.Containers[i].Resources.Limits) == 0 {
Expand Down Expand Up @@ -642,13 +661,42 @@ func addPodLifeCycleConfig(pod *corev1.Pod, app *v1beta2.SparkApplication) *patc
if lifeCycle == nil {
return nil
}

i := 0
// Find the driver container in the pod.
for ; i < len(pod.Spec.Containers); i++ {
if pod.Spec.Containers[i].Name == config.SparkDriverContainerName {
break
}
}
if i == len(pod.Spec.Containers) {
glog.Warningf("Spark driver container not found in pod %s", pod.Name)
return nil
}

path := fmt.Sprintf("/spec/containers/%d/lifecycle", i)
return &patchOperation{Op: "add", Path: path, Value: *lifeCycle}
}

func findContainer(pod *corev1.Pod) int {
var candidateContainerNames []string
if util.IsDriverPod(pod) {
candidateContainerNames = append(candidateContainerNames, config.SparkDriverContainerName)
} else if util.IsExecutorPod(pod) {
// Spark 3.x changed the default executor container name so we need to include both.
candidateContainerNames = append(candidateContainerNames, config.SparkExecutorContainerName, config.Spark3DefaultExecutorContainerName)
}

if len(candidateContainerNames) == 0 {
return -1
}

for i := 0; i < len(pod.Spec.Containers); i++ {
for _, name := range candidateContainerNames {
if pod.Spec.Containers[i].Name == name {
return i
}
}
}
return -1
}
8 changes: 4 additions & 4 deletions pkg/webhook/patch_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -152,7 +152,7 @@ func TestPatchSparkPod_Volumes_Subpath(t *testing.T) {
},
Spec: v1beta2.SparkApplicationSpec{
Volumes: []corev1.Volume{
corev1.Volume{
{
Name: "spark-pvc",
VolumeSource: corev1.VolumeSource{
PersistentVolumeClaim: &corev1.PersistentVolumeClaimVolumeSource{
Expand Down Expand Up @@ -219,15 +219,15 @@ func TestPatchSparkPod_Volumes(t *testing.T) {
},
Spec: v1beta2.SparkApplicationSpec{
Volumes: []corev1.Volume{
corev1.Volume{
{
Name: "spark",
VolumeSource: corev1.VolumeSource{
HostPath: &corev1.HostPathVolumeSource{
Path: "/spark",
},
},
},
corev1.Volume{
{
Name: "foo",
VolumeSource: corev1.VolumeSource{
EmptyDir: &corev1.EmptyDirVolumeSource{},
Expand Down Expand Up @@ -938,7 +938,7 @@ func TestPatchSparkPod_DNSConfig(t *testing.T) {
Nameservers: []string{"8.8.8.8", "4.4.4.4"},
Searches: []string{"svc.cluster.local", "cluster.local"},
Options: []corev1.PodDNSConfigOption{
corev1.PodDNSConfigOption{Name: "ndots", Value: &aVal},
{Name: "ndots", Value: &aVal},
},
}

Expand Down

0 comments on commit 92a6611

Please sign in to comment.