diff --git a/backend/Dockerfile.persistenceagent b/backend/Dockerfile.persistenceagent index a57c7435c9f..e0cc5afd19c 100644 --- a/backend/Dockerfile.persistenceagent +++ b/backend/Dockerfile.persistenceagent @@ -17,4 +17,7 @@ COPY --from=builder /go/src/github.com/kubeflow/pipelines/third_party/license.tx ENV NAMESPACE "" -CMD persistence_agent --logtostderr=true --namespace=${NAMESPACE} +# Set Workflow TTL to 7 days +ENV TTL_SECONDS_AFTER_WORKFLOW_FINISH 604800 + +CMD persistence_agent --logtostderr=true --namespace=${NAMESPACE} --ttlSecondsAfterWorkflowFinish=${TTL_SECONDS_AFTER_WORKFLOW_FINISH} diff --git a/backend/src/agent/persistence/main.go b/backend/src/agent/persistence/main.go index 91b2b0ecfc5..21293674a70 100644 --- a/backend/src/agent/persistence/main.go +++ b/backend/src/agent/persistence/main.go @@ -31,28 +31,30 @@ import ( ) var ( - masterURL string - kubeconfig string - initializeTimeout time.Duration - timeout time.Duration - mlPipelineAPIServerName string - mlPipelineAPIServerPort string - mlPipelineAPIServerBasePath string - mlPipelineServiceHttpPort string - mlPipelineServiceGRPCPort string - namespace string + masterURL string + kubeconfig string + initializeTimeout time.Duration + timeout time.Duration + mlPipelineAPIServerName string + mlPipelineAPIServerPort string + mlPipelineAPIServerBasePath string + mlPipelineServiceHttpPort string + mlPipelineServiceGRPCPort string + namespace string + ttlSecondsAfterWorkflowFinish int64 ) const ( - kubeconfigFlagName = "kubeconfig" - masterFlagName = "master" - initializationTimeoutFlagName = "initializeTimeout" - timeoutFlagName = "timeout" - mlPipelineAPIServerBasePathFlagName = "mlPipelineAPIServerBasePath" - mlPipelineAPIServerNameFlagName = "mlPipelineAPIServerName" - mlPipelineAPIServerHttpPortFlagName = "mlPipelineServiceHttpPort" - mlPipelineAPIServerGRPCPortFlagName = "mlPipelineServiceGRPCPort" - namespaceFlagName = "namespace" + kubeconfigFlagName = "kubeconfig" + masterFlagName = "master" + initializationTimeoutFlagName = "initializeTimeout" + timeoutFlagName = "timeout" + mlPipelineAPIServerBasePathFlagName = "mlPipelineAPIServerBasePath" + mlPipelineAPIServerNameFlagName = "mlPipelineAPIServerName" + mlPipelineAPIServerHttpPortFlagName = "mlPipelineServiceHttpPort" + mlPipelineAPIServerGRPCPortFlagName = "mlPipelineServiceGRPCPort" + namespaceFlagName = "namespace" + ttlSecondsAfterWorkflowFinishFlagName = "ttlSecondsAfterWorkflowFinish" ) func main() { @@ -122,4 +124,5 @@ func init() { flag.StringVar(&mlPipelineAPIServerBasePath, mlPipelineAPIServerBasePathFlagName, "/apis/v1beta1", "The base path for the ML pipeline API server.") flag.StringVar(&namespace, namespaceFlagName, "", "The namespace name used for Kubernetes informers to obtain the listers.") + flag.Int64Var(&ttlSecondsAfterWorkflowFinish, ttlSecondsAfterWorkflowFinishFlagName, 604800 /* 7 days */, "The TTL for Argo workflow to persist after workflow finish.") } diff --git a/backend/src/agent/persistence/persistence_agent.go b/backend/src/agent/persistence/persistence_agent.go index af58aa58fe9..bf530e236de 100644 --- a/backend/src/agent/persistence/persistence_agent.go +++ b/backend/src/agent/persistence/persistence_agent.go @@ -44,10 +44,10 @@ type PersistenceAgent struct { // NewPersistenceAgent returns a new persistence agent. func NewPersistenceAgent( - swfInformerFactory swfinformers.SharedInformerFactory, - workflowInformerFactory workflowinformers.SharedInformerFactory, - pipelineClient *client.PipelineClient, - time util.TimeInterface) *PersistenceAgent { + swfInformerFactory swfinformers.SharedInformerFactory, + workflowInformerFactory workflowinformers.SharedInformerFactory, + pipelineClient *client.PipelineClient, + time util.TimeInterface) *PersistenceAgent { // obtain references to shared informers swfInformer := swfInformerFactory.Scheduledworkflow().V1beta1().ScheduledWorkflows() workflowInformer := workflowInformerFactory.Argoproj().V1alpha1().Workflows() @@ -64,7 +64,7 @@ func NewPersistenceAgent( workflowWorker := worker.NewPersistenceWorker(time, workflowregister.Kind, workflowInformer.Informer(), true, - worker.NewWorkflowSaver(workflowClient, pipelineClient)) + worker.NewWorkflowSaver(workflowClient, pipelineClient, ttlSecondsAfterWorkflowFinish)) agent := &PersistenceAgent{ swfClient: swfClient, diff --git a/backend/src/agent/persistence/worker/persistence_worker_test.go b/backend/src/agent/persistence/worker/persistence_worker_test.go index 6bfaf11761a..15ccb12047d 100644 --- a/backend/src/agent/persistence/worker/persistence_worker_test.go +++ b/backend/src/agent/persistence/worker/persistence_worker_test.go @@ -54,7 +54,7 @@ func TestPersistenceWorker_Success(t *testing.T) { pipelineClient := client.NewPipelineClientFake() // Set up peristence worker - saver := NewWorkflowSaver(workflowClient, pipelineClient) + saver := NewWorkflowSaver(workflowClient, pipelineClient, 100) eventHandler := NewFakeEventHandler() worker := NewPersistenceWorker( util.NewFakeTimeForEpoch(), @@ -84,7 +84,7 @@ func TestPersistenceWorker_NotFoundError(t *testing.T) { pipelineClient := client.NewPipelineClientFake() // Set up peristence worker - saver := NewWorkflowSaver(workflowClient, pipelineClient) + saver := NewWorkflowSaver(workflowClient, pipelineClient, 100) eventHandler := NewFakeEventHandler() worker := NewPersistenceWorker( util.NewFakeTimeForEpoch(), @@ -115,7 +115,7 @@ func TestPersistenceWorker_GetWorklowError(t *testing.T) { pipelineClient := client.NewPipelineClientFake() // Set up peristence worker - saver := NewWorkflowSaver(workflowClient, pipelineClient) + saver := NewWorkflowSaver(workflowClient, pipelineClient, 100) eventHandler := NewFakeEventHandler() worker := NewPersistenceWorker( util.NewFakeTimeForEpoch(), @@ -148,7 +148,7 @@ func TestPersistenceWorker_ReportWorkflowRetryableError(t *testing.T) { "My Retriable Error")) // Set up peristence worker - saver := NewWorkflowSaver(workflowClient, pipelineClient) + saver := NewWorkflowSaver(workflowClient, pipelineClient, 100) eventHandler := NewFakeEventHandler() worker := NewPersistenceWorker( util.NewFakeTimeForEpoch(), @@ -181,7 +181,7 @@ func TestPersistenceWorker_ReportWorkflowNonRetryableError(t *testing.T) { "My Permanent Error")) // Set up peristence worker - saver := NewWorkflowSaver(workflowClient, pipelineClient) + saver := NewWorkflowSaver(workflowClient, pipelineClient, 100) eventHandler := NewFakeEventHandler() worker := NewPersistenceWorker( util.NewFakeTimeForEpoch(), diff --git a/backend/src/agent/persistence/worker/workflow_saver.go b/backend/src/agent/persistence/worker/workflow_saver.go index df8211aaf13..1b9fbdd3a65 100644 --- a/backend/src/agent/persistence/worker/workflow_saver.go +++ b/backend/src/agent/persistence/worker/workflow_saver.go @@ -19,21 +19,24 @@ import ( "github.com/kubeflow/pipelines/backend/src/common/util" log "github.com/sirupsen/logrus" _ "k8s.io/client-go/plugin/pkg/client/auth/gcp" + "time" ) // WorkflowSaver provides a function to persist a workflow to a database. type WorkflowSaver struct { - client client.WorkflowClientInterface - pipelineClient client.PipelineClientInterface - metricsReporter *MetricsReporter + client client.WorkflowClientInterface + pipelineClient client.PipelineClientInterface + metricsReporter *MetricsReporter + ttlSecondsAfterWorkflowFinish int64 } func NewWorkflowSaver(client client.WorkflowClientInterface, - pipelineClient client.PipelineClientInterface) *WorkflowSaver { + pipelineClient client.PipelineClientInterface, ttlSecondsAfterWorkflowFinish int64) *WorkflowSaver { return &WorkflowSaver{ - client: client, - pipelineClient: pipelineClient, - metricsReporter: NewMetricsReporter(pipelineClient), + client: client, + pipelineClient: pipelineClient, + metricsReporter: NewMetricsReporter(pipelineClient), + ttlSecondsAfterWorkflowFinish: ttlSecondsAfterWorkflowFinish, } } @@ -53,7 +56,12 @@ func (s *WorkflowSaver) Save(key string, namespace string, name string, nowEpoch "Workflow (%s): transient failure: %v", key, err) } - + if wf.PersistedFinalState() && time.Now().Unix()-wf.FinishedAt() < s.ttlSecondsAfterWorkflowFinish { + // Skip persisting the workflow if the workflow is finished + // and the workflow hasn't being passing the TTL + log.Infof("Skip syncing Workflow (%v): workflow marked as persisted.", name) + return nil + } // Save this Workflow to the database. err = s.pipelineClient.ReportWorkflow(wf) retry := util.HasCustomCode(err, util.CUSTOM_CODE_TRANSIENT) diff --git a/backend/src/agent/persistence/worker/workflow_saver_test.go b/backend/src/agent/persistence/worker/workflow_saver_test.go index 4f7360e5adf..57aa87c9149 100644 --- a/backend/src/agent/persistence/worker/workflow_saver_test.go +++ b/backend/src/agent/persistence/worker/workflow_saver_test.go @@ -17,6 +17,7 @@ package worker import ( "fmt" "testing" + "time" workflowapi "github.com/argoproj/argo/pkg/apis/workflow/v1alpha1" "github.com/kubeflow/pipelines/backend/src/agent/persistence/client" @@ -39,9 +40,7 @@ func TestWorkflow_Save_Success(t *testing.T) { workflowFake.Put("MY_NAMESPACE", "MY_NAME", workflow) - saver := NewWorkflowSaver( - workflowFake, - pipelineFake) + saver := NewWorkflowSaver(workflowFake, pipelineFake, 100) err := saver.Save("MY_KEY", "MY_NAMESPACE", "MY_NAME", 20) @@ -53,9 +52,7 @@ func TestWorkflow_Save_NotFoundDuringGet(t *testing.T) { workflowFake := client.NewWorkflowClientFake() pipelineFake := client.NewPipelineClientFake() - saver := NewWorkflowSaver( - workflowFake, - pipelineFake) + saver := NewWorkflowSaver(workflowFake, pipelineFake, 100) err := saver.Save("MY_KEY", "MY_NAMESPACE", "MY_NAME", 20) @@ -70,9 +67,7 @@ func TestWorkflow_Save_ErrorDuringGet(t *testing.T) { workflowFake.Put("MY_NAMESPACE", "MY_NAME", nil) - saver := NewWorkflowSaver( - workflowFake, - pipelineFake) + saver := NewWorkflowSaver(workflowFake, pipelineFake, 100) err := saver.Save("MY_KEY", "MY_NAMESPACE", "MY_NAME", 20) @@ -97,9 +92,7 @@ func TestWorkflow_Save_PermanentFailureWhileReporting(t *testing.T) { workflowFake.Put("MY_NAMESPACE", "MY_NAME", workflow) - saver := NewWorkflowSaver( - workflowFake, - pipelineFake) + saver := NewWorkflowSaver(workflowFake, pipelineFake, 100) err := saver.Save("MY_KEY", "MY_NAMESPACE", "MY_NAME", 20) @@ -124,9 +117,7 @@ func TestWorkflow_Save_TransientFailureWhileReporting(t *testing.T) { workflowFake.Put("MY_NAMESPACE", "MY_NAME", workflow) - saver := NewWorkflowSaver( - workflowFake, - pipelineFake) + saver := NewWorkflowSaver(workflowFake, pipelineFake, 100) err := saver.Save("MY_KEY", "MY_NAMESPACE", "MY_NAME", 20) @@ -134,3 +125,65 @@ func TestWorkflow_Save_TransientFailureWhileReporting(t *testing.T) { assert.NotNil(t, err) assert.Contains(t, err.Error(), "transient failure") } + +func TestWorkflow_Save_SkippedDueToFinalStatue(t *testing.T) { + workflowFake := client.NewWorkflowClientFake() + pipelineFake := client.NewPipelineClientFake() + + // Add this will result in failure unless reporting is skipped + pipelineFake.SetError(util.NewCustomError(fmt.Errorf("Error"), util.CUSTOM_CODE_PERMANENT, + "My Permanent Error")) + + workflow := util.NewWorkflow(&workflowapi.Workflow{ + ObjectMeta: metav1.ObjectMeta{ + Namespace: "MY_NAMESPACE", + Name: "MY_NAME", + Labels: map[string]string{util.LabelKeyWorkflowPersistedFinalState: "true"}, + }, + Status: workflowapi.WorkflowStatus{ + FinishedAt: metav1.Now(), + }, + }) + + workflowFake.Put("MY_NAMESPACE", "MY_NAME", workflow) + + saver := NewWorkflowSaver(workflowFake, pipelineFake, 100) + + err := saver.Save("MY_KEY", "MY_NAMESPACE", "MY_NAME", 20) + + assert.Equal(t, false, util.HasCustomCode(err, util.CUSTOM_CODE_TRANSIENT)) + assert.Equal(t, nil, err) +} + +func TestWorkflow_Save_FinalStatueNotSkippedDueToExceedTTL(t *testing.T) { + workflowFake := client.NewWorkflowClientFake() + pipelineFake := client.NewPipelineClientFake() + + // Add this will result in failure unless reporting is skipped + pipelineFake.SetError(util.NewCustomError(fmt.Errorf("Error"), util.CUSTOM_CODE_PERMANENT, + "My Permanent Error")) + + workflow := util.NewWorkflow(&workflowapi.Workflow{ + ObjectMeta: metav1.ObjectMeta{ + Namespace: "MY_NAMESPACE", + Name: "MY_NAME", + Labels: map[string]string{util.LabelKeyWorkflowPersistedFinalState: "true"}, + }, + Status: workflowapi.WorkflowStatus{ + FinishedAt: metav1.Now(), + }, + }) + + workflowFake.Put("MY_NAMESPACE", "MY_NAME", workflow) + + saver := NewWorkflowSaver(workflowFake, pipelineFake, 1) + + // Sleep 2 seconds to make sure workflow passed TTL + time.Sleep(2 * time.Second) + + err := saver.Save("MY_KEY", "MY_NAMESPACE", "MY_NAME", 20) + + assert.Equal(t, false, util.HasCustomCode(err, util.CUSTOM_CODE_TRANSIENT)) + assert.NotNil(t, err) + assert.Contains(t, err.Error(), "permanent failure") +} diff --git a/backend/src/apiserver/resource/resource_manager.go b/backend/src/apiserver/resource/resource_manager.go index 69b386cdc28..c7652a65843 100644 --- a/backend/src/apiserver/resource/resource_manager.go +++ b/backend/src/apiserver/resource/resource_manager.go @@ -493,58 +493,105 @@ func (r *ResourceManager) DeleteJob(jobID string) error { func (r *ResourceManager) ReportWorkflowResource(workflow *util.Workflow) error { if _, ok := workflow.ObjectMeta.Labels[util.LabelKeyWorkflowRunId]; !ok { // Skip reporting if the workflow doesn't have the run id label - return nil + return util.NewInvalidInputError("Workflow missing the Run ID label") } runId := workflow.ObjectMeta.Labels[util.LabelKeyWorkflowRunId] jobId := workflow.ScheduledWorkflowUUIDAsStringOrEmpty() + if workflow.PersistedFinalState() { + // If workflow's final state has being persisted, the workflow should be garbage collected. + err := r.workflowClient.Delete(workflow.Name, &v1.DeleteOptions{}) + if err != nil { + return util.NewInternalServerError(err, "Failed to delete the completed workflow for run %s", runId) + } + } + if jobId == "" { - // If a run doesn't have owner UID, it's a one-time run created by Pipeline API server. + // If a run doesn't have job ID, it's a one-time run created by Pipeline API server. // In this case the DB entry should already been created when argo workflow CRD is created. - return r.runStore.UpdateRun(runId, workflow.Condition(), workflow.FinishedAt(), workflow.ToStringForStore()) - } - - // Get the experiment resource reference for job. - experimentRef, err := r.resourceReferenceStore.GetResourceReference(jobId, common.Job, common.Experiment) - if err != nil { - return util.Wrap(err, "Failed to retrieve the experiment ID for the job that created the run.") - } - runDetail := &model.RunDetail{ - Run: model.Run{ - UUID: runId, - DisplayName: workflow.Name, - Name: workflow.Name, - StorageState: api.Run_STORAGESTATE_AVAILABLE.String(), - Namespace: workflow.Namespace, - CreatedAtInSec: workflow.CreationTimestamp.Unix(), - ScheduledAtInSec: workflow.ScheduledAtInSecOr0(), - FinishedAtInSec: workflow.FinishedAt(), - Conditions: workflow.Condition(), - PipelineSpec: model.PipelineSpec{ - WorkflowSpecManifest: workflow.GetWorkflowSpec().ToStringForStore(), - }, - ResourceReferences: []*model.ResourceReference{ - { - ResourceUUID: runId, - ResourceType: common.Run, - ReferenceUUID: jobId, - ReferenceType: common.Job, - Relationship: common.Creator, + err := r.runStore.UpdateRun(runId, workflow.Condition(), workflow.FinishedAt(), workflow.ToStringForStore()) + if err != nil { + return util.Wrap(err, "Failed to update the run.") + } + } else { + // Get the experiment resource reference for job. + experimentRef, err := r.resourceReferenceStore.GetResourceReference(jobId, common.Job, common.Experiment) + if err != nil { + return util.Wrap(err, "Failed to retrieve the experiment ID for the job that created the run.") + } + runDetail := &model.RunDetail{ + Run: model.Run{ + UUID: runId, + DisplayName: workflow.Name, + Name: workflow.Name, + StorageState: api.Run_STORAGESTATE_AVAILABLE.String(), + Namespace: workflow.Namespace, + CreatedAtInSec: workflow.CreationTimestamp.Unix(), + ScheduledAtInSec: workflow.ScheduledAtInSecOr0(), + FinishedAtInSec: workflow.FinishedAt(), + Conditions: workflow.Condition(), + PipelineSpec: model.PipelineSpec{ + WorkflowSpecManifest: workflow.GetWorkflowSpec().ToStringForStore(), }, - { - ResourceUUID: runId, - ResourceType: common.Run, - ReferenceUUID: experimentRef.ReferenceUUID, - ReferenceType: common.Experiment, - Relationship: common.Owner, + ResourceReferences: []*model.ResourceReference{ + { + ResourceUUID: runId, + ResourceType: common.Run, + ReferenceUUID: jobId, + ReferenceType: common.Job, + Relationship: common.Creator, + }, + { + ResourceUUID: runId, + ResourceType: common.Run, + ReferenceUUID: experimentRef.ReferenceUUID, + ReferenceType: common.Experiment, + Relationship: common.Owner, + }, }, }, - }, - PipelineRuntime: model.PipelineRuntime{ - WorkflowRuntimeManifest: workflow.ToStringForStore(), + PipelineRuntime: model.PipelineRuntime{ + WorkflowRuntimeManifest: workflow.ToStringForStore(), + }, + } + err = r.runStore.CreateOrUpdateRun(runDetail) + if err != nil { + return util.Wrap(err, "Failed to create or update the run.") + } + } + + if workflow.IsInFinalState() { + err := AddWorkflowLabel(r.workflowClient, workflow.Name, util.LabelKeyWorkflowPersistedFinalState, "true") + if err != nil { + return util.Wrap(err, "Failed to add PersistedFinalState label to workflow") + } + } + + return nil +} + +// AddWorkflowLabel add label for a workflow +func AddWorkflowLabel(wfClient workflowclient.WorkflowInterface, name string, labelKey string, labelValue string) error { + patchObj := map[string]interface{}{ + "metadata": map[string]interface{}{ + "labels": map[string]interface{}{ + labelKey: labelValue, + }, }, } - return r.runStore.CreateOrUpdateRun(runDetail) + + patch, err := json.Marshal(patchObj) + if err != nil { + return util.NewInternalServerError(err, "Unexpected error while marshalling a patch object.") + } + + var operation = func() error { + _, err = wfClient.Patch(name, types.MergePatchType, patch) + return err + } + var backoffPolicy = backoff.WithMaxRetries(backoff.NewConstantBackOff(100), 10) + err = backoff.Retry(operation, backoffPolicy) + return err } func (r *ResourceManager) ReportScheduledWorkflowResource(swf *util.ScheduledWorkflow) error { diff --git a/backend/src/apiserver/resource/resource_manager_test.go b/backend/src/apiserver/resource/resource_manager_test.go index e2ee5e40dc6..f89561564c8 100644 --- a/backend/src/apiserver/resource/resource_manager_test.go +++ b/backend/src/apiserver/resource/resource_manager_test.go @@ -898,7 +898,8 @@ func TestReportWorkflowResource_ScheduledWorkflowIDEmpty_Success(t *testing.T) { // report workflow workflow := util.NewWorkflow(&v1alpha1.Workflow{ ObjectMeta: v1.ObjectMeta{ - UID: types.UID(run.UUID), + UID: types.UID(run.UUID), + Labels: map[string]string{util.LabelKeyWorkflowRunId: run.UUID}, }, Status: v1alpha1.WorkflowStatus{Phase: v1alpha1.NodeRunning}, }) @@ -1063,6 +1064,60 @@ func TestReportWorkflowResource_ScheduledWorkflowIDNotEmpty_NoExperiment_Success assert.Equal(t, expectedRunDetail, runDetail) } +func TestReportWorkflowResource_WorkflowCompleted(t *testing.T) { + store, manager, run := initWithOneTimeRun(t) + defer store.Close() + // report workflow + workflow := util.NewWorkflow(&v1alpha1.Workflow{ + ObjectMeta: v1.ObjectMeta{ + Name: run.Name, + UID: types.UID(run.UUID), + Labels: map[string]string{util.LabelKeyWorkflowRunId: run.UUID}, + }, + Status: v1alpha1.WorkflowStatus{Phase: v1alpha1.NodeFailed}, + }) + err := manager.ReportWorkflowResource(workflow) + assert.Nil(t, err) + + wf, err := store.workflowClientFake.Get(run.Run.Name, v1.GetOptions{}) + assert.Nil(t, err) + assert.Equal(t, wf.Labels[util.LabelKeyWorkflowPersistedFinalState], "true") +} + +func TestReportWorkflowResource_WorkflowCompleted_FinalStatePersisted(t *testing.T) { + store, manager, run := initWithOneTimeRun(t) + defer store.Close() + // report workflow + workflow := util.NewWorkflow(&v1alpha1.Workflow{ + ObjectMeta: v1.ObjectMeta{ + Name: run.Name, + UID: types.UID(run.UUID), + Labels: map[string]string{util.LabelKeyWorkflowRunId: run.UUID, util.LabelKeyWorkflowPersistedFinalState: "true"}, + }, + Status: v1alpha1.WorkflowStatus{Phase: v1alpha1.NodeFailed}, + }) + err := manager.ReportWorkflowResource(workflow) + assert.Nil(t, err) +} + +func TestReportWorkflowResource_WorkflowCompleted_FinalStatePersisted_DeleteFailed(t *testing.T) { + store, manager, run := initWithOneTimeRun(t) + manager.workflowClient = &FakeBadWorkflowClient{} + defer store.Close() + // report workflow + workflow := util.NewWorkflow(&v1alpha1.Workflow{ + ObjectMeta: v1.ObjectMeta{ + Name: run.Name, + UID: types.UID(run.UUID), + Labels: map[string]string{util.LabelKeyWorkflowRunId: run.UUID, util.LabelKeyWorkflowPersistedFinalState: "true"}, + }, + Status: v1alpha1.WorkflowStatus{Phase: v1alpha1.NodeFailed}, + }) + err := manager.ReportWorkflowResource(workflow) + assert.NotNil(t, err) + assert.Contains(t, err.Error(), "failed to delete workflow") +} + func TestReportScheduledWorkflowResource_Success(t *testing.T) { store, manager, job := initWithJob(t) defer store.Close() @@ -1278,6 +1333,7 @@ func TestReadArtifact_WorkflowNoStatus_NotFound(t *testing.T) { Name: "MY_NAME", Namespace: "MY_NAMESPACE", UID: "run-1", + Labels: map[string]string{util.LabelKeyWorkflowRunId: "run-1"}, CreationTimestamp: v1.NewTime(time.Unix(11, 0).UTC()), OwnerReferences: []v1.OwnerReference{{ APIVersion: "kubeflow.org/v1beta1", diff --git a/backend/src/apiserver/resource/resource_manager_util.go b/backend/src/apiserver/resource/resource_manager_util.go index 924b5b6f8bf..da87816efac 100644 --- a/backend/src/apiserver/resource/resource_manager_util.go +++ b/backend/src/apiserver/resource/resource_manager_util.go @@ -126,6 +126,8 @@ func formulateRetryWorkflow(wf *util.Workflow) (*util.Workflow, []string, error) newWF := wf.DeepCopy() // Delete/reset fields which indicate workflow completed delete(newWF.Labels, common.LabelKeyCompleted) + // Delete/reset fields which indicate workflow is finished being persisted to the database + delete(newWF.Labels, util.LabelKeyWorkflowPersistedFinalState) newWF.ObjectMeta.Labels[common.LabelKeyPhase] = string(wfv1.NodeRunning) newWF.Status.Phase = wfv1.NodeRunning newWF.Status.Message = "" diff --git a/backend/src/apiserver/resource/workflow_fake.go b/backend/src/apiserver/resource/workflow_fake.go index 3c47b1f5ee8..257a5bcc546 100644 --- a/backend/src/apiserver/resource/workflow_fake.go +++ b/backend/src/apiserver/resource/workflow_fake.go @@ -16,6 +16,7 @@ package resource import ( "encoding/json" + "github.com/kubeflow/pipelines/backend/src/common/util" "strconv" "github.com/argoproj/argo/pkg/apis/workflow/v1alpha1" @@ -83,38 +84,49 @@ func (c *FakeWorkflowClient) Update(workflow *v1alpha1.Workflow) (*v1alpha1.Work } func (c *FakeWorkflowClient) Delete(name string, options *v1.DeleteOptions) error { - glog.Error("This fake method is not yet implemented.") return nil } func (c *FakeWorkflowClient) DeleteCollection(options *v1.DeleteOptions, - listOptions v1.ListOptions) error { + listOptions v1.ListOptions) error { glog.Error("This fake method is not yet implemented.") return nil } func (c *FakeWorkflowClient) Patch(name string, pt types.PatchType, data []byte, - subresources ...string) (*v1alpha1.Workflow, error) { + subresources ...string) (*v1alpha1.Workflow, error) { var dat map[string]interface{} json.Unmarshal(data, &dat) // TODO: Should we actually assert the type here, or just panic if it's wrong? - spec := dat["spec"].(map[string]interface{}) - activeDeadlineSeconds := spec["activeDeadlineSeconds"].(float64) + if _, ok := dat["spec"]; ok { + spec := dat["spec"].(map[string]interface{}) + activeDeadlineSeconds := spec["activeDeadlineSeconds"].(float64) + + // Simulate terminating a workflow + if pt == types.MergePatchType && activeDeadlineSeconds == 0 { + workflow, ok := c.workflows[name] + if ok { + newActiveDeadlineSeconds := int64(0) + workflow.Spec.ActiveDeadlineSeconds = &newActiveDeadlineSeconds + return workflow, nil + } + } + } - // Simulate terminating a workflow - if pt == types.MergePatchType && activeDeadlineSeconds == 0 { + if _, ok := dat["metadata"]; ok { workflow, ok := c.workflows[name] if ok { - newActiveDeadlineSeconds := int64(0) - workflow.Spec.ActiveDeadlineSeconds = &newActiveDeadlineSeconds + if workflow.Labels == nil { + workflow.Labels = map[string]string{} + } + workflow.Labels[util.LabelKeyWorkflowPersistedFinalState] = "true" return workflow, nil } } - - return nil, errors.New("Failed to patch worfklow") + return nil, errors.New("Failed to patch workflow") } func (c *FakeWorkflowClient) isTerminated(name string) (bool, error) { @@ -142,6 +154,11 @@ func (FakeBadWorkflowClient) Create(*v1alpha1.Workflow) (*v1alpha1.Workflow, err func (FakeBadWorkflowClient) Get(name string, options v1.GetOptions) (*v1alpha1.Workflow, error) { return nil, errors.New("some error") } + func (c *FakeBadWorkflowClient) Update(workflow *v1alpha1.Workflow) (*v1alpha1.Workflow, error) { return nil, errors.New("failed to update workflow") } + +func (c *FakeBadWorkflowClient) Delete(name string, options *v1.DeleteOptions) error { + return errors.New("failed to delete workflow") +} diff --git a/backend/src/apiserver/server/report_server_test.go b/backend/src/apiserver/server/report_server_test.go index ae5029e4195..2faa6b80867 100644 --- a/backend/src/apiserver/server/report_server_test.go +++ b/backend/src/apiserver/server/report_server_test.go @@ -25,6 +25,7 @@ func TestReportWorkflow(t *testing.T) { Name: "run1", Namespace: "default", UID: types.UID(run.UUID), + Labels: map[string]string{util.LabelKeyWorkflowRunId: run.UUID}, }, Spec: v1alpha1.WorkflowSpec{ Arguments: v1alpha1.Arguments{ diff --git a/backend/src/common/util/consts.go b/backend/src/common/util/consts.go index a3ec99cce2f..e51dc4fa94b 100644 --- a/backend/src/common/util/consts.go +++ b/backend/src/common/util/consts.go @@ -39,4 +39,8 @@ const ( // LabelKeyWorkflowScheduledWorkflowName is a label on a Workflow. // It captures whether the name of the owning ScheduledWorkflow. LabelKeyWorkflowScheduledWorkflowName = constants.FullName + "/scheduledWorkflowName" + + + LabelKeyWorkflowRunId = "pipeline/runid" + LabelKeyWorkflowPersistedFinalState = "pipeline/persistedFinalState" ) diff --git a/backend/src/common/util/workflow.go b/backend/src/common/util/workflow.go index 7edb048adf9..63d62f1bf92 100644 --- a/backend/src/common/util/workflow.go +++ b/backend/src/common/util/workflow.go @@ -25,10 +25,6 @@ import ( "strings" ) -const ( - LabelKeyWorkflowRunId = "pipeline/runid" -) - // Workflow is a type to help manipulate Workflow objects. type Workflow struct { *workflowapi.Workflow @@ -210,21 +206,21 @@ func (w *Workflow) ReplaceUID(id string) error { return NewInternalServerError(err, "Failed to unmarshal workflow spec manifest. Workflow: %s", w.ToStringForStore()) } - w.Workflow = workflow - return nil - } + w.Workflow = workflow + return nil +} - func (w *Workflow) SetCannonicalLabels(name string, nextScheduledEpoch int64, index int64) { - w.SetLabels(LabelKeyWorkflowScheduledWorkflowName, name) - w.SetLabels(LabelKeyWorkflowEpoch, FormatInt64ForLabel(nextScheduledEpoch)) - w.SetLabels(LabelKeyWorkflowIndex, FormatInt64ForLabel(index)) - w.SetLabels(LabelKeyWorkflowIsOwnedByScheduledWorkflow, "true") - } +func (w *Workflow) SetCannonicalLabels(name string, nextScheduledEpoch int64, index int64) { + w.SetLabels(LabelKeyWorkflowScheduledWorkflowName, name) + w.SetLabels(LabelKeyWorkflowEpoch, FormatInt64ForLabel(nextScheduledEpoch)) + w.SetLabels(LabelKeyWorkflowIndex, FormatInt64ForLabel(index)) + w.SetLabels(LabelKeyWorkflowIsOwnedByScheduledWorkflow, "true") +} - // FindObjectStoreArtifactKeyOrEmpty loops through all node running statuses and look up the first - // S3 artifact with the specified nodeID and artifactName. Returns empty if nothing is found. - func (w *Workflow) FindObjectStoreArtifactKeyOrEmpty(nodeID string, artifactName string) string { - if w.Status.Nodes == nil { +// FindObjectStoreArtifactKeyOrEmpty loops through all node running statuses and look up the first +// S3 artifact with the specified nodeID and artifactName. Returns empty if nothing is found. +func (w *Workflow) FindObjectStoreArtifactKeyOrEmpty(nodeID string, artifactName string) string { + if w.Status.Nodes == nil { return "" } node, found := w.Status.Nodes[nodeID] @@ -243,3 +239,20 @@ func (w *Workflow) ReplaceUID(id string) error { } return s3Key } + +// IsInFinalState whether the workflow is in a final state. +func (w *Workflow) IsInFinalState() bool { + if w.Status.Phase == workflowapi.NodeSucceeded || w.Status.Phase == workflowapi.NodeFailed { + return true + } + return false +} + +// PersistedFinalState whether the workflow final state has being persisted. +func (w *Workflow) PersistedFinalState() bool { + if _, ok :=w.GetLabels()[LabelKeyWorkflowPersistedFinalState]; ok { + // If the label exist, workflow final state has being persisted. + return true + } + return false +} \ No newline at end of file