From d1b72df5fb2a640342945cb40ed2e02dcb9872b0 Mon Sep 17 00:00:00 2001 From: jingzhang36 Date: Fri, 7 Feb 2020 16:13:36 +0800 Subject: [PATCH] Upload local file as new pipeline version Step 1 (#3001) * add upload pipeline version to upload_pipeline_server and http main * add apiPipelineVersion to pipeline upload swagger json * add apiResourceReference to pipeline upload swagger json * Add yet more types to pipeline upload swagger json * Unit tests --- .../swagger/kfp_api_single_file.swagger.json | 43 +++++ .../api/swagger/pipeline.upload.swagger.json | 133 ++++++++++++++ backend/src/apiserver/main.go | 1 + .../apiserver/resource/client_manager_fake.go | 7 + .../server/pipeline_upload_server.go | 72 +++++++- .../server/pipeline_upload_server_test.go | 172 ++++++++++++++++++ .../arguments-version.tar.gz | Bin 0 -> 754 bytes backend/src/apiserver/server/util.go | 10 +- .../api_server/pipeline_upload_client.go | 4 + .../api_server/pipeline_upload_client_fake.go | 3 + backend/test/integration/job_api_test.go | 2 + backend/test/integration/pipeline_api_test.go | 2 + backend/test/integration/run_api_test.go | 2 + 13 files changed, 445 insertions(+), 6 deletions(-) create mode 100644 backend/src/apiserver/server/test/arguments_tarball/arguments-version.tar.gz diff --git a/backend/api/swagger/kfp_api_single_file.swagger.json b/backend/api/swagger/kfp_api_single_file.swagger.json index 93e486eba76..c8fa0c1a47d 100644 --- a/backend/api/swagger/kfp_api_single_file.swagger.json +++ b/backend/api/swagger/kfp_api_single_file.swagger.json @@ -1182,6 +1182,49 @@ "PipelineUploadService" ] } + }, + "/apis/v1beta1/pipelines/upload_version": { + "post": { + "operationId": "UploadPipelineVersion", + "consumes": [ + "multipart/form-data" + ], + "produces": [ + "application/json" + ], + "responses": { + "200": { + "description": "", + "schema": { + "$ref": "#/definitions/apiPipelineVersion" + } + }, + "default": { + "description": "", + "schema": { + "$ref": "#/definitions/apiStatus" + } + } + }, + "parameters": [ + { + "name": "uploadfile", + "in": "formData", + "required": true, + "type": "file", + "description": "The pipeline to upload. Maximum size of 32MB is supported." + }, + { + "name": "name", + "in": "query", + "required": false, + "type": "string" + } + ], + "tags": [ + "PipelineUploadService" + ] + } } }, "definitions": { diff --git a/backend/api/swagger/pipeline.upload.swagger.json b/backend/api/swagger/pipeline.upload.swagger.json index 42d1e73d9c5..01649fde938 100644 --- a/backend/api/swagger/pipeline.upload.swagger.json +++ b/backend/api/swagger/pipeline.upload.swagger.json @@ -51,6 +51,49 @@ "PipelineUploadService" ] } + }, + "/apis/v1beta1/pipelines/upload_version": { + "post": { + "operationId": "UploadPipelineVersion", + "consumes": [ + "multipart/form-data" + ], + "produces": [ + "application/json" + ], + "responses": { + "200": { + "description": "", + "schema": { + "$ref": "#/definitions/apiPipelineVersion" + } + }, + "default": { + "description": "", + "schema": { + "$ref": "#/definitions/apiStatus" + } + } + }, + "parameters": [ + { + "name": "uploadfile", + "in": "formData", + "required": true, + "type": "file", + "description": "The pipeline to upload. Maximum size of 32MB is supported." + }, + { + "name": "name", + "in": "query", + "required": false, + "type": "string" + } + ], + "tags": [ + "PipelineUploadService" + ] + } } }, "definitions": { @@ -93,6 +136,96 @@ } } }, + "apiPipelineVersion": { + "type": "object", + "properties": { + "id": { + "type": "string", + "description": "Output. Unique version ID. Generated by API server." + }, + "name": { + "type": "string", + "description": "Optional input field. Version name provided by user." + }, + "created_at": { + "type": "string", + "format": "date-time", + "description": "Output. The time this pipeline version is created." + }, + "parameters": { + "type": "array", + "items": { + "$ref": "#/definitions/apiParameter" + }, + "description": "Output. The input parameters for this pipeline." + }, + "code_source_url": { + "type": "string", + "description": "Input. Optional. Pipeline version code source." + }, + "package_url": { + "$ref": "#/definitions/apiUrl", + "description": "Input. Required. Pipeline version package url.\nWhe calling CreatePipelineVersion API method, need to provide one package\nfile location." + }, + "resource_references": { + "type": "array", + "items": { + "$ref": "#/definitions/apiResourceReference" + }, + "description": "Input. Required. E.g., specify which pipeline this pipeline version belongs\nto." + } + } + }, + "apiRelationship": { + "type": "string", + "enum": [ + "UNKNOWN_RELATIONSHIP", + "OWNER", + "CREATOR" + ], + "default": "UNKNOWN_RELATIONSHIP" + }, + "apiResourceKey": { + "type": "object", + "properties": { + "type": { + "$ref": "#/definitions/apiResourceType", + "description": "The type of the resource that referred to." + }, + "id": { + "type": "string", + "description": "The ID of the resource that referred to." + } + } + }, + "apiResourceReference": { + "type": "object", + "properties": { + "key": { + "$ref": "#/definitions/apiResourceKey" + }, + "name": { + "type": "string", + "description": "The name of the resource that referred to." + }, + "relationship": { + "$ref": "#/definitions/apiRelationship", + "description": "Required field. The relationship from referred resource to the object." + } + } + }, + "apiResourceType": { + "type": "string", + "enum": [ + "UNKNOWN_RESOURCE_TYPE", + "EXPERIMENT", + "JOB", + "PIPELINE", + "PIPELINE_VERSION", + "NAMESPACE" + ], + "default": "UNKNOWN_RESOURCE_TYPE" + }, "apiStatus": { "type": "object", "properties": { diff --git a/backend/src/apiserver/main.go b/backend/src/apiserver/main.go index 6ae72ca61a7..f0202630865 100644 --- a/backend/src/apiserver/main.go +++ b/backend/src/apiserver/main.go @@ -140,6 +140,7 @@ func startHttpProxy(resourceManager *resource.ResourceManager) { // https://github.com/grpc-ecosystem/grpc-gateway/issues/410 pipelineUploadServer := server.NewPipelineUploadServer(resourceManager) topMux.HandleFunc("/apis/v1beta1/pipelines/upload", pipelineUploadServer.UploadPipeline) + topMux.HandleFunc("/apis/v1beta1/pipelines/upload_version", pipelineUploadServer.UploadPipelineVersion) topMux.HandleFunc("/apis/v1beta1/healthz", func(w http.ResponseWriter, r *http.Request) { io.WriteString(w, `{"commit_sha":"`+common.GetStringConfig("COMMIT_SHA")+`"}`) }) diff --git a/backend/src/apiserver/resource/client_manager_fake.go b/backend/src/apiserver/resource/client_manager_fake.go index 47d42dd52fe..5cdf7fd4dc5 100644 --- a/backend/src/apiserver/resource/client_manager_fake.go +++ b/backend/src/apiserver/resource/client_manager_fake.go @@ -154,3 +154,10 @@ func (f *FakeClientManager) KFAMClient() client.KFAMClientInterface { func (f *FakeClientManager) Close() error { return f.db.Close() } + +// Update the uuid used in this fake client manager +func (f *FakeClientManager) UpdateUUID(uuid util.UUIDGeneratorInterface) { + f.uuid = uuid + f.experimentStore = storage.NewExperimentStore(f.db, f.time, uuid) + f.pipelineStore = storage.NewPipelineStore(f.db, f.time, uuid) +} diff --git a/backend/src/apiserver/server/pipeline_upload_server.go b/backend/src/apiserver/server/pipeline_upload_server.go index 04f734976cd..d8d712f7b05 100644 --- a/backend/src/apiserver/server/pipeline_upload_server.go +++ b/backend/src/apiserver/server/pipeline_upload_server.go @@ -25,6 +25,7 @@ import ( api "github.com/kubeflow/pipelines/backend/api/go_client" "github.com/kubeflow/pipelines/backend/src/apiserver/resource" "github.com/kubeflow/pipelines/backend/src/common/util" + "github.com/pkg/errors" ) // These are valid conditions of a ScheduledWorkflow. @@ -32,6 +33,8 @@ const ( FormFileKey = "uploadfile" NameQueryStringKey = "name" DescriptionQueryStringKey = "description" + // Pipeline Id in the query string specifies a pipeline when creating versions. + PipelineKey = "pipelineid" ) type PipelineUploadServer struct { @@ -48,7 +51,7 @@ func (s *PipelineUploadServer) UploadPipeline(w http.ResponseWriter, r *http.Req glog.Infof("Upload pipeline called") file, header, err := r.FormFile(FormFileKey) if err != nil { - s.writeErrorToResponse(w, http.StatusBadRequest, util.Wrap(err, "Failed to read pipeline form file")) + s.writeErrorToResponse(w, http.StatusBadRequest, util.Wrap(err, "Failed to read pipeline from file")) return } defer file.Close() @@ -86,6 +89,73 @@ func (s *PipelineUploadServer) UploadPipeline(w http.ResponseWriter, r *http.Req } } +// HTTP multipart endpoint for uploading pipeline version file. +// https://www.w3.org/Protocols/rfc1341/7_2_Multipart.html +// This endpoint is not exposed through grpc endpoint, since grpc-gateway can't convert the gRPC +// endpoint to the HTTP endpoint. +// See https://github.com/grpc-ecosystem/grpc-gateway/issues/500 +// Thus we create the HTTP endpoint directly and using swagger to auto generate the HTTP client. +func (s *PipelineUploadServer) UploadPipelineVersion(w http.ResponseWriter, r *http.Request) { + glog.Infof("Upload pipeline version called") + file, header, err := r.FormFile(FormFileKey) + if err != nil { + s.writeErrorToResponse(w, http.StatusBadRequest, util.Wrap(err, "Failed to read pipeline version from file")) + return + } + defer file.Close() + + pipelineFile, err := ReadPipelineFile(header.Filename, file, MaxFileLength) + if err != nil { + s.writeErrorToResponse(w, http.StatusBadRequest, util.Wrap(err, "Error read pipeline version file.")) + return + } + + versionNameQueryString := r.URL.Query().Get(NameQueryStringKey) + // If new version's name is not included in query string, use file name. + pipelineVersionName, err := GetPipelineName(versionNameQueryString, header.Filename) + if err != nil { + s.writeErrorToResponse(w, http.StatusBadRequest, util.Wrap(err, "Invalid pipeline version name.")) + return + } + + pipelineId := r.URL.Query().Get(PipelineKey) + if len(pipelineId) == 0 { + s.writeErrorToResponse(w, http.StatusBadRequest, errors.New("Please specify a pipeline id when creating versions.")) + return + } + + newPipelineVersion, err := s.resourceManager.CreatePipelineVersion( + &api.PipelineVersion{ + Name: pipelineVersionName, + ResourceReferences: []*api.ResourceReference{ + &api.ResourceReference{ + Key: &api.ResourceKey{ + Id: pipelineId, + Type: api.ResourceType_PIPELINE, + }, + Relationship: api.Relationship_OWNER, + }, + }, + }, pipelineFile) + if err != nil { + s.writeErrorToResponse(w, http.StatusInternalServerError, util.Wrap(err, "Error creating pipeline version")) + return + } + + w.Header().Set("Content-Type", "application/json") + marshaler := &jsonpb.Marshaler{EnumsAsInts: true, OrigName: true} + createdPipelineVersion, err := ToApiPipelineVersion(newPipelineVersion) + if err != nil { + s.writeErrorToResponse(w, http.StatusInternalServerError, util.Wrap(err, "Error creating pipeline version")) + return + } + err = marshaler.Marshal(w, createdPipelineVersion) + if err != nil { + s.writeErrorToResponse(w, http.StatusInternalServerError, util.Wrap(err, "Error creating pipeline version")) + return + } +} + func (s *PipelineUploadServer) writeErrorToResponse(w http.ResponseWriter, code int, err error) { glog.Errorf("Failed to upload pipelines. Error: %+v", err) w.WriteHeader(code) diff --git a/backend/src/apiserver/server/pipeline_upload_server_test.go b/backend/src/apiserver/server/pipeline_upload_server_test.go index a9e349bfc0c..de178cbdf7c 100644 --- a/backend/src/apiserver/server/pipeline_upload_server_test.go +++ b/backend/src/apiserver/server/pipeline_upload_server_test.go @@ -34,6 +34,11 @@ import ( "github.com/stretchr/testify/assert" ) +const ( + fakeVersionUUID = "123e4567-e89b-12d3-a456-526655440000" + fakeVersionName = "a_fake_version_name" +) + func TestUploadPipeline_YAML(t *testing.T) { clientManager := resource.NewFakeClientManagerOrFatal(util.NewFakeTimeForEpoch()) resourceManager := resource.NewResourceManager(clientManager) @@ -91,6 +96,53 @@ func TestUploadPipeline_YAML(t *testing.T) { assert.Equal(t, str, "") assert.Equal(t, 1, total_size) assert.Equal(t, pkgsExpect, pkg) + + // Upload a new version under this pipeline + + // Set the fake uuid generator with a new uuid to avoid generate a same uuid as above. + clientManager.UpdateUUID(util.NewFakeUUIDGeneratorOrFatal(fakeVersionUUID, nil)) + resourceManager = resource.NewResourceManager(clientManager) + server = PipelineUploadServer{resourceManager: resourceManager} + req, _ = http.NewRequest("POST", "/apis/v1beta1/pipelines/upload_version?name="+fakeVersionName+"&pipelineid="+resource.DefaultFakeUUID, bytes.NewReader(b.Bytes())) + req.Header.Set("Content-Type", w.FormDataContentType()) + rr = httptest.NewRecorder() + handler = http.HandlerFunc(server.UploadPipelineVersion) + handler.ServeHTTP(rr, req) + + assert.Equal(t, 200, rr.Code) + assert.Contains(t, rr.Body.String(), `"created_at":"1970-01-01T00:00:02Z"`) + + // Verify stored in object store + template, err = clientManager.ObjectStore().GetFile(storage.CreatePipelinePath(fakeVersionUUID)) + assert.Nil(t, err) + assert.NotNil(t, template) + opts, err = list.NewOptions(&model.PipelineVersion{}, 2, "", nil) + assert.Nil(t, err) + // Verify metadata in db + versionsExpect := []*model.PipelineVersion{ + { + UUID: resource.DefaultFakeUUID, + CreatedAtInSec: 1, + Name: "hello-world.yaml", + Parameters: "[]", + Status: model.PipelineVersionReady, + PipelineId: resource.DefaultFakeUUID, + }, + { + UUID: fakeVersionUUID, + CreatedAtInSec: 2, + Name: fakeVersionName, + Parameters: "[]", + Status: model.PipelineVersionReady, + PipelineId: resource.DefaultFakeUUID, + }, + } + // Expect 2 versions, one is created by default when creating pipeline and the other is what we manually created + versions, total_size, str, err := clientManager.PipelineStore().ListPipelineVersions(resource.DefaultFakeUUID, opts) + assert.Nil(t, err) + assert.Equal(t, str, "") + assert.Equal(t, 2, total_size) + assert.Equal(t, versionsExpect, versions) } func TestUploadPipeline_Tarball(t *testing.T) { @@ -142,6 +194,59 @@ func TestUploadPipeline_Tarball(t *testing.T) { assert.Equal(t, str, "") assert.Equal(t, 1, total_size) assert.Equal(t, pkgsExpect, pkg) + + // Upload a new version under this pipeline + + // Set the fake uuid generator with a new uuid to avoid generate a same uuid as above. + clientManager.UpdateUUID(util.NewFakeUUIDGeneratorOrFatal(fakeVersionUUID, nil)) + resourceManager = resource.NewResourceManager(clientManager) + server = PipelineUploadServer{resourceManager: resourceManager} + b = &bytes.Buffer{} + w = multipart.NewWriter(b) + part, _ = w.CreateFormFile("uploadfile", "arguments-version.tar.gz") + fileReader, _ = os.Open("test/arguments_tarball/arguments-version.tar.gz") + io.Copy(part, fileReader) + w.Close() + req, _ = http.NewRequest("POST", "/apis/v1beta1/pipelines/upload_version?pipelineid="+resource.DefaultFakeUUID, bytes.NewReader(b.Bytes())) + req.Header.Set("Content-Type", w.FormDataContentType()) + rr = httptest.NewRecorder() + handler = http.HandlerFunc(server.UploadPipelineVersion) + handler.ServeHTTP(rr, req) + + assert.Equal(t, 200, rr.Code) + assert.Contains(t, rr.Body.String(), `"created_at":"1970-01-01T00:00:02Z"`) + + // Verify stored in object store + template, err = clientManager.ObjectStore().GetFile(storage.CreatePipelinePath(fakeVersionUUID)) + assert.Nil(t, err) + assert.NotNil(t, template) + opts, err = list.NewOptions(&model.PipelineVersion{}, 2, "", nil) + assert.Nil(t, err) + // Verify metadata in db + versionsExpect := []*model.PipelineVersion{ + { + UUID: resource.DefaultFakeUUID, + CreatedAtInSec: 1, + Name: "arguments.tar.gz", + Parameters: "[{\"name\":\"param1\",\"value\":\"hello\"},{\"name\":\"param2\"}]", + Status: model.PipelineVersionReady, + PipelineId: resource.DefaultFakeUUID, + }, + { + UUID: fakeVersionUUID, + CreatedAtInSec: 2, + Name: "arguments-version.tar.gz", + Parameters: "[{\"name\":\"param1\",\"value\":\"hello\"},{\"name\":\"param2\"}]", + Status: model.PipelineVersionReady, + PipelineId: resource.DefaultFakeUUID, + }, + } + // Expect 2 versions, one is created by default when creating pipeline and the other is what we manually created + versions, total_size, str, err := clientManager.PipelineStore().ListPipelineVersions(resource.DefaultFakeUUID, opts) + assert.Nil(t, err) + assert.Equal(t, str, "") + assert.Equal(t, 2, total_size) + assert.Equal(t, versionsExpect, versions) } func TestUploadPipeline_GetFormFileError(t *testing.T) { @@ -282,3 +387,70 @@ func TestUploadPipeline_SpecifyFileDescription(t *testing.T) { assert.Equal(t, str, "") assert.Equal(t, pkgsExpect, pkg) } + +func TestUploadPipelineVersion_GetFromFileError(t *testing.T) { + clientManager := resource.NewFakeClientManagerOrFatal(util.NewFakeTimeForEpoch()) + resourceManager := resource.NewResourceManager(clientManager) + server := PipelineUploadServer{resourceManager: resourceManager} + b := &bytes.Buffer{} + w := multipart.NewWriter(b) + part, _ := w.CreateFormFile("uploadfile", "hello-world.yaml") + io.Copy(part, bytes.NewBufferString("apiVersion: argoproj.io/v1alpha1\nkind: Workflow")) + w.Close() + req, _ := http.NewRequest("POST", "/apis/v1beta1/pipelines/upload", bytes.NewReader(b.Bytes())) + req.Header.Set("Content-Type", w.FormDataContentType()) + rr := httptest.NewRecorder() + handler := http.HandlerFunc(server.UploadPipeline) + handler.ServeHTTP(rr, req) + // Upload a new version under this pipeline + + // Set the fake uuid generator with a new uuid to avoid generate a same uuid as above. + clientManager.UpdateUUID(util.NewFakeUUIDGeneratorOrFatal(fakeVersionUUID, nil)) + resourceManager = resource.NewResourceManager(clientManager) + server = PipelineUploadServer{resourceManager: resourceManager} + b = &bytes.Buffer{} + b.WriteString("I am invalid file") + w = multipart.NewWriter(b) + w.CreateFormFile("uploadfile", "hello-world.yaml") + w.Close() + req, _ = http.NewRequest("POST", "/apis/v1beta1/pipelines/upload_version?name="+fakeVersionName+"&pipelineid="+resource.DefaultFakeUUID, bytes.NewReader(b.Bytes())) + req.Header.Set("Content-Type", w.FormDataContentType()) + rr = httptest.NewRecorder() + handler = http.HandlerFunc(server.UploadPipelineVersion) + handler.ServeHTTP(rr, req) + + assert.Equal(t, 400, rr.Code) + assert.Contains(t, string(rr.Body.Bytes()), "Failed to read pipeline version") +} + +func TestUploadPipelineVersion_FileNameTooLong(t *testing.T) { + clientManager := resource.NewFakeClientManagerOrFatal(util.NewFakeTimeForEpoch()) + resourceManager := resource.NewResourceManager(clientManager) + server := PipelineUploadServer{resourceManager: resourceManager} + b := &bytes.Buffer{} + w := multipart.NewWriter(b) + part, _ := w.CreateFormFile("uploadfile", "hello-world.yaml") + io.Copy(part, bytes.NewBufferString("apiVersion: argoproj.io/v1alpha1\nkind: Workflow")) + w.Close() + req, _ := http.NewRequest("POST", "/apis/v1beta1/pipelines/upload", bytes.NewReader(b.Bytes())) + req.Header.Set("Content-Type", w.FormDataContentType()) + rr := httptest.NewRecorder() + handler := http.HandlerFunc(server.UploadPipeline) + handler.ServeHTTP(rr, req) + + // Upload a new version under this pipeline + + // Set the fake uuid generator with a new uuid to avoid generate a same uuid as above. + clientManager.UpdateUUID(util.NewFakeUUIDGeneratorOrFatal(fakeVersionUUID, nil)) + resourceManager = resource.NewResourceManager(clientManager) + server = PipelineUploadServer{resourceManager: resourceManager} + encodedName := url.PathEscape( + "this is a loooooooooooooooooooooooooooooooooooooooooooooooooooooooooooooooooooooooooooooooooooooooooooooooooooooog name") + req, _ = http.NewRequest("POST", "/apis/v1beta1/pipelines/upload_version?name="+encodedName+"&pipelineid="+resource.DefaultFakeUUID, bytes.NewReader(b.Bytes())) + req.Header.Set("Content-Type", w.FormDataContentType()) + rr = httptest.NewRecorder() + handler = http.HandlerFunc(server.UploadPipelineVersion) + handler.ServeHTTP(rr, req) + assert.Equal(t, 400, rr.Code) + assert.Contains(t, string(rr.Body.Bytes()), "Pipeline name too long") +} diff --git a/backend/src/apiserver/server/test/arguments_tarball/arguments-version.tar.gz b/backend/src/apiserver/server/test/arguments_tarball/arguments-version.tar.gz new file mode 100644 index 0000000000000000000000000000000000000000..2b5c414ae1ae44012e57c5c866f5530b5e795900 GIT binary patch literal 754 zcmV>LCrZM2$j4LJ_Js4EDs? zV()I(Ya9#l-#c~^p5+FuD)L>No%wd=o7o-5-*9N&>I&P$@%Y7TdhvdYxml%HZ$!W*zgDxw-^S}r{T(**@4B6CtJi%z|6bE`AI*QK z=kokFUrEmcyz%+pg`v{95#hpu*L3&bi&9}kFdh$`U1t}@!Y7$f0IB3Z39t)-fkvMn zYflxvlSzb<;MJS(t{_y_uFCrl93fXJ#F&Fr7Se<`gh*g6I0r3#(iVgSU&T5SD1CxV z*hQIU9i+}7&ekE+g+*o~^EI#L_j!P}6kK9qt?oA(S(eqYR9jajY(y(&(io412b1YR zjTM)BPGm$$0)~F2!f>q@IiO}iK3=fQ2s2OyFf_zIs|vxH5mv~s3W=KA3=MIVKqS_P zi_|{o)Twg(<;X}9nB7pPG)dAI6fXsW}|~C938{(XtF<= zjgBU~`V51~8GId0_NzeLD^5d8Z3_J?QWSlpplI5Z=z)N_S_w)t`C=}7u1kg~h6JIy zA|tsR&}3qf6sMW6ynv%bA{G{HX?apq-EmNh_4)S!O{la{KkGs@u3U`t0$t})$e<6W z%3RJPl{qn44A5fV0SHNwK}!>isob5$ByCdDXxL&oY|Sau7F>3iTq_}MAF>7Vd121x z+f4-r;BMz?T?1LhF72+fx*S&+rEFVJ6se~y&taP-?jtKJ!))C5(gNQBoo=!|&HGH| zM~MoPuVitj$!s?ATP9>mp!`ezDBI-hmul>q!{ZWT(bgZn%Gi6pwsSWUHdd~$SMut2 kx>i5ko15CRPVeTXa=!fp*kX$