Skip to content

Commit

Permalink
Use cache to retrieve WorkflowTemplates (argoproj#1534)
Browse files Browse the repository at this point in the history
  • Loading branch information
dtaniwaki authored and Duske committed Aug 15, 2019
1 parent df99713 commit 53642f0
Show file tree
Hide file tree
Showing 6 changed files with 78 additions and 35 deletions.
32 changes: 20 additions & 12 deletions workflow/controller/controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,8 @@ import (
"time"

wfv1 "github.com/argoproj/argo/pkg/apis/workflow/v1alpha1"
wfextv "github.com/argoproj/argo/pkg/client/informers/externalversions"
wfextvv1alpha1 "github.com/argoproj/argo/pkg/client/informers/externalversions/workflow/v1alpha1"
log "github.com/sirupsen/logrus"
apiv1 "k8s.io/api/core/v1"
apierr "k8s.io/apimachinery/pkg/api/errors"
Expand Down Expand Up @@ -54,20 +56,22 @@ type WorkflowController struct {
wfclientset wfclientset.Interface

// datastructures to support the processing of workflows and workflow pods
wfInformer cache.SharedIndexInformer
podInformer cache.SharedIndexInformer
wfQueue workqueue.RateLimitingInterface
podQueue workqueue.RateLimitingInterface
completedPods chan string
gcPods chan string // pods to be deleted depend on GC strategy
throttler Throttler
wfDBctx sqldb.DBRepository
wfInformer cache.SharedIndexInformer
wftmplInformer wfextvv1alpha1.WorkflowTemplateInformer
podInformer cache.SharedIndexInformer
wfQueue workqueue.RateLimitingInterface
podQueue workqueue.RateLimitingInterface
completedPods chan string
gcPods chan string // pods to be deleted depend on GC strategy
throttler Throttler
wfDBctx sqldb.DBRepository
}

const (
workflowResyncPeriod = 20 * time.Minute
workflowMetricsResyncPeriod = 1 * time.Minute
podResyncPeriod = 30 * time.Minute
workflowResyncPeriod = 20 * time.Minute
workflowTemplateResyncPeriod = 20 * time.Minute
workflowMetricsResyncPeriod = 1 * time.Minute
podResyncPeriod = 30 * time.Minute
)

// NewWorkflowController instantiates a new WorkflowController
Expand Down Expand Up @@ -145,16 +149,20 @@ func (wfc *WorkflowController) Run(ctx context.Context, wfWorkers, podWorkers in

wfc.wfInformer = util.NewWorkflowInformer(wfc.restConfig, wfc.Config.Namespace, workflowResyncPeriod, wfc.tweakWorkflowlist)

informerFactory := wfextv.NewSharedInformerFactory(wfc.wfclientset, workflowTemplateResyncPeriod)
wfc.wftmplInformer = informerFactory.Argoproj().V1alpha1().WorkflowTemplates()

wfc.addWorkflowInformerHandler()
wfc.podInformer = wfc.newPodInformer()

go wfc.wfInformer.Run(ctx.Done())
go wfc.wftmplInformer.Informer().Run(ctx.Done())
go wfc.podInformer.Run(ctx.Done())
go wfc.podLabeler(ctx.Done())
go wfc.podGarbageCollector(ctx.Done())

// Wait for all involved caches to be synced, before processing items from the queue is started
for _, informer := range []cache.SharedIndexInformer{wfc.wfInformer, wfc.podInformer} {
for _, informer := range []cache.SharedIndexInformer{wfc.wfInformer, wfc.wftmplInformer.Informer(), wfc.podInformer} {
if !cache.WaitForCacheSync(ctx.Done(), informer.HasSynced) {
log.Error("Timed out waiting for caches to sync")
return
Expand Down
19 changes: 16 additions & 3 deletions workflow/controller/controller_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,20 +2,24 @@ package controller

import (
"bytes"
"context"
"encoding/json"
"io"
"io/ioutil"
"testing"
"time"

"github.com/ghodss/yaml"
"github.com/stretchr/testify/assert"
apiv1 "k8s.io/api/core/v1"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/client-go/kubernetes"
"k8s.io/client-go/kubernetes/fake"
"k8s.io/client-go/tools/cache"

wfv1 "github.com/argoproj/argo/pkg/apis/workflow/v1alpha1"
fakewfclientset "github.com/argoproj/argo/pkg/client/clientset/versioned/fake"
wfextv "github.com/argoproj/argo/pkg/client/informers/externalversions"
"github.com/argoproj/argo/workflow/config"
)

Expand All @@ -42,13 +46,22 @@ spec:
`

func newController() *WorkflowController {
wfclientset := fakewfclientset.NewSimpleClientset()
informerFactory := wfextv.NewSharedInformerFactory(wfclientset, 10*time.Minute)
wftmplInformer := informerFactory.Argoproj().V1alpha1().WorkflowTemplates()
ctx := context.Background()
go wftmplInformer.Informer().Run(ctx.Done())
if !cache.WaitForCacheSync(ctx.Done(), wftmplInformer.Informer().HasSynced) {
panic("Timed out waiting for caches to sync")
}
return &WorkflowController{
Config: config.WorkflowControllerConfig{
ExecutorImage: "executor:latest",
},
kubeclientset: fake.NewSimpleClientset(),
wfclientset: fakewfclientset.NewSimpleClientset(),
completedPods: make(chan string, 512),
kubeclientset: fake.NewSimpleClientset(),
wfclientset: wfclientset,
completedPods: make(chan string, 512),
wftmplInformer: wftmplInformer,
}
}

Expand Down
2 changes: 1 addition & 1 deletion workflow/controller/operator.go
Original file line number Diff line number Diff line change
Expand Up @@ -114,7 +114,7 @@ func newWorkflowOperationCtx(wf *wfv1.Workflow, wfc *WorkflowController) *wfOper
completedPods: make(map[string]bool),
succeededPods: make(map[string]bool),
deadline: time.Now().UTC().Add(maxOperationTime),
tmplCtx: templateresolution.NewContext(wfc.wfclientset, wf.Namespace, wf),
tmplCtx: templateresolution.NewContext(wfc.wftmplInformer.Lister().WorkflowTemplates(wf.Namespace), wf),
}

if woc.wf.Status.Nodes == nil {
Expand Down
45 changes: 33 additions & 12 deletions workflow/templateresolution/context.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@ package templateresolution
import (
"github.com/argoproj/argo/errors"
wfv1 "github.com/argoproj/argo/pkg/apis/workflow/v1alpha1"
wfclientset "github.com/argoproj/argo/pkg/client/clientset/versioned"
typed "github.com/argoproj/argo/pkg/client/clientset/versioned/typed/workflow/v1alpha1"
"github.com/argoproj/argo/workflow/common"
log "github.com/sirupsen/logrus"
apierr "k8s.io/apimachinery/pkg/api/errors"
Expand All @@ -13,22 +13,43 @@ import (
// maxResolveDepth is the limit of template reference resolution.
const maxResolveDepth int = 10

// workflowTemplateInterfaceWrapper is an internal struct to wrap clientset.
type workflowTemplateInterfaceWrapper struct {
clientset typed.WorkflowTemplateInterface
}

// Get retrieves the WorkflowTemplate of a given name.
func (wrapper *workflowTemplateInterfaceWrapper) Get(name string) (*wfv1.WorkflowTemplate, error) {
return wrapper.clientset.Get(name, metav1.GetOptions{})
}

// WorkflowTemplateNamespaceLister helps get WorkflowTemplates.
type WorkflowTemplateNamespacedGetter interface {
// Get retrieves the WorkflowTemplate from the indexer for a given name.
Get(name string) (*wfv1.WorkflowTemplate, error)
}

// Context is a context of template search.
type Context struct {
// wfClientset is the clientset to get workflow templates.
wfClientset wfclientset.Interface
// namespace is the namespace of template search.
namespace string
// wftmplGetter is an interface to get WorkflowTemplates.
wftmplGetter WorkflowTemplateNamespacedGetter
// tmplBase is the base of local template search.
tmplBase wfv1.TemplateGetter
}

// NewContext returns new Context.
func NewContext(wfClientset wfclientset.Interface, namespace string, tmplBase wfv1.TemplateGetter) *Context {
func NewContext(wftmplGetter WorkflowTemplateNamespacedGetter, tmplBase wfv1.TemplateGetter) *Context {
return &Context{
wftmplGetter: wftmplGetter,
tmplBase: tmplBase,
}
}

// NewContext returns new Context.
func NewContextFromClientset(clientset typed.WorkflowTemplateInterface, tmplBase wfv1.TemplateGetter) *Context {
return &Context{
wfClientset: wfClientset,
namespace: namespace,
tmplBase: tmplBase,
wftmplGetter: &workflowTemplateInterfaceWrapper{clientset: clientset},
tmplBase: tmplBase,
}
}

Expand All @@ -43,7 +64,7 @@ func (ctx *Context) GetTemplateByName(name string) (*wfv1.Template, error) {

// GetTemplateFromRef returns a template found by a given template ref.
func (ctx *Context) GetTemplateFromRef(tmplRef *wfv1.TemplateRef) (*wfv1.Template, error) {
wftmpl, err := ctx.wfClientset.ArgoprojV1alpha1().WorkflowTemplates(ctx.namespace).Get(tmplRef.Name, metav1.GetOptions{})
wftmpl, err := ctx.wftmplGetter.Get(tmplRef.Name)
if err != nil {
if apierr.IsNotFound(err) {
return nil, errors.Errorf(errors.CodeNotFound, "workflow template %s not found", tmplRef.Name)
Expand Down Expand Up @@ -82,7 +103,7 @@ func (ctx *Context) GetTemplate(tmplHolder wfv1.TemplateHolder) (*wfv1.Template,
func (ctx *Context) GetTemplateBase(tmplHolder wfv1.TemplateHolder) (wfv1.TemplateGetter, error) {
tmplRef := tmplHolder.GetTemplateRef()
if tmplRef != nil {
wftmpl, err := ctx.wfClientset.ArgoprojV1alpha1().WorkflowTemplates(ctx.namespace).Get(tmplRef.Name, metav1.GetOptions{})
wftmpl, err := ctx.wftmplGetter.Get(tmplRef.Name)
if err != nil && apierr.IsNotFound(err) {
return nil, errors.Errorf(errors.CodeNotFound, "workflow template %s not found", tmplRef.Name)
}
Expand Down Expand Up @@ -140,5 +161,5 @@ func (ctx *Context) resolveTemplateImpl(tmplHolder wfv1.TemplateHolder, depth in

// WithTemplateBase creates new context with a wfv1.TemplateGetter.
func (ctx *Context) WithTemplateBase(tmplBase wfv1.TemplateGetter) *Context {
return NewContext(ctx.wfClientset, ctx.namespace, tmplBase)
return NewContext(ctx.wftmplGetter, tmplBase)
}
10 changes: 5 additions & 5 deletions workflow/templateresolution/context_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -107,7 +107,7 @@ spec:
func TestGetTemplateByName(t *testing.T) {
wfClientset := fakewfclientset.NewSimpleClientset()
wftmpl := unmarshalWftmpl(baseWorkflowTemplateYaml)
ctx := NewContext(wfClientset, metav1.NamespaceDefault, wftmpl)
ctx := NewContextFromClientset(wfClientset.ArgoprojV1alpha1().WorkflowTemplates(metav1.NamespaceDefault), wftmpl)

tmpl, err := ctx.GetTemplateByName("whalesay")
if !assert.NoError(t, err) {
Expand All @@ -131,7 +131,7 @@ func TestGetTemplateFromRef(t *testing.T) {
t.Fatal(err)
}
wftmpl := unmarshalWftmpl(baseWorkflowTemplateYaml)
ctx := NewContext(wfClientset, metav1.NamespaceDefault, wftmpl)
ctx := NewContextFromClientset(wfClientset.ArgoprojV1alpha1().WorkflowTemplates(metav1.NamespaceDefault), wftmpl)

// Get the template of existing template reference.
tmplRef := wfv1.TemplateRef{Name: "some-workflow-template", Template: "whalesay"}
Expand Down Expand Up @@ -164,7 +164,7 @@ func TestGetTemplate(t *testing.T) {
t.Fatal(err)
}
wftmpl := unmarshalWftmpl(baseWorkflowTemplateYaml)
ctx := NewContext(wfClientset, metav1.NamespaceDefault, wftmpl)
ctx := NewContextFromClientset(wfClientset.ArgoprojV1alpha1().WorkflowTemplates(metav1.NamespaceDefault), wftmpl)

// Get the template of existing template name.
tmplHolder := wfv1.Template{Template: "whalesay"}
Expand Down Expand Up @@ -211,7 +211,7 @@ func TestGetTemplateBase(t *testing.T) {
t.Fatal(err)
}
wftmpl := unmarshalWftmpl(baseWorkflowTemplateYaml)
ctx := NewContext(wfClientset, metav1.NamespaceDefault, wftmpl)
ctx := NewContextFromClientset(wfClientset.ArgoprojV1alpha1().WorkflowTemplates(metav1.NamespaceDefault), wftmpl)

// Get the template base of existing template name.
tmplHolder := wfv1.Template{Template: "whalesay"}
Expand Down Expand Up @@ -266,7 +266,7 @@ func TestResolveTemplate(t *testing.T) {
t.Fatal(err)
}
wftmpl := unmarshalWftmpl(baseWorkflowTemplateYaml)
ctx := NewContext(wfClientset, metav1.NamespaceDefault, wftmpl)
ctx := NewContextFromClientset(wfClientset.ArgoprojV1alpha1().WorkflowTemplates(metav1.NamespaceDefault), wftmpl)

// Get the template of template name.
tmplHolder := wfv1.Template{Template: "whalesay"}
Expand Down
5 changes: 3 additions & 2 deletions workflow/validate/validate.go
Original file line number Diff line number Diff line change
Expand Up @@ -85,8 +85,9 @@ func ValidateWorkflow(wfClientset wfclientset.Interface, namespace string, wf *w
if wf.Namespace != "" {
namespace = wf.Namespace
}

ctx := newTemplateValidationCtx(wfClientset, namespace, wf, opts)
tmplCtx := templateresolution.NewContext(wfClientset, namespace, wf)
tmplCtx := templateresolution.NewContextFromClientset(wfClientset.ArgoprojV1alpha1().WorkflowTemplates(namespace), wf)

err := validateWorkflowFieldNames(wf.Spec.Templates)
if err != nil {
Expand Down Expand Up @@ -159,7 +160,7 @@ func ValidateWorkflowTemplate(wfClientset wfclientset.Interface, namespace strin
namespace = wftmpl.Namespace
}
ctx := newTemplateValidationCtx(wfClientset, namespace, wftmpl, ValidateOpts{})
tmplCtx := templateresolution.NewContext(wfClientset, namespace, wftmpl)
tmplCtx := templateresolution.NewContextFromClientset(wfClientset.ArgoprojV1alpha1().WorkflowTemplates(namespace), wftmpl)

// Check if all templates can be resolved.
for _, template := range wftmpl.Spec.Templates {
Expand Down

0 comments on commit 53642f0

Please sign in to comment.