diff --git a/pkg/apis/controller/experiments/v1alpha3/constants.go b/pkg/apis/controller/experiments/v1alpha3/constants.go index 00ebfb99013..9adace8dd6c 100644 --- a/pkg/apis/controller/experiments/v1alpha3/constants.go +++ b/pkg/apis/controller/experiments/v1alpha3/constants.go @@ -24,4 +24,7 @@ const ( // Default value of Spec.TemplatePath DefaultTrialTemplatePath = "defaultTrialTemplate.yaml" + + // Default value of Spec.DefaultResumePolicy + DefaultResumePolicy = LongRunning ) diff --git a/pkg/apis/controller/experiments/v1alpha3/experiment_defaults.go b/pkg/apis/controller/experiments/v1alpha3/experiment_defaults.go index e85f7176086..14bd238c7b9 100644 --- a/pkg/apis/controller/experiments/v1alpha3/experiment_defaults.go +++ b/pkg/apis/controller/experiments/v1alpha3/experiment_defaults.go @@ -29,6 +29,7 @@ import ( func (e *Experiment) SetDefault() { e.setDefaultParallelTrialCount() + e.setDefaultResumePolicy() e.setDefaultTrialTemplate() e.setDefaultMetricsCollector() } @@ -40,6 +41,12 @@ func (e *Experiment) setDefaultParallelTrialCount() { } } +func (e *Experiment) setDefaultResumePolicy() { + if e.Spec.ResumePolicy == "" { + e.Spec.ResumePolicy = DefaultResumePolicy + } +} + func (e *Experiment) setDefaultTrialTemplate() { t := e.Spec.TrialTemplate if t == nil { diff --git a/pkg/apis/controller/experiments/v1alpha3/experiment_types.go b/pkg/apis/controller/experiments/v1alpha3/experiment_types.go index b2c4b545419..aa94091ba90 100644 --- a/pkg/apis/controller/experiments/v1alpha3/experiment_types.go +++ b/pkg/apis/controller/experiments/v1alpha3/experiment_types.go @@ -49,9 +49,11 @@ type ExperimentSpec struct { NasConfig *NasConfig `json:"nasConfig,omitempty"` + // Describes resuming policy which usually take effect after experiment terminated. + ResumePolicy ResumePolicyType `json:"resumePolicy,omitempty"` + // TODO - Other fields, exact format is TBD. Will add these back during implementation. // - Early stopping - // - Resume experiment } type ExperimentStatus struct { @@ -154,6 +156,13 @@ const ( ExperimentFailed ExperimentConditionType = "Failed" ) +type ResumePolicyType string + +const ( + NeverResume ResumePolicyType = "Never" + LongRunning ResumePolicyType = "LongRunning" +) + type ParameterSpec struct { Name string `json:"name,omitempty"` ParameterType ParameterType `json:"parameterType,omitempty"` diff --git a/pkg/apis/v1alpha3/openapi_generated.go b/pkg/apis/v1alpha3/openapi_generated.go index 68f22b686ef..5c88d96b1bf 100644 --- a/pkg/apis/v1alpha3/openapi_generated.go +++ b/pkg/apis/v1alpha3/openapi_generated.go @@ -557,6 +557,13 @@ func GetOpenAPIDefinitions(ref common.ReferenceCallback) map[string]common.OpenA Ref: ref("github.com/kubeflow/katib/pkg/apis/controller/experiments/v1alpha3.NasConfig"), }, }, + "resumePolicy": { + SchemaProps: spec.SchemaProps{ + Description: "Describes resuming policy which usually take effect after experiment terminated.", + Type: []string{"string"}, + Format: "", + }, + }, }, }, }, diff --git a/pkg/apis/v1alpha3/swagger.json b/pkg/apis/v1alpha3/swagger.json index f26033cec5a..eee94f9b3ec 100644 --- a/pkg/apis/v1alpha3/swagger.json +++ b/pkg/apis/v1alpha3/swagger.json @@ -485,6 +485,10 @@ "$ref": "#/definitions/v1alpha3.ParameterSpec" } }, + "resumePolicy": { + "description": "Describes resuming policy which usually take effect after experiment terminated.", + "type": "string" + }, "trialTemplate": { "description": "Template for each run of the trial.", "$ref": "#/definitions/v1alpha3.TrialTemplate" diff --git a/pkg/controller.v1alpha3/experiment/experiment_controller.go b/pkg/controller.v1alpha3/experiment/experiment_controller.go index 845b9dfed20..fe888ad04ab 100644 --- a/pkg/controller.v1alpha3/experiment/experiment_controller.go +++ b/pkg/controller.v1alpha3/experiment/experiment_controller.go @@ -202,6 +202,9 @@ func (r *ReconcileExperiment) Reconcile(request reconcile.Request) (reconcile.Re instance.MarkExperimentStatusRestarting(util.ExperimentRestartingReason, msg) } } else { + if instance.Spec.ResumePolicy != experimentsv1alpha3.LongRunning { + return r.terminateSuggestion(instance) + } // If experiment is completed with no running trials, stop reconcile if !instance.HasRunningTrials() { return reconcile.Result{}, nil @@ -263,6 +266,7 @@ func (r *ReconcileExperiment) ReconcileExperiment(instance *experimentsv1alpha3. if reconcileRequired { r.ReconcileTrials(instance, trials.Items) } + return nil } diff --git a/pkg/controller.v1alpha3/experiment/experiment_util.go b/pkg/controller.v1alpha3/experiment/experiment_util.go index b095060c8d6..2b80a7eda5a 100644 --- a/pkg/controller.v1alpha3/experiment/experiment_util.go +++ b/pkg/controller.v1alpha3/experiment/experiment_util.go @@ -4,6 +4,7 @@ import ( "bytes" "context" "fmt" + "k8s.io/apimachinery/pkg/api/errors" "k8s.io/apimachinery/pkg/apis/meta/v1/unstructured" "k8s.io/apimachinery/pkg/types" @@ -14,6 +15,7 @@ import ( experimentsv1alpha3 "github.com/kubeflow/katib/pkg/apis/controller/experiments/v1alpha3" suggestionsv1alpha3 "github.com/kubeflow/katib/pkg/apis/controller/suggestions/v1alpha3" trialsv1alpha3 "github.com/kubeflow/katib/pkg/apis/controller/trials/v1alpha3" + suggestionController "github.com/kubeflow/katib/pkg/controller.v1alpha3/suggestion" "github.com/kubeflow/katib/pkg/controller.v1alpha3/util" ) @@ -109,3 +111,31 @@ func (r *ReconcileExperiment) updateFinalizers(instance *experimentsv1alpha3.Exp return reconcile.Result{Requeue: true}, err } } + +func (r *ReconcileExperiment) terminateSuggestion(instance *experimentsv1alpha3.Experiment) (reconcile.Result, error) { + log.Info("Start terminating original...") + original := &suggestionsv1alpha3.Suggestion{} + err := r.Get(context.TODO(), types.NamespacedName{ + Namespace: instance.Namespace, + Name: instance.Name, + }, original) + if err != nil { + if errors.IsNotFound(err) { + return reconcile.Result{}, nil + } + return reconcile.Result{}, err + } + if original.IsCompleted() { + return reconcile.Result{}, nil + } + suggestion := original.DeepCopy() + msg := "Suggestion is succeeded" + suggestion.MarkSuggestionStatusSucceeded(suggestionController.SuggestionSucceededReason, msg) + log.Info("Mark suggestion succeeded...") + + if err := r.UpdateSuggestion(suggestion); err != nil { + return reconcile.Result{}, err + } else { + return reconcile.Result{Requeue: true}, nil + } +} diff --git a/pkg/controller.v1alpha3/suggestion/suggestion_controller.go b/pkg/controller.v1alpha3/suggestion/suggestion_controller.go index 6194d0fd24a..9eaa9e27f70 100644 --- a/pkg/controller.v1alpha3/suggestion/suggestion_controller.go +++ b/pkg/controller.v1alpha3/suggestion/suggestion_controller.go @@ -134,7 +134,16 @@ func (r *ReconcileSuggestion) Reconcile(request reconcile.Request) (reconcile.Re return reconcile.Result{}, err } instance := oldS.DeepCopy() - if instance.IsCompleted() { + // If ResumePolicyType is LongRunning, suggestion status will never be succeeded. + if instance.IsSucceeded() { + err = r.deleteDeployment(instance) + if err != nil { + return reconcile.Result{}, err + } + err = r.deleteService(instance) + if err != nil { + return reconcile.Result{}, err + } return reconcile.Result{}, nil } if !instance.IsCreated() { diff --git a/pkg/controller.v1alpha3/suggestion/suggestion_controller_util.go b/pkg/controller.v1alpha3/suggestion/suggestion_controller_util.go index 4a4a33d6618..03a4cce6e1e 100644 --- a/pkg/controller.v1alpha3/suggestion/suggestion_controller_util.go +++ b/pkg/controller.v1alpha3/suggestion/suggestion_controller_util.go @@ -2,6 +2,7 @@ package suggestion import ( "context" + "github.com/kubeflow/katib/pkg/apis/controller/suggestions/v1alpha3" appsv1 "k8s.io/api/apps/v1" corev1 "k8s.io/api/core/v1" @@ -34,3 +35,47 @@ func (r *ReconcileSuggestion) reconcileService(service *corev1.Service) (*corev1 } return foundService, nil } + +func (r *ReconcileSuggestion) deleteDeployment(instance *v1alpha3.Suggestion) error { + deploy, err := r.DesiredDeployment(instance) + if err != nil { + return err + } + realDeploy := &appsv1.Deployment{} + err = r.Get(context.TODO(), types.NamespacedName{Name: deploy.Name, Namespace: deploy.Namespace}, realDeploy) + if err != nil { + if errors.IsNotFound(err) { + return nil + } + return err + } + err = r.Delete(context.TODO(), realDeploy) + if err != nil { + return err + } + log.Info("suggestion deployment %s has been deleted", realDeploy.Name) + + return nil +} + +func (r *ReconcileSuggestion) deleteService(instance *v1alpha3.Suggestion) error { + service, err := r.DesiredService(instance) + if err != nil { + return err + } + realService := &corev1.Service{} + err = r.Get(context.TODO(), types.NamespacedName{Name: service.Name, Namespace: service.Namespace}, realService) + if err != nil { + if errors.IsNotFound(err) { + return nil + } + return err + } + err = r.Delete(context.TODO(), realService) + if err != nil { + return err + } + log.Info("suggestion service %s has been deleted", realService.Name) + + return nil +} diff --git a/pkg/webhook/v1alpha3/experiment/validator/validator.go b/pkg/webhook/v1alpha3/experiment/validator/validator.go index 5f54604513f..6c50075be5b 100644 --- a/pkg/webhook/v1alpha3/experiment/validator/validator.go +++ b/pkg/webhook/v1alpha3/experiment/validator/validator.go @@ -68,6 +68,9 @@ func (g *DefaultValidator) ValidateExperiment(instance, oldInst *experimentsv1al if err := g.validateAlgorithm(instance.Spec.Algorithm); err != nil { return err } + if err := g.validateResumePolicy(instance.Spec.ResumePolicy); err != nil { + return err + } if err := g.validateTrialTemplate(instance); err != nil { return err @@ -115,6 +118,18 @@ func (g *DefaultValidator) validateAlgorithm(ag *commonapiv1alpha3.AlgorithmSpec return nil } +func (g *DefaultValidator) validateResumePolicy(resume experimentsv1alpha3.ResumePolicyType) error { + validTypes := map[experimentsv1alpha3.ResumePolicyType]string{ + "": "", + experimentsv1alpha3.NeverResume: "", + experimentsv1alpha3.LongRunning: "", + } + if _, ok := validTypes[resume]; !ok { + return fmt.Errorf("invalid ResumePolicyType %s", resume) + } + return nil +} + func (g *DefaultValidator) validateTrialTemplate(instance *experimentsv1alpha3.Experiment) error { trialName := fmt.Sprintf("%s-trial", instance.GetName()) runSpec, err := g.GetRunSpec(instance, instance.GetName(), trialName, instance.GetNamespace()) diff --git a/pkg/webhook/v1alpha3/experiment/validator/validator_test.go b/pkg/webhook/v1alpha3/experiment/validator/validator_test.go index 6497e1672ca..82774135f0a 100644 --- a/pkg/webhook/v1alpha3/experiment/validator/validator_test.go +++ b/pkg/webhook/v1alpha3/experiment/validator/validator_test.go @@ -251,6 +251,15 @@ metadata: return i }(), }, + { + Instance: newFakeInstance(), + Err: true, + oldInstance: func() *experimentsv1alpha3.Experiment { + i := newFakeInstance() + i.Spec.ResumePolicy = "invalid-policy" + return i + }(), + }, { Instance: func() *experimentsv1alpha3.Experiment { i := newFakeInstance() diff --git a/test/e2e/v1alpha3/resume-e2e-experiment.go b/test/e2e/v1alpha3/resume-e2e-experiment.go index f03caf6858c..562ae36d337 100644 --- a/test/e2e/v1alpha3/resume-e2e-experiment.go +++ b/test/e2e/v1alpha3/resume-e2e-experiment.go @@ -2,18 +2,24 @@ package main import ( "bytes" + "context" "fmt" "io/ioutil" + "k8s.io/apimachinery/pkg/api/errors" + "k8s.io/apimachinery/pkg/types" "log" "os" "time" + appsv1 "k8s.io/api/apps/v1" + corev1 "k8s.io/api/core/v1" k8syaml "k8s.io/apimachinery/pkg/util/yaml" _ "k8s.io/client-go/plugin/pkg/client/auth/gcp" "sigs.k8s.io/controller-runtime/pkg/client" commonv1alpha3 "github.com/kubeflow/katib/pkg/apis/controller/common/v1alpha3" experimentsv1alpha3 "github.com/kubeflow/katib/pkg/apis/controller/experiments/v1alpha3" + controllerUtil "github.com/kubeflow/katib/pkg/controller.v1alpha3/util" "github.com/kubeflow/katib/pkg/util/v1alpha3/katibclient" ) @@ -146,5 +152,30 @@ func main() { log.Fatal("All trials are not successful ", exp.Status.TrialsSucceeded, *exp.Spec.MaxTrialCount) } } + + sug, err := kclient.GetSuggestion(exp.Name, exp.Namespace) + if exp.Spec.ResumePolicy == experimentsv1alpha3.LongRunning { + if sug.IsSucceeded() { + log.Fatal("Suggestion is terminated while ResumePolicy = LongRunning") + } + } + if exp.Spec.ResumePolicy == experimentsv1alpha3.NeverResume { + if sug.IsRunning() { + log.Fatal("Suggestion is still running while ResumePolicy = NeverResume") + } + namespacedName := types.NamespacedName{Name: controllerUtil.GetAlgorithmServiceName(sug), Namespace: sug.Namespace} + service := &corev1.Service{} + err := kclient.GetClient().Get(context.TODO(), namespacedName, service) + if err == nil || !errors.IsNotFound(err) { + log.Fatal("Suggestion service is still alive while ResumePolicy = NeverResume") + } + namespacedName = types.NamespacedName{Name: controllerUtil.GetAlgorithmDeploymentName(sug), Namespace: sug.Namespace} + deployment := &appsv1.Deployment{} + err = kclient.GetClient().Get(context.TODO(), namespacedName, deployment) + if err == nil || !errors.IsNotFound(err) { + log.Fatal("Suggestion deployment is still alive while ResumePolicy = NeverResume") + } + } + log.Printf("Experiment has recorded best current Optimal Trial %v", exp.Status.CurrentOptimalTrial) } diff --git a/test/e2e/v1alpha3/run-e2e-experiment.go b/test/e2e/v1alpha3/run-e2e-experiment.go index 3903d04e491..e58d1549544 100644 --- a/test/e2e/v1alpha3/run-e2e-experiment.go +++ b/test/e2e/v1alpha3/run-e2e-experiment.go @@ -2,18 +2,24 @@ package main import ( "bytes" + "context" "fmt" "io/ioutil" + "k8s.io/apimachinery/pkg/api/errors" + "k8s.io/apimachinery/pkg/types" "log" "os" "time" + appsv1 "k8s.io/api/apps/v1" + corev1 "k8s.io/api/core/v1" k8syaml "k8s.io/apimachinery/pkg/util/yaml" _ "k8s.io/client-go/plugin/pkg/client/auth/gcp" "sigs.k8s.io/controller-runtime/pkg/client" commonv1alpha3 "github.com/kubeflow/katib/pkg/apis/controller/common/v1alpha3" experimentsv1alpha3 "github.com/kubeflow/katib/pkg/apis/controller/experiments/v1alpha3" + controllerUtil "github.com/kubeflow/katib/pkg/controller.v1alpha3/util" "github.com/kubeflow/katib/pkg/util/v1alpha3/katibclient" ) @@ -129,5 +135,30 @@ func main() { log.Fatal("All trials are not successful ", exp.Status.TrialsSucceeded, *exp.Spec.MaxTrialCount) } } + + sug, err := kclient.GetSuggestion(exp.Name, exp.Namespace) + if exp.Spec.ResumePolicy == experimentsv1alpha3.LongRunning { + if sug.IsSucceeded() { + log.Fatal("Suggestion is terminated while ResumePolicy = LongRunning") + } + } + if exp.Spec.ResumePolicy == experimentsv1alpha3.NeverResume { + if sug.IsRunning() { + log.Fatal("Suggestion is still running while ResumePolicy = NeverResume") + } + namespacedName := types.NamespacedName{Name: controllerUtil.GetAlgorithmServiceName(sug), Namespace: sug.Namespace} + service := &corev1.Service{} + err := kclient.GetClient().Get(context.TODO(), namespacedName, service) + if err == nil || !errors.IsNotFound(err) { + log.Fatal("Suggestion service is still alive while ResumePolicy = NeverResume") + } + namespacedName = types.NamespacedName{Name: controllerUtil.GetAlgorithmDeploymentName(sug), Namespace: sug.Namespace} + deployment := &appsv1.Deployment{} + err = kclient.GetClient().Get(context.TODO(), namespacedName, deployment) + if err == nil || !errors.IsNotFound(err) { + log.Fatal("Suggestion deployment is still alive while ResumePolicy = NeverResume") + } + } + log.Printf("Experiment has recorded best current Optimal Trial %v", exp.Status.CurrentOptimalTrial) }