diff --git a/.gitignore b/.gitignore index 6241a98bb08..3e41b2b726a 100644 --- a/.gitignore +++ b/.gitignore @@ -22,6 +22,7 @@ bin *.dll *.so *.dylib +pkg/metricscollector/v1beta1/file-metricscollector/testdata ## Test binary, build with `go test -c` *.test diff --git a/cmd/metricscollector/v1beta1/file-metricscollector/main.go b/cmd/metricscollector/v1beta1/file-metricscollector/main.go index 49476bd4db7..e3e0ae86c4f 100644 --- a/cmd/metricscollector/v1beta1/file-metricscollector/main.go +++ b/cmd/metricscollector/v1beta1/file-metricscollector/main.go @@ -39,11 +39,13 @@ package main import ( "context" + "encoding/json" "flag" "fmt" "io/ioutil" "os" "path/filepath" + "regexp" "strconv" "strings" "time" @@ -102,6 +104,7 @@ var ( earlyStopServiceAddr = flag.String("s-earlystop", "", "Katib Early Stopping service endpoint") trialName = flag.String("t", "", "Trial Name") metricsFilePath = flag.String("path", "", "Metrics File Path") + metricsFileFormat = flag.String("format", "", "Metrics File Format") metricNames = flag.String("m", "", "Metric names") objectiveType = flag.String("o-type", "", "Objective type") metricFilters = flag.String("f", "", "Metric filters") @@ -137,7 +140,7 @@ func printMetricsFile(mFile string) { } } -func watchMetricsFile(mFile string, stopRules stopRulesFlag, filters []string) { +func watchMetricsFile(mFile string, stopRules stopRulesFlag, filters []string, fileFormat commonv1beta1.FileFormat) { // metricStartStep is the dict where key = metric name, value = start step. // We should apply early stopping rule only if metric is reported at least "start_step" times. @@ -148,9 +151,6 @@ func watchMetricsFile(mFile string, stopRules stopRulesFlag, filters []string) { } } - // First metric is objective in metricNames array. - objMetric := strings.Split(*metricNames, ";")[0] - objType := commonv1beta1.ObjectiveType(*objectiveType) // For objective metric we calculate best optimal value from the recorded metrics. // This is workaround for Median Stop algorithm. // TODO (andreyvelich): Think about it, maybe define latest, max or min strategy type in stop-rule as well ? @@ -169,9 +169,6 @@ func watchMetricsFile(mFile string, stopRules stopRulesFlag, filters []string) { klog.Fatalf("Failed to create new Process from pid %v, error: %v", mainProcPid, err) } - // Get list of regural expressions from filters. - metricRegList := filemc.GetFilterRegexpList(filters) - // Start watch log lines. t, _ := tail.TailFile(mFile, tail.Config{Follow: true}) for line := range t.Lines { @@ -179,78 +176,82 @@ func watchMetricsFile(mFile string, stopRules stopRulesFlag, filters []string) { // Print log line klog.Info(logText) - // Check if log line contains metric from stop rules. - isRuleLine := false - for _, rule := range stopRules { - if strings.Contains(logText, rule.Name) { - isRuleLine = true - break - } - } - // If log line doesn't contain appropriate metric, continue track file. - if !isRuleLine { - continue - } - - // If log line contains appropriate metric, find all submatches from metric filters. - for _, metricReg := range metricRegList { - matchStrings := metricReg.FindAllStringSubmatch(logText, -1) - for _, subMatchList := range matchStrings { - if len(subMatchList) < 3 { - continue - } - // Submatch must have metric name and float value - metricName := strings.TrimSpace(subMatchList[1]) - metricValue, err := strconv.ParseFloat(strings.TrimSpace(subMatchList[2]), 64) - if err != nil { - klog.Fatalf("Unable to parse value %v to float for metric %v", metricValue, metricName) + switch fileFormat { + case commonv1beta1.TextFormat: + // Get list of regural expressions from filters. + var metricRegList []*regexp.Regexp + metricRegList = filemc.GetFilterRegexpList(filters) + + // Check if log line contains metric from stop rules. + isRuleLine := false + for _, rule := range stopRules { + if strings.Contains(logText, rule.Name) { + isRuleLine = true + break } + } + // If log line doesn't contain appropriate metric, continue track file. + if !isRuleLine { + continue + } - // stopRules contains array of EarlyStoppingRules that has not been reached yet. - // After rule is reached we delete appropriate element from the array. - for idx, rule := range stopRules { - if metricName != rule.Name { + // If log line contains appropriate metric, find all submatches from metric filters. + for _, metricReg := range metricRegList { + matchStrings := metricReg.FindAllStringSubmatch(logText, -1) + for _, subMatchList := range matchStrings { + if len(subMatchList) < 3 { continue } - - // Calculate optimalObjValue. - if metricName == objMetric { - if optimalObjValue == nil { - optimalObjValue = &metricValue - } else if objType == commonv1beta1.ObjectiveTypeMaximize && metricValue > *optimalObjValue { - optimalObjValue = &metricValue - } else if objType == commonv1beta1.ObjectiveTypeMinimize && metricValue < *optimalObjValue { - optimalObjValue = &metricValue - } - // Assign best optimal value to metric value. - metricValue = *optimalObjValue + // Submatch must have metric name and float value + metricName := strings.TrimSpace(subMatchList[1]) + metricValue, err := strconv.ParseFloat(strings.TrimSpace(subMatchList[2]), 64) + if err != nil { + klog.Fatalf("Unable to parse value %v to float for metric %v", metricValue, metricName) } - // Reduce steps if appropriate metric is reported. - // Once rest steps are empty we apply early stopping rule. - if _, ok := metricStartStep[metricName]; ok { - metricStartStep[metricName]-- - if metricStartStep[metricName] != 0 { + // stopRules contains array of EarlyStoppingRules that has not been reached yet. + // After rule is reached we delete appropriate element from the array. + for idx, rule := range stopRules { + if metricName != rule.Name { continue } + stopRules, optimalObjValue = updateStopRules(stopRules, optimalObjValue, metricValue, metricStartStep, rule, idx) } + } + } + case commonv1beta1.JsonFormat: + var logJsonObj map[string]interface{} + if err = json.Unmarshal([]byte(logText), &logJsonObj); err != nil { + klog.Fatalf("Failed to unmarshal logs in %v format, log: %s, error: %v", commonv1beta1.JsonFormat, logText, err) + } + // Check if log line contains metric from stop rules. + isRuleLine := false + for _, rule := range stopRules { + if _, exist := logJsonObj[rule.Name]; exist { + isRuleLine = true + break + } + } + // If log line doesn't contain appropriate metric, continue track file. + if !isRuleLine { + continue + } - ruleValue, err := strconv.ParseFloat(rule.Value, 64) - if err != nil { - klog.Fatalf("Unable to parse value %v to float for rule metric %v", rule.Value, rule.Name) - } - - // Metric value can be equal, less or greater than stop rule. - // Deleting suitable stop rule from the array. - if rule.Comparison == commonv1beta1.ComparisonTypeEqual && metricValue == ruleValue { - stopRules = deleteStopRule(stopRules, idx) - } else if rule.Comparison == commonv1beta1.ComparisonTypeLess && metricValue < ruleValue { - stopRules = deleteStopRule(stopRules, idx) - } else if rule.Comparison == commonv1beta1.ComparisonTypeGreater && metricValue > ruleValue { - stopRules = deleteStopRule(stopRules, idx) - } + // stopRules contains array of EarlyStoppingRules that has not been reached yet. + // After rule is reached we delete appropriate element from the array. + for idx, rule := range stopRules { + value, exist := logJsonObj[rule.Name].(string) + if !exist { + continue + } + metricValue, err := strconv.ParseFloat(strings.TrimSpace(value), 64) + if err != nil { + klog.Fatalf("Unable to parse value %v to float for metric %v", metricValue, rule.Name) } + stopRules, optimalObjValue = updateStopRules(stopRules, optimalObjValue, metricValue, metricStartStep, rule, idx) } + default: + klog.Fatalf("Format must be set to %v or %v", commonv1beta1.TextFormat, commonv1beta1.JsonFormat) } // If stopRules array is empty, Trial is early stopped. @@ -289,7 +290,7 @@ func watchMetricsFile(mFile string, stopRules stopRulesFlag, filters []string) { } // Report metrics to DB. - reportMetrics(filters) + reportMetrics(filters, fileFormat) // Wait until main process is completed. timeout := 60 * time.Second @@ -326,6 +327,58 @@ func watchMetricsFile(mFile string, stopRules stopRulesFlag, filters []string) { } } +func updateStopRules( + stopRules []commonv1beta1.EarlyStoppingRule, + optimalObjValue *float64, + metricValue float64, + metricStartStep map[string]int, + rule commonv1beta1.EarlyStoppingRule, + ruleIdx int, +) ([]commonv1beta1.EarlyStoppingRule, *float64) { + + // First metric is objective in metricNames array. + objMetric := strings.Split(*metricNames, ";")[0] + objType := commonv1beta1.ObjectiveType(*objectiveType) + + // Calculate optimalObjValue. + if rule.Name == objMetric { + if optimalObjValue == nil { + optimalObjValue = &metricValue + } else if objType == commonv1beta1.ObjectiveTypeMaximize && metricValue > *optimalObjValue { + optimalObjValue = &metricValue + } else if objType == commonv1beta1.ObjectiveTypeMinimize && metricValue < *optimalObjValue { + optimalObjValue = &metricValue + } + // Assign best optimal value to metric value. + metricValue = *optimalObjValue + } + + // Reduce steps if appropriate metric is reported. + // Once rest steps are empty we apply early stopping rule. + if _, ok := metricStartStep[rule.Name]; ok { + metricStartStep[rule.Name]-- + if metricStartStep[rule.Name] != 0 { + return stopRules, optimalObjValue + } + } + + ruleValue, err := strconv.ParseFloat(rule.Value, 64) + if err != nil { + klog.Fatalf("Unable to parse value %v to float for rule metric %v", rule.Value, rule.Name) + } + + // Metric value can be equal, less or greater than stop rule. + // Deleting suitable stop rule from the array. + if rule.Comparison == commonv1beta1.ComparisonTypeEqual && metricValue == ruleValue { + return deleteStopRule(stopRules, ruleIdx), optimalObjValue + } else if rule.Comparison == commonv1beta1.ComparisonTypeLess && metricValue < ruleValue { + return deleteStopRule(stopRules, ruleIdx), optimalObjValue + } else if rule.Comparison == commonv1beta1.ComparisonTypeGreater && metricValue > ruleValue { + return deleteStopRule(stopRules, ruleIdx), optimalObjValue + } + return stopRules, optimalObjValue +} + func deleteStopRule(stopRules []commonv1beta1.EarlyStoppingRule, idx int) []commonv1beta1.EarlyStoppingRule { if idx >= len(stopRules) { klog.Fatalf("Index %v out of range stopRules: %v", idx, stopRules) @@ -345,9 +398,11 @@ func main() { filters = strings.Split(*metricFilters, ";") } + fileFormat := commonv1beta1.FileFormat(*metricsFileFormat) + // If stop rule is set we need to parse metrics during run. if len(stopRules) != 0 { - go watchMetricsFile(*metricsFilePath, stopRules, filters) + go watchMetricsFile(*metricsFilePath, stopRules, filters, fileFormat) } else { go printMetricsFile(*metricsFilePath) } @@ -366,11 +421,11 @@ func main() { // If training was not early stopped, report the metrics. if !isEarlyStopped { - reportMetrics(filters) + reportMetrics(filters, fileFormat) } } -func reportMetrics(filters []string) { +func reportMetrics(filters []string, fileFormat commonv1beta1.FileFormat) { conn, err := grpc.Dial(*dbManagerServiceAddr, grpc.WithInsecure()) if err != nil { @@ -383,7 +438,7 @@ func reportMetrics(filters []string) { if len(*metricNames) != 0 { metricList = strings.Split(*metricNames, ";") } - olog, err := filemc.CollectObservationLog(*metricsFilePath, metricList, filters) + olog, err := filemc.CollectObservationLog(*metricsFilePath, metricList, filters, fileFormat) if err != nil { klog.Fatalf("Failed to collect logs: %v", err) } diff --git a/examples/v1beta1/early-stopping/median-stop-with-json-format.yaml b/examples/v1beta1/early-stopping/median-stop-with-json-format.yaml new file mode 100644 index 00000000000..c2b44118bbd --- /dev/null +++ b/examples/v1beta1/early-stopping/median-stop-with-json-format.yaml @@ -0,0 +1,72 @@ +# This is example with median stopping early stopping rule with logs in JSON format. +# It has bad feasible space for learning rate to show more early stopped Trials. +apiVersion: kubeflow.org/v1beta1 +kind: Experiment +metadata: + namespace: kubeflow + name: median-stop-with-json-format +spec: + objective: + type: maximize + goal: 0.99 + objectiveMetricName: accuracy + additionalMetricNames: + - loss + metricsCollectorSpec: + source: + fileSystemPath: + path: "/katib/mnist.json" + kind: File + format: JSON + collector: + kind: File + algorithm: + algorithmName: random + earlyStopping: + algorithmName: medianstop + algorithmSettings: + - name: min_trials_required + value: "1" + - name: start_step + value: "2" + parallelTrialCount: 2 + maxTrialCount: 15 + maxFailedTrialCount: 3 + parameters: + - name: lr + parameterType: double + feasibleSpace: + min: "0.01" + max: "0.5" + - name: num-epochs + parameterType: int + feasibleSpace: + min: "3" + max: "4" + trialTemplate: + retain: true + primaryContainerName: training-container + trialParameters: + - name: learningRate + description: Learning rate for the training model + reference: lr + - name: numberEpochs + description: Number of epochs to train the model + reference: num-epochs + trialSpec: + apiVersion: batch/v1 + kind: Job + spec: + template: + spec: + containers: + - name: training-container + image: docker.io/kubeflowkatib/pytorch-mnist:latest + command: + - "python3" + - "/opt/pytorch-mnist/mnist.py" + - "--epochs=${trialParameters.numberEpochs}" + - "--log-path=/katib/mnist.json" + - "--lr=${trialParameters.learningRate}" + - "--logger=hypertune" + restartPolicy: Never diff --git a/examples/v1beta1/metrics-collector/file-metrics-collector-with-json-format.yaml b/examples/v1beta1/metrics-collector/file-metrics-collector-with-json-format.yaml new file mode 100644 index 00000000000..789c7ddcd48 --- /dev/null +++ b/examples/v1beta1/metrics-collector/file-metrics-collector-with-json-format.yaml @@ -0,0 +1,63 @@ +apiVersion: kubeflow.org/v1beta1 +kind: Experiment +metadata: + namespace: kubeflow + name: file-metrics-collector-with-json-format +spec: + objective: + type: maximize + goal: 0.99 + objectiveMetricName: accuracy + additionalMetricNames: + - loss + metricsCollectorSpec: + source: + fileSystemPath: + path: "/katib/mnist.json" + kind: File + format: JSON + collector: + kind: File + algorithm: + algorithmName: random + parallelTrialCount: 3 + maxTrialCount: 12 + maxFailedTrialCount: 3 + parameters: + - name: lr + parameterType: double + feasibleSpace: + min: "0.01" + max: "0.03" + - name: momentum + parameterType: double + feasibleSpace: + min: "0.3" + max: "0.7" + trialTemplate: + primaryContainerName: training-container + trialParameters: + - name: learningRate + description: Learning rate for the training model + reference: lr + - name: momentum + description: Momentum for the training model + reference: momentum + trialSpec: + apiVersion: batch/v1 + kind: Job + spec: + template: + spec: + containers: + - name: training-container + image: docker.io/kubeflowkatib/pytorch-mnist:latest + command: + - "python3" + - "/opt/pytorch-mnist/mnist.py" + - "--epochs=2" + - "--log-path=/katib/mnist.json" + - "--lr=${trialParameters.learningRate}" + - "--momentum=${trialParameters.momentum}" + - "--logger=hypertune" + restartPolicy: Never diff --git a/examples/v1beta1/trial-images/pytorch-mnist/Dockerfile b/examples/v1beta1/trial-images/pytorch-mnist/Dockerfile index 81213e2cd13..0a6138112bf 100644 --- a/examples/v1beta1/trial-images/pytorch-mnist/Dockerfile +++ b/examples/v1beta1/trial-images/pytorch-mnist/Dockerfile @@ -5,6 +5,7 @@ WORKDIR /opt/pytorch-mnist # Add folder for the logs. RUN mkdir /katib +RUN pip install cloudml-hypertune==0.1.0.dev6 RUN chgrp -R 0 /opt/pytorch-mnist \ && chmod -R g+rwX /opt/pytorch-mnist \ diff --git a/examples/v1beta1/trial-images/pytorch-mnist/README.md b/examples/v1beta1/trial-images/pytorch-mnist/README.md index cbb10e08b7a..3326bc74b9c 100644 --- a/examples/v1beta1/trial-images/pytorch-mnist/README.md +++ b/examples/v1beta1/trial-images/pytorch-mnist/README.md @@ -5,5 +5,7 @@ to the file or printing to the StdOut. It uses convolutional neural network to train the model. Katib uses this training container in some Experiments, for instance in the -[file Metrics Collector example](../../metrics-collector/file-metrics-collector.yaml#L55-L64) -or in the [PyTorchJob example](../../kubeflow-training-operator/pytorchjob-mnist.yaml#L47-L54). +[file Metrics Collector example](../../metrics-collector/file-metrics-collector.yaml#L55-L64), +the [file Metrics Collector with logs in JSON format example](../../metrics-collector/file-metrics-collector-with-json-format.yaml#L52-L62), +the [median stopping early stopping rule with logs in JSON format example](../../early-stopping/median-stop-with-json-format.yaml#L62-L71) +and the [PyTorchJob example](../../kubeflow-training-operator/pytorchjob-mnist.yaml#L47-L54). diff --git a/examples/v1beta1/trial-images/pytorch-mnist/mnist.py b/examples/v1beta1/trial-images/pytorch-mnist/mnist.py index b1c32de35bb..fade3d0e6cd 100644 --- a/examples/v1beta1/trial-images/pytorch-mnist/mnist.py +++ b/examples/v1beta1/trial-images/pytorch-mnist/mnist.py @@ -4,6 +4,7 @@ import logging import os +import hypertune from torchvision import datasets, transforms import torch import torch.distributed as dist @@ -50,7 +51,7 @@ def train(args, model, device, train_loader, optimizer, epoch): niter = epoch * len(train_loader) + batch_idx -def test(args, model, device, test_loader, epoch): +def test(args, model, device, test_loader, epoch, hpt): model.eval() test_loss = 0 correct = 0 @@ -63,8 +64,19 @@ def test(args, model, device, test_loader, epoch): correct += pred.eq(target.view_as(pred)).sum().item() test_loss /= len(test_loader.dataset) + test_accuracy = float(correct) / len(test_loader.dataset) logging.info("{{metricName: accuracy, metricValue: {:.4f}}};{{metricName: loss, metricValue: {:.4f}}}\n".format( - float(correct) / len(test_loader.dataset), test_loss)) + test_accuracy, test_loss)) + + if args.logger == "hypertune": + hpt.report_hyperparameter_tuning_metric( + hyperparameter_metric_tag='loss', + metric_value=test_loss, + global_step=epoch) + hpt.report_hyperparameter_tuning_metric( + hyperparameter_metric_tag='accuracy', + metric_value=test_accuracy, + global_step=epoch) def should_distribute(): @@ -98,6 +110,8 @@ def main(): help="Path to save logs. Print to StdOut if log-path is not set") parser.add_argument("--save-model", action="store_true", default=False, help="For Saving the current Model") + parser.add_argument("--logger", type=str, choices=["standard", "hypertune"], + help="Logger", default="standard") if dist.is_available(): parser.add_argument("--backend", type=str, help="Distributed backend", @@ -107,7 +121,7 @@ def main(): # Use this format (%Y-%m-%dT%H:%M:%SZ) to record timestamp of the metrics. # If log_path is empty print log to StdOut, otherwise print log to the file. - if args.log_path == "": + if args.log_path == "" or args.logger == "hypertune": logging.basicConfig( format="%(asctime)s %(levelname)-8s %(message)s", datefmt="%Y-%m-%dT%H:%M:%SZ", @@ -119,6 +133,12 @@ def main(): level=logging.DEBUG, filename=args.log_path) + if args.logger == "hypertune" and args.log_path != "": + os.environ['CLOUD_ML_HP_METRIC_FILE'] = args.log_path + + # For JSON logging + hpt = hypertune.HyperTune() + use_cuda = not args.no_cuda and torch.cuda.is_available() if use_cuda: print("Using CUDA") @@ -161,7 +181,7 @@ def main(): for epoch in range(1, args.epochs + 1): train(args, model, device, train_loader, optimizer, epoch) - test(args, model, device, test_loader, epoch) + test(args, model, device, test_loader, epoch, hpt) if (args.save_model): torch.save(model.state_dict(), "mnist_cnn.pt") diff --git a/pkg/apis/controller/common/v1beta1/common_types.go b/pkg/apis/controller/common/v1beta1/common_types.go index a13bfe8a35c..8d237fc2360 100644 --- a/pkg/apis/controller/common/v1beta1/common_types.go +++ b/pkg/apis/controller/common/v1beta1/common_types.go @@ -188,10 +188,18 @@ const ( InvalidKind FileSystemKind = "Invalid" ) +type FileFormat string + +const ( + TextFormat FileFormat = "TEXT" + JsonFormat FileFormat = "JSON" +) + // +k8s:deepcopy-gen=true type FileSystemPath struct { - Path string `json:"path,omitempty"` - Kind FileSystemKind `json:"kind,omitempty"` + Path string `json:"path,omitempty"` + Kind FileSystemKind `json:"kind,omitempty"` + Format FileFormat `json:"format,omitempty"` } type CollectorKind string diff --git a/pkg/apis/controller/experiments/v1beta1/experiment_defaults.go b/pkg/apis/controller/experiments/v1beta1/experiment_defaults.go index a5602ab8072..8f31278a967 100644 --- a/pkg/apis/controller/experiments/v1beta1/experiment_defaults.go +++ b/pkg/apis/controller/experiments/v1beta1/experiment_defaults.go @@ -158,6 +158,9 @@ func (e *Experiment) setDefaultMetricsCollector() { if e.Spec.MetricsCollectorSpec.Source.FileSystemPath.Path == "" { e.Spec.MetricsCollectorSpec.Source.FileSystemPath.Path = common.DefaultFilePath } + if e.Spec.MetricsCollectorSpec.Source.FileSystemPath.Format == "" { + e.Spec.MetricsCollectorSpec.Source.FileSystemPath.Format = common.TextFormat + } case common.TfEventCollector: if e.Spec.MetricsCollectorSpec.Source == nil { e.Spec.MetricsCollectorSpec.Source = &common.SourceSpec{} diff --git a/pkg/apis/v1beta1/openapi_generated.go b/pkg/apis/v1beta1/openapi_generated.go index 1f2aa283d8c..523d9bcacd7 100644 --- a/pkg/apis/v1beta1/openapi_generated.go +++ b/pkg/apis/v1beta1/openapi_generated.go @@ -285,6 +285,12 @@ func schema_apis_controller_common_v1beta1_FileSystemPath(ref common.ReferenceCa Format: "", }, }, + "format": { + SchemaProps: spec.SchemaProps{ + Type: []string{"string"}, + Format: "", + }, + }, }, }, }, diff --git a/pkg/apis/v1beta1/swagger.json b/pkg/apis/v1beta1/swagger.json index 62183654c6d..21926b928fd 100644 --- a/pkg/apis/v1beta1/swagger.json +++ b/pkg/apis/v1beta1/swagger.json @@ -779,6 +779,9 @@ "v1beta1.FileSystemPath": { "type": "object", "properties": { + "format": { + "type": "string" + }, "kind": { "type": "string" }, diff --git a/pkg/metricscollector/v1beta1/common/const.go b/pkg/metricscollector/v1beta1/common/const.go index 56e2b57d9e5..4edbcd79d65 100644 --- a/pkg/metricscollector/v1beta1/common/const.go +++ b/pkg/metricscollector/v1beta1/common/const.go @@ -46,6 +46,8 @@ const ( // Score=1.23E10 DefaultFilter = `([\w|-]+)\s*=\s*([+-]?\d*(\.\d+)?([Ee][+-]?\d+)?)` + TimeStampJsonKey = "timestamp" + // TODO (andreyvelich): Do we need to maintain 2 names? Should we leave only 1? MetricCollectorContainerName = "metrics-collector" MetricLoggerCollectorContainerName = "metrics-logger-and-collector" diff --git a/pkg/metricscollector/v1beta1/file-metricscollector/file-metricscollector.go b/pkg/metricscollector/v1beta1/file-metricscollector/file-metricscollector.go index 727b352ecba..9708c3f2e20 100644 --- a/pkg/metricscollector/v1beta1/file-metricscollector/file-metricscollector.go +++ b/pkg/metricscollector/v1beta1/file-metricscollector/file-metricscollector.go @@ -17,19 +17,24 @@ limitations under the License. package sidecarmetricscollector import ( + "encoding/json" + "fmt" "io/ioutil" "os" "regexp" + "strconv" "strings" "time" + "k8s.io/klog" + + commonv1beta1 "github.com/kubeflow/katib/pkg/apis/controller/common/v1beta1" v1beta1 "github.com/kubeflow/katib/pkg/apis/manager/v1beta1" "github.com/kubeflow/katib/pkg/controller.v1beta1/consts" "github.com/kubeflow/katib/pkg/metricscollector/v1beta1/common" - "k8s.io/klog" ) -func CollectObservationLog(fileName string, metrics []string, filters []string) (*v1beta1.ObservationLog, error) { +func CollectObservationLog(fileName string, metrics []string, filters []string, fileFormat commonv1beta1.FileFormat) (*v1beta1.ObservationLog, error) { file, err := os.Open(fileName) if err != nil { return nil, err @@ -40,12 +45,17 @@ func CollectObservationLog(fileName string, metrics []string, filters []string) return nil, err } logs := string(content) - olog, err := parseLogs(strings.Split(logs, "\n"), metrics, filters) - return olog, err + + switch fileFormat { + case commonv1beta1.TextFormat: + return parseLogsInTextFormat(strings.Split(logs, "\n"), metrics, filters) + case commonv1beta1.JsonFormat: + return parseLogsInJsonFormat(strings.Split(logs, "\n"), metrics) + } + return nil, fmt.Errorf("format must be set %v or %v", commonv1beta1.TextFormat, commonv1beta1.JsonFormat) } -func parseLogs(logs []string, metrics []string, filters []string) (*v1beta1.ObservationLog, error) { - olog := &v1beta1.ObservationLog{} +func parseLogsInTextFormat(logs []string, metrics []string, filters []string) (*v1beta1.ObservationLog, error) { metricRegList := GetFilterRegexpList(filters) mlogs := make([]*v1beta1.MetricLog, 0, len(logs)) @@ -98,6 +108,51 @@ func parseLogs(logs []string, metrics []string, filters []string) (*v1beta1.Obse } } } + return newObservationLog(mlogs, metrics), nil +} + +func parseLogsInJsonFormat(logs []string, metrics []string) (*v1beta1.ObservationLog, error) { + mlogs := make([]*v1beta1.MetricLog, 0, len(logs)) + + for _, logline := range logs { + if len(logline) == 0 { + continue + } + var jsonObj map[string]interface{} + if err := json.Unmarshal([]byte(logline), &jsonObj); err != nil { + return nil, err + } + + timestamp := time.Time{}.UTC().Format(time.RFC3339) + timestampJsonValue, exist := jsonObj[common.TimeStampJsonKey] + if !exist { + klog.Warningf("Metrics will not have timestamp since %s doesn't have the key timestamp", logline) + } else { + if parsedTimestamp := parseTimestamp(timestampJsonValue); parsedTimestamp == "" { + klog.Warningf("Metrics will not have timestamp since error parsing time %v", timestampJsonValue) + } else { + timestamp = parsedTimestamp + } + } + + for _, m := range metrics { + value, exist := jsonObj[m].(string) + if !exist { + continue + } + mlogs = append(mlogs, &v1beta1.MetricLog{ + TimeStamp: timestamp, + Metric: &v1beta1.Metric{ + Name: m, + Value: value, + }, + }) + } + } + return newObservationLog(mlogs, metrics), nil +} + +func newObservationLog(mlogs []*v1beta1.MetricLog, metrics []string) *v1beta1.ObservationLog { // Metrics logs must contain at least one objective metric value // Objective metric is located at first index isObjectiveMetricReported := false @@ -109,21 +164,64 @@ func parseLogs(logs []string, metrics []string, filters []string) (*v1beta1.Obse } // If objective metrics were not reported, insert unavailable value in the DB if !isObjectiveMetricReported { - olog.MetricLogs = []*v1beta1.MetricLog{ - { - TimeStamp: time.Time{}.UTC().Format(time.RFC3339), - Metric: &v1beta1.Metric{ - Name: metrics[0], - Value: consts.UnavailableMetricValue, + klog.Infof("Objective metric %v is not found in training logs, %v value is reported", metrics[0], consts.UnavailableMetricValue) + return &v1beta1.ObservationLog{ + MetricLogs: []*v1beta1.MetricLog{ + { + TimeStamp: time.Time{}.UTC().Format(time.RFC3339), + Metric: &v1beta1.Metric{ + Name: metrics[0], + Value: consts.UnavailableMetricValue, + }, }, }, } - klog.Infof("Objective metric %v is not found in training logs, %v value is reported", metrics[0], consts.UnavailableMetricValue) - } else { - olog.MetricLogs = mlogs } + return &v1beta1.ObservationLog{ + MetricLogs: mlogs, + } +} + +func parseTimestamp(timestamp interface{}) string { + if stringTimestamp, ok := timestamp.(string); ok { + + if stringTimestamp == "" { + klog.Warningln("Timestamp is empty") + return "" + } else if _, err := time.Parse(time.RFC3339Nano, stringTimestamp); err != nil { + klog.Warningf("Failed to parse timestamp since %s is not RFC3339Nano format", stringTimestamp) + return "" + } + return stringTimestamp + + } else { + + floatTimestamp, ok := timestamp.(float64) + if !ok { + klog.Warningf("Failed to parse timestamp since the type of %v is neither string nor float64", timestamp) + return "" + } + + stringTimestamp = strconv.FormatFloat(floatTimestamp, 'f', -1, 64) + t := strings.Split(stringTimestamp, ".") - return olog, nil + sec, err := strconv.ParseInt(t[0], 10, 64) + if err != nil { + klog.Warningf("Failed to parse timestamp; %v", err) + return "" + } + + var nanoSec int64 = 0 + if len(t) == 2 { + nanoSec, err = strconv.ParseInt(t[1], 10, 64) + if err != nil { + klog.Warningf("Failed to parse timestamp; %v", err) + return "" + } + } + + return time.Unix(sec, nanoSec).UTC().Format(time.RFC3339Nano) + } } // GetFilterRegexpList returns Regexp array from filters string array diff --git a/pkg/metricscollector/v1beta1/file-metricscollector/file-metricscollector_test.go b/pkg/metricscollector/v1beta1/file-metricscollector/file-metricscollector_test.go new file mode 100644 index 00000000000..b502e4362fa --- /dev/null +++ b/pkg/metricscollector/v1beta1/file-metricscollector/file-metricscollector_test.go @@ -0,0 +1,217 @@ +/* +Copyright 2021 The Kubeflow Authors. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package sidecarmetricscollector + +import ( + "os" + "path/filepath" + "reflect" + "testing" + "time" + + commonv1beta1 "github.com/kubeflow/katib/pkg/apis/controller/common/v1beta1" + v1beta1 "github.com/kubeflow/katib/pkg/apis/manager/v1beta1" + "github.com/kubeflow/katib/pkg/controller.v1beta1/consts" +) + +var testJsonDataPath = filepath.Join("testdata", "JSON") + +func TestCollectObservationLog(t *testing.T) { + + if err := generateTestFiles(); err != nil { + t.Fatal(err) + } + defer os.RemoveAll(filepath.Dir(testJsonDataPath)) + + // TODO (tenzen-y): We should add tests for logs in TEXT format. + // Ref: https://github.com/kubeflow/katib/issues/1756 + testCases := []struct { + description string + filePath string + metrics []string + filters []string + fileFormat commonv1beta1.FileFormat + err bool + expected *v1beta1.ObservationLog + }{ + { + description: "Positive case for logs in JSON format", + filePath: filepath.Join(testJsonDataPath, "good.json"), + metrics: []string{"acc", "loss"}, + fileFormat: commonv1beta1.JsonFormat, + expected: &v1beta1.ObservationLog{ + MetricLogs: []*v1beta1.MetricLog{ + { + TimeStamp: "2021-12-02T05:27:27.000028721Z", + Metric: &v1beta1.Metric{ + Name: "loss", + Value: "0.22082142531871796", + }, + }, + { + TimeStamp: "2021-12-02T05:27:27.000287801Z", + Metric: &v1beta1.Metric{ + Name: "acc", + Value: "0.9349666833877563", + }, + }, + { + TimeStamp: "2021-12-02T14:27:50.000035161Z", + Metric: &v1beta1.Metric{ + Name: "loss", + Value: "0.1414974331855774", + }, + }, + { + TimeStamp: "2021-12-02T14:27:50.000037459Z", + Metric: &v1beta1.Metric{ + Name: "acc", + Value: "0.9586416482925415", + }, + }, + { + TimeStamp: time.Time{}.UTC().Format(time.RFC3339), + Metric: &v1beta1.Metric{ + Name: "loss", + Value: "0.10683439671993256", + }, + }, + }, + }, + }, + { + description: "Invalid file name", + filePath: "invalid", + err: true, + }, + { + description: "Invalid file format", + filePath: filepath.Join(testJsonDataPath, "good.json"), + fileFormat: "invalid", + err: true, + }, + { + description: "Invalid formatted file for logs in JSON format", + filePath: filepath.Join(testJsonDataPath, "invalid-format.json"), + fileFormat: commonv1beta1.JsonFormat, + err: true, + }, + { + description: "Invalid timestamp for logs in JSON format", + filePath: filepath.Join(testJsonDataPath, "invalid-timestamp.json"), + fileFormat: commonv1beta1.JsonFormat, + metrics: []string{"acc", "loss"}, + expected: &v1beta1.ObservationLog{ + MetricLogs: []*v1beta1.MetricLog{ + { + TimeStamp: time.Time{}.UTC().Format(time.RFC3339), + Metric: &v1beta1.Metric{ + Name: "loss", + Value: "0.22082142531871796", + }, + }, + { + TimeStamp: "2021-12-02T05:27:27Z", + Metric: &v1beta1.Metric{ + Name: "acc", + Value: "0.9349666833877563", + }, + }, + }, + }, + }, + { + description: "Missing objective metric in training logs", + filePath: filepath.Join(testJsonDataPath, "missing-objective-metric.json"), + fileFormat: commonv1beta1.JsonFormat, + metrics: []string{"acc", "loss"}, + expected: &v1beta1.ObservationLog{ + MetricLogs: []*v1beta1.MetricLog{ + { + TimeStamp: time.Time{}.UTC().Format(time.RFC3339), + Metric: &v1beta1.Metric{ + Name: "acc", + Value: consts.UnavailableMetricValue, + }, + }, + }, + }, + }, + } + + for _, test := range testCases { + t.Run(test.description, func(t *testing.T) { + actual, err := CollectObservationLog(test.filePath, test.metrics, test.filters, test.fileFormat) + if (err != nil) != test.err { + t.Errorf("\nGOT: \n%v\nWANT: %v\n", err, test.err) + } else { + if !reflect.DeepEqual(actual, test.expected) { + t.Errorf("Expected %v\n got %v", test.expected, actual) + } + } + }) + } +} + +func generateTestFiles() error { + if _, err := os.Stat(testJsonDataPath); err != nil { + if err = os.MkdirAll(testJsonDataPath, 0700); err != nil { + return err + } + } + + testData := []struct { + fileName string + data string + }{ + { + fileName: "good.json", + data: `{"checkpoint_path": "", "global_step": "0", "loss": "0.22082142531871796", "timestamp": 1638422847.28721, "trial": "0"} +{"acc": "0.9349666833877563", "checkpoint_path": "", "global_step": "0", "timestamp": 1638422847.287801, "trial": "0"} +{"checkpoint_path": "", "global_step": "1", "loss": "0.1414974331855774", "timestamp": "2021-12-02T14:27:50.000035161Z", "trial": "0"} +{"acc": "0.9586416482925415", "checkpoint_path": "", "global_step": "1", "timestamp": "2021-12-02T14:27:50.000037459Z", "trial": "0"} +{"checkpoint_path": "", "global_step": "2", "loss": "0.10683439671993256", "trial": "0"} +`, + }, + { + fileName: "invalid-format.json", + data: `"checkpoint_path": "", "global_step": "0", "loss": "0.22082142531871796", "timestamp": 1638422847.28721, "trial": "0" +{"acc": "0.9349666833877563", "checkpoint_path": "", "global_step": "0", "timestamp": 1638422847.287801, "trial": "0 +`, + }, + { + fileName: "invalid-timestamp.json", + data: `{"checkpoint_path": "", "global_step": "0", "loss": "0.22082142531871796", "timestamp": "invalid", "trial": "0"} +{"acc": "0.9349666833877563", "checkpoint_path": "", "global_step": "0", "timestamp": 1638422847, "trial": "0"} +`, + }, { + fileName: "missing-objective-metric.json", + data: `{"checkpoint_path": "", "global_step": "0", "loss": "0.22082142531871796", "timestamp": 1638422847.28721, "trial": "0"} +{"checkpoint_path": "", "global_step": "1", "loss": "0.1414974331855774", "timestamp": "2021-12-02T14:27:50.000035161+09:00", "trial": "0"} +{"checkpoint_path": "", "global_step": "2", "loss": "0.10683439671993256", "trial": "0"}`, + }, + } + + for _, td := range testData { + filePath := filepath.Join(testJsonDataPath, td.fileName) + if err := os.WriteFile(filePath, []byte(td.data), 0600); err != nil { + return err + } + } + + return nil +} diff --git a/pkg/webhook/v1beta1/experiment/validator/validator.go b/pkg/webhook/v1beta1/experiment/validator/validator.go index 0649b3e17e1..c8990080833 100644 --- a/pkg/webhook/v1beta1/experiment/validator/validator.go +++ b/pkg/webhook/v1beta1/experiment/validator/validator.go @@ -428,11 +428,22 @@ func (g *DefaultValidator) validateMetricsCollector(inst *experimentsv1beta1.Exp mcSpec.Source.FileSystemPath.Kind != commonapiv1beta1.FileKind || !filepath.IsAbs(mcSpec.Source.FileSystemPath.Path) { return fmt.Errorf("file path where metrics file exists is required by .spec.metricsCollectorSpec.source.fileSystemPath.path") } + // Format + fileFormat := mcSpec.Source.FileSystemPath.Format + if fileFormat != commonapiv1beta1.TextFormat && fileFormat != commonapiv1beta1.JsonFormat { + return fmt.Errorf("format of metrics file is required by .spec.metricsCollectorSpec.source.fileSystemPath.format") + } + if fileFormat == commonapiv1beta1.JsonFormat && mcSpec.Source.Filter != nil { + return fmt.Errorf(".spec.metricsCollectorSpec.source.filter must be nil when format of metrics file is %v", commonapiv1beta1.JsonFormat) + } case commonapiv1beta1.TfEventCollector: if mcSpec.Source == nil || mcSpec.Source.FileSystemPath == nil || mcSpec.Source.FileSystemPath.Kind != commonapiv1beta1.DirectoryKind || !filepath.IsAbs(mcSpec.Source.FileSystemPath.Path) { return fmt.Errorf("directory path where tensorflow event files exist is required by .spec.metricsCollectorSpec.source.fileSystemPath.path") } + if mcSpec.Source.FileSystemPath.Format != "" { + return fmt.Errorf(".spec.metricsCollectorSpec.source.fileSystemPath.format must be empty") + } case commonapiv1beta1.PrometheusMetricCollector: i, err := strconv.Atoi(mcSpec.Source.HttpGet.Port.String()) if err != nil || i <= 0 { diff --git a/pkg/webhook/v1beta1/experiment/validator/validator_test.go b/pkg/webhook/v1beta1/experiment/validator/validator_test.go index 923e79bfc8a..1f56088df15 100644 --- a/pkg/webhook/v1beta1/experiment/validator/validator_test.go +++ b/pkg/webhook/v1beta1/experiment/validator/validator_test.go @@ -798,7 +798,8 @@ func TestValidateMetricsCollector(t *testing.T) { }, Source: &commonv1beta1.SourceSpec{ FileSystemPath: &commonv1beta1.FileSystemPath{ - Path: "not/absolute/path", + Path: "not/absolute/path", + Format: commonv1beta1.TextFormat, }, }, } @@ -826,6 +827,27 @@ func TestValidateMetricsCollector(t *testing.T) { Err: true, testDescription: "Invalid path for TF event metrics collector", }, + // TfEventCollector invalid file format + { + Instance: func() *experimentsv1beta1.Experiment { + i := newFakeInstance() + i.Spec.MetricsCollectorSpec = &commonv1beta1.MetricsCollectorSpec{ + Collector: &commonv1beta1.CollectorSpec{ + Kind: commonv1beta1.TfEventCollector, + }, + Source: &commonv1beta1.SourceSpec{ + FileSystemPath: &commonv1beta1.FileSystemPath{ + Path: "/absolute/path", + Format: commonv1beta1.JsonFormat, + Kind: commonv1beta1.DirectoryKind, + }, + }, + } + return i + }(), + Err: true, + testDescription: "Invalid file format for TF event metrics collector", + }, // PrometheusMetricCollector invalid Port { Instance: func() *experimentsv1beta1.Experiment { @@ -920,8 +942,9 @@ func TestValidateMetricsCollector(t *testing.T) { }, }, FileSystemPath: &commonv1beta1.FileSystemPath{ - Path: "/absolute/path", - Kind: commonv1beta1.FileKind, + Path: "/absolute/path", + Kind: commonv1beta1.FileKind, + Format: commonv1beta1.TextFormat, }, }, } @@ -945,8 +968,9 @@ func TestValidateMetricsCollector(t *testing.T) { }, }, FileSystemPath: &commonv1beta1.FileSystemPath{ - Path: "/absolute/path", - Kind: commonv1beta1.FileKind, + Path: "/absolute/path", + Kind: commonv1beta1.FileKind, + Format: commonv1beta1.TextFormat, }, }, } @@ -955,6 +979,49 @@ func TestValidateMetricsCollector(t *testing.T) { Err: true, testDescription: "One subexpression in metrics format", }, + // FileMetricCollector invalid file format + { + Instance: func() *experimentsv1beta1.Experiment { + i := newFakeInstance() + i.Spec.MetricsCollectorSpec = &commonv1beta1.MetricsCollectorSpec{ + Collector: &commonv1beta1.CollectorSpec{ + Kind: commonv1beta1.FileCollector, + }, + Source: &commonv1beta1.SourceSpec{ + FileSystemPath: &commonv1beta1.FileSystemPath{ + Path: "/absolute/path", + Kind: commonv1beta1.FileKind, + Format: "invalid", + }, + }, + } + return i + }(), + Err: true, + testDescription: "Invalid file format for File metrics collector", + }, + // FileMetricCollector invalid metrics filter + { + Instance: func() *experimentsv1beta1.Experiment { + i := newFakeInstance() + i.Spec.MetricsCollectorSpec = &commonv1beta1.MetricsCollectorSpec{ + Collector: &commonv1beta1.CollectorSpec{ + Kind: commonv1beta1.FileCollector, + }, + Source: &commonv1beta1.SourceSpec{ + Filter: &commonv1beta1.FilterSpec{}, + FileSystemPath: &commonv1beta1.FileSystemPath{ + Path: "/absolute/path", + Kind: commonv1beta1.FileKind, + Format: commonv1beta1.JsonFormat, + }, + }, + } + return i + }(), + Err: true, + testDescription: "Invalid metrics filer for File metrics collector when file format is `JSON`", + }, // Valid FileMetricCollector { Instance: func() *experimentsv1beta1.Experiment { @@ -965,8 +1032,9 @@ func TestValidateMetricsCollector(t *testing.T) { }, Source: &commonv1beta1.SourceSpec{ FileSystemPath: &commonv1beta1.FileSystemPath{ - Path: "/absolute/path", - Kind: commonv1beta1.FileKind, + Path: "/absolute/path", + Kind: commonv1beta1.FileKind, + Format: commonv1beta1.JsonFormat, }, }, } diff --git a/pkg/webhook/v1beta1/pod/inject_webhook.go b/pkg/webhook/v1beta1/pod/inject_webhook.go index d4cb97471ae..cb90ffd6918 100644 --- a/pkg/webhook/v1beta1/pod/inject_webhook.go +++ b/pkg/webhook/v1beta1/pod/inject_webhook.go @@ -294,6 +294,14 @@ func (s *SidecarInjector) getMetricsCollectorArgs(trial *trialsv1beta1.Trial, me if mc.Source != nil && mc.Source.Filter != nil && len(mc.Source.Filter.MetricsFormat) > 0 { args = append(args, "-f", strings.Join(mc.Source.Filter.MetricsFormat, ";")) } + if mc.Collector.Kind == common.FileCollector && mc.Source != nil { + if mc.Source.FileSystemPath != nil { + args = append(args, "-format", string(mc.Source.FileSystemPath.Format)) + } + } + if mc.Collector.Kind == common.StdOutCollector { + args = append(args, "-format", string(common.TextFormat)) + } if metricsCollectorConfigData.WaitAllProcesses != nil { args = append(args, "-w", strconv.FormatBool(*metricsCollectorConfigData.WaitAllProcesses)) } diff --git a/pkg/webhook/v1beta1/pod/inject_webhook_test.go b/pkg/webhook/v1beta1/pod/inject_webhook_test.go index 0f3a3e4a6da..44a55629b81 100644 --- a/pkg/webhook/v1beta1/pod/inject_webhook_test.go +++ b/pkg/webhook/v1beta1/pod/inject_webhook_test.go @@ -343,6 +343,7 @@ func TestGetMetricsCollectorArgs(t *testing.T) { "-o-type", string(testObjective), "-s-db", katibDBAddress, "-path", common.DefaultFilePath, + "-format", string(common.TextFormat), "-w", "false", }, Name: "StdOut MC", @@ -356,7 +357,8 @@ func TestGetMetricsCollectorArgs(t *testing.T) { }, Source: &common.SourceSpec{ FileSystemPath: &common.FileSystemPath{ - Path: testPath, + Path: testPath, + Format: common.TextFormat, }, Filter: &common.FilterSpec{ MetricsFormat: []string{ @@ -374,9 +376,35 @@ func TestGetMetricsCollectorArgs(t *testing.T) { "-s-db", katibDBAddress, "-path", testPath, "-f", "{mn1: ([a-b]), mv1: [0-9]};{mn2: ([a-b]), mv2: ([0-9])}", + "-format", string(common.TextFormat), }, Name: "File MC with Filter", }, + { + Trial: testTrial, + MetricNames: testMetricName, + MCSpec: common.MetricsCollectorSpec{ + Collector: &common.CollectorSpec{ + Kind: common.FileCollector, + }, + Source: &common.SourceSpec{ + FileSystemPath: &common.FileSystemPath{ + Path: testPath, + Format: common.JsonFormat, + }, + }, + }, + KatibConfig: katibconfig.MetricsCollectorConfig{}, + ExpectedArgs: []string{ + "-t", testTrialName, + "-m", testMetricName, + "-o-type", string(testObjective), + "-s-db", katibDBAddress, + "-path", testPath, + "-format", string(common.JsonFormat), + }, + Name: "File MC with Json Format", + }, { Trial: testTrial, MetricNames: testMetricName, @@ -473,6 +501,7 @@ func TestGetMetricsCollectorArgs(t *testing.T) { "-o-type", string(testObjective), "-s-db", katibDBAddress, "-path", common.DefaultFilePath, + "-format", string(common.TextFormat), "-stop-rule", earlyStoppingRules[0], "-stop-rule", earlyStoppingRules[1], "-s-earlystop", katibEarlyStopAddress, diff --git a/sdk/python/v1beta1/docs/V1beta1FileSystemPath.md b/sdk/python/v1beta1/docs/V1beta1FileSystemPath.md index 08daa66d584..49056919a17 100644 --- a/sdk/python/v1beta1/docs/V1beta1FileSystemPath.md +++ b/sdk/python/v1beta1/docs/V1beta1FileSystemPath.md @@ -3,6 +3,7 @@ ## Properties Name | Type | Description | Notes ------------ | ------------- | ------------- | ------------- +**format** | **str** | | [optional] **kind** | **str** | | [optional] **path** | **str** | | [optional] diff --git a/sdk/python/v1beta1/kubeflow/katib/models/v1beta1_file_system_path.py b/sdk/python/v1beta1/kubeflow/katib/models/v1beta1_file_system_path.py index addc29a73f4..95d12423868 100644 --- a/sdk/python/v1beta1/kubeflow/katib/models/v1beta1_file_system_path.py +++ b/sdk/python/v1beta1/kubeflow/katib/models/v1beta1_file_system_path.py @@ -33,30 +33,56 @@ class V1beta1FileSystemPath(object): and the value is json key in definition. """ openapi_types = { + 'format': 'str', 'kind': 'str', 'path': 'str' } attribute_map = { + 'format': 'format', 'kind': 'kind', 'path': 'path' } - def __init__(self, kind=None, path=None, local_vars_configuration=None): # noqa: E501 + def __init__(self, format=None, kind=None, path=None, local_vars_configuration=None): # noqa: E501 """V1beta1FileSystemPath - a model defined in OpenAPI""" # noqa: E501 if local_vars_configuration is None: local_vars_configuration = Configuration() self.local_vars_configuration = local_vars_configuration + self._format = None self._kind = None self._path = None self.discriminator = None + if format is not None: + self.format = format if kind is not None: self.kind = kind if path is not None: self.path = path + @property + def format(self): + """Gets the format of this V1beta1FileSystemPath. # noqa: E501 + + + :return: The format of this V1beta1FileSystemPath. # noqa: E501 + :rtype: str + """ + return self._format + + @format.setter + def format(self, format): + """Sets the format of this V1beta1FileSystemPath. + + + :param format: The format of this V1beta1FileSystemPath. # noqa: E501 + :type: str + """ + + self._format = format + @property def kind(self): """Gets the kind of this V1beta1FileSystemPath. # noqa: E501 diff --git a/test/e2e/v1beta1/argo_workflow.py b/test/e2e/v1beta1/argo_workflow.py index 01d7e6124fa..f29225c23be 100644 --- a/test/e2e/v1beta1/argo_workflow.py +++ b/test/e2e/v1beta1/argo_workflow.py @@ -68,21 +68,23 @@ # Dict with Katib Experiments to run during the test. # Key - image name, Value - dockerfile location. KATIB_EXPERIMENTS = { - "random": "examples/v1beta1/hp-tuning/random.yaml", - "grid": "examples/v1beta1/hp-tuning/grid.yaml", - "bayesianoptimization": "examples/v1beta1/hp-tuning/bayesian-optimization.yaml", - "tpe": "examples/v1beta1/hp-tuning/tpe.yaml", - "multivariate-tpe": "examples/v1beta1/hp-tuning/multivariate-tpe.yaml", - "cmaes": "examples/v1beta1/hp-tuning/cma-es.yaml", - "hyperband": "examples/v1beta1/hp-tuning/hyperband.yaml", - "enas": "examples/v1beta1/nas/enas-cpu.yaml", - "darts": "examples/v1beta1/nas/darts-cpu.yaml", - "pytorchjob": "examples/v1beta1/kubeflow-training-operator/pytorchjob-mnist.yaml", - "tfjob": "examples/v1beta1/kubeflow-training-operator/tfjob-mnist-with-summaries.yaml", - "file-metricscollector": "examples/v1beta1/metrics-collector/file-metrics-collector.yaml", - "never-resume": "examples/v1beta1/resume-experiment/never-resume.yaml", - "from-volume-resume": "examples/v1beta1/resume-experiment/from-volume-resume.yaml", - "median-stop": "examples/v1beta1/early-stopping/median-stop.yaml" + "random": "examples/v1beta1/hp-tuning/random.yaml", + "grid": "examples/v1beta1/hp-tuning/grid.yaml", + "bayesianoptimization": "examples/v1beta1/hp-tuning/bayesian-optimization.yaml", + "tpe": "examples/v1beta1/hp-tuning/tpe.yaml", + "multivariate-tpe": "examples/v1beta1/hp-tuning/multivariate-tpe.yaml", + "cmaes": "examples/v1beta1/hp-tuning/cma-es.yaml", + "hyperband": "examples/v1beta1/hp-tuning/hyperband.yaml", + "enas": "examples/v1beta1/nas/enas-cpu.yaml", + "darts": "examples/v1beta1/nas/darts-cpu.yaml", + "pytorchjob": "examples/v1beta1/kubeflow-training-operator/pytorchjob-mnist.yaml", + "tfjob": "examples/v1beta1/kubeflow-training-operator/tfjob-mnist-with-summaries.yaml", + "file-metricscollector": "examples/v1beta1/metrics-collector/file-metrics-collector.yaml", + "file-metricscollector-with-json-format": "examples/v1beta1/metrics-collector/file-metrics-collector-with-json-format.yaml", + "never-resume": "examples/v1beta1/resume-experiment/never-resume.yaml", + "from-volume-resume": "examples/v1beta1/resume-experiment/from-volume-resume.yaml", + "median-stop": "examples/v1beta1/early-stopping/median-stop.yaml", + "median-stop-with-json-format": "examples/v1beta1/early-stopping/median-stop-with-json-format.yaml", } # How many Experiments are running in parallel. PARALLEL_EXECUTION = 5 diff --git a/test/e2e/v1beta1/run-e2e-experiment.go b/test/e2e/v1beta1/run-e2e-experiment.go index fe004ceec51..258abd4953e 100644 --- a/test/e2e/v1beta1/run-e2e-experiment.go +++ b/test/e2e/v1beta1/run-e2e-experiment.go @@ -207,8 +207,8 @@ func waitExperimentFinish(kclient katibclient.Client, exp *experimentsv1beta1.Ex } log.Printf("Waiting for Experiment %s to finish", exp.Name) - log.Printf(`Experiment is running: %v Trials, %v Pending Trials, %v Running Trials, %v Succeeded Trials, %v Failed Trials`, - exp.Status.Trials, exp.Status.TrialsPending, exp.Status.TrialsRunning, exp.Status.TrialsSucceeded, exp.Status.TrialsFailed) + log.Printf(`Experiment is running: %v Trials, %v Pending Trials, %v Running Trials, %v Succeeded Trials, %v Failed Trials, %v EarlyStopped Trials`, + exp.Status.Trials, exp.Status.TrialsPending, exp.Status.TrialsRunning, exp.Status.TrialsSucceeded, exp.Status.TrialsFailed, exp.Status.TrialsEarlyStopped) log.Printf("Current optimal Trial: %v", exp.Status.CurrentOptimalTrial) log.Printf("Experiment conditions: %v\n\n\n", exp.Status.Conditions) @@ -270,8 +270,13 @@ func verifyExperimentResults(kclient katibclient.Client, exp *experimentsv1beta1 *exp.Spec.MaxTrialCount, exp.Status.TrialsSucceeded) } + trialsCompleted := exp.Status.TrialsSucceeded + if exp.Spec.EarlyStopping != nil { + trialsCompleted += exp.Status.TrialsEarlyStopped + } + // Otherwise, Goal should be reached. - if exp.Status.TrialsSucceeded != *exp.Spec.MaxTrialCount && + if trialsCompleted != *exp.Spec.MaxTrialCount && ((objectiveType == commonv1beta1.ObjectiveTypeMinimize && minMetric > *goal) || (objectiveType == commonv1beta1.ObjectiveTypeMaximize && maxMetric < *goal)) { return fmt.Errorf(`Objective Goal is not reached and Succeeded Trials: %v != %v MaxTrialCount.