Skip to content

Commit

Permalink
Refactor the legacy way of using pipeline id to create run in KFP bac…
Browse files Browse the repository at this point in the history
…kend (kubeflow#3437)

* For legacy interface, we switch to the new presentation underhood

* when create run, if user specify a pipeline, we subsitute it with the pipeline's default version

* Add a case where a version and a pipeline are both specified

* comment; get ready pipeline

* comments

* fix upgrade integration test

* comments of todo; expected run/job now has resource references

* fix upgrade test expected value according to the new response

* fix a typo

* a quick hack for upgrade test

* surface err from conversion
  • Loading branch information
jingzhang36 authored and Jeffwan committed Dec 9, 2020
1 parent 7c8bcca commit a843bac
Show file tree
Hide file tree
Showing 6 changed files with 245 additions and 59 deletions.
31 changes: 16 additions & 15 deletions backend/src/apiserver/resource/resource_manager.go
Original file line number Diff line number Diff line change
Expand Up @@ -250,10 +250,15 @@ func (r *ResourceManager) CreateRun(apiRun *api.Run) (*model.RunDetail, error) {
// Get workflow from either of the two places:
// (1) raw pipeline manifest in pipeline_spec
// (2) pipeline version in resource_references
// And the latter takes priority over the former
var workflowSpecManifestBytes []byte
workflowSpecManifestBytes, err := r.getWorkflowSpecBytes(apiRun.GetPipelineSpec())
err := ConvertPipelineIdToDefaultPipelineVersion(apiRun.PipelineSpec, &apiRun.ResourceReferences, r)
if err != nil {
workflowSpecManifestBytes, err = r.getWorkflowSpecBytesFromPipelineVersion(apiRun.GetResourceReferences())
return nil, util.Wrap(err, "Failed to find default version to create run with pipeline id.")
}
workflowSpecManifestBytes, err = r.getWorkflowSpecBytesFromPipelineVersion(apiRun.GetResourceReferences())
if err != nil {
workflowSpecManifestBytes, err = r.getWorkflowSpecBytesFromPipelineSpec(apiRun.GetPipelineSpec())
if err != nil {
return nil, util.Wrap(err, "Failed to fetch workflow spec.")
}
Expand Down Expand Up @@ -500,10 +505,15 @@ func (r *ResourceManager) CreateJob(apiJob *api.Job) (*model.Job, error) {
// Get workflow from either of the two places:
// (1) raw pipeline manifest in pipeline_spec
// (2) pipeline version in resource_references
// And the latter takes priority over the former
var workflowSpecManifestBytes []byte
workflowSpecManifestBytes, err := r.getWorkflowSpecBytes(apiJob.GetPipelineSpec())
err := ConvertPipelineIdToDefaultPipelineVersion(apiJob.PipelineSpec, &apiJob.ResourceReferences, r)
if err != nil {
return nil, util.Wrap(err, "Failed to find default version to create job with pipeline id.")
}
workflowSpecManifestBytes, err = r.getWorkflowSpecBytesFromPipelineVersion(apiJob.GetResourceReferences())
if err != nil {
workflowSpecManifestBytes, err = r.getWorkflowSpecBytesFromPipelineVersion(apiJob.GetResourceReferences())
workflowSpecManifestBytes, err = r.getWorkflowSpecBytesFromPipelineSpec(apiJob.GetPipelineSpec())
if err != nil {
return nil, util.Wrap(err, "Failed to fetch workflow spec.")
}
Expand Down Expand Up @@ -775,17 +785,8 @@ func (r *ResourceManager) checkRunExist(runID string) (*model.RunDetail, error)
return runDetail, nil
}

func (r *ResourceManager) getWorkflowSpecBytes(spec *api.PipelineSpec) ([]byte, error) {
// TODO(jingzhang36): after FE is enabled to use pipeline version to create
// run, we'll only check for the raw manifest in pipeline_spec.
if spec.GetPipelineId() != "" {
var workflow util.Workflow
err := r.objectStore.GetFromYamlFile(&workflow, r.objectStore.GetPipelineKey(spec.GetPipelineId()))
if err != nil {
return nil, util.Wrap(err, "Get pipeline YAML failed.")
}
return []byte(workflow.ToStringForStore()), nil
} else if spec.GetWorkflowManifest() != "" {
func (r *ResourceManager) getWorkflowSpecBytesFromPipelineSpec(spec *api.PipelineSpec) ([]byte, error) {
if spec.GetWorkflowManifest() != "" {
return []byte(spec.GetWorkflowManifest()), nil
}
return nil, util.NewInvalidInputError("Please provide a valid pipeline spec")
Expand Down
101 changes: 63 additions & 38 deletions backend/src/apiserver/resource/resource_manager_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,6 @@
package resource

import (
"encoding/json"
"fmt"
"strings"
"testing"
Expand Down Expand Up @@ -310,6 +309,27 @@ func TestCreateRun_ThroughPipelineID(t *testing.T) {
apiExperiment := &api.Experiment{Name: "e1"}
experiment, err := manager.CreateExperiment(apiExperiment)
assert.Nil(t, err)

// Create a new pipeline version with UUID being FakeUUID.
pipelineStore, ok := store.pipelineStore.(*storage.PipelineStore)
assert.True(t, ok)
pipelineStore.SetUUIDGenerator(util.NewFakeUUIDGeneratorOrFatal(FakeUUIDOne, nil))
version, err := manager.CreatePipelineVersion(&api.PipelineVersion{
Name: "version_for_run",
ResourceReferences: []*api.ResourceReference{
&api.ResourceReference{
Key: &api.ResourceKey{
Id: p.UUID,
Type: api.ResourceType_PIPELINE,
},
Relationship: api.Relationship_OWNER,
},
},
}, []byte(testWorkflow.ToStringForStore()))
assert.Nil(t, err)

// The pipeline specified via pipeline id will be converted to this
// pipeline's default version, which will be used to create run.
apiRun := &api.Run{
Name: "run1",
PipelineSpec: &api.PipelineSpec{
Expand Down Expand Up @@ -343,7 +363,7 @@ func TestCreateRun_ThroughPipelineID(t *testing.T) {
Name: "workflow-name",
Namespace: "ns1",
StorageState: api.Run_STORAGESTATE_AVAILABLE.String(),
CreatedAtInSec: 3,
CreatedAtInSec: 4,
Conditions: "Running",
PipelineSpec: model.PipelineSpec{
PipelineId: p.UUID,
Expand All @@ -360,6 +380,14 @@ func TestCreateRun_ThroughPipelineID(t *testing.T) {
ReferenceType: common.Experiment,
Relationship: common.Owner,
},
{
ResourceUUID: "123e4567-e89b-12d3-a456-426655440000",
ResourceType: common.Run,
ReferenceUUID: version.UUID,
ReferenceName: version.Name,
ReferenceType: common.PipelineVersion,
Relationship: common.Creator,
},
},
},
PipelineRuntime: model.PipelineRuntime{
Expand Down Expand Up @@ -921,15 +949,36 @@ func TestCreateJob_ThroughPipelineID(t *testing.T) {
},
},
}

// Create a new pipeline version with UUID being FakeUUID.
pipelineStore, ok := store.pipelineStore.(*storage.PipelineStore)
assert.True(t, ok)
pipelineStore.SetUUIDGenerator(util.NewFakeUUIDGeneratorOrFatal(FakeUUIDOne, nil))
version, err := manager.CreatePipelineVersion(&api.PipelineVersion{
Name: "version_for_run",
ResourceReferences: []*api.ResourceReference{
&api.ResourceReference{
Key: &api.ResourceKey{
Id: pipeline.UUID,
Type: api.ResourceType_PIPELINE,
},
Relationship: api.Relationship_OWNER,
},
},
}, []byte(testWorkflow.ToStringForStore()))
assert.Nil(t, err)

// The pipeline specified via pipeline id will be converted to this
// pipeline's default version, which will be used to create run.
newJob, err := manager.CreateJob(job)
expectedJob := &model.Job{
UUID: "123e4567-e89b-12d3-a456-426655440000",
DisplayName: "j1",
Name: "j1",
Namespace: "ns1",
Enabled: true,
CreatedAtInSec: 3,
UpdatedAtInSec: 3,
CreatedAtInSec: 4,
UpdatedAtInSec: 4,
Conditions: "NO_STATUS",
PipelineSpec: model.PipelineSpec{
PipelineId: pipeline.UUID,
Expand All @@ -946,6 +995,14 @@ func TestCreateJob_ThroughPipelineID(t *testing.T) {
ReferenceType: common.Experiment,
Relationship: common.Owner,
},
{
ResourceUUID: "123e4567-e89b-12d3-a456-426655440000",
ResourceType: common.Job,
ReferenceUUID: version.UUID,
ReferenceName: version.Name,
ReferenceType: common.PipelineVersion,
Relationship: common.Creator,
},
},
}
assert.Nil(t, err)
Expand Down Expand Up @@ -1525,38 +1582,6 @@ func TestReportScheduledWorkflowResource_Error(t *testing.T) {
assert.Contains(t, err.(*util.UserError).String(), "database is closed")
}

func TestGetWorkflowSpecBytes_ByPipelineID(t *testing.T) {
store, manager, pipeline := initWithPipeline(t)
defer store.Close()
spec := &api.PipelineSpec{
PipelineId: pipeline.UUID,
Parameters: []*api.Parameter{
{Name: "param1", Value: "world"},
},
}
workflowBytes, err := manager.getWorkflowSpecBytes(spec)
assert.Nil(t, err)
var actualWorkflow v1alpha1.Workflow
json.Unmarshal(workflowBytes, &actualWorkflow)
assert.Equal(t, testWorkflow.Get(), &actualWorkflow)
}

func TestGetWorkflowSpecBytes_ByPipelineID_NotExist(t *testing.T) {
store := NewFakeClientManagerOrFatal(util.NewFakeTimeForEpoch())
defer store.Close()
manager := NewResourceManager(store)

spec := &api.PipelineSpec{
PipelineId: "1",
Parameters: []*api.Parameter{
{Name: "param1", Value: "world"},
},
}
_, err := manager.getWorkflowSpecBytes(spec)
assert.NotNil(t, err)
assert.Contains(t, err.Error(), "not found")
}

func TestGetWorkflowSpecBytes_ByWorkflowManifest(t *testing.T) {
store := NewFakeClientManagerOrFatal(util.NewFakeTimeForEpoch())
defer store.Close()
Expand All @@ -1568,7 +1593,7 @@ func TestGetWorkflowSpecBytes_ByWorkflowManifest(t *testing.T) {
{Name: "param1", Value: "world"},
},
}
workflowBytes, err := manager.getWorkflowSpecBytes(spec)
workflowBytes, err := manager.getWorkflowSpecBytesFromPipelineSpec(spec)
assert.Nil(t, err)
assert.Equal(t, []byte("some manifest"), workflowBytes)
}
Expand All @@ -1583,7 +1608,7 @@ func TestGetWorkflowSpecBytes_MissingSpec(t *testing.T) {
{Name: "param1", Value: "world"},
},
}
_, err := manager.getWorkflowSpecBytes(spec)
_, err := manager.getWorkflowSpecBytesFromPipelineSpec(spec)
assert.NotNil(t, err)
assert.Contains(t, err.Error(), "Please provide a valid pipeline spec")
}
Expand Down
28 changes: 27 additions & 1 deletion backend/src/apiserver/resource/resource_manager_util.go
Original file line number Diff line number Diff line change
Expand Up @@ -26,11 +26,12 @@ import (
api "github.com/kubeflow/pipelines/backend/api/go_client"
"github.com/kubeflow/pipelines/backend/src/apiserver/client"
servercommon "github.com/kubeflow/pipelines/backend/src/apiserver/common"
"github.com/kubeflow/pipelines/backend/src/apiserver/model"
"github.com/kubeflow/pipelines/backend/src/common/util"
scheduledworkflow "github.com/kubeflow/pipelines/backend/src/crd/pkg/apis/scheduledworkflow/v1beta1"
apierr "k8s.io/apimachinery/pkg/api/errors"
"k8s.io/apimachinery/pkg/apis/meta/v1"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
v1 "k8s.io/apimachinery/pkg/apis/meta/v1"
)

func toCRDTrigger(apiTrigger *api.Trigger) *scheduledworkflow.Trigger {
Expand Down Expand Up @@ -228,3 +229,28 @@ func OverrideParameterWithSystemDefault(workflow util.Workflow, apiRun *api.Run)
}
return nil
}

// Convert PipelineId in PipelineSpec to the pipeline's default pipeline version.
// This is for legacy usage of pipeline id to create run. The standard way to
// create run is by specifying the pipeline version.
func ConvertPipelineIdToDefaultPipelineVersion(pipelineSpec *api.PipelineSpec, resourceReferences *[]*api.ResourceReference, r *ResourceManager) error {
if pipelineSpec.GetPipelineId() == "" {
return nil
}
// If there is already a pipeline version in resource references, don't convert pipeline id.
for _, reference := range *resourceReferences {
if reference.Key.Type == api.ResourceType_PIPELINE_VERSION && reference.Relationship == api.Relationship_CREATOR {
return nil
}
}
pipeline, err := r.pipelineStore.GetPipelineWithStatus(pipelineSpec.GetPipelineId(), model.PipelineReady)
if err != nil {
return util.Wrap(err, "Failed to find the specified pipeline")
}
// Add default pipeline version to resource references
*resourceReferences = append(*resourceReferences, &api.ResourceReference{
Key: &api.ResourceKey{Type: api.ResourceType_PIPELINE_VERSION, Id: pipeline.DefaultVersionId},
Relationship: api.Relationship_CREATOR,
})
return nil
}
Loading

0 comments on commit a843bac

Please sign in to comment.