Skip to content

Commit

Permalink
Delete Suggestion deployment after Experiment is finished (kubeflow#1150
Browse files Browse the repository at this point in the history
)

* Delete Suggestion deployment after Experiment is finished

* fix

* update

* update openapi

* fix validator test

* refine

* add debug info

* try to fix

* try to fix

* try to fix

* keep API style consistent

* rebase master
  • Loading branch information
sperlingxx committed May 19, 2020
1 parent b7f1750 commit b186502
Show file tree
Hide file tree
Showing 13 changed files with 206 additions and 2 deletions.
3 changes: 3 additions & 0 deletions pkg/apis/controller/experiments/v1alpha3/constants.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,4 +24,7 @@ const (

// Default value of Spec.TemplatePath
DefaultTrialTemplatePath = "defaultTrialTemplate.yaml"

// Default value of Spec.DefaultResumePolicy
DefaultResumePolicy = LongRunning
)

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

11 changes: 10 additions & 1 deletion pkg/apis/controller/experiments/v1alpha3/experiment_types.go
Original file line number Diff line number Diff line change
Expand Up @@ -49,9 +49,11 @@ type ExperimentSpec struct {

NasConfig *NasConfig `json:"nasConfig,omitempty"`

// Describes resuming policy which usually take effect after experiment terminated.
ResumePolicy ResumePolicyType `json:"resumePolicy,omitempty"`

// TODO - Other fields, exact format is TBD. Will add these back during implementation.
// - Early stopping
// - Resume experiment
}

