Skip to content

Commit

Permalink
fix: enable per-namespace parallelism
Browse files Browse the repository at this point in the history
Signed-off-by: isubasinghe <isitha@pipekit.io>
  • Loading branch information
isubasinghe committed Feb 13, 2025
1 parent aaa4c26 commit 8501244
Show file tree
Hide file tree
Showing 12 changed files with 321 additions and 2 deletions.
2 changes: 2 additions & 0 deletions docs/parallelism.md
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,8 @@ When namespace parallelism is enabled, it is plausible for a workflow with a low
!!! Note
Workflows that are executing but restricted from running more nodes due to other mechanisms will still count toward parallelism limits.
In addition to the default parallelism, you are able to set individual limits on namespace parallelism by modifying the namespace object with a `workflows.argoproj.io/namespace-parallelism-limit` label.

### Priority

You can set a `priority` on workflows:
Expand Down
1 change: 1 addition & 0 deletions test/e2e/fixtures/e2e_suite.go
Original file line number Diff line number Diff line change
Expand Up @@ -244,5 +244,6 @@ func (s *E2ESuite) Given() *Given {
hydrator: s.hydrator,
kubeClient: s.KubeClient,
bearerToken: bearerToken,
restConfig: s.RestConfig,
}
}
3 changes: 3 additions & 0 deletions test/e2e/fixtures/given.go
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@ import (
"github.com/stretchr/testify/require"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/client-go/kubernetes"
"k8s.io/client-go/rest"
"sigs.k8s.io/yaml"

wfv1 "github.com/argoproj/argo-workflows/v3/pkg/apis/workflow/v1alpha1"
Expand All @@ -35,6 +36,7 @@ type Given struct {
cronWf *wfv1.CronWorkflow
kubeClient kubernetes.Interface
bearerToken string
restConfig *rest.Config
}

// creates a workflow based on the parameter, this may be:
Expand Down Expand Up @@ -230,5 +232,6 @@ func (g *Given) When() *When {
hydrator: g.hydrator,
kubeClient: g.kubeClient,
bearerToken: g.bearerToken,
restConfig: g.restConfig,
}
}
3 changes: 3 additions & 0 deletions test/e2e/fixtures/then.go
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@ import (
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/types"
"k8s.io/client-go/kubernetes"
"k8s.io/client-go/rest"

"github.com/minio/minio-go/v7"
"github.com/minio/minio-go/v7/pkg/credentials"
Expand All @@ -33,6 +34,7 @@ type Then struct {
hydrator hydrator.Interface
kubeClient kubernetes.Interface
bearerToken string
restConfig *rest.Config
}

func (t *Then) ExpectWorkflow(block func(t *testing.T, metadata *metav1.ObjectMeta, status *wfv1.WorkflowStatus)) *Then {
Expand Down Expand Up @@ -301,5 +303,6 @@ func (t *Then) When() *When {
wf: t.wf,
kubeClient: t.kubeClient,
bearerToken: t.bearerToken,
restConfig: t.restConfig,
}
}
27 changes: 27 additions & 0 deletions test/e2e/fixtures/when.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@ import (
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/types"
"k8s.io/client-go/kubernetes"
"k8s.io/client-go/rest"
"k8s.io/utils/ptr"

wfv1 "github.com/argoproj/argo-workflows/v3/pkg/apis/workflow/v1alpha1"
Expand All @@ -39,6 +40,7 @@ type When struct {
hydrator hydrator.Interface
kubeClient kubernetes.Interface
bearerToken string
restConfig *rest.Config
}

func (w *When) SubmitWorkflow() *When {
Expand Down Expand Up @@ -219,6 +221,7 @@ var (
return node.Type == wfv1.NodeTypePod && node.Phase == wfv1.NodeFailed
}), "to have failed pod"
}
ToBePending = ToHavePhase(wfv1.WorkflowPending)
)

// `ToBeDone` replaces `ToFinish` which also makes sure the workflow is both complete not pending archiving.
Expand Down Expand Up @@ -488,6 +491,28 @@ func (w *When) RemoveFinalizers(shouldErr bool) *When {
return w
}

func (w *When) AddNamespaceLimit(limit string) *When {
w.t.Helper()
ctx := context.Background()
patchMap := make(map[string]interface{})
metadata := make(map[string]interface{})
labels := make(map[string]interface{})
labels["workflows.argoproj.io/namespace-parallelism-limit"] = limit
metadata["labels"] = labels
patchMap["metadata"] = metadata

bs, err := json.Marshal(patchMap)
if err != nil {
w.t.Fatal(err)
}

_, err = w.kubeClient.CoreV1().Namespaces().Patch(ctx, "argo", types.MergePatchType, []byte(bs), metav1.PatchOptions{})
if err != nil {
w.t.Fatal(err)
}
return w
}

type PodCondition func(p *corev1.Pod) bool

