diff --git a/backend/build_api_server.sh b/backend/build_api_server.sh index b2f80262d62..97e9871bf5f 100755 --- a/backend/build_api_server.sh +++ b/backend/build_api_server.sh @@ -36,7 +36,7 @@ LONGOPTS=use_remote_build,gcp_credentials_file: PARSED=$(getopt --longoptions=$LONGOPTS --options=$OPTS --name "$0" -- "$@") eval set -- "$PARSED" -USE_REMOTE_BUILD=true +USE_REMOTE_BUILD=false GCP_CREDENTIALS_FILE="gs://ml-pipeline-test-bazel/ml-pipeline-test-bazel-builder-credentials.json" MACHINE_ARCH=`uname -m` diff --git a/backend/src/apiserver/main.go b/backend/src/apiserver/main.go index f0202630865..b072943a6b4 100644 --- a/backend/src/apiserver/main.go +++ b/backend/src/apiserver/main.go @@ -40,12 +40,6 @@ import ( "google.golang.org/grpc/reflection" ) -const ( - HasDefaultBucketEnvVar = "HAS_DEFAULT_BUCKET" - ProjectIDEnvVar = "PROJECT_ID" - DefaultBucketNameEnvVar = "BUCKET_NAME" -) - var ( rpcPortFlag = flag.String("rpcPortFlag", ":8887", "RPC Port") httpPortFlag = flag.String("httpPortFlag", ":8888", "Http Proxy Port") @@ -196,19 +190,6 @@ func loadSamples(resourceManager *resource.ResourceManager) error { if configErr != nil { return fmt.Errorf("Failed to decompress the file %s. Error: %v", config.Name, configErr) } - // Patch the default bucket name read from ConfigMap - if common.GetBoolConfigWithDefault(HasDefaultBucketEnvVar, false) { - defaultBucket := common.GetStringConfig(DefaultBucketNameEnvVar) - projectId := common.GetStringConfig(ProjectIDEnvVar) - patchMap := map[string]string{ - "": defaultBucket, - "": projectId, - } - pipelineFile, err = server.PatchPipelineDefaultParameter(pipelineFile, patchMap) - if err != nil { - return fmt.Errorf("Failed to patch default value to %s. Error: %v", config.Name, err) - } - } _, configErr = resourceManager.CreatePipeline(config.Name, config.Description, pipelineFile) if configErr != nil { // Log the error but not fail. The API Server pod can restart and it could potentially cause name collision. diff --git a/backend/src/apiserver/server/run_server.go b/backend/src/apiserver/server/run_server.go index ef4541fff41..2c0364f8b89 100644 --- a/backend/src/apiserver/server/run_server.go +++ b/backend/src/apiserver/server/run_server.go @@ -16,6 +16,7 @@ package server import ( "context" + "fmt" "github.com/golang/protobuf/ptypes/empty" api "github.com/kubeflow/pipelines/backend/api/go_client" @@ -30,7 +31,23 @@ type RunServer struct { resourceManager *resource.ResourceManager } +const ( + HasDefaultBucketEnvVar = "HAS_DEFAULT_BUCKET" + ProjectIDEnvVar = "PROJECT_ID" + DefaultBucketNameEnvVar = "BUCKET_NAME" +) + func (s *RunServer) CreateRun(ctx context.Context, request *api.CreateRunRequest) (*api.RunDetail, error) { + // Patch default values + for _, param := range request.Run.PipelineSpec.Parameters { + if common.GetBoolConfigWithDefault(HasDefaultBucketEnvVar, false) { + var err error + param.Value, err = PatchPipelineDefaultParameter(param.Value) + if err != nil { + return nil, fmt.Errorf("failed to patch default value to pipeline. Error: %v", err) + } + } + } err := s.validateCreateRunRequest(request) if err != nil { return nil, util.Wrap(err, "Validate create run request failed.") diff --git a/backend/src/apiserver/server/run_server_test.go b/backend/src/apiserver/server/run_server_test.go index 0e629e3a98d..caef700f601 100644 --- a/backend/src/apiserver/server/run_server_test.go +++ b/backend/src/apiserver/server/run_server_test.go @@ -62,6 +62,62 @@ func TestCreateRun(t *testing.T) { assert.Equal(t, expectedRunDetail, *runDetail) } +func TestCreateRunPatch(t *testing.T) { + clients, manager, experiment := initWithExperiment(t) + viper.Set(HasDefaultBucketEnvVar, "true") + viper.Set(ProjectIDEnvVar, "test-project-id") + viper.Set(DefaultBucketNameEnvVar, "test-default-bucket") + defer clients.Close() + server := NewRunServer(manager) + run := &api.Run{ + Name: "123", + ResourceReferences: validReference, + PipelineSpec: &api.PipelineSpec{ + WorkflowManifest: testWorkflowPatch.ToStringForStore(), + Parameters: []*api.Parameter{ + {Name: "param1", Value: "{{kfp-default-bucket}}"}, + {Name: "param2", Value: "{{kfp-project-id}}"}}, + }, + } + runDetail, err := server.CreateRun(nil, &api.CreateRunRequest{Run: run}) + assert.Nil(t, err) + + expectedRuntimeWorkflow := testWorkflowPatch.DeepCopy() + expectedRuntimeWorkflow.Spec.Arguments.Parameters = []v1alpha1.Parameter{ + {Name: "param1", Value: util.StringPointer("test-default-bucket")}, + {Name: "param2", Value: util.StringPointer("test-project-id")}, + } + expectedRuntimeWorkflow.Labels = map[string]string{util.LabelKeyWorkflowRunId: "123e4567-e89b-12d3-a456-426655440000"} + expectedRuntimeWorkflow.Annotations = map[string]string{util.AnnotationKeyRunName: "123"} + expectedRuntimeWorkflow.Spec.ServiceAccountName = "pipeline-runner" + expectedRunDetail := api.RunDetail{ + Run: &api.Run{ + Id: "123e4567-e89b-12d3-a456-426655440000", + Name: "123", + StorageState: api.Run_STORAGESTATE_AVAILABLE, + CreatedAt: ×tamp.Timestamp{Seconds: 2}, + ScheduledAt: ×tamp.Timestamp{}, + FinishedAt: ×tamp.Timestamp{}, + PipelineSpec: &api.PipelineSpec{ + WorkflowManifest: testWorkflowPatch.ToStringForStore(), + Parameters: []*api.Parameter{ + {Name: "param1", Value: "test-default-bucket"}, + {Name: "param2", Value: "test-project-id"}}, + }, + ResourceReferences: []*api.ResourceReference{ + { + Key: &api.ResourceKey{Type: api.ResourceType_EXPERIMENT, Id: experiment.UUID}, + Name: "123", Relationship: api.Relationship_OWNER, + }, + }, + }, + PipelineRuntime: &api.PipelineRuntime{ + WorkflowManifest: util.NewWorkflow(expectedRuntimeWorkflow).ToStringForStore(), + }, + } + assert.Equal(t, expectedRunDetail, *runDetail) +} + func TestCreateRun_Unauthorized(t *testing.T) { clients, manager, _ := initWithExperiment_KFAM_Unauthorized(t) defer clients.Close() diff --git a/backend/src/apiserver/server/test_util.go b/backend/src/apiserver/server/test_util.go index 420baaabc9d..779de5830de 100644 --- a/backend/src/apiserver/server/test_util.go +++ b/backend/src/apiserver/server/test_util.go @@ -42,6 +42,12 @@ var testWorkflow2 = util.NewWorkflow(&v1alpha1.Workflow{ Spec: v1alpha1.WorkflowSpec{Arguments: v1alpha1.Arguments{Parameters: []v1alpha1.Parameter{{Name: "param1"}}}}, }) +var testWorkflowPatch = util.NewWorkflow(&v1alpha1.Workflow{ + TypeMeta: v1.TypeMeta{APIVersion: "argoproj.io/v1alpha1", Kind: "Workflow"}, + ObjectMeta: v1.ObjectMeta{Name: "workflow-name", UID: "workflow2"}, + Spec: v1alpha1.WorkflowSpec{Arguments: v1alpha1.Arguments{Parameters: []v1alpha1.Parameter{{Name: "param1"}, {Name: "param2"}}}}, +}) + var validReference = []*api.ResourceReference{ { Key: &api.ResourceKey{ diff --git a/backend/src/apiserver/server/util.go b/backend/src/apiserver/server/util.go index 69e7d7f76f2..2bf36d20fe4 100644 --- a/backend/src/apiserver/server/util.go +++ b/backend/src/apiserver/server/util.go @@ -182,16 +182,20 @@ func ReadPipelineFile(fileName string, fileReader io.Reader, maxFileLength int) return processedFile, nil } -// Mutate default values of specified pipeline file. +// Mutate default values of specified pipeline spec. // Args: -// file: pipeline file in bytes. -// toPatch: mapping from the old value to its new value. -func PatchPipelineDefaultParameter(file []byte, toPatch map[string]string) ([]byte, error) { - pipelineRawString := string(file) +// text: (part of) pipeline file in string. +func PatchPipelineDefaultParameter(text string) (string, error) { + defaultBucket := common.GetStringConfig(DefaultBucketNameEnvVar) + projectId := common.GetStringConfig(ProjectIDEnvVar) + toPatch := map[string]string{ + "{{kfp-default-bucket}}": defaultBucket, + "{{kfp-project-id}}": projectId, + } for key, value := range toPatch { - pipelineRawString = strings.Replace(pipelineRawString, key, value, -1) + text = strings.Replace(text, key, value, -1) } - return []byte(pipelineRawString), nil + return text, nil } func printParameters(params []*api.Parameter) string { diff --git a/backend/src/apiserver/server/util_test.go b/backend/src/apiserver/server/util_test.go index 6a20e21b0a8..cb9515d3a91 100644 --- a/backend/src/apiserver/server/util_test.go +++ b/backend/src/apiserver/server/util_test.go @@ -131,19 +131,6 @@ func TestReadPipelineFile_YAML(t *testing.T) { assert.Equal(t, expectedFileBytes, fileBytes) } -func TestParameterPatch(t *testing.T) { - file, _ := os.Open("test/arguments-parameters.yaml") - fileBytes, err := ReadPipelineFile("arguments-parameters.yaml", file, MaxFileLength) - patchMap := map[string]string{ - "hello": "new-hello", - } - fileBytes, err = PatchPipelineDefaultParameter(fileBytes, patchMap) - assert.Nil(t, err) - - expectedFileBytes, _ := ioutil.ReadFile("test/patched-arguments-parameters.yaml") - assert.Equal(t, expectedFileBytes, fileBytes) -} - func TestReadPipelineFile_Zip(t *testing.T) { file, _ := os.Open("test/arguments_zip/arguments-parameters.zip") pipelineFile, err := ReadPipelineFile("arguments-parameters.zip", file, MaxFileLength) diff --git a/samples/core/parameterized_tfx_oss/parameterized_tfx_oss.py b/samples/core/parameterized_tfx_oss/parameterized_tfx_oss.py index 13f5be3e531..701c1ebdd7c 100644 --- a/samples/core/parameterized_tfx_oss/parameterized_tfx_oss.py +++ b/samples/core/parameterized_tfx_oss/parameterized_tfx_oss.py @@ -54,7 +54,7 @@ # Path of pipeline root, should be a GCS path. pipeline_root = os.path.join( - 'gs://', 'tfx_taxi_simple', kfp.dsl.RUN_ID_PLACEHOLDER + 'gs://{{kfp-default-bucket}}', 'tfx_taxi_simple', kfp.dsl.RUN_ID_PLACEHOLDER ) diff --git a/samples/core/parameterized_tfx_oss/taxi_pipeline_notebook.ipynb b/samples/core/parameterized_tfx_oss/taxi_pipeline_notebook.ipynb index adf6b6a08ea..e4ce8736e26 100644 --- a/samples/core/parameterized_tfx_oss/taxi_pipeline_notebook.ipynb +++ b/samples/core/parameterized_tfx_oss/taxi_pipeline_notebook.ipynb @@ -79,12 +79,12 @@ "# In TFX MLMD schema, pipeline name is used as the unique id of each pipeline.\n", "# Assigning workflow ID as part of pipeline name allows the user to bypass\n", "# some schema checks which are redundant for experimental pipelines.\n", - "pipeline_name = 'taxi_pipeline_with_parameters_' + kfp.dsl.RUN_ID_PLACEHOLDER\n", + "pipeline_name = 'taxi_pipeline_with_parameters'\n", "\n", "# Path of pipeline data root, should be a GCS path.\n", "# Note that when running on KFP, the pipeline root is always a runtime parameter.\n", "# The value specified here will be its default.\n", - "pipeline_root = os.path.join('gs://my-bucket', 'tfx_taxi_simple',\n", + "pipeline_root = os.path.join('gs://{{kfp-default-bucket}}', 'tfx_taxi_simple',\n", " kfp.dsl.RUN_ID_PLACEHOLDER)\n", "\n", "# Location of input data, should be a GCS path under which there is a csv file.\n", @@ -308,7 +308,8 @@ ").create_run_from_pipeline_package(\n", " pipeline_name + '.tar.gz', \n", " arguments={\n", - " 'pipeline-root': 'gs:///tfx_taxi_simple/' + kfp.dsl.RUN_ID_PLACEHOLDER,\n", + " # Uncomment following lines in order to use custom GCS bucket/module file/training data.\n", + " # 'pipeline-root': 'gs:///tfx_taxi_simple/' + kfp.dsl.RUN_ID_PLACEHOLDER,\n", " # 'module-file': '', # delete this line to use default module file.\n", " # 'data-root': '' # delete this line to use default data.\n", "})" @@ -332,8 +333,17 @@ "nbconvert_exporter": "python", "pygments_lexer": "ipython3", "version": "3.7.5rc1" + }, + "pycharm": { + "stem_cell": { + "cell_type": "raw", + "source": [], + "metadata": { + "collapsed": false + } + } } }, "nbformat": 4, "nbformat_minor": 4 -} +} \ No newline at end of file diff --git a/samples/core/xgboost_training_cm/xgboost_training_cm.py b/samples/core/xgboost_training_cm/xgboost_training_cm.py index cab0b6d6103..665c36eacc3 100644 --- a/samples/core/xgboost_training_cm/xgboost_training_cm.py +++ b/samples/core/xgboost_training_cm/xgboost_training_cm.py @@ -160,7 +160,7 @@ def dataproc_train_op( region=region, cluster_name=cluster_name, main_class=_TRAINER_MAIN_CLS, - spark_job=json.dumps({ 'jarFileUris': [_XGBOOST_PKG]}), + spark_job=json.dumps({'jarFileUris': [_XGBOOST_PKG]}), args=json.dumps([ str(config), str(rounds), @@ -189,7 +189,7 @@ def dataproc_predict_op( region=region, cluster_name=cluster_name, main_class=_PREDICTOR_MAIN_CLS, - spark_job=json.dumps({ 'jarFileUris': [_XGBOOST_PKG]}), + spark_job=json.dumps({'jarFileUris': [_XGBOOST_PKG]}), args=json.dumps([ str(model), str(data), @@ -205,8 +205,8 @@ def dataproc_predict_op( description='A trainer that does end-to-end distributed training for XGBoost models.' ) def xgb_train_pipeline( - output='gs://', - project='', + output='gs://{{kfp-default-bucket}}', + project='{{kfp-project-id}}', cluster_name='xgb-%s' % dsl.RUN_ID_PLACEHOLDER, region='us-central1', train_data='gs://ml-pipeline-playground/sfpd/train.csv',