Skip to content

Commit

Permalink
FEATURE: add cli argument to modify controller workqueue ratelimiter (k…
Browse files Browse the repository at this point in the history
…ubeflow#2186)

* add cli argument to modify controller workqueue ratelimiter

Signed-off-by: ImpSy <3097030+ImpSy@users.noreply.github.com>

* add cli argument to modify controller workqueue ratelimiter support to helm chart

Signed-off-by: ImpSy <3097030+ImpSy@users.noreply.github.com>

---------

Signed-off-by: ImpSy <3097030+ImpSy@users.noreply.github.com>
  • Loading branch information
ImpSy authored Sep 29, 2024
1 parent 0127883 commit d37a0e9
Show file tree
Hide file tree
Showing 6 changed files with 84 additions and 1 deletion.
4 changes: 4 additions & 0 deletions charts/spark-operator-chart/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -117,6 +117,10 @@ See [helm uninstall](https://helm.sh/docs/helm/helm_uninstall) for command docum
| controller.pprof.enable | bool | `false` | Specifies whether to enable pprof. |
| controller.pprof.port | int | `6060` | Specifies pprof port. |
| controller.pprof.portName | string | `"pprof"` | Specifies pprof service port name. |
| controller.workqueueRateLimiter.bucketQPS | int | `50` | Specifies the average rate of items process by the workqueue rate limiter. |
| controller.workqueueRateLimiter.bucketSize | int | `500` | Specifies the maximum number of items that can be in the workqueue at any given time. |
| controller.workqueueRateLimiter.maxDelay.enable | bool | `true` | Specifies whether to enable max delay for the workqueue rate limiter. This is useful to avoid losing events when the workqueue is full. |
| controller.workqueueRateLimiter.maxDelay.duration | string | `"6h"` | Specifies the maximum delay duration for the workqueue rate limiter. |
| webhook.enable | bool | `true` | Specifies whether to enable webhook. |
| webhook.replicas | int | `1` | Number of replicas of webhook server. |
| webhook.logLevel | string | `"info"` | Configure the verbosity of logging, can be one of `debug`, `info`, `error`. |
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -94,6 +94,11 @@ spec:
{{- if .Values.controller.pprof.enable }}
- --pprof-bind-address=:{{ .Values.controller.pprof.port }}
{{- end }}
- --workqueue-ratelimiter-bucket-qps={{ .Values.controller.workqueueRateLimiter.bucketQPS }}
- --workqueue-ratelimiter-bucket-size={{ .Values.controller.workqueueRateLimiter.bucketSize }}
{{- if .Values.controller.workqueueRateLimiter.maxDelay.enable }}
- --workqueue-ratelimiter-max-delay={{ .Values.controller.workqueueRateLimiter.maxDelay.duration }}
{{- end }}
{{- if or .Values.prometheus.metrics.enable .Values.controller.pprof.enable }}
ports:
{{- if .Values.controller.pprof.enable }}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -592,4 +592,35 @@ tests:
content:
name: pprof-test
containerPort: 12345
count: 1
count: 1

- it: Should contain `--workqueue-ratelimiter-max-delay` arg if `controller.workqueueRateLimiter.maxDelay.enable` is set to `true`
set:
controller:
workqueueRateLimiter:
bucketQPS: 1
bucketSize: 2
maxDelay:
enable: true
duration: 3h
asserts:
- contains:
path: spec.template.spec.containers[?(@.name=="spark-operator-controller")].args
content: --workqueue-ratelimiter-bucket-qps=1
- contains:
path: spec.template.spec.containers[?(@.name=="spark-operator-controller")].args
content: --workqueue-ratelimiter-bucket-size=2
- contains:
path: spec.template.spec.containers[?(@.name=="spark-operator-controller")].args
content: --workqueue-ratelimiter-max-delay=3h

- it: Should contain `--workqueue-ratelimiter-max-delay` arg if `controller.workqueueRateLimiter.maxDelay.enable` is set to `true`
set:
controller:
maxDelay:
enable: false
duration: 1h
asserts:
- notContains:
path: spec.template.spec.containers[?(@.name=="spark-operator-controller")].args
content: --workqueue-ratelimiter-max-delay=1h
13 changes: 13 additions & 0 deletions charts/spark-operator-chart/values.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -178,6 +178,19 @@ controller:
# -- Specifies pprof service port name.
portName: pprof

# Workqueue rate limiter configuration forwarded to the controller-runtime Reconciler.
workqueueRateLimiter:
# -- Specifies the average rate of items process by the workqueue rate limiter.
bucketQPS: 50
# -- Specifies the maximum number of items that can be in the workqueue at any given time.
bucketSize: 500
maxDelay:
# -- Specifies whether to enable max delay for the workqueue rate limiter.
# This is useful to avoid losing events when the workqueue is full.
enable: true
# -- Specifies the maximum delay duration for the workqueue rate limiter.
duration: 6h

webhook:
# -- Specifies whether to enable webhook.
enable: true
Expand Down
11 changes: 11 additions & 0 deletions cmd/operator/controller/start.go
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@ import (

// Import all Kubernetes client auth plugins (e.g. Azure, GCP, OIDC, etc.)
// to ensure that exec-entrypoint and run can make use of them.
"golang.org/x/time/rate"
_ "k8s.io/client-go/plugin/pkg/client/auth"

"github.com/spf13/cobra"
Expand Down Expand Up @@ -74,6 +75,11 @@ var (
controllerThreads int
cacheSyncTimeout time.Duration

//WorkQueue
workqueueRateLimiterBucketQPS int
workqueueRateLimiterBucketSize int
workqueueRateLimiterMaxDelay time.Duration

// Batch scheduler
enableBatchScheduler bool
kubeSchedulerNames []string
Expand Down Expand Up @@ -134,6 +140,10 @@ func NewStartCommand() *cobra.Command {
command.Flags().StringSliceVar(&namespaces, "namespaces", []string{}, "The Kubernetes namespace to manage. Will manage custom resource objects of the managed CRD types for the whole cluster if unset or contains empty string.")
command.Flags().DurationVar(&cacheSyncTimeout, "cache-sync-timeout", 30*time.Second, "Informer cache sync timeout.")

command.Flags().IntVar(&workqueueRateLimiterBucketQPS, "workqueue-ratelimiter-bucket-qps", 10, "QPS of the bucket rate of the workqueue.")
command.Flags().IntVar(&workqueueRateLimiterBucketSize, "workqueue-ratelimiter-bucket-size", 100, "The token bucket size of the workqueue.")
command.Flags().DurationVar(&workqueueRateLimiterMaxDelay, "workqueue-ratelimiter-max-delay", rate.InfDuration, "The maximum delay of the workqueue.")

command.Flags().BoolVar(&enableBatchScheduler, "enable-batch-scheduler", false, "Enable batch schedulers.")
command.Flags().StringSliceVar(&kubeSchedulerNames, "kube-scheduler-names", []string{}, "The kube-scheduler names for scheduling Spark applications.")
command.Flags().StringVar(&defaultBatchScheduler, "default-batch-scheduler", "", "Default batch scheduler.")
Expand Down Expand Up @@ -355,6 +365,7 @@ func newControllerOptions() controller.Options {
options := controller.Options{
MaxConcurrentReconciles: controllerThreads,
CacheSyncTimeout: cacheSyncTimeout,
RateLimiter: util.NewRateLimiter(workqueueRateLimiterBucketQPS, workqueueRateLimiterBucketSize, workqueueRateLimiterMaxDelay),
}
return options
}
Expand Down
19 changes: 19 additions & 0 deletions pkg/util/workqueue.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,19 @@
package util

import (
"time"

"golang.org/x/time/rate"
"k8s.io/client-go/util/workqueue"
)

// This allow to create a new rate limiter while tuning the BucketRateLimiter parameters
// This also prevent a "bug" in the BucketRateLimiter due to the fact that a BucketRateLimiter does not have a maxDelay parameter
func NewRateLimiter(qps int, bucketSize int, maxDelay time.Duration) workqueue.RateLimiter {
ratelimiter := workqueue.NewWithMaxWaitRateLimiter(
workqueue.NewMaxOfRateLimiter(
workqueue.NewItemExponentialFailureRateLimiter(5*time.Millisecond, 1000*time.Second),
&workqueue.BucketRateLimiter{Limiter: rate.NewLimiter(rate.Limit(qps), bucketSize)},
), maxDelay)
return ratelimiter
}

0 comments on commit d37a0e9

Please sign in to comment.