var (
Expand Down Expand Up @@ -717,6 +742,7 @@ func (w *When) Then() *Then {
hydrator: w.hydrator,
kubeClient: w.kubeClient,
bearerToken: w.bearerToken,
restConfig: w.restConfig,
}
}

Expand All @@ -736,5 +762,6 @@ func (w *When) Given() *Given {
cwfTemplates: w.cwfTemplates,
cronWf: w.cronWf,
kubeClient: w.kubeClient,
restConfig: w.restConfig,
}
}
57 changes: 57 additions & 0 deletions test/e2e/ns_parallelism_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,57 @@
package e2e

import (
"testing"
"time"

"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/suite"

"github.com/argoproj/argo-workflows/v3/test/e2e/fixtures"
)

type NamespaceParallelismSuite struct {
fixtures.E2ESuite
}

const wf = `apiVersion: argoproj.io/v1alpha1
kind: Workflow
metadata:
generateName: hello-world-
labels:
workflows.argoproj.io/archive-strategy: "false"
annotations:
workflows.argoproj.io/description: |
This is a simple hello world example.
spec:
entrypoint: hello-world
templates:
- name: hello-world
container:
image: busybox
command: [sleep]
args: ["240"]
`

func (s *NamespaceParallelismSuite) TestNamespaceParallelism() {

s.Given().
Workflow(wf).
When().
AddNamespaceLimit("1").
SubmitWorkflow().
WaitForWorkflow(fixtures.ToStart)

time.Sleep(time.Second * 5)
wf := s.Given().
Workflow(wf).
When().
SubmitWorkflow().
WaitForWorkflow(fixtures.ToBePending).GetWorkflow()
t := s.T()
assert.Equal(t, "Workflow processing has been postponed because too many workflows are already running", wf.Status.Message)
}

