From d37a0e938a4a9be67fa9a9df2719e2821dfa8ec7 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?S=C3=A9bastien=20Maintrot?= <3097030+ImpSy@users.noreply.github.com> Date: Sun, 29 Sep 2024 03:47:37 +0200 Subject: [PATCH] FEATURE: add cli argument to modify controller workqueue ratelimiter (#2186) * add cli argument to modify controller workqueue ratelimiter Signed-off-by: ImpSy <3097030+ImpSy@users.noreply.github.com> * add cli argument to modify controller workqueue ratelimiter support to helm chart Signed-off-by: ImpSy <3097030+ImpSy@users.noreply.github.com> --------- Signed-off-by: ImpSy <3097030+ImpSy@users.noreply.github.com> --- charts/spark-operator-chart/README.md | 4 +++ .../templates/controller/deployment.yaml | 5 +++ .../tests/controller/deployment_test.yaml | 33 ++++++++++++++++++- charts/spark-operator-chart/values.yaml | 13 ++++++++ cmd/operator/controller/start.go | 11 +++++++ pkg/util/workqueue.go | 19 +++++++++++ 6 files changed, 84 insertions(+), 1 deletion(-) create mode 100644 pkg/util/workqueue.go diff --git a/charts/spark-operator-chart/README.md b/charts/spark-operator-chart/README.md index cf0420edf..2eabb1098 100644 --- a/charts/spark-operator-chart/README.md +++ b/charts/spark-operator-chart/README.md @@ -117,6 +117,10 @@ See [helm uninstall](https://helm.sh/docs/helm/helm_uninstall) for command docum | controller.pprof.enable | bool | `false` | Specifies whether to enable pprof. | | controller.pprof.port | int | `6060` | Specifies pprof port. | | controller.pprof.portName | string | `"pprof"` | Specifies pprof service port name. | +| controller.workqueueRateLimiter.bucketQPS | int | `50` | Specifies the average rate of items process by the workqueue rate limiter. | +| controller.workqueueRateLimiter.bucketSize | int | `500` | Specifies the maximum number of items that can be in the workqueue at any given time. | +| controller.workqueueRateLimiter.maxDelay.enable | bool | `true` | Specifies whether to enable max delay for the workqueue rate limiter. This is useful to avoid losing events when the workqueue is full. | +| controller.workqueueRateLimiter.maxDelay.duration | string | `"6h"` | Specifies the maximum delay duration for the workqueue rate limiter. | | webhook.enable | bool | `true` | Specifies whether to enable webhook. | | webhook.replicas | int | `1` | Number of replicas of webhook server. | | webhook.logLevel | string | `"info"` | Configure the verbosity of logging, can be one of `debug`, `info`, `error`. | diff --git a/charts/spark-operator-chart/templates/controller/deployment.yaml b/charts/spark-operator-chart/templates/controller/deployment.yaml index 4f9251f5a..92f36ee20 100644 --- a/charts/spark-operator-chart/templates/controller/deployment.yaml +++ b/charts/spark-operator-chart/templates/controller/deployment.yaml @@ -94,6 +94,11 @@ spec: {{- if .Values.controller.pprof.enable }} - --pprof-bind-address=:{{ .Values.controller.pprof.port }} {{- end }} + - --workqueue-ratelimiter-bucket-qps={{ .Values.controller.workqueueRateLimiter.bucketQPS }} + - --workqueue-ratelimiter-bucket-size={{ .Values.controller.workqueueRateLimiter.bucketSize }} + {{- if .Values.controller.workqueueRateLimiter.maxDelay.enable }} + - --workqueue-ratelimiter-max-delay={{ .Values.controller.workqueueRateLimiter.maxDelay.duration }} + {{- end }} {{- if or .Values.prometheus.metrics.enable .Values.controller.pprof.enable }} ports: {{- if .Values.controller.pprof.enable }} diff --git a/charts/spark-operator-chart/tests/controller/deployment_test.yaml b/charts/spark-operator-chart/tests/controller/deployment_test.yaml index 57b557757..bdd35c344 100644 --- a/charts/spark-operator-chart/tests/controller/deployment_test.yaml +++ b/charts/spark-operator-chart/tests/controller/deployment_test.yaml @@ -592,4 +592,35 @@ tests: content: name: pprof-test containerPort: 12345 - count: 1 \ No newline at end of file + count: 1 + + - it: Should contain `--workqueue-ratelimiter-max-delay` arg if `controller.workqueueRateLimiter.maxDelay.enable` is set to `true` + set: + controller: + workqueueRateLimiter: + bucketQPS: 1 + bucketSize: 2 + maxDelay: + enable: true + duration: 3h + asserts: + - contains: + path: spec.template.spec.containers[?(@.name=="spark-operator-controller")].args + content: --workqueue-ratelimiter-bucket-qps=1 + - contains: + path: spec.template.spec.containers[?(@.name=="spark-operator-controller")].args + content: --workqueue-ratelimiter-bucket-size=2 + - contains: + path: spec.template.spec.containers[?(@.name=="spark-operator-controller")].args + content: --workqueue-ratelimiter-max-delay=3h + + - it: Should contain `--workqueue-ratelimiter-max-delay` arg if `controller.workqueueRateLimiter.maxDelay.enable` is set to `true` + set: + controller: + maxDelay: + enable: false + duration: 1h + asserts: + - notContains: + path: spec.template.spec.containers[?(@.name=="spark-operator-controller")].args + content: --workqueue-ratelimiter-max-delay=1h diff --git a/charts/spark-operator-chart/values.yaml b/charts/spark-operator-chart/values.yaml index 94100dfb2..64340699f 100644 --- a/charts/spark-operator-chart/values.yaml +++ b/charts/spark-operator-chart/values.yaml @@ -178,6 +178,19 @@ controller: # -- Specifies pprof service port name. portName: pprof + # Workqueue rate limiter configuration forwarded to the controller-runtime Reconciler. + workqueueRateLimiter: + # -- Specifies the average rate of items process by the workqueue rate limiter. + bucketQPS: 50 + # -- Specifies the maximum number of items that can be in the workqueue at any given time. + bucketSize: 500 + maxDelay: + # -- Specifies whether to enable max delay for the workqueue rate limiter. + # This is useful to avoid losing events when the workqueue is full. + enable: true + # -- Specifies the maximum delay duration for the workqueue rate limiter. + duration: 6h + webhook: # -- Specifies whether to enable webhook. enable: true diff --git a/cmd/operator/controller/start.go b/cmd/operator/controller/start.go index 38fee35c9..f34be8c8d 100644 --- a/cmd/operator/controller/start.go +++ b/cmd/operator/controller/start.go @@ -25,6 +25,7 @@ import ( // Import all Kubernetes client auth plugins (e.g. Azure, GCP, OIDC, etc.) // to ensure that exec-entrypoint and run can make use of them. + "golang.org/x/time/rate" _ "k8s.io/client-go/plugin/pkg/client/auth" "github.com/spf13/cobra" @@ -74,6 +75,11 @@ var ( controllerThreads int cacheSyncTimeout time.Duration + //WorkQueue + workqueueRateLimiterBucketQPS int + workqueueRateLimiterBucketSize int + workqueueRateLimiterMaxDelay time.Duration + // Batch scheduler enableBatchScheduler bool kubeSchedulerNames []string @@ -134,6 +140,10 @@ func NewStartCommand() *cobra.Command { command.Flags().StringSliceVar(&namespaces, "namespaces", []string{}, "The Kubernetes namespace to manage. Will manage custom resource objects of the managed CRD types for the whole cluster if unset or contains empty string.") command.Flags().DurationVar(&cacheSyncTimeout, "cache-sync-timeout", 30*time.Second, "Informer cache sync timeout.") + command.Flags().IntVar(&workqueueRateLimiterBucketQPS, "workqueue-ratelimiter-bucket-qps", 10, "QPS of the bucket rate of the workqueue.") + command.Flags().IntVar(&workqueueRateLimiterBucketSize, "workqueue-ratelimiter-bucket-size", 100, "The token bucket size of the workqueue.") + command.Flags().DurationVar(&workqueueRateLimiterMaxDelay, "workqueue-ratelimiter-max-delay", rate.InfDuration, "The maximum delay of the workqueue.") + command.Flags().BoolVar(&enableBatchScheduler, "enable-batch-scheduler", false, "Enable batch schedulers.") command.Flags().StringSliceVar(&kubeSchedulerNames, "kube-scheduler-names", []string{}, "The kube-scheduler names for scheduling Spark applications.") command.Flags().StringVar(&defaultBatchScheduler, "default-batch-scheduler", "", "Default batch scheduler.") @@ -355,6 +365,7 @@ func newControllerOptions() controller.Options { options := controller.Options{ MaxConcurrentReconciles: controllerThreads, CacheSyncTimeout: cacheSyncTimeout, + RateLimiter: util.NewRateLimiter(workqueueRateLimiterBucketQPS, workqueueRateLimiterBucketSize, workqueueRateLimiterMaxDelay), } return options } diff --git a/pkg/util/workqueue.go b/pkg/util/workqueue.go new file mode 100644 index 000000000..8f8d18b56 --- /dev/null +++ b/pkg/util/workqueue.go @@ -0,0 +1,19 @@ +package util + +import ( + "time" + + "golang.org/x/time/rate" + "k8s.io/client-go/util/workqueue" +) + +// This allow to create a new rate limiter while tuning the BucketRateLimiter parameters +// This also prevent a "bug" in the BucketRateLimiter due to the fact that a BucketRateLimiter does not have a maxDelay parameter +func NewRateLimiter(qps int, bucketSize int, maxDelay time.Duration) workqueue.RateLimiter { + ratelimiter := workqueue.NewWithMaxWaitRateLimiter( + workqueue.NewMaxOfRateLimiter( + workqueue.NewItemExponentialFailureRateLimiter(5*time.Millisecond, 1000*time.Second), + &workqueue.BucketRateLimiter{Limiter: rate.NewLimiter(rate.Limit(qps), bucketSize)}, + ), maxDelay) + return ratelimiter +}