Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add default batch scheduler argument #2143

Merged
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions charts/spark-operator-chart/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -90,6 +90,7 @@ See [helm uninstall](https://helm.sh/docs/helm/helm_uninstall) for command docum
| controller.uiIngress.enable | bool | `false` | Specifies whether to create ingress for Spark web UI. `controller.uiService.enable` must be `true` to enable ingress. |
| controller.uiIngress.urlFormat | string | `""` | Ingress URL format. Required if `controller.uiIngress.enable` is true. |
| controller.batchScheduler.enable | bool | `false` | Specifies whether to enable batch scheduler for spark jobs scheduling. If enabled, users can specify batch scheduler name in spark application. |
| controller.batchScheduler.default | string | `""` | Default batch scheduler to be used if not specified by the user. If specified, this value must be either "volcano" or "yunikorn". Specifying any other value will cause the controller to error on startup. |
| controller.serviceAccount.create | bool | `true` | Specifies whether to create a service account for the controller. |
| controller.serviceAccount.name | string | `""` | Optional name for the controller service account. |
| controller.serviceAccount.annotations | object | `{}` | Extra annotations for the controller service account. |
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -70,8 +70,9 @@ spec:
- --ingress-url-format={{ . }}
{{- end }}
{{- end }}
{{- with .Values.controller.batchScheduler.enable }}
{{- if .Values.controller.batchScheduler.enable }}
- --enable-batch-scheduler=true
- --default-batch-scheduler={{ .Values.controller.batchScheduler.default }}
{{- end }}
{{- if .Values.prometheus.metrics.enable }}
- --enable-metrics=true
Expand Down
11 changes: 11 additions & 0 deletions charts/spark-operator-chart/tests/controller/deployment_test.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -160,6 +160,17 @@ tests:
path: spec.template.spec.containers[?(@.name=="spark-operator-controller")].args
content: --enable-batch-scheduler=true

- it: Should contain `--default-batch-scheduler` arg if `controller.batchScheduler.default` is set
set:
controller:
batchScheduler:
enable: true
default: yunikorn
asserts:
- contains:
path: spec.template.spec.containers[?(@.name=="spark-operator-controller")].args
content: --default-batch-scheduler=yunikorn

- it: Should contain `--enable-metrics` arg if `prometheus.metrics.enable` is set to `true`
set:
prometheus:
Expand Down
4 changes: 4 additions & 0 deletions charts/spark-operator-chart/values.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -67,6 +67,10 @@ controller:
# -- Specifies whether to enable batch scheduler for spark jobs scheduling.
# If enabled, users can specify batch scheduler name in spark application.
enable: false
# -- Default batch scheduler to be used if not specified by the user.
# If specified, this value must be either "volcano" or "yunikorn". Specifying any other
# value will cause the controller to error on startup.
default: ""

serviceAccount:
# -- Specifies whether to create a service account for the controller.
Expand Down
17 changes: 14 additions & 3 deletions cmd/operator/controller/start.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@ import (
"crypto/tls"
"flag"
"os"
"slices"
"time"

// Import all Kubernetes client auth plugins (e.g. Azure, GCP, OIDC, etc.)
Expand Down Expand Up @@ -72,7 +73,8 @@ var (
cacheSyncTimeout time.Duration

// Batch scheduler
enableBatchScheduler bool
enableBatchScheduler bool
defaultBatchScheduler string

// Spark web UI service and ingress
enableUIService bool
Expand Down Expand Up @@ -128,6 +130,8 @@ func NewStartCommand() *cobra.Command {
command.Flags().DurationVar(&cacheSyncTimeout, "cache-sync-timeout", 30*time.Second, "Informer cache sync timeout.")

command.Flags().BoolVar(&enableBatchScheduler, "enable-batch-scheduler", false, "Enable batch schedulers.")
command.Flags().StringVar(&defaultBatchScheduler, "default-batch-scheduler", "", "Default batch scheduler.")

command.Flags().BoolVar(&enableUIService, "enable-ui-service", true, "Enable Spark Web UI service.")
command.Flags().StringVar(&ingressClassName, "ingress-class-name", "", "Set ingressClassName for ingress resources created.")
command.Flags().StringVar(&ingressURLFormat, "ingress-url-format", "", "Ingress URL format.")
Expand Down Expand Up @@ -207,8 +211,14 @@ func start() {
var registry *scheduler.Registry
if enableBatchScheduler {
registry = scheduler.GetRegistry()
registry.Register(common.VolcanoSchedulerName, volcano.Factory)
registry.Register(yunikorn.SchedulerName, yunikorn.Factory)
_ = registry.Register(common.VolcanoSchedulerName, volcano.Factory)
_ = registry.Register(yunikorn.SchedulerName, yunikorn.Factory)

schedulerNames := registry.GetRegisteredSchedulerNames()
if defaultBatchScheduler != "" && !slices.Contains(schedulerNames, defaultBatchScheduler) {
logger.Error(nil, "Failed to find default batch scheduler in registered schedulers")
os.Exit(1)
}
}

// Setup controller for SparkApplication.
Expand Down Expand Up @@ -348,6 +358,7 @@ func newSparkApplicationReconcilerOptions() sparkapplication.Options {
EnableUIService: enableUIService,
IngressClassName: ingressClassName,
IngressURLFormat: ingressURLFormat,
DefaultBatchScheduler: defaultBatchScheduler,
SparkApplicationMetrics: sparkApplicationMetrics,
SparkExecutorMetrics: sparkExecutorMetrics,
}
Expand Down
23 changes: 17 additions & 6 deletions internal/controller/sparkapplication/controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -54,10 +54,11 @@ var (

// Options defines the options of the controller.
type Options struct {
Namespaces []string
EnableUIService bool
IngressClassName string
IngressURLFormat string
Namespaces []string
EnableUIService bool
IngressClassName string
IngressURLFormat string
DefaultBatchScheduler string

SparkApplicationMetrics *metrics.SparkApplicationMetrics
SparkExecutorMetrics *metrics.SparkExecutorMetrics
Expand Down Expand Up @@ -1184,14 +1185,24 @@ func (r *Reconciler) resetSparkApplicationStatus(app *v1beta2.SparkApplication)
}

func (r *Reconciler) shouldDoBatchScheduling(app *v1beta2.SparkApplication) (bool, scheduler.Interface) {
if r.registry == nil || app.Spec.BatchScheduler == nil || *app.Spec.BatchScheduler == "" {
// If batch scheduling isn't enabled
if r.registry == nil {
return false, nil
}

schedulerName := r.options.DefaultBatchScheduler
if app.Spec.BatchScheduler != nil && *app.Spec.BatchScheduler != "" {
schedulerName = *app.Spec.BatchScheduler
}

// If both the default and app batch scheduler are unspecified or empty
if schedulerName == "" {
return false, nil
}

var err error
var scheduler scheduler.Interface

schedulerName := *app.Spec.BatchScheduler
switch schedulerName {
case common.VolcanoSchedulerName:
config := &volcano.Config{
Expand Down