From e80137c286388139e586f5f1bad542d0d0655e59 Mon Sep 17 00:00:00 2001 From: Aldo Culquicondor <1299064+alculquicondor@users.noreply.github.com> Date: Thu, 15 Jul 2021 17:28:38 -0400 Subject: [PATCH] Consolidate validation and defaulting logic (#376) Validation happens in a single place, improving coverage --- v2/pkg/apis/kubeflow/v2/default.go | 37 +++-- v2/pkg/apis/kubeflow/v2/default_test.go | 103 ++++++++++++ v2/pkg/apis/kubeflow/validation/validation.go | 96 +++++++++++ .../kubeflow/validation/validation_test.go | 155 ++++++++++++++++++ v2/pkg/controller/mpi_job_controller.go | 91 +++++----- v2/test/integration/doc.go | 17 ++ 6 files changed, 438 insertions(+), 61 deletions(-) create mode 100644 v2/pkg/apis/kubeflow/v2/default_test.go create mode 100644 v2/pkg/apis/kubeflow/validation/validation.go create mode 100644 v2/pkg/apis/kubeflow/validation/validation_test.go create mode 100644 v2/test/integration/doc.go diff --git a/v2/pkg/apis/kubeflow/v2/default.go b/v2/pkg/apis/kubeflow/v2/default.go index 7a75ebcd..93ff58d9 100644 --- a/v2/pkg/apis/kubeflow/v2/default.go +++ b/v2/pkg/apis/kubeflow/v2/default.go @@ -19,35 +19,42 @@ import ( "k8s.io/apimachinery/pkg/runtime" ) -// Int32 is a helper routine that allocates a new int32 value -// to store v and returns a pointer to it. -func Int32(v int32) *int32 { - return &v -} - func addDefaultingFuncs(scheme *runtime.Scheme) error { return RegisterDefaults(scheme) } // setDefaultsTypeLauncher sets the default value to launcher. func setDefaultsTypeLauncher(spec *common.ReplicaSpec) { - if spec != nil && spec.RestartPolicy == "" { + if spec == nil { + return + } + if spec.RestartPolicy == "" { spec.RestartPolicy = DefaultRestartPolicy } + if spec.Replicas == nil { + spec.Replicas = newInt32(1) + } } // setDefaultsTypeWorker sets the default value to worker. func setDefaultsTypeWorker(spec *common.ReplicaSpec) { - if spec != nil && spec.RestartPolicy == "" { + if spec == nil { + return + } + if spec.RestartPolicy == "" { spec.RestartPolicy = DefaultRestartPolicy } + if spec.Replicas == nil { + spec.Replicas = newInt32(0) + } } func SetDefaults_MPIJob(mpiJob *MPIJob) { - // Set default cleanpod policy to None. if mpiJob.Spec.CleanPodPolicy == nil { - none := common.CleanPodPolicyNone - mpiJob.Spec.CleanPodPolicy = &none + mpiJob.Spec.CleanPodPolicy = newCleanPodPolicy(common.CleanPodPolicyNone) + } + if mpiJob.Spec.SlotsPerWorker == nil { + mpiJob.Spec.SlotsPerWorker = newInt32(1) } // set default to Launcher @@ -56,3 +63,11 @@ func SetDefaults_MPIJob(mpiJob *MPIJob) { // set default to Worker setDefaultsTypeWorker(mpiJob.Spec.MPIReplicaSpecs[MPIReplicaTypeWorker]) } + +func newInt32(v int32) *int32 { + return &v +} + +func newCleanPodPolicy(policy common.CleanPodPolicy) *common.CleanPodPolicy { + return &policy +} diff --git a/v2/pkg/apis/kubeflow/v2/default_test.go b/v2/pkg/apis/kubeflow/v2/default_test.go new file mode 100644 index 00000000..e29205d0 --- /dev/null +++ b/v2/pkg/apis/kubeflow/v2/default_test.go @@ -0,0 +1,103 @@ +// Copyright 2021 The Kubeflow Authors. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package v2 + +import ( + "testing" + + "github.com/google/go-cmp/cmp" + common "github.com/kubeflow/common/pkg/apis/common/v1" +) + +func TestSetDefaults_MPIJob(t *testing.T) { + cases := map[string]struct { + job MPIJob + want MPIJob + }{ + "base defaults": { + want: MPIJob{ + Spec: MPIJobSpec{ + SlotsPerWorker: newInt32(1), + CleanPodPolicy: newCleanPodPolicy(common.CleanPodPolicyNone), + }, + }, + }, + "base defaults overridden": { + job: MPIJob{ + Spec: MPIJobSpec{ + SlotsPerWorker: newInt32(10), + CleanPodPolicy: newCleanPodPolicy(common.CleanPodPolicyRunning), + }, + }, + want: MPIJob{ + Spec: MPIJobSpec{ + SlotsPerWorker: newInt32(10), + CleanPodPolicy: newCleanPodPolicy(common.CleanPodPolicyRunning), + }, + }, + }, + "launcher defaults": { + job: MPIJob{ + Spec: MPIJobSpec{ + MPIReplicaSpecs: map[MPIReplicaType]*common.ReplicaSpec{ + MPIReplicaTypeLauncher: {}, + }, + }, + }, + want: MPIJob{ + Spec: MPIJobSpec{ + SlotsPerWorker: newInt32(1), + CleanPodPolicy: newCleanPodPolicy(common.CleanPodPolicyNone), + MPIReplicaSpecs: map[MPIReplicaType]*common.ReplicaSpec{ + MPIReplicaTypeLauncher: { + Replicas: newInt32(1), + RestartPolicy: DefaultRestartPolicy, + }, + }, + }, + }, + }, + "worker defaults": { + job: MPIJob{ + Spec: MPIJobSpec{ + MPIReplicaSpecs: map[MPIReplicaType]*common.ReplicaSpec{ + MPIReplicaTypeWorker: {}, + }, + }, + }, + want: MPIJob{ + Spec: MPIJobSpec{ + SlotsPerWorker: newInt32(1), + CleanPodPolicy: newCleanPodPolicy(common.CleanPodPolicyNone), + MPIReplicaSpecs: map[MPIReplicaType]*common.ReplicaSpec{ + MPIReplicaTypeWorker: { + Replicas: newInt32(0), + RestartPolicy: DefaultRestartPolicy, + }, + }, + }, + }, + }, + } + for name, tc := range cases { + t.Run(name, func(t *testing.T) { + got := tc.job.DeepCopy() + SetDefaults_MPIJob(got) + if diff := cmp.Diff(tc.want, *got); diff != "" { + t.Errorf("Unexpected changes (-want,+got):\n%s", diff) + } + }) + } +} diff --git a/v2/pkg/apis/kubeflow/validation/validation.go b/v2/pkg/apis/kubeflow/validation/validation.go new file mode 100644 index 00000000..2be6ddd4 --- /dev/null +++ b/v2/pkg/apis/kubeflow/validation/validation.go @@ -0,0 +1,96 @@ +// Copyright 2021 The Kubeflow Authors. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package validation + +import ( + "fmt" + + common "github.com/kubeflow/common/pkg/apis/common/v1" + v2 "github.com/kubeflow/mpi-operator/v2/pkg/apis/kubeflow/v2" + apivalidation "k8s.io/apimachinery/pkg/api/validation" + "k8s.io/apimachinery/pkg/util/sets" + "k8s.io/apimachinery/pkg/util/validation/field" +) + +var validCleanPolicies = sets.NewString( + string(common.CleanPodPolicyNone), + string(common.CleanPodPolicyRunning), + string(common.CleanPodPolicyAll)) + +func ValidateMPIJob(job *v2.MPIJob) field.ErrorList { + return validateMPIJobSpec(&job.Spec, field.NewPath("spec")) +} + +func validateMPIJobSpec(spec *v2.MPIJobSpec, path *field.Path) field.ErrorList { + errs := validateMPIReplicaSpecs(spec.MPIReplicaSpecs, path.Child("mpiReplicaSpecs")) + if spec.SlotsPerWorker == nil { + errs = append(errs, field.Required(path.Child("slotsPerWorker"), "must have number of slots per worker")) + } else { + errs = append(errs, apivalidation.ValidateNonnegativeField(int64(*spec.SlotsPerWorker), path.Child("slotsPerWorker"))...) + } + if spec.CleanPodPolicy == nil { + errs = append(errs, field.Required(path.Child("cleanPodPolicy"), "must have clean Pod policy")) + } else if !validCleanPolicies.Has(string(*spec.CleanPodPolicy)) { + errs = append(errs, field.NotSupported(path.Child("cleanPodPolicy"), *spec.CleanPodPolicy, validCleanPolicies.List())) + } + return errs +} + +func validateMPIReplicaSpecs(replicaSpecs map[v2.MPIReplicaType]*common.ReplicaSpec, path *field.Path) field.ErrorList { + var errs field.ErrorList + if replicaSpecs == nil { + errs = append(errs, field.Required(path, "must have replica specs")) + return errs + } + errs = append(errs, validateLauncherReplicaSpec(replicaSpecs[v2.MPIReplicaTypeLauncher], path.Key(string(v2.MPIReplicaTypeLauncher)))...) + errs = append(errs, validateWorkerReplicaSpec(replicaSpecs[v2.MPIReplicaTypeWorker], path.Key(string(v2.MPIReplicaTypeWorker)))...) + return errs +} + +func validateLauncherReplicaSpec(spec *common.ReplicaSpec, path *field.Path) field.ErrorList { + var errs field.ErrorList + if spec == nil { + errs = append(errs, field.Required(path, fmt.Sprintf("must have %s replica spec", v2.MPIReplicaTypeLauncher))) + return errs + } + errs = append(errs, validateReplicaSpec(spec, path)...) + if spec.Replicas != nil && *spec.Replicas != 1 { + errs = append(errs, field.Invalid(path.Child("replicas"), *spec.Replicas, "must be 1")) + } + return errs +} + +func validateWorkerReplicaSpec(spec *common.ReplicaSpec, path *field.Path) field.ErrorList { + var errs field.ErrorList + if spec == nil { + return errs + } + errs = append(errs, validateReplicaSpec(spec, path)...) + if spec.Replicas != nil { + errs = append(errs, apivalidation.ValidateNonnegativeField(int64(*spec.Replicas), path.Child("replicas"))...) + } + return errs +} + +func validateReplicaSpec(spec *common.ReplicaSpec, path *field.Path) field.ErrorList { + var errs field.ErrorList + if spec.Replicas == nil { + errs = append(errs, field.Required(path.Child("replicas"), "must define number of replicas")) + } + if len(spec.Template.Spec.Containers) == 0 { + errs = append(errs, field.Required(path.Child("template", "spec", "containers"), "must define at least one container")) + } + return errs +} diff --git a/v2/pkg/apis/kubeflow/validation/validation_test.go b/v2/pkg/apis/kubeflow/validation/validation_test.go new file mode 100644 index 00000000..7a997d7e --- /dev/null +++ b/v2/pkg/apis/kubeflow/validation/validation_test.go @@ -0,0 +1,155 @@ +// Copyright 2021 The Kubeflow Authors. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package validation + +import ( + "testing" + + "github.com/google/go-cmp/cmp" + "github.com/google/go-cmp/cmp/cmpopts" + common "github.com/kubeflow/common/pkg/apis/common/v1" + "github.com/kubeflow/mpi-operator/v2/pkg/apis/kubeflow/v2" + corev1 "k8s.io/api/core/v1" + "k8s.io/apimachinery/pkg/util/validation/field" +) + +func TestValidateMPIJob(t *testing.T) { + cases := map[string]struct { + job v2.MPIJob + wantErrs field.ErrorList + }{ + "valid": { + job: v2.MPIJob{ + Spec: v2.MPIJobSpec{ + SlotsPerWorker: newInt32(2), + CleanPodPolicy: newCleanPodPolicy(common.CleanPodPolicyRunning), + MPIReplicaSpecs: map[v2.MPIReplicaType]*common.ReplicaSpec{ + v2.MPIReplicaTypeLauncher: { + Replicas: newInt32(1), + Template: corev1.PodTemplateSpec{ + Spec: corev1.PodSpec{ + Containers: []corev1.Container{{}}, + }, + }, + }, + }, + }, + }, + }, + "valid with worker": { + job: v2.MPIJob{ + Spec: v2.MPIJobSpec{ + SlotsPerWorker: newInt32(2), + CleanPodPolicy: newCleanPodPolicy(common.CleanPodPolicyRunning), + MPIReplicaSpecs: map[v2.MPIReplicaType]*common.ReplicaSpec{ + v2.MPIReplicaTypeLauncher: { + Replicas: newInt32(1), + Template: corev1.PodTemplateSpec{ + Spec: corev1.PodSpec{ + Containers: []corev1.Container{{}}, + }, + }, + }, + v2.MPIReplicaTypeWorker: { + Replicas: newInt32(3), + Template: corev1.PodTemplateSpec{ + Spec: corev1.PodSpec{ + Containers: []corev1.Container{{}}, + }, + }, + }, + }, + }, + }, + }, + "empty job": { + wantErrs: field.ErrorList{ + &field.Error{ + Type: field.ErrorTypeRequired, + Field: "spec.mpiReplicaSpecs", + }, + &field.Error{ + Type: field.ErrorTypeRequired, + Field: "spec.slotsPerWorker", + }, + &field.Error{ + Type: field.ErrorTypeRequired, + Field: "spec.cleanPodPolicy", + }, + }, + }, + "empty replica specs": { + job: v2.MPIJob{ + Spec: v2.MPIJobSpec{ + SlotsPerWorker: newInt32(2), + CleanPodPolicy: newCleanPodPolicy(common.CleanPodPolicyRunning), + MPIReplicaSpecs: map[v2.MPIReplicaType]*common.ReplicaSpec{}, + }, + }, + wantErrs: field.ErrorList{ + &field.Error{ + Type: field.ErrorTypeRequired, + Field: "spec.mpiReplicaSpecs[Launcher]", + }, + }, + }, + "missing replica spec fields": { + job: v2.MPIJob{ + Spec: v2.MPIJobSpec{ + SlotsPerWorker: newInt32(2), + CleanPodPolicy: newCleanPodPolicy(common.CleanPodPolicyRunning), + MPIReplicaSpecs: map[v2.MPIReplicaType]*common.ReplicaSpec{ + v2.MPIReplicaTypeLauncher: {}, + v2.MPIReplicaTypeWorker: {}, + }, + }, + }, + wantErrs: field.ErrorList{ + { + Type: field.ErrorTypeRequired, + Field: "spec.mpiReplicaSpecs[Launcher].replicas", + }, + { + Type: field.ErrorTypeRequired, + Field: "spec.mpiReplicaSpecs[Launcher].template.spec.containers", + }, + { + Type: field.ErrorTypeRequired, + Field: "spec.mpiReplicaSpecs[Worker].replicas", + }, + { + Type: field.ErrorTypeRequired, + Field: "spec.mpiReplicaSpecs[Worker].template.spec.containers", + }, + }, + }, + } + for name, tc := range cases { + t.Run(name, func(t *testing.T) { + got := ValidateMPIJob(&tc.job) + if diff := cmp.Diff(tc.wantErrs, got, cmpopts.IgnoreFields(field.Error{}, "Detail", "BadValue")); diff != "" { + t.Errorf("Unexpected errors (-want,+got):\n%s", diff) + } + }) + } +} + +func newInt32(v int32) *int32 { + return &v +} + +func newCleanPodPolicy(v common.CleanPodPolicy) *common.CleanPodPolicy { + return &v +} diff --git a/v2/pkg/controller/mpi_job_controller.go b/v2/pkg/controller/mpi_job_controller.go index 5876a8f1..7ee02512 100644 --- a/v2/pkg/controller/mpi_job_controller.go +++ b/v2/pkg/controller/mpi_job_controller.go @@ -56,6 +56,7 @@ import ( common "github.com/kubeflow/common/pkg/apis/common/v1" kubeflow "github.com/kubeflow/mpi-operator/v2/pkg/apis/kubeflow/v2" + "github.com/kubeflow/mpi-operator/v2/pkg/apis/kubeflow/validation" clientset "github.com/kubeflow/mpi-operator/v2/pkg/client/clientset/versioned" "github.com/kubeflow/mpi-operator/v2/pkg/client/clientset/versioned/scheme" informers "github.com/kubeflow/mpi-operator/v2/pkg/client/informers/externalversions/kubeflow/v2" @@ -100,9 +101,9 @@ const ( // fails to sync due to dependent resources already existing. MessageResourceExists = "Resource %q of Kind %q already exists and is not managed by MPIJob" - // ErrResourceDoesNotExist is used as part of the Event 'reason' when some - // resource is missing in yaml - ErrResourceDoesNotExist = "ErrResourceDoesNotExist" + // ValidationError is used as part of the Event 'reason' when failed to + // validate an MPIJob. + ValidationError = "ValidationError" // MessageResourceDoesNotExist is used for Events when some // resource is missing in yaml @@ -110,7 +111,11 @@ const ( // podTemplateRestartPolicyReason is the warning reason when the restart // policy is set in pod template. - podTemplateRestartPolicyReason = "SettedPodTemplateRestartPolicy" + podTemplateRestartPolicyReason = "SetPodTemplateRestartPolicy" + + // eventMessageLimit is the maximum size of an Event's message. + // From: k8s.io/kubernetes/pkg/apis/core/validation/events.go + eventMessageLimit = 1024 ) var ( @@ -404,6 +409,13 @@ func (c *MPIJobController) syncHandler(key string) error { return nil } + if errs := validation.ValidateMPIJob(mpiJob); len(errs) != 0 { + msg := truncateMessage(fmt.Sprintf("Found validation errors: %v", errs.ToAggregate())) + c.recorder.Event(mpiJob, corev1.EventTypeWarning, ValidationError, msg) + // Do not requeue + return nil + } + // Whether the job is preempted, and requeue it requeue := false // If the MPIJob is terminated, delete its pods according to cleanPodPolicy. @@ -491,12 +503,7 @@ func (c *MPIJobController) syncHandler(key string) error { return err } if launcher == nil { - launcher = c.newLauncher(mpiJob, isGPULauncher) - if launcher == nil { - c.recorder.Eventf(mpiJob, corev1.EventTypeWarning, mpiJobFailedReason, "launcher pod spec is invalid") - return fmt.Errorf("launcher pod spec is invalid") - } - launcher, err = c.kubeClient.CoreV1().Pods(namespace).Create(context.TODO(), launcher, metav1.CreateOptions{}) + launcher, err = c.kubeClient.CoreV1().Pods(namespace).Create(context.TODO(), c.newLauncher(mpiJob, isGPULauncher), metav1.CreateOptions{}) if err != nil { c.recorder.Eventf(mpiJob, corev1.EventTypeWarning, mpiJobFailedReason, "launcher pod created failed: %v", err) return fmt.Errorf("creating launcher Pod: %w", err) @@ -730,21 +737,14 @@ func keysFromData(data map[string][]byte) []string { // MPIJob, or creates one if it doesn't exist. func (c *MPIJobController) getOrCreateWorker(mpiJob *kubeflow.MPIJob) ([]*corev1.Pod, error) { var ( - workerPrefix = mpiJob.Name + workerSuffix - workerPods []*corev1.Pod - i int32 = 0 - workerReplicas *int32 + workerPrefix = mpiJob.Name + workerSuffix + workerPods []*corev1.Pod + i int32 = 0 ) - if worker, ok := mpiJob.Spec.MPIReplicaSpecs[kubeflow.MPIReplicaTypeWorker]; ok && worker != nil { - workerReplicas = worker.Replicas - } else { + worker := mpiJob.Spec.MPIReplicaSpecs[kubeflow.MPIReplicaTypeWorker] + if worker == nil { return workerPods, nil } - if workerReplicas == nil { - msg := fmt.Sprintf(MessageResourceDoesNotExist, "Worker.replicas") - c.recorder.Event(mpiJob, corev1.EventTypeWarning, ErrResourceDoesNotExist, msg) - return nil, fmt.Errorf(msg) - } // Remove Pods when replicas are scaled down selector, err := workerSelector(mpiJob.Name) @@ -755,7 +755,7 @@ func (c *MPIJobController) getOrCreateWorker(mpiJob *kubeflow.MPIJob) ([]*corev1 if err != nil { return nil, err } - if len(podFullList) > int(*workerReplicas) { + if len(podFullList) > int(*worker.Replicas) { for _, pod := range podFullList { indexStr, ok := pod.Labels[common.ReplicaIndexLabel] if !ok { @@ -763,7 +763,7 @@ func (c *MPIJobController) getOrCreateWorker(mpiJob *kubeflow.MPIJob) ([]*corev1 } index, err := strconv.Atoi(indexStr) if err == nil { - if index >= int(*workerReplicas) { + if index >= int(*worker.Replicas) { err = c.kubeClient.CoreV1().Pods(pod.Namespace).Delete(context.TODO(), pod.Name, metav1.DeleteOptions{}) if err != nil { return nil, err @@ -773,19 +773,13 @@ func (c *MPIJobController) getOrCreateWorker(mpiJob *kubeflow.MPIJob) ([]*corev1 } } - for ; i < *workerReplicas; i++ { + for ; i < *worker.Replicas; i++ { name := fmt.Sprintf("%s-%d", workerPrefix, i) pod, err := c.podLister.Pods(mpiJob.Namespace).Get(name) // If the worker Pod doesn't exist, we'll create it. if errors.IsNotFound(err) { worker := newWorker(mpiJob, name, c.gangSchedulerName) - if worker == nil { - msg := fmt.Sprintf(MessageResourceDoesNotExist, "Worker") - c.recorder.Event(mpiJob, corev1.EventTypeWarning, ErrResourceDoesNotExist, msg) - err = fmt.Errorf(msg) - return nil, err - } // Insert ReplicaIndexLabel worker.Labels[common.ReplicaIndexLabel] = strconv.Itoa(int(i)) pod, err = c.kubeClient.CoreV1().Pods(mpiJob.Namespace).Create(context.TODO(), worker, metav1.CreateOptions{}) @@ -793,7 +787,7 @@ func (c *MPIJobController) getOrCreateWorker(mpiJob *kubeflow.MPIJob) ([]*corev1 // If an error occurs during Get/Create, we'll requeue the item so we // can attempt processing again later. This could have been caused by a // temporary network failure, or any other transient reason. - if err != nil && !errors.IsNotFound(err) { + if err != nil { c.recorder.Eventf(mpiJob, corev1.EventTypeWarning, mpiJobFailedReason, "worker pod created failed: %v", err) return nil, err } @@ -812,17 +806,15 @@ func (c *MPIJobController) getOrCreateWorker(mpiJob *kubeflow.MPIJob) ([]*corev1 func (c *MPIJobController) deleteWorkerPods(mpiJob *kubeflow.MPIJob) error { var ( - workerPrefix string = mpiJob.Name + workerSuffix - i int32 = 0 - workerReplicas *int32 + workerPrefix = mpiJob.Name + workerSuffix + i int32 = 0 ) - if worker, ok := mpiJob.Spec.MPIReplicaSpecs[kubeflow.MPIReplicaTypeWorker]; ok && worker != nil { - workerReplicas = worker.Replicas - } else { + worker := mpiJob.Spec.MPIReplicaSpecs[kubeflow.MPIReplicaTypeWorker] + if worker == nil { return nil } - for ; i < *workerReplicas; i++ { + for ; i < *worker.Replicas; i++ { name := fmt.Sprintf("%s-%d", workerPrefix, i) pod, err := c.podLister.Pods(mpiJob.Namespace).Get(name) @@ -1212,10 +1204,6 @@ func newWorker(mpiJob *kubeflow.MPIJob, name, gangSchedulerName string) *corev1. podTemplate.Spec.Subdomain = mpiJob.Name + workerSuffix // Matches workers' Service name. setRestartPolicy(podTemplate, mpiJob.Spec.MPIReplicaSpecs[kubeflow.MPIReplicaTypeWorker]) - if len(podTemplate.Spec.Containers) == 0 { - klog.Errorln("Worker pod does not have any containers in its spec") - return nil - } container := &podTemplate.Spec.Containers[0] if len(container.Command) == 0 && len(container.Args) == 0 { container.Command = []string{"/usr/sbin/sshd", "-De"} @@ -1288,12 +1276,6 @@ func (c *MPIJobController) newLauncher(mpiJob *kubeflow.MPIJob, isGPULauncher bo } podSpec.Spec.Hostname = launcherName podSpec.Spec.Subdomain = mpiJob.Name + workerSuffix // Matches workers' Service name. - if len(podSpec.Spec.Containers) == 0 { - klog.Errorln("Launcher pod does not have any containers in its spec") - msg := fmt.Sprintf(MessageResourceDoesNotExist, "Launcher") - c.recorder.Event(mpiJob, corev1.EventTypeWarning, ErrResourceDoesNotExist, msg) - return nil - } container := &podSpec.Spec.Containers[0] container.Env = append(container.Env, // Allows driver to reach workers through the Service. @@ -1333,7 +1315,7 @@ func (c *MPIJobController) newLauncher(mpiJob *kubeflow.MPIJob, isGPULauncher bo // Submit a warning event if the user specifies restart policy for // the pod template. We recommend to set it from the replica level. if podSpec.Spec.RestartPolicy != "" { - errMsg := "Restart policy in pod template will be overwritten by restart policy in replica spec" + errMsg := "Restart policy in pod template overridden by restart policy in replica spec" klog.Warning(errMsg) c.recorder.Event(mpiJob, corev1.EventTypeWarning, podTemplateRestartPolicyReason, errMsg) } @@ -1507,3 +1489,12 @@ func sshInitContainer(mounts []corev1.VolumeMount) corev1.Container { func newInt32(v int32) *int32 { return &v } + +// truncateMessage truncates a message if it hits the NoteLengthLimit. +func truncateMessage(message string) string { + if len(message) <= eventMessageLimit { + return message + } + suffix := "..." + return message[:eventMessageLimit-len(suffix)] + suffix +} diff --git a/v2/test/integration/doc.go b/v2/test/integration/doc.go new file mode 100644 index 00000000..7ddd24bc --- /dev/null +++ b/v2/test/integration/doc.go @@ -0,0 +1,17 @@ +// Copyright 2021 The Kubeflow Authors. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +// Package integration is an empty package +// https://github.com/golang/go/issues/27333 +package integration