Skip to content

Commit

Permalink
Garbage collect the completed workflow after persisted to database (#…
Browse files Browse the repository at this point in the history
…1802)

* garbage collect the completed workflow

* add tests

* fix tests

* fix tests

* update e2e test

* update logic

* update logic

* update logic

* fix tests

* fix tests

* update
  • Loading branch information
IronPan authored Aug 13, 2019
1 parent 1d8d451 commit 6189681
Show file tree
Hide file tree
Showing 13 changed files with 330 additions and 123 deletions.
5 changes: 4 additions & 1 deletion backend/Dockerfile.persistenceagent
Original file line number Diff line number Diff line change
Expand Up @@ -17,4 +17,7 @@ COPY --from=builder /go/src/github.com/kubeflow/pipelines/third_party/license.tx

ENV NAMESPACE ""

CMD persistence_agent --logtostderr=true --namespace=${NAMESPACE}
# Set Workflow TTL to 7 days
ENV TTL_SECONDS_AFTER_WORKFLOW_FINISH 604800

CMD persistence_agent --logtostderr=true --namespace=${NAMESPACE} --ttlSecondsAfterWorkflowFinish=${TTL_SECONDS_AFTER_WORKFLOW_FINISH}
41 changes: 22 additions & 19 deletions backend/src/agent/persistence/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -31,28 +31,30 @@ import (
)

var (
masterURL string
kubeconfig string
initializeTimeout time.Duration
timeout time.Duration
mlPipelineAPIServerName string
mlPipelineAPIServerPort string
mlPipelineAPIServerBasePath string
mlPipelineServiceHttpPort string
mlPipelineServiceGRPCPort string
namespace string
masterURL string
kubeconfig string
initializeTimeout time.Duration
timeout time.Duration
mlPipelineAPIServerName string
mlPipelineAPIServerPort string
mlPipelineAPIServerBasePath string
mlPipelineServiceHttpPort string
mlPipelineServiceGRPCPort string
namespace string
ttlSecondsAfterWorkflowFinish int64
)

const (
kubeconfigFlagName = "kubeconfig"
masterFlagName = "master"
initializationTimeoutFlagName = "initializeTimeout"
timeoutFlagName = "timeout"
mlPipelineAPIServerBasePathFlagName = "mlPipelineAPIServerBasePath"
mlPipelineAPIServerNameFlagName = "mlPipelineAPIServerName"
mlPipelineAPIServerHttpPortFlagName = "mlPipelineServiceHttpPort"
mlPipelineAPIServerGRPCPortFlagName = "mlPipelineServiceGRPCPort"
namespaceFlagName = "namespace"
kubeconfigFlagName = "kubeconfig"
masterFlagName = "master"
initializationTimeoutFlagName = "initializeTimeout"
timeoutFlagName = "timeout"
mlPipelineAPIServerBasePathFlagName = "mlPipelineAPIServerBasePath"
mlPipelineAPIServerNameFlagName = "mlPipelineAPIServerName"
mlPipelineAPIServerHttpPortFlagName = "mlPipelineServiceHttpPort"
mlPipelineAPIServerGRPCPortFlagName = "mlPipelineServiceGRPCPort"
namespaceFlagName = "namespace"
ttlSecondsAfterWorkflowFinishFlagName = "ttlSecondsAfterWorkflowFinish"
)

