Skip to content

Commit

Permalink
Upload local file as new pipeline version Step 1 (kubeflow#3001)
Browse files Browse the repository at this point in the history
* add upload pipeline version to upload_pipeline_server and http main

* add apiPipelineVersion to pipeline upload swagger json

* add apiResourceReference to pipeline upload swagger json

* Add yet more types to pipeline upload swagger json

* Unit tests
  • Loading branch information
jingzhang36 authored and Jeffwan committed Dec 9, 2020
1 parent 4faff2a commit c72228c
Show file tree
Hide file tree
Showing 13 changed files with 445 additions and 6 deletions.
43 changes: 43 additions & 0 deletions backend/api/swagger/kfp_api_single_file.swagger.json

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

133 changes: 133 additions & 0 deletions backend/api/swagger/pipeline.upload.swagger.json

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

1 change: 1 addition & 0 deletions backend/src/apiserver/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -140,6 +140,7 @@ func startHttpProxy(resourceManager *resource.ResourceManager) {
// https://github.com/grpc-ecosystem/grpc-gateway/issues/410
pipelineUploadServer := server.NewPipelineUploadServer(resourceManager)
topMux.HandleFunc("/apis/v1beta1/pipelines/upload", pipelineUploadServer.UploadPipeline)
topMux.HandleFunc("/apis/v1beta1/pipelines/upload_version", pipelineUploadServer.UploadPipelineVersion)
topMux.HandleFunc("/apis/v1beta1/healthz", func(w http.ResponseWriter, r *http.Request) {
io.WriteString(w, `{"commit_sha":"`+common.GetStringConfig("COMMIT_SHA")+`"}`)
})
Expand Down
7 changes: 7 additions & 0 deletions backend/src/apiserver/resource/client_manager_fake.go
Original file line number Diff line number Diff line change
Expand Up @@ -154,3 +154,10 @@ func (f *FakeClientManager) KFAMClient() client.KFAMClientInterface {
func (f *FakeClientManager) Close() error {
return f.db.Close()
}

// Update the uuid used in this fake client manager
func (f *FakeClientManager) UpdateUUID(uuid util.UUIDGeneratorInterface) {
f.uuid = uuid
f.experimentStore = storage.NewExperimentStore(f.db, f.time, uuid)
f.pipelineStore = storage.NewPipelineStore(f.db, f.time, uuid)
}
72 changes: 71 additions & 1 deletion backend/src/apiserver/server/pipeline_upload_server.go
Original file line number Diff line number Diff line change
Expand Up @@ -25,13 +25,16 @@ import (
api "github.com/kubeflow/pipelines/backend/api/go_client"
"github.com/kubeflow/pipelines/backend/src/apiserver/resource"
"github.com/kubeflow/pipelines/backend/src/common/util"
"github.com/pkg/errors"
)

// These are valid conditions of a ScheduledWorkflow.
const (
FormFileKey = "uploadfile"
NameQueryStringKey = "name"
DescriptionQueryStringKey = "description"
// Pipeline Id in the query string specifies a pipeline when creating versions.
PipelineKey = "pipelineid"
)

type PipelineUploadServer struct {
Expand All @@ -48,7 +51,7 @@ func (s *PipelineUploadServer) UploadPipeline(w http.ResponseWriter, r *http.Req
glog.Infof("Upload pipeline called")
file, header, err := r.FormFile(FormFileKey)
if err != nil {
s.writeErrorToResponse(w, http.StatusBadRequest, util.Wrap(err, "Failed to read pipeline form file"))
s.writeErrorToResponse(w, http.StatusBadRequest, util.Wrap(err, "Failed to read pipeline from file"))
return
}
defer file.Close()
Expand Down Expand Up @@ -86,6 +89,73 @@ func (s *PipelineUploadServer) UploadPipeline(w http.ResponseWriter, r *http.Req
}
}

// HTTP multipart endpoint for uploading pipeline version file.
// https://www.w3.org/Protocols/rfc1341/7_2_Multipart.html
// This endpoint is not exposed through grpc endpoint, since grpc-gateway can't convert the gRPC
// endpoint to the HTTP endpoint.
// See https://github.com/grpc-ecosystem/grpc-gateway/issues/500
// Thus we create the HTTP endpoint directly and using swagger to auto generate the HTTP client.
func (s *PipelineUploadServer) UploadPipelineVersion(w http.ResponseWriter, r *http.Request) {
glog.Infof("Upload pipeline version called")
file, header, err := r.FormFile(FormFileKey)
if err != nil {
s.writeErrorToResponse(w, http.StatusBadRequest, util.Wrap(err, "Failed to read pipeline version from file"))
return
}
defer file.Close()

pipelineFile, err := ReadPipelineFile(header.Filename, file, MaxFileLength)
if err != nil {
s.writeErrorToResponse(w, http.StatusBadRequest, util.Wrap(err, "Error read pipeline version file."))
return
}

versionNameQueryString := r.URL.Query().Get(NameQueryStringKey)
// If new version's name is not included in query string, use file name.
pipelineVersionName, err := GetPipelineName(versionNameQueryString, header.Filename)
if err != nil {
s.writeErrorToResponse(w, http.StatusBadRequest, util.Wrap(err, "Invalid pipeline version name."))
return
}

pipelineId := r.URL.Query().Get(PipelineKey)
if len(pipelineId) == 0 {
s.writeErrorToResponse(w, http.StatusBadRequest, errors.New("Please specify a pipeline id when creating versions."))
return
}

newPipelineVersion, err := s.resourceManager.CreatePipelineVersion(
&api.PipelineVersion{
Name: pipelineVersionName,
ResourceReferences: []*api.ResourceReference{
&api.ResourceReference{
Key: &api.ResourceKey{
Id: pipelineId,
Type: api.ResourceType_PIPELINE,
},
Relationship: api.Relationship_OWNER,
},
},
}, pipelineFile)
if err != nil {
s.writeErrorToResponse(w, http.StatusInternalServerError, util.Wrap(err, "Error creating pipeline version"))
return
}

w.Header().Set("Content-Type", "application/json")
marshaler := &jsonpb.Marshaler{EnumsAsInts: true, OrigName: true}
createdPipelineVersion, err := ToApiPipelineVersion(newPipelineVersion)
if err != nil {
s.writeErrorToResponse(w, http.StatusInternalServerError, util.Wrap(err, "Error creating pipeline version"))
return
}
err = marshaler.Marshal(w, createdPipelineVersion)
if err != nil {
s.writeErrorToResponse(w, http.StatusInternalServerError, util.Wrap(err, "Error creating pipeline version"))
return
}
}

func (s *PipelineUploadServer) writeErrorToResponse(w http.ResponseWriter, code int, err error) {
glog.Errorf("Failed to upload pipelines. Error: %+v", err)
w.WriteHeader(code)
Expand Down
Loading

0 comments on commit c72228c

Please sign in to comment.