diff --git a/pkg/controller/add_studyjobcontroller.go b/pkg/controller/add_studyjobcontroller.go index a6cb7061884..fc2c4bffd42 100644 --- a/pkg/controller/add_studyjobcontroller.go +++ b/pkg/controller/add_studyjobcontroller.go @@ -16,10 +16,10 @@ limitations under the License. package controller import ( - "github.com/kubeflow/katib/pkg/controller/studyjobcontroller" + "github.com/kubeflow/katib/pkg/controller/studyjob" ) func init() { // AddToManagerFuncs is a list of functions to create controllers and add them to a manager. - AddToManagerFuncs = append(AddToManagerFuncs, studyjobcontroller.Add) + AddToManagerFuncs = append(AddToManagerFuncs, studyjob.Add) } diff --git a/pkg/controller/studyjob/katib_api_util.go b/pkg/controller/studyjob/katib_api_util.go new file mode 100644 index 00000000000..6b2143f1a3a --- /dev/null +++ b/pkg/controller/studyjob/katib_api_util.go @@ -0,0 +1,213 @@ +// Copyright 2018 The Kubeflow Authors +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package studyjob + +import ( + "context" + "log" + + "github.com/kubeflow/katib/pkg" + katibapi "github.com/kubeflow/katib/pkg/api" + katibv1alpha1 "github.com/kubeflow/katib/pkg/api/operators/apis/studyjob/v1alpha1" + "google.golang.org/grpc" +) + +func initializeStudy(instance *katibv1alpha1.StudyJob, ns string) error { + if instance.Spec.SuggestionSpec == nil { + instance.Status.Condition = katibv1alpha1.ConditionFailed + return nil + } + if instance.Spec.SuggestionSpec.SuggestionAlgorithm == "" { + instance.Spec.SuggestionSpec.SuggestionAlgorithm = "random" + } + instance.Status.Condition = katibv1alpha1.ConditionRunning + + conn, err := grpc.Dial(pkg.ManagerAddr, grpc.WithInsecure()) + if err != nil { + log.Printf("Connect katib manager error %v", err) + instance.Status.Condition = katibv1alpha1.ConditionFailed + return nil + } + defer conn.Close() + c := katibapi.NewManagerClient(conn) + + studyConfig, err := getStudyConf(instance) + if err != nil { + return err + } + + log.Printf("Create Study %s", studyConfig.Name) + //CreateStudy + studyID, err := createStudy(c, studyConfig) + if err != nil { + return err + } + instance.Status.StudyID = studyID + log.Printf("Study: %s Suggestion Spec %v", studyID, instance.Spec.SuggestionSpec) + var sspec *katibv1alpha1.SuggestionSpec + if instance.Spec.SuggestionSpec != nil { + sspec = instance.Spec.SuggestionSpec + } else { + sspec = &katibv1alpha1.SuggestionSpec{} + } + sspec.SuggestionParameters = append(sspec.SuggestionParameters, + katibapi.SuggestionParameter{ + Name: "SuggestionCount", + Value: "0", + }) + sPID, err := setSuggestionParam(c, studyID, sspec) + if err != nil { + return err + } + instance.Status.SuggestionParameterID = sPID + instance.Status.SuggestionCount += 1 + instance.Status.Condition = katibv1alpha1.ConditionRunning + return nil +} + +func getStudyConf(instance *katibv1alpha1.StudyJob) (*katibapi.StudyConfig, error) { + sconf := &katibapi.StudyConfig{ + Metrics: []string{}, + ParameterConfigs: &katibapi.StudyConfig_ParameterConfigs{ + Configs: []*katibapi.ParameterConfig{}, + }, + } + sconf.Name = instance.Spec.StudyName + sconf.Owner = instance.Spec.Owner + if instance.Spec.OptimizationGoal != nil { + sconf.OptimizationGoal = *instance.Spec.OptimizationGoal + } + sconf.ObjectiveValueName = instance.Spec.ObjectiveValueName + switch instance.Spec.OptimizationType { + case katibv1alpha1.OptimizationTypeMinimize: + sconf.OptimizationType = katibapi.OptimizationType_MINIMIZE + case katibv1alpha1.OptimizationTypeMaximize: + sconf.OptimizationType = katibapi.OptimizationType_MAXIMIZE + default: + sconf.OptimizationType = katibapi.OptimizationType_UNKNOWN_OPTIMIZATION + } + for _, m := range instance.Spec.MetricsNames { + sconf.Metrics = append(sconf.Metrics, m) + } + for _, pc := range instance.Spec.ParameterConfigs { + p := &katibapi.ParameterConfig{ + Feasible: &katibapi.FeasibleSpace{}, + } + p.Name = pc.Name + p.Feasible.Min = pc.Feasible.Min + p.Feasible.Max = pc.Feasible.Max + p.Feasible.List = pc.Feasible.List + switch pc.ParameterType { + case katibv1alpha1.ParameterTypeUnknown: + p.ParameterType = katibapi.ParameterType_UNKNOWN_TYPE + case katibv1alpha1.ParameterTypeDouble: + p.ParameterType = katibapi.ParameterType_DOUBLE + case katibv1alpha1.ParameterTypeInt: + p.ParameterType = katibapi.ParameterType_INT + case katibv1alpha1.ParameterTypeDiscrete: + p.ParameterType = katibapi.ParameterType_DISCRETE + case katibv1alpha1.ParameterTypeCategorical: + p.ParameterType = katibapi.ParameterType_CATEGORICAL + } + sconf.ParameterConfigs.Configs = append(sconf.ParameterConfigs.Configs, p) + } + sconf.JobId = string(instance.UID) + return sconf, nil +} + +func createStudy(c katibapi.ManagerClient, studyConfig *katibapi.StudyConfig) (string, error) { + ctx := context.Background() + createStudyreq := &katibapi.CreateStudyRequest{ + StudyConfig: studyConfig, + } + createStudyreply, err := c.CreateStudy(ctx, createStudyreq) + if err != nil { + log.Printf("CreateStudy Error %v", err) + return "", err + } + studyID := createStudyreply.StudyId + log.Printf("Study ID %s", studyID) + getStudyreq := &katibapi.GetStudyRequest{ + StudyId: studyID, + } + getStudyReply, err := c.GetStudy(ctx, getStudyreq) + if err != nil { + log.Printf("Study: %s GetConfig Error %v", studyID, err) + return "", err + } + log.Printf("Study ID %s StudyConf%v", studyID, getStudyReply.StudyConfig) + return studyID, nil +} + +func setSuggestionParam(c katibapi.ManagerClient, studyID string, suggestionSpec *katibv1alpha1.SuggestionSpec) (string, error) { + ctx := context.Background() + pid := "" + if suggestionSpec.SuggestionParameters != nil { + sspr := &katibapi.SetSuggestionParametersRequest{ + StudyId: studyID, + SuggestionAlgorithm: suggestionSpec.SuggestionAlgorithm, + } + for _, p := range suggestionSpec.SuggestionParameters { + sspr.SuggestionParameters = append( + sspr.SuggestionParameters, + &katibapi.SuggestionParameter{ + Name: p.Name, + Value: p.Value, + }, + ) + } + setSuggesitonParameterReply, err := c.SetSuggestionParameters(ctx, sspr) + if err != nil { + log.Printf("Study %s SetConfig Error %v", studyID, err) + return "", err + } + log.Printf("Study: %s setSuggesitonParameterReply %v", studyID, setSuggesitonParameterReply) + pid = setSuggesitonParameterReply.ParamId + } + return pid, nil +} + +func getSuggestionParam(c katibapi.ManagerClient, paramID string) ([]*katibapi.SuggestionParameter, error) { + ctx := context.Background() + gsreq := &katibapi.GetSuggestionParametersRequest{ + ParamId: paramID, + } + gsrep, err := c.GetSuggestionParameters(ctx, gsreq) + if err != nil { + return nil, err + } + return gsrep.SuggestionParameters, err +} + +func getSuggestion(c katibapi.ManagerClient, studyID string, suggestionSpec *katibv1alpha1.SuggestionSpec, sParamID string) (*katibapi.GetSuggestionsReply, error) { + ctx := context.Background() + getSuggestRequest := &katibapi.GetSuggestionsRequest{ + StudyId: studyID, + SuggestionAlgorithm: suggestionSpec.SuggestionAlgorithm, + RequestNumber: int32(suggestionSpec.RequestNumber), + //RequestNumber=0 means get all grids. + ParamId: sParamID, + } + getSuggestReply, err := c.GetSuggestions(ctx, getSuggestRequest) + if err != nil { + log.Printf("Study: %s GetSuggestion Error %v", studyID, err) + return nil, err + } + log.Printf("Study: %s CreatedTrials :", studyID) + for _, t := range getSuggestReply.Trials { + log.Printf("\t%v", t) + } + return getSuggestReply, nil +} diff --git a/pkg/controller/studyjob/manifest_parser.go b/pkg/controller/studyjob/manifest_parser.go new file mode 100644 index 00000000000..bea20099eee --- /dev/null +++ b/pkg/controller/studyjob/manifest_parser.go @@ -0,0 +1,151 @@ +// Copyright 2018 The Kubeflow Authors +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package studyjob + +import ( + "bytes" + "context" + "fmt" + "log" + "text/template" + + katibapi "github.com/kubeflow/katib/pkg/api" + katibv1alpha1 "github.com/kubeflow/katib/pkg/api/operators/apis/studyjob/v1alpha1" + k8syaml "k8s.io/apimachinery/pkg/util/yaml" +) + +func getWorkerKind(workerSpec *katibv1alpha1.WorkerSpec) (string, error) { + var typeChecker interface{} + BUFSIZE := 1024 + _, m, err := getWorkerManifest( + nil, + "validation", + &katibapi.Trial{ + TrialId: "validation", + ParameterSet: []*katibapi.Parameter{}, + }, + workerSpec, + "", + true, + ) + if err != nil { + return "", err + } + if err := k8syaml.NewYAMLOrJSONDecoder(m, BUFSIZE).Decode(&typeChecker); err != nil { + log.Printf("Yaml decode validation error %v", err) + return "", err + } + tcMap, ok := typeChecker.(map[string]interface{}) + if !ok { + return "", fmt.Errorf("Cannot get kind of worker %v", typeChecker) + } + wkind, ok := tcMap["kind"] + if !ok { + return "", fmt.Errorf("Cannot get kind of worker %v", typeChecker) + } + wkindS, ok := wkind.(string) + if !ok { + return "", fmt.Errorf("Cannot get kind of worker %v", typeChecker) + } + return wkindS, nil +} + +func getWorkerManifest(c katibapi.ManagerClient, studyID string, trial *katibapi.Trial, workerSpec *katibv1alpha1.WorkerSpec, kind string, dryrun bool) (string, *bytes.Buffer, error) { + var wtp *template.Template = nil + var err error + if workerSpec != nil { + if workerSpec.GoTemplate.RawTemplate != "" { + wtp, err = template.New("Worker").Parse(workerSpec.GoTemplate.RawTemplate) + } else if workerSpec.GoTemplate.TemplatePath != "" { + wtp, err = template.ParseFiles(workerSpec.GoTemplate.TemplatePath) + } + if err != nil { + return "", nil, err + } + } + if wtp == nil { + wtp, err = template.ParseFiles("/worker-template/defaultWorkerTemplate.yaml") + if err != nil { + return "", nil, err + } + } + var wid string + if dryrun { + wid = "validation" + } else { + cwreq := &katibapi.RegisterWorkerRequest{ + Worker: &katibapi.Worker{ + StudyId: studyID, + TrialId: trial.TrialId, + Status: katibapi.State_PENDING, + Type: kind, + }, + } + cwrep, err := c.RegisterWorker(context.Background(), cwreq) + if err != nil { + return "", nil, err + } + wid = cwrep.WorkerId + } + + wi := WorkerInstance{ + StudyID: studyID, + TrialID: trial.TrialId, + WorkerID: wid, + } + var b bytes.Buffer + for _, p := range trial.ParameterSet { + wi.HyperParameters = append(wi.HyperParameters, p) + } + err = wtp.Execute(&b, wi) + if err != nil { + return "", nil, err + } + return wid, &b, nil +} + +func getMetricsCollectorManifest(studyID string, trialID string, workerID string, namespace string, mcs *katibv1alpha1.MetricsCollectorSpec) (*bytes.Buffer, error) { + var mtp *template.Template = nil + var err error + tmpValues := map[string]string{ + "StudyID": studyID, + "TrialID": trialID, + "WorkerID": workerID, + "NameSpace": namespace, + } + if mcs != nil { + if mcs.GoTemplate.RawTemplate != "" { + mtp, err = template.New("MetricsCollector").Parse(mcs.GoTemplate.RawTemplate) + } else if mcs.GoTemplate.TemplatePath != "" { + mtp, err = template.ParseFiles(mcs.GoTemplate.TemplatePath) + } else { + } + if err != nil { + return nil, err + } + } + if mtp == nil { + mtp, err = template.ParseFiles("/metricscollector-template/defaultMetricsCollectorTemplate.yaml") + if err != nil { + return nil, err + } + } + var b bytes.Buffer + err = mtp.Execute(&b, tmpValues) + if err != nil { + return nil, err + } + return &b, nil +} diff --git a/pkg/controller/studyjob/model_util.go b/pkg/controller/studyjob/model_util.go new file mode 100644 index 00000000000..2cf15c695ee --- /dev/null +++ b/pkg/controller/studyjob/model_util.go @@ -0,0 +1,94 @@ +// Copyright 2018 The Kubeflow Authors +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package studyjob + +import ( + "context" + "fmt" + + katibapi "github.com/kubeflow/katib/pkg/api" +) + +func saveModel(c katibapi.ManagerClient, studyID string, trialID string, workerID string) error { + ctx := context.Background() + // Disable ModelDB + //getStudyreq := &katibapi.GetStudyRequest{ + // StudyId: studyId, + //} + //getStudyReply, err := c.GetStudy(ctx, getStudyreq) + //if err != nil { + // return err + //} + //sc := getStudyReply.StudyConfig + getMetricsRequest := &katibapi.GetMetricsRequest{ + StudyId: studyID, + WorkerIds: []string{workerID}, + } + getMetricsReply, err := c.GetMetrics(ctx, getMetricsRequest) + if err != nil { + return err + } + for _, mls := range getMetricsReply.MetricsLogSets { + mets := []*katibapi.Metrics{} + var trial *katibapi.Trial = nil + gtret, err := c.GetTrials( + ctx, + &katibapi.GetTrialsRequest{ + StudyId: studyID, + }) + if err != nil { + return err + } + for _, t := range gtret.Trials { + if t.TrialId == trialID { + trial = t + } + } + for _, ml := range mls.MetricsLogs { + if ml != nil { + if len(ml.Values) > 0 { + mets = append(mets, &katibapi.Metrics{ + Name: ml.Name, + Value: ml.Values[len(ml.Values)-1].Value, + }) + } + } + } + if trial == nil { + return fmt.Errorf("Trial %s not found", trialID) + } + // Disable ModelDB + // if len(mets) > 0 { + // smr := &katibapi.SaveModelRequest{ + // Model: &katibapi.ModelInfo{ + // StudyName: sc.Name, + // WorkerId: mls.WorkerId, + // Parameters: trial.ParameterSet, + // Metrics: mets, + // ModelPath: sc.Name, + // }, + // DataSet: &katibapi.DataSetInfo{ + // Name: sc.Name, + // Path: sc.Name, + // }, + // } + // _, err = c.SaveModel(ctx, smr) + // if err != nil { + // return err + // } + // } + } + return nil +} diff --git a/pkg/controller/studyjobcontroller/studyjob_controller.go b/pkg/controller/studyjob/studyjob_controller.go similarity index 59% rename from pkg/controller/studyjobcontroller/studyjob_controller.go rename to pkg/controller/studyjob/studyjob_controller.go index e0161e3ff43..ef6991ac98d 100644 --- a/pkg/controller/studyjobcontroller/studyjob_controller.go +++ b/pkg/controller/studyjob/studyjob_controller.go @@ -13,16 +13,13 @@ See the License for the specific language governing permissions and limitations under the License. */ -package studyjobcontroller +package studyjob import ( - "bytes" "context" - "fmt" "log" "strconv" "sync" - "text/template" "github.com/kubeflow/katib/pkg" katibapi "github.com/kubeflow/katib/pkg/api" @@ -56,7 +53,7 @@ const maxMsgSize = 1<<31 - 1 // Add creates a new StudyJobController Controller and adds it to the Manager with default RBAC. The Manager will set fields on the Controller // and Start it when the Manager is Started. -// USER ACTION REQUIRED: update cmd/manager/main.go to call this studyjobcontroller.Add(mgr) to install this Controller +// USER ACTION REQUIRED: update cmd/manager/main.go to call this studyjob.Add(mgr) to install this Controller func Add(mgr manager.Manager) error { return add(mgr, newReconciler(mgr)) } @@ -151,7 +148,7 @@ func (r *ReconcileStudyJobController) Reconcile(request reconcile.Request) (reco case katibv1alpha1.ConditionRunning: update, err = r.checkStatus(instance, request.Namespace) default: - err = r.initializeStudy(instance, request.Namespace) + err = initializeStudy(instance, request.Namespace) if err != nil { r.Update(context.TODO(), instance) log.Printf("Fail to initialize %v", err) @@ -174,56 +171,6 @@ func (r *ReconcileStudyJobController) Reconcile(request reconcile.Request) (reco return reconcile.Result{}, nil } -func (r *ReconcileStudyJobController) getStudyConf(instance *katibv1alpha1.StudyJob) (*katibapi.StudyConfig, error) { - sconf := &katibapi.StudyConfig{ - Metrics: []string{}, - ParameterConfigs: &katibapi.StudyConfig_ParameterConfigs{ - Configs: []*katibapi.ParameterConfig{}, - }, - } - sconf.Name = instance.Spec.StudyName - sconf.Owner = instance.Spec.Owner - if instance.Spec.OptimizationGoal != nil { - sconf.OptimizationGoal = *instance.Spec.OptimizationGoal - } - sconf.ObjectiveValueName = instance.Spec.ObjectiveValueName - switch instance.Spec.OptimizationType { - case katibv1alpha1.OptimizationTypeMinimize: - sconf.OptimizationType = katibapi.OptimizationType_MINIMIZE - case katibv1alpha1.OptimizationTypeMaximize: - sconf.OptimizationType = katibapi.OptimizationType_MAXIMIZE - default: - sconf.OptimizationType = katibapi.OptimizationType_UNKNOWN_OPTIMIZATION - } - for _, m := range instance.Spec.MetricsNames { - sconf.Metrics = append(sconf.Metrics, m) - } - for _, pc := range instance.Spec.ParameterConfigs { - p := &katibapi.ParameterConfig{ - Feasible: &katibapi.FeasibleSpace{}, - } - p.Name = pc.Name - p.Feasible.Min = pc.Feasible.Min - p.Feasible.Max = pc.Feasible.Max - p.Feasible.List = pc.Feasible.List - switch pc.ParameterType { - case katibv1alpha1.ParameterTypeUnknown: - p.ParameterType = katibapi.ParameterType_UNKNOWN_TYPE - case katibv1alpha1.ParameterTypeDouble: - p.ParameterType = katibapi.ParameterType_DOUBLE - case katibv1alpha1.ParameterTypeInt: - p.ParameterType = katibapi.ParameterType_INT - case katibv1alpha1.ParameterTypeDiscrete: - p.ParameterType = katibapi.ParameterType_DISCRETE - case katibv1alpha1.ParameterTypeCategorical: - p.ParameterType = katibapi.ParameterType_CATEGORICAL - } - sconf.ParameterConfigs.Configs = append(sconf.ParameterConfigs.Configs, p) - } - sconf.JobId = string(instance.UID) - return sconf, nil -} - func (r *ReconcileStudyJobController) checkGoal(instance *katibv1alpha1.StudyJob, c katibapi.ManagerClient, wids []string) (bool, error) { if instance.Spec.OptimizationGoal == nil { return false, nil @@ -306,59 +253,6 @@ func (r *ReconcileStudyJobController) checkGoal(instance *katibv1alpha1.StudyJob return goal, nil } -func (r *ReconcileStudyJobController) initializeStudy(instance *katibv1alpha1.StudyJob, ns string) error { - if instance.Spec.SuggestionSpec == nil { - instance.Status.Condition = katibv1alpha1.ConditionFailed - return nil - } - if instance.Spec.SuggestionSpec.SuggestionAlgorithm == "" { - instance.Spec.SuggestionSpec.SuggestionAlgorithm = "random" - } - instance.Status.Condition = katibv1alpha1.ConditionRunning - - conn, err := grpc.Dial(pkg.ManagerAddr, grpc.WithInsecure()) - if err != nil { - log.Printf("Connect katib manager error %v", err) - instance.Status.Condition = katibv1alpha1.ConditionFailed - return nil - } - defer conn.Close() - c := katibapi.NewManagerClient(conn) - - studyConfig, err := r.getStudyConf(instance) - if err != nil { - return err - } - - log.Printf("Create Study %s", studyConfig.Name) - //CreateStudy - studyID, err := r.createStudy(c, studyConfig) - if err != nil { - return err - } - instance.Status.StudyID = studyID - log.Printf("Study: %s Suggestion Spec %v", studyID, instance.Spec.SuggestionSpec) - var sspec *katibv1alpha1.SuggestionSpec - if instance.Spec.SuggestionSpec != nil { - sspec = instance.Spec.SuggestionSpec - } else { - sspec = &katibv1alpha1.SuggestionSpec{} - } - sspec.SuggestionParameters = append(sspec.SuggestionParameters, - katibapi.SuggestionParameter{ - Name: "SuggestionCount", - Value: "0", - }) - sPID, err := r.setSuggestionParam(c, studyID, sspec) - if err != nil { - return err - } - instance.Status.SuggestionParameterID = sPID - instance.Status.SuggestionCount += 1 - instance.Status.Condition = katibv1alpha1.ConditionRunning - return nil -} - func (r *ReconcileStudyJobController) checkStatus(instance *katibv1alpha1.StudyJob, ns string) (bool, error) { nextSuggestionSchedule := true var cwids []string @@ -431,7 +325,7 @@ func (r *ReconcileStudyJobController) checkStatus(instance *katibv1alpha1.StudyJ if cjoberr == nil { if ctime != nil && cjob.Status.LastScheduleTime != nil { if ctime.Before(cjob.Status.LastScheduleTime) && len(cjob.Status.Active) == 0 { - r.saveModel(c, instance.Status.StudyID, instance.Status.Trials[i].TrialID, instance.Status.Trials[i].WorkerList[j].WorkerID) + saveModel(c, instance.Status.StudyID, instance.Status.Trials[i].TrialID, instance.Status.Trials[i].WorkerList[j].WorkerID) instance.Status.Trials[i].WorkerList[j].Condition = katibv1alpha1.ConditionCompleted instance.Status.Trials[i].WorkerList[j].CompletionTime = metav1.Now() update = true @@ -498,7 +392,7 @@ func (r *ReconcileStudyJobController) checkStatus(instance *katibv1alpha1.StudyJ func (r *ReconcileStudyJobController) getAndRunSuggestion(instance *katibv1alpha1.StudyJob, c katibapi.ManagerClient, ns string) (bool, error) { //Check Suggestion Count - sps, err := r.getSuggestionParam(c, instance.Status.SuggestionParameterID) + sps, err := getSuggestionParam(c, instance.Status.SuggestionParameterID) if err != nil { return false, err } @@ -513,7 +407,7 @@ func (r *ReconcileStudyJobController) getAndRunSuggestion(instance *katibv1alpha } } //GetSuggestion - getSuggestReply, err := r.getSuggestion( + getSuggestReply, err := getSuggestion( c, instance.Status.StudyID, instance.Spec.SuggestionSpec, @@ -529,7 +423,7 @@ func (r *ReconcileStudyJobController) getAndRunSuggestion(instance *katibv1alpha return true, nil } log.Printf("Study: %s Suggestions %v", instance.Status.StudyID, getSuggestReply) - wkind, err := r.getWorkerKind(instance.Spec.WorkerSpec) + wkind, err := getWorkerKind(instance.Spec.WorkerSpec) if err != nil { log.Printf("getWorkerKind error %v", err) instance.Status.Condition = katibv1alpha1.ConditionFailed @@ -573,161 +467,6 @@ func (r *ReconcileStudyJobController) getAndRunSuggestion(instance *katibv1alpha return true, nil } -func (r *ReconcileStudyJobController) createStudy(c katibapi.ManagerClient, studyConfig *katibapi.StudyConfig) (string, error) { - ctx := context.Background() - createStudyreq := &katibapi.CreateStudyRequest{ - StudyConfig: studyConfig, - } - createStudyreply, err := c.CreateStudy(ctx, createStudyreq) - if err != nil { - log.Printf("CreateStudy Error %v", err) - return "", err - } - studyID := createStudyreply.StudyId - log.Printf("Study ID %s", studyID) - getStudyreq := &katibapi.GetStudyRequest{ - StudyId: studyID, - } - getStudyReply, err := c.GetStudy(ctx, getStudyreq) - if err != nil { - log.Printf("Study: %s GetConfig Error %v", studyID, err) - return "", err - } - log.Printf("Study ID %s StudyConf%v", studyID, getStudyReply.StudyConfig) - return studyID, nil -} - -func (r *ReconcileStudyJobController) setSuggestionParam(c katibapi.ManagerClient, studyID string, suggestionSpec *katibv1alpha1.SuggestionSpec) (string, error) { - ctx := context.Background() - pid := "" - if suggestionSpec.SuggestionParameters != nil { - sspr := &katibapi.SetSuggestionParametersRequest{ - StudyId: studyID, - SuggestionAlgorithm: suggestionSpec.SuggestionAlgorithm, - } - for _, p := range suggestionSpec.SuggestionParameters { - sspr.SuggestionParameters = append( - sspr.SuggestionParameters, - &katibapi.SuggestionParameter{ - Name: p.Name, - Value: p.Value, - }, - ) - } - setSuggesitonParameterReply, err := c.SetSuggestionParameters(ctx, sspr) - if err != nil { - log.Printf("Study %s SetConfig Error %v", studyID, err) - return "", err - } - log.Printf("Study: %s setSuggesitonParameterReply %v", studyID, setSuggesitonParameterReply) - pid = setSuggesitonParameterReply.ParamId - } - return pid, nil -} - -func (r *ReconcileStudyJobController) getSuggestionParam(c katibapi.ManagerClient, paramID string) ([]*katibapi.SuggestionParameter, error) { - ctx := context.Background() - gsreq := &katibapi.GetSuggestionParametersRequest{ - ParamId: paramID, - } - gsrep, err := c.GetSuggestionParameters(ctx, gsreq) - if err != nil { - return nil, err - } - return gsrep.SuggestionParameters, err -} -func (r *ReconcileStudyJobController) getSuggestion(c katibapi.ManagerClient, studyID string, suggestionSpec *katibv1alpha1.SuggestionSpec, sParamID string) (*katibapi.GetSuggestionsReply, error) { - ctx := context.Background() - getSuggestRequest := &katibapi.GetSuggestionsRequest{ - StudyId: studyID, - SuggestionAlgorithm: suggestionSpec.SuggestionAlgorithm, - RequestNumber: int32(suggestionSpec.RequestNumber), - //RequestNumber=0 means get all grids. - ParamId: sParamID, - } - getSuggestReply, err := c.GetSuggestions(ctx, getSuggestRequest) - if err != nil { - log.Printf("Study: %s GetSuggestion Error %v", studyID, err) - return nil, err - } - log.Printf("Study: %s CreatedTrials :", studyID) - for _, t := range getSuggestReply.Trials { - log.Printf("\t%v", t) - } - return getSuggestReply, nil -} -func (r *ReconcileStudyJobController) saveModel(c katibapi.ManagerClient, studyID string, trialID string, workerID string) error { - ctx := context.Background() - // Disable ModelDB - //getStudyreq := &katibapi.GetStudyRequest{ - // StudyId: studyId, - //} - //getStudyReply, err := c.GetStudy(ctx, getStudyreq) - //if err != nil { - // return err - //} - //sc := getStudyReply.StudyConfig - getMetricsRequest := &katibapi.GetMetricsRequest{ - StudyId: studyID, - WorkerIds: []string{workerID}, - } - getMetricsReply, err := c.GetMetrics(ctx, getMetricsRequest) - if err != nil { - return err - } - for _, mls := range getMetricsReply.MetricsLogSets { - mets := []*katibapi.Metrics{} - var trial *katibapi.Trial = nil - gtret, err := c.GetTrials( - ctx, - &katibapi.GetTrialsRequest{ - StudyId: studyID, - }) - if err != nil { - return err - } - for _, t := range gtret.Trials { - if t.TrialId == trialID { - trial = t - } - } - for _, ml := range mls.MetricsLogs { - if ml != nil { - if len(ml.Values) > 0 { - mets = append(mets, &katibapi.Metrics{ - Name: ml.Name, - Value: ml.Values[len(ml.Values)-1].Value, - }) - } - } - } - if trial == nil { - return fmt.Errorf("Trial %s not found", trialID) - } - // Disable ModelDB - // if len(mets) > 0 { - // smr := &katibapi.SaveModelRequest{ - // Model: &katibapi.ModelInfo{ - // StudyName: sc.Name, - // WorkerId: mls.WorkerId, - // Parameters: trial.ParameterSet, - // Metrics: mets, - // ModelPath: sc.Name, - // }, - // DataSet: &katibapi.DataSetInfo{ - // Name: sc.Name, - // Path: sc.Name, - // }, - // } - // _, err = c.SaveModel(ctx, smr) - // if err != nil { - // return err - // } - // } - } - return nil -} - type WorkerInstance struct { StudyID string TrialID string @@ -735,44 +474,8 @@ type WorkerInstance struct { HyperParameters []*katibapi.Parameter } -func (r *ReconcileStudyJobController) getWorkerKind(workerSpec *katibv1alpha1.WorkerSpec) (string, error) { - var typeChecker interface{} - BUFSIZE := 1024 - _, m, err := r.getWorkerManifest( - nil, - "validation", - &katibapi.Trial{ - TrialId: "validation", - ParameterSet: []*katibapi.Parameter{}, - }, - workerSpec, - "", - true, - ) - if err != nil { - return "", err - } - if err := k8syaml.NewYAMLOrJSONDecoder(m, BUFSIZE).Decode(&typeChecker); err != nil { - log.Printf("Yaml decode validation error %v", err) - return "", err - } - tcMap, ok := typeChecker.(map[string]interface{}) - if !ok { - return "", fmt.Errorf("Cannot get kind of worker %v", typeChecker) - } - wkind, ok := tcMap["kind"] - if !ok { - return "", fmt.Errorf("Cannot get kind of worker %v", typeChecker) - } - wkindS, ok := wkind.(string) - if !ok { - return "", fmt.Errorf("Cannot get kind of worker %v", typeChecker) - } - return wkindS, nil -} - func (r *ReconcileStudyJobController) spawnWorker(instance *katibv1alpha1.StudyJob, c katibapi.ManagerClient, studyID string, trial *katibapi.Trial, workerSpec *katibv1alpha1.WorkerSpec, wkind string, dryrun bool) (string, error) { - wid, wm, err := r.getWorkerManifest(c, studyID, trial, workerSpec, wkind, false) + wid, wm, err := getWorkerManifest(c, studyID, trial, workerSpec, wkind, false) if err != nil { return "", err } @@ -799,64 +502,10 @@ func (r *ReconcileStudyJobController) spawnWorker(instance *katibv1alpha1.StudyJ return wid, nil } -func (r *ReconcileStudyJobController) getWorkerManifest(c katibapi.ManagerClient, studyID string, trial *katibapi.Trial, workerSpec *katibv1alpha1.WorkerSpec, kind string, dryrun bool) (string, *bytes.Buffer, error) { - var wtp *template.Template = nil - var err error - if workerSpec != nil { - if workerSpec.GoTemplate.RawTemplate != "" { - wtp, err = template.New("Worker").Parse(workerSpec.GoTemplate.RawTemplate) - } else if workerSpec.GoTemplate.TemplatePath != "" { - wtp, err = template.ParseFiles(workerSpec.GoTemplate.TemplatePath) - } - if err != nil { - return "", nil, err - } - } - if wtp == nil { - wtp, err = template.ParseFiles("/worker-template/defaultWorkerTemplate.yaml") - if err != nil { - return "", nil, err - } - } - var wid string - if dryrun { - wid = "validation" - } else { - cwreq := &katibapi.RegisterWorkerRequest{ - Worker: &katibapi.Worker{ - StudyId: studyID, - TrialId: trial.TrialId, - Status: katibapi.State_PENDING, - Type: kind, - }, - } - cwrep, err := c.RegisterWorker(context.Background(), cwreq) - if err != nil { - return "", nil, err - } - wid = cwrep.WorkerId - } - - wi := WorkerInstance{ - StudyID: studyID, - TrialID: trial.TrialId, - WorkerID: wid, - } - var b bytes.Buffer - for _, p := range trial.ParameterSet { - wi.HyperParameters = append(wi.HyperParameters, p) - } - err = wtp.Execute(&b, wi) - if err != nil { - return "", nil, err - } - return wid, &b, nil -} - func (r *ReconcileStudyJobController) spawnMetricsCollector(instance *katibv1alpha1.StudyJob, c katibapi.ManagerClient, studyID string, trialID string, workerID string, namespace string, mcs *katibv1alpha1.MetricsCollectorSpec) error { var mcjob batchv1beta.CronJob BUFSIZE := 1024 - mcm, err := r.getMetricsCollectorManifest(studyID, trialID, workerID, namespace, mcs) + mcm, err := getMetricsCollectorManifest(studyID, trialID, workerID, namespace, mcs) if err != nil { log.Printf("getMetricsCollectorManifest error %v", err) return err @@ -881,37 +530,3 @@ func (r *ReconcileStudyJobController) spawnMetricsCollector(instance *katibv1alp } return nil } - -func (r *ReconcileStudyJobController) getMetricsCollectorManifest(studyID string, trialID string, workerID string, namespace string, mcs *katibv1alpha1.MetricsCollectorSpec) (*bytes.Buffer, error) { - var mtp *template.Template = nil - var err error - tmpValues := map[string]string{ - "StudyID": studyID, - "TrialID": trialID, - "WorkerID": workerID, - "NameSpace": namespace, - } - if mcs != nil { - if mcs.GoTemplate.RawTemplate != "" { - mtp, err = template.New("MetricsCollector").Parse(mcs.GoTemplate.RawTemplate) - } else if mcs.GoTemplate.TemplatePath != "" { - mtp, err = template.ParseFiles(mcs.GoTemplate.TemplatePath) - } else { - } - if err != nil { - return nil, err - } - } - if mtp == nil { - mtp, err = template.ParseFiles("/metricscollector-template/defaultMetricsCollectorTemplate.yaml") - if err != nil { - return nil, err - } - } - var b bytes.Buffer - err = mtp.Execute(&b, tmpValues) - if err != nil { - return nil, err - } - return &b, nil -}