Skip to content

Commit

Permalink
Add integration tests for version related methods (kubeflow#3174)
Browse files Browse the repository at this point in the history
* create a new version upload integration test file

* checkpoint

* checkpoint

* test

* checkpoint

* checkpoint

* version api integration test

* Fix a typo

* typo in comment

* error message
  • Loading branch information
jingzhang36 authored and Jeffwan committed Dec 9, 2020
1 parent 55be9dc commit 4a572af
Show file tree
Hide file tree
Showing 7 changed files with 447 additions and 8 deletions.
105 changes: 105 additions & 0 deletions backend/src/common/client/api_server/pipeline_client.go
Original file line number Diff line number Diff line change
Expand Up @@ -202,3 +202,108 @@ func listAllForPipeline(client PipelineInterface, parameters *params.ListPipelin

return allResults, nil
}

func (c *PipelineClient) CreatePipelineVersion(parameters *params.CreatePipelineVersionParams) (*model.APIPipelineVersion,
error) {
// Create context with timeout
ctx, cancel := context.WithTimeout(context.Background(), apiServerDefaultTimeout)
defer cancel()

parameters.Context = ctx
response, err := c.apiClient.PipelineService.CreatePipelineVersion(parameters, PassThroughAuth)
if err != nil {
if defaultError, ok := err.(*params.CreatePipelineVersionDefault); ok {
err = CreateErrorFromAPIStatus(defaultError.Payload.Error, defaultError.Payload.Code)
} else {
err = CreateErrorCouldNotRecoverAPIStatus(err)
}

return nil, util.NewUserError(err,
fmt.Sprintf("Failed to create pipeline version. Params: '%v'", parameters),
fmt.Sprintf("Failed to create pipeline version from URL '%v'", parameters.Body.PackageURL.PipelineURL))
}

return response.Payload, nil
}

func (c *PipelineClient) ListPipelineVersions(parameters *params.ListPipelineVersionsParams) (
[]*model.APIPipelineVersion, int, string, error) {
// Create context with timeout
ctx, cancel := context.WithTimeout(context.Background(), apiServerDefaultTimeout)
defer cancel()

// Make service call
parameters.Context = ctx
response, err := c.apiClient.PipelineService.ListPipelineVersions(parameters, PassThroughAuth)
if err != nil {
if defaultError, ok := err.(*params.ListPipelineVersionsDefault); ok {
err = CreateErrorFromAPIStatus(defaultError.Payload.Error, defaultError.Payload.Code)
} else {
err = CreateErrorCouldNotRecoverAPIStatus(err)
}

return nil, 0, "", util.NewUserError(err,
fmt.Sprintf("Failed to list pipeline versions. Params: '%+v'", parameters),
fmt.Sprintf("Failed to list pipeline versions"))
}

return response.Payload.Versions, int(response.Payload.TotalSize), response.Payload.NextPageToken, nil
}

func (c *PipelineClient) GetPipelineVersion(parameters *params.GetPipelineVersionParams) (*model.APIPipelineVersion,
error) {
// Create context with timeout
ctx, cancel := context.WithTimeout(context.Background(), apiServerDefaultTimeout)
defer cancel()

// Make service call
parameters.Context = ctx
response, err := c.apiClient.PipelineService.GetPipelineVersion(parameters, PassThroughAuth)
if err != nil {
if defaultError, ok := err.(*params.GetPipelineVersionDefault); ok {
err = CreateErrorFromAPIStatus(defaultError.Payload.Error, defaultError.Payload.Code)
} else {
err = CreateErrorCouldNotRecoverAPIStatus(err)
}

return nil, util.NewUserError(err,
fmt.Sprintf("Failed to get pipeline version. Params: '%v'", parameters),
fmt.Sprintf("Failed to get pipeline version '%v'", parameters.VersionID))
}

return response.Payload, nil
}

func (c *PipelineClient) GetPipelineVersionTemplate(parameters *params.GetPipelineVersionTemplateParams) (
*workflowapi.Workflow, error) {
// Create context with timeout
ctx, cancel := context.WithTimeout(context.Background(), apiServerDefaultTimeout)
defer cancel()

// Make service call
parameters.Context = ctx
response, err := c.apiClient.PipelineService.GetPipelineVersionTemplate(parameters, PassThroughAuth)
if err != nil {
if defaultError, ok := err.(*params.GetPipelineVersionTemplateDefault); ok {
err = CreateErrorFromAPIStatus(defaultError.Payload.Error, defaultError.Payload.Code)
} else {
err = CreateErrorCouldNotRecoverAPIStatus(err)
}

return nil, util.NewUserError(err,
fmt.Sprintf("Failed to get template. Params: '%+v'", parameters),
fmt.Sprintf("Failed to get template for pipeline version '%v'", parameters.VersionID))
}

// Unmarshal response
var workflow workflowapi.Workflow
err = yaml.Unmarshal([]byte(response.Payload.Template), &workflow)
if err != nil {
return nil, util.NewUserError(err,
fmt.Sprintf("Failed to unmarshal reponse. Params: '%+v'. Response: '%s'", parameters,
response.Payload.Template),
fmt.Sprintf("Failed to unmarshal reponse"))
}

return &workflow, nil
}
36 changes: 34 additions & 2 deletions backend/src/common/client/api_server/pipeline_upload_client.go
Original file line number Diff line number Diff line change
Expand Up @@ -85,5 +85,37 @@ func (c *PipelineUploadClient) Upload(parameters *params.UploadPipelineParams) (
return response.Payload, nil
}

// TODO(jingzhang36): add UploadPipelineVersion after go_http_client and go_client are
// auto-generated from UploadPipelineVersion in PipelineUploadServer
// UploadPipelineVersion uploads pipeline version from local file.
func (c *PipelineUploadClient) UploadPipelineVersion(filePath string, parameters *params.UploadPipelineVersionParams) (*model.APIPipelineVersion,
error) {
// Get file
file, err := os.Open(filePath)
if err != nil {
return nil, util.NewUserErrorWithSingleMessage(err,
fmt.Sprintf("Failed to open file '%s'", filePath))
}
defer file.Close()
parameters.Uploadfile = runtime.NamedReader(filePath, file)

// Create context with timeout
ctx, cancel := context.WithTimeout(context.Background(), apiServerDefaultTimeout)
defer cancel()

// Make service call
parameters.Context = ctx
response, err := c.apiClient.PipelineUploadService.UploadPipelineVersion(parameters, PassThroughAuth)

if err != nil {
if defaultError, ok := err.(*params.UploadPipelineVersionDefault); ok {
err = CreateErrorFromAPIStatus(defaultError.Payload.Error, defaultError.Payload.Code)
} else {
err = CreateErrorCouldNotRecoverAPIStatus(err)
}

return nil, util.NewUserError(err,
fmt.Sprintf("Failed to upload pipeline version. Params: '%v'", parameters),
fmt.Sprintf("Failed to upload pipeline version"))
}

return response.Payload, nil
}
1 change: 1 addition & 0 deletions backend/test/integration/BUILD.bazel
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@ go_test(
"experiment_api_test.go",
"job_api_test.go",
"pipeline_api_test.go",
"pipeline_version_api_test.go",
"run_api_test.go",
"visualization_api_test.go",
],
Expand Down
5 changes: 0 additions & 5 deletions backend/test/integration/pipeline_api_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -197,9 +197,6 @@ func verifyPipeline(t *testing.T, pipeline *model.APIPipeline) {
{Name: "param1", Value: "hello"}, // Default value in the pipeline template
{Name: "param2"}, // No default value in the pipeline
},
// TODO(jingzhang36): after version API launch, remove the following field.
// This is because after the version API launch, we won't have defautl
// version produced automatically when creating pipeline.
DefaultVersion: &model.APIPipelineVersion{
CreatedAt: pipeline.CreatedAt,
ID: pipeline.ID,
Expand All @@ -218,8 +215,6 @@ func TestPipelineAPI(t *testing.T) {
suite.Run(t, new(PipelineApiTest))
}

// TODO(jingzhang36): include UploadPipelineVersion in integration test

func (s *PipelineApiTest) TearDownSuite() {
if *runIntegrationTests {
if !*isDevMode {
Expand Down
Loading

0 comments on commit 4a572af

Please sign in to comment.