diff --git a/pkg/apis/controller/trials/v1beta1/util.go b/pkg/apis/controller/trials/v1beta1/util.go index 5688daec935..c3b8390fb52 100644 --- a/pkg/apis/controller/trials/v1beta1/util.go +++ b/pkg/apis/controller/trials/v1beta1/util.go @@ -83,6 +83,15 @@ func (trial *Trial) IsKilled() bool { return hasCondition(trial, TrialKilled) } +// IsMetricsUnavailable returns true if Trial metrics are not available +func (trial *Trial) IsMetricsUnavailable() bool { + cond := getCondition(trial, TrialSucceeded) + if cond != nil && cond.Status == v1.ConditionFalse { + return true + } + return false +} + func (trial *Trial) IsCompleted() bool { return trial.IsSucceeded() || trial.IsFailed() || trial.IsKilled() } diff --git a/pkg/controller.v1beta1/consts/const.go b/pkg/controller.v1beta1/consts/const.go index b94bafe2205..598ce9d8326 100644 --- a/pkg/controller.v1beta1/consts/const.go +++ b/pkg/controller.v1beta1/consts/const.go @@ -147,6 +147,7 @@ const ( TrialTemplateMetaKeyOfLabels = "Labels" // UnavailableMetricValue is the value when metric was not reported or metric value can't be converted to float64 + // This value is recorded in to DB when metrics collector can't parse objective metric from the training logs. UnavailableMetricValue = "unavailable" ) diff --git a/pkg/controller.v1beta1/suggestion/suggestionclient/suggestionclient.go b/pkg/controller.v1beta1/suggestion/suggestionclient/suggestionclient.go index bb79db8c21f..fc81865a258 100644 --- a/pkg/controller.v1beta1/suggestion/suggestionclient/suggestionclient.go +++ b/pkg/controller.v1beta1/suggestion/suggestionclient/suggestionclient.go @@ -199,6 +199,9 @@ func (g *General) ConvertExperiment(e *experimentsv1beta1.Experiment) *suggestio func (g *General) ConvertTrials(ts []trialsv1beta1.Trial) []*suggestionapi.Trial { trialsRes := make([]*suggestionapi.Trial, 0) for _, t := range ts { + if t.IsMetricsUnavailable() { + continue + } trial := &suggestionapi.Trial{ Name: t.Name, Spec: &suggestionapi.TrialSpec{ diff --git a/pkg/controller.v1beta1/suggestion/suggestionclient/suggestionclient_test.go b/pkg/controller.v1beta1/suggestion/suggestionclient/suggestionclient_test.go index 2b17ad89938..d46d6ea86fa 100644 --- a/pkg/controller.v1beta1/suggestion/suggestionclient/suggestionclient_test.go +++ b/pkg/controller.v1beta1/suggestion/suggestionclient/suggestionclient_test.go @@ -23,6 +23,7 @@ import ( "github.com/kubeflow/katib/pkg/controller.v1beta1/consts" "github.com/onsi/gomega" + corev1 "k8s.io/api/core/v1" "k8s.io/apimachinery/pkg/api/equality" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" ) @@ -632,6 +633,21 @@ func newFakeTrials() []trialsv1beta1.Trial { Conditions: fakeConditions, }, }, + { + ObjectMeta: metav1.ObjectMeta{ + Name: "trial3-name", + Namespace: "namespace", + }, + Status: trialsv1beta1.TrialStatus{ + Conditions: []trialsv1beta1.TrialCondition{ + { + Type: trialsv1beta1.TrialSucceeded, + Status: corev1.ConditionFalse, + Message: "Metrics are not available", + }, + }, + }, + }, } } diff --git a/pkg/controller.v1beta1/trial/trial_controller.go b/pkg/controller.v1beta1/trial/trial_controller.go index 2b624fb6b63..c2a64770b8d 100644 --- a/pkg/controller.v1beta1/trial/trial_controller.go +++ b/pkg/controller.v1beta1/trial/trial_controller.go @@ -19,6 +19,7 @@ package trial import ( "context" "fmt" + "time" batchv1beta "k8s.io/api/batch/v1beta1" corev1 "k8s.io/api/core/v1" @@ -57,6 +58,8 @@ const ( var ( log = logf.Log.WithName(ControllerName) + // errMetricsNotReported is the error when Trial job is succeeded but metrics are not reported yet + errMetricsNotReported = fmt.Errorf("Metrics are not reported yet") ) // Add creates a new Trial Controller and adds it to the Manager with default RBAC. The Manager will set fields on the Controller @@ -213,6 +216,11 @@ func (r *ReconcileTrial) Reconcile(request reconcile.Request) (reconcile.Result, } else { err := r.reconcileTrial(instance) if err != nil { + if err == errMetricsNotReported { + return reconcile.Result{ + RequeueAfter: time.Second * 1, + }, nil + } logger.Error(err, "Reconcile trial error") r.recorder.Eventf(instance, corev1.EventTypeWarning, ReconcileFailedReason, @@ -225,7 +233,7 @@ func (r *ReconcileTrial) Reconcile(request reconcile.Request) (reconcile.Result, //assuming that only status change err = r.updateStatusHandler(instance) if err != nil { - logger.Info("Update trial instance status failed, reconciler requeued", "err", err) + logger.Info("Update trial instance status failed, reconcile requeued", "err", err) return reconcile.Result{ Requeue: true, }, nil @@ -254,7 +262,7 @@ func (r *ReconcileTrial) reconcileTrial(instance *trialsv1beta1.Trial) error { // Job already exists // TODO Can desired Spec differ from deployedSpec? if deployedJob != nil { - if instance.Spec.SuccessCondition != "" && instance.Spec.FailureCondition != "" { + if instance.Spec.SuccessCondition != "" && instance.Spec.FailureCondition != "" && !instance.IsCompleted() { jobStatus, err := trialutil.GetDeployedJobStatus(instance, deployedJob) if err != nil { logger.Error(err, "GetDeployedJobStatus error") @@ -270,6 +278,11 @@ func (r *ReconcileTrial) reconcileTrial(instance *trialsv1beta1.Trial) error { return err } } + // If observation is empty metrics collector doesn't finish + if jobStatus.Condition == trialutil.JobSucceeded && instance.Status.Observation == nil { + logger.Info("Trial job is succeeded but metrics are not reported, reconcile requeued") + return errMetricsNotReported + } // Update Trial job status only // if job has succeeded and if observation field is available. @@ -277,7 +290,7 @@ func (r *ReconcileTrial) reconcileTrial(instance *trialsv1beta1.Trial) error { // This will ensure that trial is set to be complete only if metric is collected at least once r.UpdateTrialStatusCondition(instance, deployedJob.GetName(), jobStatus) - } else { + } else if instance.Spec.SuccessCondition == "" && instance.Spec.FailureCondition == "" { // TODO (andreyvelich): This can be deleted after switch to custom CRD kind := deployedJob.GetKind() jobProvider, err := jobv1beta1.New(kind) @@ -319,6 +332,7 @@ func (r *ReconcileTrial) reconcileJob(instance *trialsv1beta1.Trial, desiredJob gvk := schema.FromAPIVersionAndKind(apiVersion, kind) // Add annotation to desired Job to disable istio sidecar + // TODO (andreyvelich): Can be removed after custom CRD implementation err = util.TrainingJobAnnotations(desiredJob) if err != nil { logger.Error(err, "TrainingJobAnnotations error") @@ -333,15 +347,19 @@ func (r *ReconcileTrial) reconcileJob(instance *trialsv1beta1.Trial, desiredJob if instance.IsCompleted() { return nil, nil } - jobProvider, err := jobv1beta1.New(desiredJob.GetKind()) - if err != nil { - return nil, err - } - // mutate desiredJob according to provider - if err := jobProvider.MutateJob(instance, desiredJob); err != nil { - logger.Error(err, "Mutating desiredSpec of km.Training error") - return nil, err - } + + // TODO (andreyvelich): Mutate job needs to be refactored (ref: https://github.com/kubeflow/katib/issues/1320) + // Currently, commented since we don't do Mutate Job for SupportedJobList + // jobProvider, err := jobv1beta1.New(desiredJob.GetKind()) + // if err != nil { + // return nil, err + // } + // // mutate desiredJob according to provider + // if err := jobProvider.MutateJob(instance, desiredJob); err != nil { + // logger.Error(err, "Mutating desiredSpec of km.Training error") + // return nil, err + // } + logger.Info("Creating Job", "kind", kind, "name", desiredJob.GetName()) err = r.Create(context.TODO(), desiredJob) diff --git a/pkg/controller.v1beta1/trial/trial_controller_util.go b/pkg/controller.v1beta1/trial/trial_controller_util.go index c8da0f83576..08f1a3344ea 100644 --- a/pkg/controller.v1beta1/trial/trial_controller_util.go +++ b/pkg/controller.v1beta1/trial/trial_controller_util.go @@ -24,6 +24,7 @@ import ( corev1 "k8s.io/api/core/v1" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" "k8s.io/apimachinery/pkg/apis/meta/v1/unstructured" + "k8s.io/apimachinery/pkg/types" "sigs.k8s.io/controller-runtime/pkg/reconcile" commonv1beta1 "github.com/kubeflow/katib/pkg/apis/controller/common/v1beta1" @@ -40,11 +41,12 @@ const ( // UpdateTrialStatusCondition updates Trial status from current deployed Job status func (r *ReconcileTrial) UpdateTrialStatusCondition(instance *trialsv1beta1.Trial, deployedJobName string, jobStatus *trialutil.TrialJobStatus) { + logger := log.WithValues("Trial", types.NamespacedName{Name: instance.GetName(), Namespace: instance.GetNamespace()}) timeNow := metav1.Now() if jobStatus.Condition == trialutil.JobSucceeded { - if isTrialObservationAvailable(instance) { + if isTrialObservationAvailable(instance) && !instance.IsSucceeded() { msg := "Trial has succeeded " reason := TrialSucceededReason @@ -56,14 +58,16 @@ func (r *ReconcileTrial) UpdateTrialStatusCondition(instance *trialsv1beta1.Tria reason = fmt.Sprintf("%v. Job reason: %v", reason, jobStatus.Reason) } + logger.Info("Trial status changed to Succeeded") instance.MarkTrialStatusSucceeded(corev1.ConditionTrue, reason, msg) instance.Status.CompletionTime = &timeNow eventMsg := fmt.Sprintf("Job %v has succeeded", deployedJobName) r.recorder.Eventf(instance, corev1.EventTypeNormal, JobSucceededReason, eventMsg) r.collector.IncreaseTrialsSucceededCount(instance.Namespace) - } else { - // TODO (andreyvelich): Is is correct to mark succeeded status false when metrics are unavailable? + } else if !instance.IsMetricsUnavailable() { + // TODO (andreyvelich): Is it correct to mark succeeded status false when metrics are unavailable? + // Ref issue to add new condition: https://github.com/kubeflow/katib/issues/1343 msg := "Metrics are not available" reason := TrialMetricsUnavailableReason @@ -75,12 +79,13 @@ func (r *ReconcileTrial) UpdateTrialStatusCondition(instance *trialsv1beta1.Tria reason = fmt.Sprintf("%v. Job reason: %v", reason, jobStatus.Reason) } + logger.Info("Trial status changed to Metrics Unavailable") instance.MarkTrialStatusSucceeded(corev1.ConditionFalse, reason, msg) eventMsg := fmt.Sprintf("Metrics are not available for Job %v", deployedJobName) r.recorder.Eventf(instance, corev1.EventTypeWarning, JobMetricsUnavailableReason, eventMsg) } - } else if jobStatus.Condition == trialutil.JobFailed { + } else if jobStatus.Condition == trialutil.JobFailed && !instance.IsFailed() { msg := "Trial has failed" reason := TrialFailedReason @@ -102,12 +107,14 @@ func (r *ReconcileTrial) UpdateTrialStatusCondition(instance *trialsv1beta1.Tria r.recorder.Eventf(instance, corev1.EventTypeNormal, JobFailedReason, eventMsg) r.collector.IncreaseTrialsFailedCount(instance.Namespace) - } else if jobStatus.Condition == trialutil.JobRunning { + logger.Info("Trial status changed to Failed") + } else if jobStatus.Condition == trialutil.JobRunning && !instance.IsRunning() { msg := "Trial is running" instance.MarkTrialStatusRunning(TrialRunningReason, msg) eventMsg := fmt.Sprintf("Job %v is running", deployedJobName) r.recorder.Eventf(instance, corev1.EventTypeNormal, JobRunningReason, eventMsg) + logger.Info("Trial status changed to Running") // TODO(gaocegege): Should we maintain a TrialsRunningCount? } // else nothing to do @@ -168,7 +175,7 @@ func (r *ReconcileTrial) UpdateTrialStatusObservation(instance *trialsv1beta1.Tr return err } metricStrategies := instance.Spec.Objective.MetricStrategies - if reply.ObservationLog != nil { + if len(reply.ObservationLog.MetricLogs) != 0 { observation, err := getMetrics(reply.ObservationLog.MetricLogs, metricStrategies) if err != nil { log.Error(err, "Get metrics from logs error") @@ -203,9 +210,6 @@ func (r *ReconcileTrial) updateFinalizers(instance *trialsv1beta1.Trial, finaliz } func isTrialObservationAvailable(instance *trialsv1beta1.Trial) bool { - if instance == nil { - return false - } objectiveMetricName := instance.Spec.Objective.ObjectiveMetricName if instance.Status.Observation != nil && instance.Status.Observation.Metrics != nil { for _, metric := range instance.Status.Observation.Metrics { diff --git a/pkg/controller.v1beta1/trial/util/job_util.go b/pkg/controller.v1beta1/trial/util/job_util.go index 565744ec9fe..ea6a875bcac 100644 --- a/pkg/controller.v1beta1/trial/util/job_util.go +++ b/pkg/controller.v1beta1/trial/util/job_util.go @@ -5,7 +5,6 @@ import ( "fmt" "k8s.io/apimachinery/pkg/apis/meta/v1/unstructured" - "k8s.io/apimachinery/pkg/types" logf "sigs.k8s.io/controller-runtime/pkg/runtime/log" trialsv1beta1 "github.com/kubeflow/katib/pkg/apis/controller/trials/v1beta1" @@ -47,7 +46,6 @@ var ( // GetDeployedJobStatus returns internal representation for deployed Job status. func GetDeployedJobStatus(trial *trialsv1beta1.Trial, deployedJob *unstructured.Unstructured) (*TrialJobStatus, error) { - logger := log.WithValues("Trial", types.NamespacedName{Name: trial.GetName(), Namespace: trial.GetNamespace()}) trialJobStatus := &TrialJobStatus{} @@ -75,7 +73,6 @@ func GetDeployedJobStatus(trial *trialsv1beta1.Trial, deployedJob *unstructured. // Job condition is failed trialJobStatus.Condition = JobFailed - logger.Info("Deployed Job status is failed", "Job", deployedJob.GetName()) return trialJobStatus, nil } @@ -97,15 +94,13 @@ func GetDeployedJobStatus(trial *trialsv1beta1.Trial, deployedJob *unstructured. // Job condition is succeeded trialJobStatus.Condition = JobSucceeded - logger.Info("Deployed Job status is succeeded", "Job", deployedJob.GetName()) return trialJobStatus, nil } // Set default Job condition is running when Job name is generated. - // Check if Trial is not running and is not completed - if !trial.IsRunning() && deployedJob.GetName() != "" && !trial.IsCompleted() { + // Check if Trial is not running + if !trial.IsRunning() && deployedJob.GetName() != "" { trialJobStatus.Condition = JobRunning - logger.Info("Deployed Job status is running", "Job", deployedJob.GetName()) return trialJobStatus, nil } diff --git a/pkg/controller.v1beta1/util/annotations.go b/pkg/controller.v1beta1/util/annotations.go index 0c41b60e688..758d887086c 100644 --- a/pkg/controller.v1beta1/util/annotations.go +++ b/pkg/controller.v1beta1/util/annotations.go @@ -16,8 +16,6 @@ limitations under the License. package util import ( - "fmt" - batchv1 "k8s.io/api/batch/v1" "k8s.io/apimachinery/pkg/apis/meta/v1/unstructured" "k8s.io/apimachinery/pkg/runtime" @@ -25,7 +23,6 @@ import ( suggestionsv1beta1 "github.com/kubeflow/katib/pkg/apis/controller/suggestions/v1beta1" "github.com/kubeflow/katib/pkg/controller.v1beta1/consts" - jobv1beta1 "github.com/kubeflow/katib/pkg/job/v1beta1" pytorchv1 "github.com/kubeflow/pytorch-operator/pkg/apis/pytorch/v1" tfv1 "github.com/kubeflow/tf-operator/pkg/apis/tensorflow/v1" ) @@ -102,13 +99,8 @@ func TrainingJobAnnotations(desiredJob *unstructured.Unstructured) error { } return nil default: - // Annotation appending of custom job can be done in Provider.MutateJob. - if _, ok := jobv1beta1.SupportedJobList[kind]; ok { - return nil - } - return fmt.Errorf("Invalid Katib Training Job kind %v", kind) + return nil } - } func appendAnnotation(annotations map[string]string, newAnnotationName string, newAnnotationValue string) map[string]string { diff --git a/pkg/db/v1beta1/mysql/mysql.go b/pkg/db/v1beta1/mysql/mysql.go index f84f79b5b15..a04ea734b15 100644 --- a/pkg/db/v1beta1/mysql/mysql.go +++ b/pkg/db/v1beta1/mysql/mysql.go @@ -92,10 +92,10 @@ func NewDBInterface() (common.KatibDBInterface, error) { } func (d *dbConn) RegisterObservationLog(trialName string, observationLog *v1beta1.ObservationLog) error { - var mname, mvalue string + sqlQuery := "INSERT INTO observation_logs (trial_name, time, metric_name, value) VALUES " + values := []interface{}{} + for _, mlog := range observationLog.MetricLogs { - mname = mlog.Metric.Name - mvalue = mlog.Metric.Value if mlog.TimeStamp == "" { continue } @@ -104,22 +104,24 @@ func (d *dbConn) RegisterObservationLog(trialName string, observationLog *v1beta return fmt.Errorf("Error parsing start time %s: %v", mlog.TimeStamp, err) } sqlTimeStr := t.UTC().Format(mysqlTimeFmt) - _, err = d.db.Exec( - `INSERT INTO observation_logs ( - trial_name, - time, - metric_name, - value - ) VALUES (?, ?, ?, ?)`, - trialName, - sqlTimeStr, - mname, - mvalue, - ) - if err != nil { - return err - } + + sqlQuery += "(?, ?, ?, ?)," + values = append(values, trialName, sqlTimeStr, mlog.Metric.Name, mlog.Metric.Value) } + sqlQuery = sqlQuery[0 : len(sqlQuery)-1] + + // Prepare the statement + stmt, err := d.db.Prepare(sqlQuery) + if err != nil { + return fmt.Errorf("Pepare SQL statement failed: %v", err) + } + + // Execute INSERT + _, err = stmt.Exec(values...) + if err != nil { + return fmt.Errorf("Execute SQL INSERT failed: %v", err) + } + return nil } diff --git a/pkg/db/v1beta1/mysql/mysql_test.go b/pkg/db/v1beta1/mysql/mysql_test.go index 5fc68600c01..bea0182655b 100644 --- a/pkg/db/v1beta1/mysql/mysql_test.go +++ b/pkg/db/v1beta1/mysql/mysql_test.go @@ -60,37 +60,22 @@ func TestRegisterObservationLog(t *testing.T) { Value: "0.5", }, }, - { - TimeStamp: "2016-12-31T20:02:05.123456Z", - Metric: &api_pb.Metric{ - Name: "precision", - Value: "88.7", - }, - }, - { - TimeStamp: "2016-12-31T20:02:05.123456Z", - Metric: &api_pb.Metric{ - Name: "recall", - Value: "89.2", - }, - }, }, } - for _, m := range obsLog.MetricLogs { - mock.ExpectExec( - `INSERT INTO observation_logs \( - trial_name, - time, - metric_name, - value - \)`, - ).WithArgs( - "test1_trial1", - "2016-12-31 20:02:05.123456", - m.Metric.Name, - m.Metric.Value, - ).WillReturnResult(sqlmock.NewResult(1, 1)) - } + mock.ExpectPrepare("INSERT") + mock.ExpectExec( + "INSERT", + ).WithArgs( + "test1_trial1", + "2016-12-31 20:02:05.123456", + "f1_score", + "88.95", + "test1_trial1", + "2016-12-31 20:02:05.123456", + "loss", + "0.5", + ).WillReturnResult(sqlmock.NewResult(1, 1)) + err := dbInterface.RegisterObservationLog("test1_trial1", obsLog) if err != nil { t.Errorf("RegisterExperiment failed: %v", err) diff --git a/pkg/metricscollector/v1beta1/common/const.py b/pkg/metricscollector/v1beta1/common/const.py index f64df59b0a9..fdcf4156dfb 100644 --- a/pkg/metricscollector/v1beta1/common/const.py +++ b/pkg/metricscollector/v1beta1/common/const.py @@ -8,3 +8,7 @@ DEFAULT_METRICS_FILE_DIR = "/log" # Job finished marker in $$$$.pid file when main process is completed TRAINING_COMPLETED = "completed" + +# UnavailableMetricValue is the value in the DB +# when metrics collector can't parse objective metric from the training logs. +UNAVAILABLE_METRIC_VALUE = "unavailable" diff --git a/pkg/metricscollector/v1beta1/file-metricscollector/file-metricscollector.go b/pkg/metricscollector/v1beta1/file-metricscollector/file-metricscollector.go index b3f2f0dc6a5..e98ee984d26 100644 --- a/pkg/metricscollector/v1beta1/file-metricscollector/file-metricscollector.go +++ b/pkg/metricscollector/v1beta1/file-metricscollector/file-metricscollector.go @@ -8,6 +8,7 @@ import ( "time" v1beta1 "github.com/kubeflow/katib/pkg/apis/manager/v1beta1" + "github.com/kubeflow/katib/pkg/controller.v1beta1/consts" "github.com/kubeflow/katib/pkg/metricscollector/v1beta1/common" "k8s.io/klog" ) @@ -31,14 +32,14 @@ func parseLogs(logs []string, metrics []string, filters []string) (*v1beta1.Obse for _, logline := range logs { // skip line which doesn't contain any metrics keywords, avoiding unnecessary pattern match - isObjLine := false + isMetricLine := false for _, m := range metrics { if strings.Contains(logline, m) { - isObjLine = true + isMetricLine = true break } } - if !isObjLine { + if !isMetricLine { continue } @@ -78,7 +79,31 @@ func parseLogs(logs []string, metrics []string, filters []string) (*v1beta1.Obse } } } - olog.MetricLogs = mlogs + // Metrics logs must contain at least one objective metric value + // Objective metric is located at first index + isObjectiveMetricReported := false + for _, mLog := range mlogs { + if mLog.Metric.Name == metrics[0] { + isObjectiveMetricReported = true + break + } + } + // If objective metrics were not reported, insert unavailable value in the DB + if !isObjectiveMetricReported { + olog.MetricLogs = []*v1beta1.MetricLog{ + { + TimeStamp: time.Time{}.UTC().Format(time.RFC3339), + Metric: &v1beta1.Metric{ + Name: metrics[0], + Value: consts.UnavailableMetricValue, + }, + }, + } + klog.Infof("Objective metric %v is not found in training logs, %v value is reported", metrics[0], consts.UnavailableMetricValue) + } else { + olog.MetricLogs = mlogs + } + return olog, nil } diff --git a/pkg/metricscollector/v1beta1/tfevent-metricscollector/tfevent_loader.py b/pkg/metricscollector/v1beta1/tfevent-metricscollector/tfevent_loader.py index 4549f1fe911..6453421d129 100644 --- a/pkg/metricscollector/v1beta1/tfevent-metricscollector/tfevent_loader.py +++ b/pkg/metricscollector/v1beta1/tfevent-metricscollector/tfevent_loader.py @@ -4,6 +4,7 @@ import rfc3339 import api_pb2 from logging import getLogger, StreamHandler, INFO +import const # TFEventFileParser parses tfevent files and returns an ObservationLog of the metrics specified. # When the event file is under a directory(e.g. test dir), please specify "{{dirname}}/{{metrics name}}" @@ -61,4 +62,26 @@ def parse_file(self, directory): except Exception as e: self.logger.warning("Unexpected error: " + str(e)) continue + + # Metrics logs must contain at least one objective metric value + # Objective metric is located at first index + is_objective_metric_reported = False + for ml in mls: + if ml.metric.name == self.metrics[0]: + is_objective_metric_reported = True + break + # If objective metrics were not reported, insert unavailable value in the DB + if not is_objective_metric_reported: + mls = [ + api_pb2.MetricLog( + time_stamp=rfc3339.rfc3339(datetime.now()), + metric=api_pb2.Metric( + name=self.metrics[0], + value=const.UNAVAILABLE_METRIC_VALUE + ) + ) + ] + self.logger.info("Objective metric {} is not found in training logs, {} value is reported".format( + self.metrics[0], const.UNAVAILABLE_METRIC_VALUE)) + return api_pb2.ObservationLog(metric_logs=mls) diff --git a/pkg/webhook/v1beta1/experiment/validator/validator.go b/pkg/webhook/v1beta1/experiment/validator/validator.go index 995a5ded6aa..f62ce6e7063 100644 --- a/pkg/webhook/v1beta1/experiment/validator/validator.go +++ b/pkg/webhook/v1beta1/experiment/validator/validator.go @@ -284,7 +284,7 @@ func (g *DefaultValidator) validateTrialTemplate(instance *experimentsv1beta1.Ex // Check if Job is supported // Check if Job can be converted to Batch Job/TFJob/PyTorchJob - // Not default CRDs can be omitted later + // Other jobs are not validated if err := g.validateSupportedJob(runSpec); err != nil { return fmt.Errorf("Invalid spec.trialTemplate: %v", err) } @@ -336,7 +336,7 @@ func (g *DefaultValidator) validateSupportedJob(runSpec *unstructured.Unstructur return nil } } - return fmt.Errorf("Job type %v not supported", gvk) + return nil } func validatePatchJob(runSpec *unstructured.Unstructured, job interface{}, jobType string) error { diff --git a/pkg/webhook/v1beta1/experiment/validator/validator_test.go b/pkg/webhook/v1beta1/experiment/validator/validator_test.go index ff4537cb8e8..9269ec49bec 100644 --- a/pkg/webhook/v1beta1/experiment/validator/validator_test.go +++ b/pkg/webhook/v1beta1/experiment/validator/validator_test.go @@ -355,9 +355,9 @@ spec: emptyAPIVersionJob.TypeMeta.APIVersion = "" emptyAPIVersionStr := convertBatchJobToString(emptyAPIVersionJob) - invalidJobType := newFakeBatchJob() - invalidJobType.TypeMeta.Kind = "InvalidKind" - invalidJobTypeStr := convertBatchJobToString(invalidJobType) + customJobType := newFakeBatchJob() + customJobType.TypeMeta.Kind = "CustomKind" + customJobTypeStr := convertBatchJobToString(customJobType) emptyConfigMap := p.EXPECT().GetTrialTemplate(gomock.Any()).Return("", errors.New(string(metav1.StatusReasonNotFound))) @@ -371,7 +371,7 @@ spec: invalidParameterTemplate := p.EXPECT().GetTrialTemplate(gomock.Any()).Return(invalidParameterJobStr, nil) notEmptyMetadataTemplate := p.EXPECT().GetTrialTemplate(gomock.Any()).Return(notEmptyMetadataStr, nil) emptyAPIVersionTemplate := p.EXPECT().GetTrialTemplate(gomock.Any()).Return(emptyAPIVersionStr, nil) - invalidJobTypeTemplate := p.EXPECT().GetTrialTemplate(gomock.Any()).Return(invalidJobTypeStr, nil) + customJobTypeTemplate := p.EXPECT().GetTrialTemplate(gomock.Any()).Return(customJobTypeStr, nil) gomock.InOrder( emptyConfigMap, @@ -384,7 +384,7 @@ spec: invalidParameterTemplate, notEmptyMetadataTemplate, emptyAPIVersionTemplate, - invalidJobTypeTemplate, + customJobTypeTemplate, ) tcs := []struct { @@ -550,15 +550,15 @@ spec: Err: true, testDescription: "Trial template doesn't contain APIVersion or Kind", }, - // Trial Template has invalid Kind - // invalidJobTypeTemplate case + // Trial Template has custom Kind + // customJobTypeTemplate case { Instance: func() *experimentsv1beta1.Experiment { i := newFakeInstance() return i }(), - Err: true, - testDescription: "Trial template has invalid Kind", + Err: false, + testDescription: "Trial template has custom Kind", }, } for _, tc := range tcs { diff --git a/pkg/webhook/v1beta1/pod/inject_webhook.go b/pkg/webhook/v1beta1/pod/inject_webhook.go index e4b86f2b7b5..f2519322131 100644 --- a/pkg/webhook/v1beta1/pod/inject_webhook.go +++ b/pkg/webhook/v1beta1/pod/inject_webhook.go @@ -169,7 +169,7 @@ func (s *sidecarInjector) Mutate(pod *v1.Pod, namespace string) (*v1.Pod, error) mountPath, pathKind := getMountPath(trial.Spec.MetricsCollector) if mountPath != "" { - if err = mutateVolume(mutatedPod, jobKind, mountPath, injectContainer.Name, pathKind); err != nil { + if err = mutateVolume(mutatedPod, jobKind, mountPath, injectContainer.Name, trial.Spec.PrimaryContainerName, pathKind); err != nil { return nil, err } } diff --git a/pkg/webhook/v1beta1/pod/inject_webhook_test.go b/pkg/webhook/v1beta1/pod/inject_webhook_test.go index e8de454682f..63a5077765c 100644 --- a/pkg/webhook/v1beta1/pod/inject_webhook_test.go +++ b/pkg/webhook/v1beta1/pod/inject_webhook_test.go @@ -398,6 +398,7 @@ func TestMutateVolume(t *testing.T) { JobKind string MountPath string SidecarContainerName string + PrimaryContainerName string PathKind common.FileSystemKind Err bool }{ @@ -433,6 +434,12 @@ func TestMutateVolume(t *testing.T) { }, { Name: "metrics-collector", + VolumeMounts: []v1.VolumeMount{ + { + Name: common.MetricsVolume, + MountPath: filepath.Dir(common.DefaultFilePath), + }, + }, }, }, Volumes: []v1.Volume{ @@ -447,7 +454,8 @@ func TestMutateVolume(t *testing.T) { }, JobKind: "Job", MountPath: common.DefaultFilePath, - SidecarContainerName: "train-job", + SidecarContainerName: "metrics-collector", + PrimaryContainerName: "train-job", PathKind: common.FileKind, } @@ -456,6 +464,7 @@ func TestMutateVolume(t *testing.T) { tc.JobKind, tc.MountPath, tc.SidecarContainerName, + tc.PrimaryContainerName, tc.PathKind) if err != nil { t.Errorf("mutateVolume failed: %v", err) diff --git a/pkg/webhook/v1beta1/pod/utils.go b/pkg/webhook/v1beta1/pod/utils.go index 32feb7d3348..202a4711f36 100644 --- a/pkg/webhook/v1beta1/pod/utils.go +++ b/pkg/webhook/v1beta1/pod/utils.go @@ -236,7 +236,7 @@ func getMarkCompletedCommand(mountPath string, pathKind common.FileSystemKind) s return fmt.Sprintf("echo %s > %s", mccommon.TrainingCompleted, pidFile) } -func mutateVolume(pod *v1.Pod, jobKind, mountPath, sidecarContainerName string, pathKind common.FileSystemKind) error { +func mutateVolume(pod *v1.Pod, jobKind, mountPath, sidecarContainerName, primaryContainerName string, pathKind common.FileSystemKind) error { metricsVol := v1.Volume{ Name: common.MetricsVolume, VolumeSource: v1.VolumeSource{ @@ -257,11 +257,16 @@ func mutateVolume(pod *v1.Pod, jobKind, mountPath, sidecarContainerName string, if c.Name == sidecarContainerName { shouldMount = true } else { - jobProvider, err := jobv1beta1.New(jobKind) - if err != nil { - return err + if primaryContainerName != "" && c.Name == primaryContainerName { + shouldMount = true + // TODO (andreyvelich): This can be deleted after switch to custom CRD + } else if primaryContainerName == "" { + jobProvider, err := jobv1beta1.New(jobKind) + if err != nil { + return err + } + shouldMount = jobProvider.IsTrainingContainer(i, c) } - shouldMount = jobProvider.IsTrainingContainer(i, c) } if shouldMount { indexList = append(indexList, i) diff --git a/test/e2e/v1beta1/invalid-experiment.yaml b/test/e2e/v1beta1/invalid-experiment.yaml index 0ca2fb29625..fadfdfb4bd2 100644 --- a/test/e2e/v1beta1/invalid-experiment.yaml +++ b/test/e2e/v1beta1/invalid-experiment.yaml @@ -13,7 +13,7 @@ spec: additionalMetricNames: - Train-accuracy algorithm: - algorithmName: random + algorithmName: invalid-algorithm # Invalid Algorithm to check that validation webhook is working parameters: - name: lr parameterType: double @@ -45,7 +45,7 @@ spec: reference: optimizer trialSpec: apiVersion: batch/v1 - kind: InvalidKind # Invalid Kind to check validation webhook + kind: Job spec: template: spec: