Skip to content

Commit

Permalink
Merge pull request #1 from Linchin/ling-exit-handler
Browse files Browse the repository at this point in the history
Ling exit handler
  • Loading branch information
Linchin authored Apr 22, 2022
2 parents fa0258b + 7f01604 commit bd08f64
Show file tree
Hide file tree
Showing 33 changed files with 530 additions and 184 deletions.
23 changes: 23 additions & 0 deletions api/v2alpha1/pipeline_spec.proto
Original file line number Diff line number Diff line change
Expand Up @@ -484,6 +484,29 @@ message PipelineTaskSpec {
// Iterator to iterate over a parameter input.
ParameterIteratorSpec parameter_iterator = 10;
}

// User-configured task-level retry.
message RetryPolicy {
// Number of retries before considering a task as failed. Set to 0 or
// unspecified to disallow retry."
int32 max_retry_count = 1;

// The time interval between retries. Defaults to zero (an immediate retry).
google.protobuf.Duration backoff_duration = 2;

// The exponential backoff factor applied to backoff_duration. If
// unspecified, will default to 2.
double backoff_factor = 3;

// The maximum duration during which the task will be retried according to
// the backoff strategy. Max allowed is 1 hour - higher value will be capped
// to this limit. If unspecified, will set to 1 hour.
google.protobuf.Duration backoff_max_duration = 4;
}

// User-configured task-level retry.
// Applicable only to component tasks.
RetryPolicy retry_policy = 11;
}

// The spec of an artifact iterator. It supports fan-out a workflow from a list
Expand Down
3 changes: 3 additions & 0 deletions backend/src/apiserver/template/v2_template.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@ import (
structpb "github.com/golang/protobuf/ptypes/struct"

"github.com/ghodss/yaml"
"github.com/golang/glog"
"github.com/kubeflow/pipelines/api/v2alpha1/go/pipelinespec"
api "github.com/kubeflow/pipelines/backend/api/go_client"
"github.com/kubeflow/pipelines/backend/src/common/util"
Expand Down Expand Up @@ -36,6 +37,7 @@ func (t *V2Spec) ScheduledWorkflow(apiJob *api.Job) (*scheduledworkflow.Schedule
}
job.RuntimeConfig = jobRuntimeConfig
wf, err := argocompiler.Compile(job, nil)
glog.Infof("wf value: %+v", wf)
if err != nil {
return nil, util.Wrap(err, "Failed to compile job")
}
Expand Down Expand Up @@ -147,6 +149,7 @@ func (t *V2Spec) RunWorkflow(apiRun *api.Run, options RunWorkflowOptions) (*util
}
job.RuntimeConfig = jobRuntimeConfig
wf, err := argocompiler.Compile(job, nil)
glog.Infof("wf value: %+v", wf)
if err != nil {
return nil, util.Wrap(err, "Failed to compile job")
}
Expand Down
4 changes: 4 additions & 0 deletions backend/src/v2/cmd/compiler/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -42,6 +42,9 @@ func main() {
flag.Parse()
noSpec := specPath == nil || *specPath == ""
noJob := jobPath == nil || *jobPath == ""
glog.Info("Hiiiiiiiiiiiiiiii!!!!!!!!!!!!!!!!!!!!!!!!!!!")
noSpec = true
noJob = true
if noSpec && noJob {
glog.Exitf("spec or job must be specified")
}
Expand Down Expand Up @@ -70,6 +73,7 @@ func compile(job *pipelinespec.PipelineJob) error {
LauncherImage: *launcher,
PipelineRoot: *pipelineRoot,
})
glog.Infof("wf value: %+v", wf)
if err != nil {
return err
}
Expand Down
15 changes: 14 additions & 1 deletion backend/src/v2/compiler/argocompiler/argo.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@ import (
"strings"

wfapi "github.com/argoproj/argo-workflows/v3/pkg/apis/workflow/v1alpha1"
"github.com/golang/glog"
"github.com/kubeflow/pipelines/api/v2alpha1/go/pipelinespec"
"github.com/kubeflow/pipelines/backend/src/v2/compiler"
"google.golang.org/protobuf/proto"
Expand Down Expand Up @@ -73,6 +74,16 @@ func Compile(jobArg *pipelinespec.PipelineJob, opts *Options) (*wfapi.Workflow,
}
}
}
// fill in exit handler
var exit_task string
all_root_tasks := spec.GetRoot().GetDag().GetTasks()
for task_name, task_spec := range all_root_tasks {
glog.Infof("task name, task spec: %s, %+v", task_name, task_spec)
if task_spec.GetTriggerPolicy().GetStrategy().String() == "ALL_UPSTREAM_TASKS_COMPLETED" {
exit_task = task_name
glog.Infof("exit task: %s", exit_task)
}
}
// initialization
wf := &wfapi.Workflow{
TypeMeta: k8smeta.TypeMeta{
Expand Down Expand Up @@ -101,13 +112,15 @@ func Compile(jobArg *pipelinespec.PipelineJob, opts *Options) (*wfapi.Workflow,
},
ServiceAccountName: "pipeline-runner",
Entrypoint: tmplEntrypoint,
OnExit: exit_task,
},
}
c := &workflowCompiler{
wf: wf,
templates: make(map[string]*wfapi.Template),
// TODO(chensun): release process and update the images.
driverImage: "gcr.io/ml-pipeline-test/dev/kfp-driver:latest",
// driverImage: "gcr.io/ml-pipeline-test/dev/kfp-driver:latest",
driverImage: "gcr.io/ling-kfp/dev/kfp-driver:latest",
launcherImage: "gcr.io/ml-pipeline-test/dev/kfp-launcher-v2:latest",
job: job,
spec: spec,
Expand Down
42 changes: 38 additions & 4 deletions backend/src/v2/driver/driver.go
Original file line number Diff line number Diff line change
Expand Up @@ -122,6 +122,7 @@ func RootDAG(ctx context.Context, opts Options, mlmd *metadata.Client) (executio
}
// TODO(Bobgy): fill in run resource.
pipeline, err := mlmd.GetPipeline(ctx, opts.PipelineName, opts.RunID, opts.Namespace, "run-resource", pipelineRoot)
glog.Infof("metadata pipeline: %+v", pipeline)
if err != nil {
return nil, err
}
Expand Down Expand Up @@ -414,6 +415,7 @@ func DAG(ctx context.Context, opts Options, mlmd *metadata.Client) (execution *E
executorInput := &pipelinespec.ExecutorInput{
Inputs: inputs,
}
glog.Infof("executorInput value: %+v", executorInput)
execution = &Execution{ExecutorInput: executorInput}
condition := opts.Task.GetTriggerPolicy().GetCondition()
if condition != "" {
Expand All @@ -436,14 +438,37 @@ func DAG(ctx context.Context, opts Options, mlmd *metadata.Client) (execution *E
return execution, fmt.Errorf("ArtifactIterator is not implemented")
}
isIterator := opts.Task.GetParameterIterator() != nil && opts.IterationIndex < 0
// Fan out iterations
if execution.WillTrigger() && isIterator {
iterator := opts.Task.GetParameterIterator()
value, ok := executorInput.GetInputs().GetParameterValues()[iterator.GetItems().GetInputParameter()]
report := func(err error) error {
return fmt.Errorf("iterating on item input %q failed: %w", iterator.GetItemInput(), err)
}
if !ok {
return execution, report(fmt.Errorf("cannot find input parameter"))
// Check the items type of parameterIterator:
// It can be "inputParameter" or "Raw"
var value *structpb.Value
var ok bool
switch iterator.GetItems().GetKind().(type) {
case *pipelinespec.ParameterIteratorSpec_ItemsSpec_InputParameter:
value, ok = executorInput.GetInputs().GetParameterValues()[iterator.GetItems().GetInputParameter()]
if !ok {
return execution, report(fmt.Errorf("cannot find input parameter"))
}
case *pipelinespec.ParameterIteratorSpec_ItemsSpec_Raw:
value_raw := iterator.GetItems().GetRaw()
var unmarshalled_raw interface{}
err = json.Unmarshal([]byte(value_raw), &unmarshalled_raw)
if err != nil {
return execution, fmt.Errorf("error unmarshall raw string: %q", err)
}
value, err = structpb.NewValue(unmarshalled_raw)
if err != nil {
return execution, fmt.Errorf("error converting unmarshalled raw string into protobuf Value type: %q", err)
}
// Add the raw input to the executor input
execution.ExecutorInput.Inputs.ParameterValues[iterator.GetItemInput()] = value
default:
return execution, fmt.Errorf("cannot find parameter iterator")
}
items, err := getItems(value)
if err != nil {
Expand Down Expand Up @@ -724,7 +749,16 @@ func resolveInputs(ctx context.Context, dag *metadata.DAG, iterationIndex *int,
case task.GetArtifactIterator() != nil:
return nil, fmt.Errorf("artifact iterator not implemented yet")
case task.GetParameterIterator() != nil:
itemsInput := task.GetParameterIterator().GetItems().GetInputParameter()
var itemsInput string
if task.GetParameterIterator().GetItems().GetInputParameter() != "" {
// input comes from outside the component
itemsInput = task.GetParameterIterator().GetItems().GetInputParameter()
} else if task.GetParameterIterator().GetItemInput() != "" {
// input comes from static input
itemsInput = task.GetParameterIterator().GetItemInput()
} else {
return nil, fmt.Errorf("cannot retrieve parameter iterator.")
}
items, err := getItems(inputs.ParameterValues[itemsInput])
if err != nil {
return nil, err
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,4 +16,5 @@ Components
google_cloud_pipeline_components.experimental.forecasting
google_cloud_pipeline_components.experimental.hyperparameter_tuning_job
google_cloud_pipeline_components.experimental.tensorflow_probability
google_cloud_pipeline_components.experimental.vertex_notification_email
google_cloud_pipeline_components.experimental.wait_gcp_resources
Original file line number Diff line number Diff line change
@@ -0,0 +1,34 @@
name: import_model_evaluation
description: |
Calls ModelService.ImportModelEvaluation to import a model evaluation file to Vertex
Args:
metrics (system.Metrics):
Path of metrics generated from an evaluation component.
explanation (system.Metrics):
Path for model explanation metrics generated from an evaluation comonent.
model (google.VertexModel):
Vertex model resource that will be the parent resource of the uploaded evaluation.
metrics_schema_uri (str):
GCS link to the schema URI for model evaluation metrics.
inputs:
- {name: metrics, type: Metrics}
- {name: explanation, type: Metrics, optional: True}
- {name: model, type: google.VertexModel}
- {name: metrics_schema_uri, type: String}
implementation:
container:
image: gcr.io/ml-pipeline/google-cloud-pipeline-components:latest
command: [python3, -u, -m, google_cloud_pipeline_components.container.experimental.evaluation.import_model_evaluation]
args:
- --metrics
- {inputPath: metrics}
- if:
cond: {isPresent: explanation}
then:
- --explanation
- "{{$.inputs.artifacts['explanation'].metadata['explanation_gcs_path']}}"
- --metrics_schema_uri
- {inputValue: metrics_schema_uri}
- --model_name
- "{{$.inputs.artifacts['model'].metadata['resourceName']}}"
Original file line number Diff line number Diff line change
@@ -0,0 +1,14 @@
# Copyright 2021 The Kubeflow Authors. All Rights Reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
"""Google Cloud Pipeline Evaluation Components root."""
Original file line number Diff line number Diff line change
@@ -0,0 +1,112 @@
# Copyright 2022 The Kubeflow Authors. All Rights Reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
"""Module for importing a model evaluation to an existing Vertex model resource."""

import sys
import argparse
import json
import six

from google.cloud import aiplatform
from google.api_core import gapic_v1
from google.protobuf.struct_pb2 import Value, Struct, NULL_VALUE, ListValue


def main(argv):
"""Calls ModelService.ImportModelEvaluation"""
parser = argparse.ArgumentParser(
prog='Vertex Model Service evaluation importer', description='')
parser.add_argument(
'--metrics',
dest='metrics',
type=str,
required=True,
default=argparse.SUPPRESS)
parser.add_argument(
'--explanation', dest='explanation', type=str, default=None)
parser.add_argument(
'--metrics_schema_uri',
dest='metrics_schema_uri',
type=str,
required=True,
default=argparse.SUPPRESS)
parser.add_argument(
'--model_name',
dest='model_name',
type=str,
required=True,
default=argparse.SUPPRESS)

parsed_args, _ = parser.parse_known_args(argv)

_, project_id, _, location, _, model_id = parsed_args.model_name.split('/')

with open(parsed_args.metrics) as metrics_file:
model_evaluation = {
'metrics':
to_value(
next(
iter(
json.loads(metrics_file.read())['slicedMetrics'][0]
['metrics'].values()))),
'metrics_schema_uri':
parsed_args.metrics_schema_uri,
}

if parsed_args.explanation:
with open('/gcs' + parsed_args.explanation[4:]) as explanation_file:
model_evaluation['model_explanation'] = {
'mean_attributions': [{
'feature_attributions':
to_value(
json.loads(explanation_file.read())['explanation']
['attributions'][0]['featureAttributions'])
}]
}
print(model_evaluation)
aiplatform.gapic.ModelServiceClient(
client_info=gapic_v1.client_info.ClientInfo(
user_agent='google-cloud-pipeline-components',),
client_options={
'api_endpoint': location + '-aiplatform.googleapis.com',
}).import_model_evaluation(
parent=parsed_args.model_name,
model_evaluation=model_evaluation,
)


def to_value(value):
if value is None:
return Value(null_value=NULL_VALUE)
elif isinstance(value, bool):
# This check needs to happen before isinstance(value, int),
# isinstance(value, int) returns True when value is bool.
return Value(bool_value=value)
elif isinstance(value, six.integer_types) or isinstance(value, float):
return Value(number_value=value)
elif isinstance(value, six.string_types) or isinstance(value, six.text_type):
return Value(string_value=value)
elif isinstance(value, dict):
return Value(
struct_value=Struct(fields={k: to_value(v) for k, v in value.items()}))
elif isinstance(value, list):
return Value(
list_value=ListValue(values=[to_value(item) for item in value]))
else:
raise ValueError('Unsupported data type: {}'.format(type(value)))


if __name__ == '__main__':
print(sys.argv)
main(sys.argv[1:])
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,7 @@
'CvTrainerOp', 'InfraValidatorOp', 'Stage1TunerOp', 'EnsembleOp',
'StatsAndExampleGenOp', 'FeatureSelectionOp', 'TransformOp', 'FinalizerOp',
'WideAndDeepTrainerOp', 'BuiltinAlgorithmHyperparameterTuningJobOp',
'TabNetTrainerOp'
'TabNetTrainerOp', 'FeatureTransformEngineOp'
]

CvTrainerOp = load_component_from_file(
Expand All @@ -41,6 +41,8 @@
os.path.join(os.path.dirname(__file__), 'feature_selection.yaml'))
TransformOp = load_component_from_file(
os.path.join(os.path.dirname(__file__), 'transform.yaml'))
FeatureTransformEngineOp = load_component_from_file(
os.path.join(os.path.dirname(__file__), 'feature_transform_engine.yaml'))
FinalizerOp = load_component_from_file(
os.path.join(os.path.dirname(__file__), 'finalizer.yaml'))
WideAndDeepTrainerOp = load_component_from_file(
Expand Down
Loading

0 comments on commit bd08f64

Please sign in to comment.