type ExperimentStatus struct {
Expand Down Expand Up @@ -154,6 +156,13 @@ const (
ExperimentFailed ExperimentConditionType = "Failed"
)

type ResumePolicyType string

const (
NeverResume ResumePolicyType = "Never"
LongRunning ResumePolicyType = "LongRunning"
)

type ParameterSpec struct {
Name string `json:"name,omitempty"`
ParameterType ParameterType `json:"parameterType,omitempty"`
Expand Down
7 changes: 7 additions & 0 deletions pkg/apis/v1alpha3/openapi_generated.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

4 changes: 4 additions & 0 deletions pkg/apis/v1alpha3/swagger.json
Original file line number Diff line number Diff line change
Expand Up @@ -485,6 +485,10 @@
"$ref": "#/definitions/v1alpha3.ParameterSpec"
}
},
"resumePolicy": {
"description": "Describes resuming policy which usually take effect after experiment terminated.",
"type": "string"
},
"trialTemplate": {
"description": "Template for each run of the trial.",
"$ref": "#/definitions/v1alpha3.TrialTemplate"
Expand Down
4 changes: 4 additions & 0 deletions pkg/controller.v1alpha3/experiment/experiment_controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -202,6 +202,9 @@ func (r *ReconcileExperiment) Reconcile(request reconcile.Request) (reconcile.Re
instance.MarkExperimentStatusRestarting(util.ExperimentRestartingReason, msg)
}
} else {
if instance.Spec.ResumePolicy != experimentsv1alpha3.LongRunning {
return r.terminateSuggestion(instance)
}
// If experiment is completed with no running trials, stop reconcile
if !instance.HasRunningTrials() {
return reconcile.Result{}, nil
Expand Down Expand Up @@ -263,6 +266,7 @@ func (r *ReconcileExperiment) ReconcileExperiment(instance *experimentsv1alpha3.
if reconcileRequired {
r.ReconcileTrials(instance, trials.Items)
}

return nil
}

Expand Down
30 changes: 30 additions & 0 deletions pkg/controller.v1alpha3/experiment/experiment_util.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@ import (
"bytes"
"context"
"fmt"
"k8s.io/apimachinery/pkg/api/errors"

"k8s.io/apimachinery/pkg/apis/meta/v1/unstructured"
"k8s.io/apimachinery/pkg/types"
Expand All @@ -14,6 +15,7 @@ import (
experimentsv1alpha3 "github.com/kubeflow/katib/pkg/apis/controller/experiments/v1alpha3"
suggestionsv1alpha3 "github.com/kubeflow/katib/pkg/apis/controller/suggestions/v1alpha3"
trialsv1alpha3 "github.com/kubeflow/katib/pkg/apis/controller/trials/v1alpha3"
suggestionController "github.com/kubeflow/katib/pkg/controller.v1alpha3/suggestion"
"github.com/kubeflow/katib/pkg/controller.v1alpha3/util"
)

Expand Down Expand Up @@ -109,3 +111,31 @@ func (r *ReconcileExperiment) updateFinalizers(instance *experimentsv1alpha3.Exp
return reconcile.Result{Requeue: true}, err
}
}

func (r *ReconcileExperiment) terminateSuggestion(instance *experimentsv1alpha3.Experiment) (reconcile.Result, error) {
log.Info("Start terminating original...")
original := &suggestionsv1alpha3.Suggestion{}
err := r.Get(context.TODO(), types.NamespacedName{
Namespace: instance.Namespace,
Name: instance.Name,
}, original)
if err != nil {
if errors.IsNotFound(err) {
return reconcile.Result{}, nil
}
return reconcile.Result{}, err
}
if original.IsCompleted() {
return reconcile.Result{}, nil
}
suggestion := original.DeepCopy()
msg := "Suggestion is succeeded"
suggestion.MarkSuggestionStatusSucceeded(suggestionController.SuggestionSucceededReason, msg)
log.Info("Mark suggestion succeeded...")

if err := r.UpdateSuggestion(suggestion); err != nil {
return reconcile.Result{}, err
} else {
return reconcile.Result{Requeue: true}, nil
}
}
11 changes: 10 additions & 1 deletion pkg/controller.v1alpha3/suggestion/suggestion_controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -134,7 +134,16 @@ func (r *ReconcileSuggestion) Reconcile(request reconcile.Request) (reconcile.Re
return reconcile.Result{}, err
}
instance := oldS.DeepCopy()
if instance.IsCompleted() {
// If ResumePolicyType is LongRunning, suggestion status will never be succeeded.
if instance.IsSucceeded() {
err = r.deleteDeployment(instance)
if err != nil {
return reconcile.Result{}, err
}
err = r.deleteService(instance)
if err != nil {
return reconcile.Result{}, err
}
return reconcile.Result{}, nil
}
if !instance.IsCreated() {
Expand Down
45 changes: 45 additions & 0 deletions pkg/controller.v1alpha3/suggestion/suggestion_controller_util.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@ package suggestion

import (
"context"
"github.com/kubeflow/katib/pkg/apis/controller/suggestions/v1alpha3"

appsv1 "k8s.io/api/apps/v1"
corev1 "k8s.io/api/core/v1"
Expand Down Expand Up @@ -34,3 +35,47 @@ func (r *ReconcileSuggestion) reconcileService(service *corev1.Service) (*corev1
}
return foundService, nil
}

func (r *ReconcileSuggestion) deleteDeployment(instance *v1alpha3.Suggestion) error {
deploy, err := r.DesiredDeployment(instance)
if err != nil {
return err
}
realDeploy := &appsv1.Deployment{}
err = r.Get(context.TODO(), types.NamespacedName{Name: deploy.Name, Namespace: deploy.Namespace}, realDeploy)
if err != nil {
if errors.IsNotFound(err) {
return nil
}
return err
}
err = r.Delete(context.TODO(), realDeploy)
if err != nil {
return err
}
log.Info("suggestion deployment %s has been deleted", realDeploy.Name)

return nil
}

func (r *ReconcileSuggestion) deleteService(instance *v1alpha3.Suggestion) error {
service, err := r.DesiredService(instance)
if err != nil {
return err
}
realService := &corev1.Service{}
err = r.Get(context.TODO(), types.NamespacedName{Name: service.Name, Namespace: service.Namespace}, realService)
if err != nil {
if errors.IsNotFound(err) {
return nil
}
return err
}
err = r.Delete(context.TODO(), realService)
if err != nil {
return err
}
log.Info("suggestion service %s has been deleted", realService.Name)

return nil
}
15 changes: 15 additions & 0 deletions pkg/webhook/v1alpha3/experiment/validator/validator.go
Original file line number Diff line number Diff line change
Expand Up @@ -68,6 +68,9 @@ func (g *DefaultValidator) ValidateExperiment(instance, oldInst *experimentsv1al
if err := g.validateAlgorithm(instance.Spec.Algorithm); err != nil {
return err
}
if err := g.validateResumePolicy(instance.Spec.ResumePolicy); err != nil {
return err
}

if err := g.validateTrialTemplate(instance); err != nil {
return err
Expand Down Expand Up @@ -115,6 +118,18 @@ func (g *DefaultValidator) validateAlgorithm(ag *commonapiv1alpha3.AlgorithmSpec
return nil
}

func (g *DefaultValidator) validateResumePolicy(resume experimentsv1alpha3.ResumePolicyType) error {
validTypes := map[experimentsv1alpha3.ResumePolicyType]string{
"": "",
experimentsv1alpha3.NeverResume: "",
experimentsv1alpha3.LongRunning: "",
}
if _, ok := validTypes[resume]; !ok {
return fmt.Errorf("invalid ResumePolicyType %s", resume)
}
return nil
}

func (g *DefaultValidator) validateTrialTemplate(instance *experimentsv1alpha3.Experiment) error {
trialName := fmt.Sprintf("%s-trial", instance.GetName())
runSpec, err := g.GetRunSpec(instance, instance.GetName(), trialName, instance.GetNamespace())
Expand Down
9 changes: 9 additions & 0 deletions pkg/webhook/v1alpha3/experiment/validator/validator_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -251,6 +251,15 @@ metadata:
return i
}(),
},
{
Instance: newFakeInstance(),
Err: true,
oldInstance: func() *experimentsv1alpha3.Experiment {
i := newFakeInstance()
i.Spec.ResumePolicy = "invalid-policy"
return i
}(),
},
{
Instance: func() *experimentsv1alpha3.Experiment {
i := newFakeInstance()
Expand Down
31 changes: 31 additions & 0 deletions test/e2e/v1alpha3/resume-e2e-experiment.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,18 +2,24 @@ package main

import (
"bytes"
"context"
"fmt"
"io/ioutil"
"k8s.io/apimachinery/pkg/api/errors"
"k8s.io/apimachinery/pkg/types"
"log"
"os"
"time"

appsv1 "k8s.io/api/apps/v1"
corev1 "k8s.io/api/core/v1"
k8syaml "k8s.io/apimachinery/pkg/util/yaml"
_ "k8s.io/client-go/plugin/pkg/client/auth/gcp"
"sigs.k8s.io/controller-runtime/pkg/client"

commonv1alpha3 "github.com/kubeflow/katib/pkg/apis/controller/common/v1alpha3"
experimentsv1alpha3 "github.com/kubeflow/katib/pkg/apis/controller/experiments/v1alpha3"
controllerUtil "github.com/kubeflow/katib/pkg/controller.v1alpha3/util"
"github.com/kubeflow/katib/pkg/util/v1alpha3/katibclient"
)

Expand Down Expand Up @@ -146,5 +152,30 @@ func main() {
log.Fatal("All trials are not successful ", exp.Status.TrialsSucceeded, *exp.Spec.MaxTrialCount)
}
}

sug, err := kclient.GetSuggestion(exp.Name, exp.Namespace)
if exp.Spec.ResumePolicy == experimentsv1alpha3.LongRunning {
if sug.IsSucceeded() {
log.Fatal("Suggestion is terminated while ResumePolicy = LongRunning")
}
}
if exp.Spec.ResumePolicy == experimentsv1alpha3.NeverResume {
if sug.IsRunning() {
log.Fatal("Suggestion is still running while ResumePolicy = NeverResume")
}
namespacedName := types.NamespacedName{Name: controllerUtil.GetAlgorithmServiceName(sug), Namespace: sug.Namespace}
service := &corev1.Service{}
err := kclient.GetClient().Get(context.TODO(), namespacedName, service)
if err == nil || !errors.IsNotFound(err) {
log.Fatal("Suggestion service is still alive while ResumePolicy = NeverResume")
}
namespacedName = types.NamespacedName{Name: controllerUtil.GetAlgorithmDeploymentName(sug), Namespace: sug.Namespace}
deployment := &appsv1.Deployment{}
err = kclient.GetClient().Get(context.TODO(), namespacedName, deployment)
if err == nil || !errors.IsNotFound(err) {
log.Fatal("Suggestion deployment is still alive while ResumePolicy = NeverResume")
}
}

log.Printf("Experiment has recorded best current Optimal Trial %v", exp.Status.CurrentOptimalTrial)
}
31 changes: 31 additions & 0 deletions test/e2e/v1alpha3/run-e2e-experiment.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,18 +2,24 @@ package main

import (
"bytes"
"context"
"fmt"
"io/ioutil"
"k8s.io/apimachinery/pkg/api/errors"
"k8s.io/apimachinery/pkg/types"
"log"
"os"
"time"

appsv1 "k8s.io/api/apps/v1"
corev1 "k8s.io/api/core/v1"
k8syaml "k8s.io/apimachinery/pkg/util/yaml"
_ "k8s.io/client-go/plugin/pkg/client/auth/gcp"
"sigs.k8s.io/controller-runtime/pkg/client"

commonv1alpha3 "github.com/kubeflow/katib/pkg/apis/controller/common/v1alpha3"
experimentsv1alpha3 "github.com/kubeflow/katib/pkg/apis/controller/experiments/v1alpha3"
controllerUtil "github.com/kubeflow/katib/pkg/controller.v1alpha3/util"
"github.com/kubeflow/katib/pkg/util/v1alpha3/katibclient"
)

Expand Down Expand Up @@ -129,5 +135,30 @@ func main() {
log.Fatal("All trials are not successful ", exp.Status.TrialsSucceeded, *exp.Spec.MaxTrialCount)
}
}

sug, err := kclient.GetSuggestion(exp.Name, exp.Namespace)
if exp.Spec.ResumePolicy == experimentsv1alpha3.LongRunning {
if sug.IsSucceeded() {
log.Fatal("Suggestion is terminated while ResumePolicy = LongRunning")
}
}
if exp.Spec.ResumePolicy == experimentsv1alpha3.NeverResume {
if sug.IsRunning() {
log.Fatal("Suggestion is still running while ResumePolicy = NeverResume")
}
namespacedName := types.NamespacedName{Name: controllerUtil.GetAlgorithmServiceName(sug), Namespace: sug.Namespace}
service := &corev1.Service{}
err := kclient.GetClient().Get(context.TODO(), namespacedName, service)
if err == nil || !errors.IsNotFound(err) {
log.Fatal("Suggestion service is still alive while ResumePolicy = NeverResume")
}
namespacedName = types.NamespacedName{Name: controllerUtil.GetAlgorithmDeploymentName(sug), Namespace: sug.Namespace}
deployment := &appsv1.Deployment{}
err = kclient.GetClient().Get(context.TODO(), namespacedName, deployment)
if err == nil || !errors.IsNotFound(err) {
log.Fatal("Suggestion deployment is still alive while ResumePolicy = NeverResume")
}
}

log.Printf("Experiment has recorded best current Optimal Trial %v", exp.Status.CurrentOptimalTrial)
}

0 comments on commit b186502

Please sign in to comment.