func main() {
Expand Down Expand Up @@ -122,4 +124,5 @@ func init() {
flag.StringVar(&mlPipelineAPIServerBasePath, mlPipelineAPIServerBasePathFlagName,
"/apis/v1beta1", "The base path for the ML pipeline API server.")
flag.StringVar(&namespace, namespaceFlagName, "", "The namespace name used for Kubernetes informers to obtain the listers.")
flag.Int64Var(&ttlSecondsAfterWorkflowFinish, ttlSecondsAfterWorkflowFinishFlagName, 604800 /* 7 days */, "The TTL for Argo workflow to persist after workflow finish.")
}
10 changes: 5 additions & 5 deletions backend/src/agent/persistence/persistence_agent.go
Original file line number Diff line number Diff line change
Expand Up @@ -44,10 +44,10 @@ type PersistenceAgent struct {

// NewPersistenceAgent returns a new persistence agent.
func NewPersistenceAgent(
swfInformerFactory swfinformers.SharedInformerFactory,
workflowInformerFactory workflowinformers.SharedInformerFactory,
pipelineClient *client.PipelineClient,
time util.TimeInterface) *PersistenceAgent {
swfInformerFactory swfinformers.SharedInformerFactory,
workflowInformerFactory workflowinformers.SharedInformerFactory,
pipelineClient *client.PipelineClient,
time util.TimeInterface) *PersistenceAgent {
// obtain references to shared informers
swfInformer := swfInformerFactory.Scheduledworkflow().V1beta1().ScheduledWorkflows()
workflowInformer := workflowInformerFactory.Argoproj().V1alpha1().Workflows()
Expand All @@ -64,7 +64,7 @@ func NewPersistenceAgent(

workflowWorker := worker.NewPersistenceWorker(time, workflowregister.Kind,
workflowInformer.Informer(), true,
worker.NewWorkflowSaver(workflowClient, pipelineClient))
worker.NewWorkflowSaver(workflowClient, pipelineClient, ttlSecondsAfterWorkflowFinish))

agent := &PersistenceAgent{
swfClient: swfClient,
Expand Down
10 changes: 5 additions & 5 deletions backend/src/agent/persistence/worker/persistence_worker_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -54,7 +54,7 @@ func TestPersistenceWorker_Success(t *testing.T) {
pipelineClient := client.NewPipelineClientFake()

// Set up peristence worker
saver := NewWorkflowSaver(workflowClient, pipelineClient)
saver := NewWorkflowSaver(workflowClient, pipelineClient, 100)
eventHandler := NewFakeEventHandler()
worker := NewPersistenceWorker(
util.NewFakeTimeForEpoch(),
Expand Down Expand Up @@ -84,7 +84,7 @@ func TestPersistenceWorker_NotFoundError(t *testing.T) {
pipelineClient := client.NewPipelineClientFake()

// Set up peristence worker
saver := NewWorkflowSaver(workflowClient, pipelineClient)
saver := NewWorkflowSaver(workflowClient, pipelineClient, 100)
eventHandler := NewFakeEventHandler()
worker := NewPersistenceWorker(
util.NewFakeTimeForEpoch(),
Expand Down Expand Up @@ -115,7 +115,7 @@ func TestPersistenceWorker_GetWorklowError(t *testing.T) {
pipelineClient := client.NewPipelineClientFake()

// Set up peristence worker
saver := NewWorkflowSaver(workflowClient, pipelineClient)
saver := NewWorkflowSaver(workflowClient, pipelineClient, 100)
eventHandler := NewFakeEventHandler()
worker := NewPersistenceWorker(
util.NewFakeTimeForEpoch(),
Expand Down Expand Up @@ -148,7 +148,7 @@ func TestPersistenceWorker_ReportWorkflowRetryableError(t *testing.T) {
"My Retriable Error"))

// Set up peristence worker
saver := NewWorkflowSaver(workflowClient, pipelineClient)
saver := NewWorkflowSaver(workflowClient, pipelineClient, 100)
eventHandler := NewFakeEventHandler()
worker := NewPersistenceWorker(
util.NewFakeTimeForEpoch(),
Expand Down Expand Up @@ -181,7 +181,7 @@ func TestPersistenceWorker_ReportWorkflowNonRetryableError(t *testing.T) {
"My Permanent Error"))

// Set up peristence worker
saver := NewWorkflowSaver(workflowClient, pipelineClient)
saver := NewWorkflowSaver(workflowClient, pipelineClient, 100)
eventHandler := NewFakeEventHandler()
worker := NewPersistenceWorker(
util.NewFakeTimeForEpoch(),
Expand Down
24 changes: 16 additions & 8 deletions backend/src/agent/persistence/worker/workflow_saver.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,21 +19,24 @@ import (
"github.com/kubeflow/pipelines/backend/src/common/util"
log "github.com/sirupsen/logrus"
_ "k8s.io/client-go/plugin/pkg/client/auth/gcp"
"time"
)

// WorkflowSaver provides a function to persist a workflow to a database.
type WorkflowSaver struct {
client client.WorkflowClientInterface
pipelineClient client.PipelineClientInterface
metricsReporter *MetricsReporter
client client.WorkflowClientInterface
pipelineClient client.PipelineClientInterface
metricsReporter *MetricsReporter
ttlSecondsAfterWorkflowFinish int64
}

func NewWorkflowSaver(client client.WorkflowClientInterface,
pipelineClient client.PipelineClientInterface) *WorkflowSaver {
pipelineClient client.PipelineClientInterface, ttlSecondsAfterWorkflowFinish int64) *WorkflowSaver {
return &WorkflowSaver{
client: client,
pipelineClient: pipelineClient,
metricsReporter: NewMetricsReporter(pipelineClient),
client: client,
pipelineClient: pipelineClient,
metricsReporter: NewMetricsReporter(pipelineClient),
ttlSecondsAfterWorkflowFinish: ttlSecondsAfterWorkflowFinish,
}
}

Expand All @@ -53,7 +56,12 @@ func (s *WorkflowSaver) Save(key string, namespace string, name string, nowEpoch
"Workflow (%s): transient failure: %v", key, err)

}

if wf.PersistedFinalState() && time.Now().Unix()-wf.FinishedAt() < s.ttlSecondsAfterWorkflowFinish {
// Skip persisting the workflow if the workflow is finished
// and the workflow hasn't being passing the TTL
log.Infof("Skip syncing Workflow (%v): workflow marked as persisted.", name)
return nil
}
// Save this Workflow to the database.
err = s.pipelineClient.ReportWorkflow(wf)
retry := util.HasCustomCode(err, util.CUSTOM_CODE_TRANSIENT)
Expand Down
83 changes: 68 additions & 15 deletions backend/src/agent/persistence/worker/workflow_saver_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@ package worker
import (
"fmt"
"testing"
"time"

workflowapi "github.com/argoproj/argo/pkg/apis/workflow/v1alpha1"
"github.com/kubeflow/pipelines/backend/src/agent/persistence/client"
Expand All @@ -39,9 +40,7 @@ func TestWorkflow_Save_Success(t *testing.T) {

workflowFake.Put("MY_NAMESPACE", "MY_NAME", workflow)

saver := NewWorkflowSaver(
workflowFake,
pipelineFake)
saver := NewWorkflowSaver(workflowFake, pipelineFake, 100)

err := saver.Save("MY_KEY", "MY_NAMESPACE", "MY_NAME", 20)

Expand All @@ -53,9 +52,7 @@ func TestWorkflow_Save_NotFoundDuringGet(t *testing.T) {
workflowFake := client.NewWorkflowClientFake()
pipelineFake := client.NewPipelineClientFake()

saver := NewWorkflowSaver(
workflowFake,
pipelineFake)
saver := NewWorkflowSaver(workflowFake, pipelineFake, 100)

err := saver.Save("MY_KEY", "MY_NAMESPACE", "MY_NAME", 20)

Expand All @@ -70,9 +67,7 @@ func TestWorkflow_Save_ErrorDuringGet(t *testing.T) {

workflowFake.Put("MY_NAMESPACE", "MY_NAME", nil)

saver := NewWorkflowSaver(
workflowFake,
pipelineFake)
saver := NewWorkflowSaver(workflowFake, pipelineFake, 100)

err := saver.Save("MY_KEY", "MY_NAMESPACE", "MY_NAME", 20)

Expand All @@ -97,9 +92,7 @@ func TestWorkflow_Save_PermanentFailureWhileReporting(t *testing.T) {

workflowFake.Put("MY_NAMESPACE", "MY_NAME", workflow)

saver := NewWorkflowSaver(
workflowFake,
pipelineFake)
saver := NewWorkflowSaver(workflowFake, pipelineFake, 100)

err := saver.Save("MY_KEY", "MY_NAMESPACE", "MY_NAME", 20)

Expand All @@ -124,13 +117,73 @@ func TestWorkflow_Save_TransientFailureWhileReporting(t *testing.T) {

workflowFake.Put("MY_NAMESPACE", "MY_NAME", workflow)

saver := NewWorkflowSaver(
workflowFake,
pipelineFake)
saver := NewWorkflowSaver(workflowFake, pipelineFake, 100)

err := saver.Save("MY_KEY", "MY_NAMESPACE", "MY_NAME", 20)

assert.Equal(t, true, util.HasCustomCode(err, util.CUSTOM_CODE_TRANSIENT))
assert.NotNil(t, err)
assert.Contains(t, err.Error(), "transient failure")
}

func TestWorkflow_Save_SkippedDueToFinalStatue(t *testing.T) {
workflowFake := client.NewWorkflowClientFake()
pipelineFake := client.NewPipelineClientFake()

// Add this will result in failure unless reporting is skipped
pipelineFake.SetError(util.NewCustomError(fmt.Errorf("Error"), util.CUSTOM_CODE_PERMANENT,
"My Permanent Error"))

workflow := util.NewWorkflow(&workflowapi.Workflow{
ObjectMeta: metav1.ObjectMeta{
Namespace: "MY_NAMESPACE",
Name: "MY_NAME",
Labels: map[string]string{util.LabelKeyWorkflowPersistedFinalState: "true"},
},
Status: workflowapi.WorkflowStatus{
FinishedAt: metav1.Now(),
},
})

workflowFake.Put("MY_NAMESPACE", "MY_NAME", workflow)

saver := NewWorkflowSaver(workflowFake, pipelineFake, 100)

err := saver.Save("MY_KEY", "MY_NAMESPACE", "MY_NAME", 20)

assert.Equal(t, false, util.HasCustomCode(err, util.CUSTOM_CODE_TRANSIENT))
assert.Equal(t, nil, err)
}

func TestWorkflow_Save_FinalStatueNotSkippedDueToExceedTTL(t *testing.T) {
workflowFake := client.NewWorkflowClientFake()
pipelineFake := client.NewPipelineClientFake()

// Add this will result in failure unless reporting is skipped
pipelineFake.SetError(util.NewCustomError(fmt.Errorf("Error"), util.CUSTOM_CODE_PERMANENT,
"My Permanent Error"))

workflow := util.NewWorkflow(&workflowapi.Workflow{
ObjectMeta: metav1.ObjectMeta{
Namespace: "MY_NAMESPACE",
Name: "MY_NAME",
Labels: map[string]string{util.LabelKeyWorkflowPersistedFinalState: "true"},
},
Status: workflowapi.WorkflowStatus{
FinishedAt: metav1.Now(),
},
})

workflowFake.Put("MY_NAMESPACE", "MY_NAME", workflow)

saver := NewWorkflowSaver(workflowFake, pipelineFake, 1)

// Sleep 2 seconds to make sure workflow passed TTL
time.Sleep(2 * time.Second)

err := saver.Save("MY_KEY", "MY_NAMESPACE", "MY_NAME", 20)

assert.Equal(t, false, util.HasCustomCode(err, util.CUSTOM_CODE_TRANSIENT))
assert.NotNil(t, err)
assert.Contains(t, err.Error(), "permanent failure")
}
Loading

0 comments on commit 6189681

Please sign in to comment.