func TestNamespaceParallelismSuite(t *testing.T) {
suite.Run(t, new(NamespaceParallelismSuite))
}
3 changes: 3 additions & 0 deletions workflow/common/common.go
Original file line number Diff line number Diff line change
Expand Up @@ -52,6 +52,9 @@ const (
// AnnotationKeyArtifactGCStrategy is listed as an annotation on the Artifact GC Pod to identify
// the strategy whose artifacts are being deleted
AnnotationKeyArtifactGCStrategy = workflow.WorkflowFullName + "/artifact-gc-strategy"
// LabelNamespaceLimit is a label applied on namespace objects to control the limits
// namespace parallelism.
LabelNamespaceLimit = workflow.WorkflowFullName + "/namespace-parallelism-limit"

// LabelKeyControllerInstanceID is the label the controller will carry forward to workflows/pod labels
// for the purposes of workflow segregation
Expand Down
10 changes: 9 additions & 1 deletion workflow/controller/controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -122,6 +122,7 @@ type WorkflowController struct {

// datastructures to support the processing of workflows and workflow pods
wfInformer cache.SharedIndexInformer
nsInformer cache.SharedIndexInformer
wftmplInformer wfextvv1alpha1.WorkflowTemplateInformer
cwftmplInformer wfextvv1alpha1.ClusterWorkflowTemplateInformer
podInformer cache.SharedIndexInformer
Expand Down Expand Up @@ -301,12 +302,17 @@ func (wfc *WorkflowController) Run(ctx context.Context, wfWorkers, workflowTTLWo
Info("Current Worker Numbers")

wfc.wfInformer = util.NewWorkflowInformer(wfc.dynamicInterface, wfc.GetManagedNamespace(), workflowResyncPeriod, wfc.tweakListRequestListOptions, wfc.tweakWatchRequestListOptions, indexers)
nsInformer, err := wfc.newNamespaceInformer(ctx, wfc.kubeclientset)
if err != nil {
log.Fatal(err)
}
wfc.nsInformer = nsInformer
wfc.wftmplInformer = informer.NewTolerantWorkflowTemplateInformer(wfc.dynamicInterface, workflowTemplateResyncPeriod, wfc.managedNamespace)

wfc.wfTaskSetInformer = wfc.newWorkflowTaskSetInformer()
wfc.artGCTaskInformer = wfc.newArtGCTaskInformer()
wfc.taskResultInformer = wfc.newWorkflowTaskResultInformer()
err := wfc.addWorkflowInformerHandlers(ctx)
err = wfc.addWorkflowInformerHandlers(ctx)
if err != nil {
log.Fatal(err)
}
Expand All @@ -326,6 +332,7 @@ func (wfc *WorkflowController) Run(ctx context.Context, wfWorkers, workflowTTLWo
go wfc.runConfigMapWatcher(ctx)
}

go wfc.nsInformer.Run(ctx.Done())
go wfc.wfInformer.Run(ctx.Done())
go wfc.wftmplInformer.Informer().Run(ctx.Done())
go wfc.podInformer.Run(ctx.Done())
Expand All @@ -339,6 +346,7 @@ func (wfc *WorkflowController) Run(ctx context.Context, wfWorkers, workflowTTLWo
if !cache.WaitForCacheSync(
ctx.Done(),
wfc.wfInformer.HasSynced,
wfc.nsInformer.HasSynced,
wfc.wftmplInformer.Informer().HasSynced,
wfc.podInformer.HasSynced,
wfc.configMapInformer.HasSynced,
Expand Down
141 changes: 141 additions & 0 deletions workflow/controller/ns_watcher.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,141 @@
package controller

import (
"context"
"strconv"
"time"

"errors"

"github.com/sirupsen/logrus"
apiv1 "k8s.io/api/core/v1"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/labels"
"k8s.io/apimachinery/pkg/runtime"
"k8s.io/apimachinery/pkg/selection"
"k8s.io/apimachinery/pkg/watch"
"k8s.io/client-go/kubernetes"
"k8s.io/client-go/tools/cache"

"github.com/argoproj/argo-workflows/v3/workflow/common"
)

var (
limitReq, _ = labels.NewRequirement(common.LabelNamespaceLimit, selection.Exists, nil)
nsResyncPeriod = 5 * time.Minute
errUnableToExtract = errors.New("was unable to extract limit")
)

type updateFunc = func(string, int)
type resetFunc = func(string)

func (wfc *WorkflowController) newNamespaceInformer(ctx context.Context, kubeclientset kubernetes.Interface) (cache.SharedIndexInformer, error) {

c := kubeclientset.CoreV1().Namespaces()
logger := logrus.WithField("scope", "ns_watcher")

labelSelector := labels.NewSelector().
Add(*limitReq)

listFunc := func(opts metav1.ListOptions) (runtime.Object, error) {
opts.LabelSelector = labelSelector.String()
return c.List(ctx, opts)
}

watchFunc := func(opts metav1.ListOptions) (watch.Interface, error) {
opts.Watch = true
opts.LabelSelector = labelSelector.String()
return c.Watch(ctx, opts)
}

source := &cache.ListWatch{ListFunc: listFunc, WatchFunc: watchFunc}
informer := cache.NewSharedIndexInformer(source, &apiv1.Namespace{}, nsResyncPeriod, cache.Indexers{})

_, err := informer.AddEventHandler(
cache.ResourceEventHandlerFuncs{
AddFunc: func(obj interface{}) {
ns, err := nsFromObj(obj)
if err != nil {
return
}
updateNS(logger, ns, wfc.throttler.UpdateNamespaceParallelism)
},

UpdateFunc: func(old, newVal interface{}) {
ns, err := nsFromObj(newVal)
if err != nil {
return
}
oldNs, err := nsFromObj(old)
if err == nil && !limitChanged(oldNs, ns) {
return
}
updateNS(logger, ns, wfc.throttler.UpdateNamespaceParallelism)
},

DeleteFunc: func(obj interface{}) {
ns, err := nsFromObj(obj)
if err != nil {
return
}
deleteNS(logger, ns, wfc.throttler.ResetNamespaceParallelism)
},
},
)
if err != nil {
return nil, err
}
return informer, nil
}

func deleteNS(log *logrus.Entry, ns *apiv1.Namespace, resetFn resetFunc) {
log.Infof("reseting the namespace parallelism limits for %s due to deletion event", ns.Name)
resetFn(ns.Name)
}

func updateNS(log *logrus.Entry, ns *apiv1.Namespace, updateFn updateFunc) {
limit, err := extractLimit(ns)
if errors.Is(err, errUnableToExtract) {
return
} else if err != nil {
log.Errorf("was unable to extract the limit due to: %s", err)
return
}
log.Infof("changing namespace parallelism in %s to %d", ns.Name, limit)
updateFn(ns.Name, limit)
}

func nsFromObj(obj interface{}) (*apiv1.Namespace, error) {
ns, ok := obj.(*apiv1.Namespace)
if !ok {
return nil, errors.New("was unable to convert to namespace")
}
return ns, nil
}

func limitChanged(old *apiv1.Namespace, newNS *apiv1.Namespace) bool {
oldLimit := old.GetLabels()[common.LabelNamespaceLimit]
newLimit := newNS.GetLabels()[common.LabelNamespaceLimit]
return !(oldLimit == newLimit)
}

func extractLimit(ns *apiv1.Namespace) (int, error) {
labels := ns.GetLabels()
var limitString *string

for lbl, value := range labels {
if lbl == common.LabelNamespaceLimit {
limitString = &value
break
}
}
if limitString == nil {
return 0, errUnableToExtract
}

integerValue, err := strconv.Atoi(*limitString)
if err != nil {
return 0, err
}
return integerValue, nil
}
Loading

0 comments on commit 8501244

Please sign in to comment.