diff --git a/charts/spark-operator-chart/README.md b/charts/spark-operator-chart/README.md index 42cd579b3..dbd4969e9 100644 --- a/charts/spark-operator-chart/README.md +++ b/charts/spark-operator-chart/README.md @@ -90,6 +90,7 @@ See [helm uninstall](https://helm.sh/docs/helm/helm_uninstall) for command docum | controller.uiIngress.enable | bool | `false` | Specifies whether to create ingress for Spark web UI. `controller.uiService.enable` must be `true` to enable ingress. | | controller.uiIngress.urlFormat | string | `""` | Ingress URL format. Required if `controller.uiIngress.enable` is true. | | controller.batchScheduler.enable | bool | `false` | Specifies whether to enable batch scheduler for spark jobs scheduling. If enabled, users can specify batch scheduler name in spark application. | +| controller.batchScheduler.kubeSchedulerNames | list | `[]` | Specifies a list of kube-scheduler names for scheduling Spark pods. | | controller.batchScheduler.default | string | `""` | Default batch scheduler to be used if not specified by the user. If specified, this value must be either "volcano" or "yunikorn". Specifying any other value will cause the controller to error on startup. | | controller.serviceAccount.create | bool | `true` | Specifies whether to create a service account for the controller. | | controller.serviceAccount.name | string | `""` | Optional name for the controller service account. | diff --git a/charts/spark-operator-chart/templates/controller/deployment.yaml b/charts/spark-operator-chart/templates/controller/deployment.yaml index 6df3cb79d..74cf30a0f 100644 --- a/charts/spark-operator-chart/templates/controller/deployment.yaml +++ b/charts/spark-operator-chart/templates/controller/deployment.yaml @@ -74,7 +74,12 @@ spec: {{- end }} {{- if .Values.controller.batchScheduler.enable }} - --enable-batch-scheduler=true - - --default-batch-scheduler={{ .Values.controller.batchScheduler.default }} + {{- with .Values.controller.batchScheduler.kubeSchedulerNames }} + - --kube-scheduler-names={{ . | join "," }} + {{- end }} + {{- with .Values.controller.batchScheduler.default }} + - --default-batch-scheduler={{ . }} + {{- end }} {{- end }} {{- if .Values.prometheus.metrics.enable }} - --enable-metrics=true diff --git a/charts/spark-operator-chart/templates/controller/rbac.yaml b/charts/spark-operator-chart/templates/controller/rbac.yaml index 472d0fcc7..7f0417eda 100644 --- a/charts/spark-operator-chart/templates/controller/rbac.yaml +++ b/charts/spark-operator-chart/templates/controller/rbac.yaml @@ -129,6 +129,17 @@ rules: - podgroups verbs: - "*" +- apiGroups: + - scheduling.x-k8s.io + resources: + - podgroups + verbs: + - get + - list + - watch + - create + - update + - delete {{- end }} --- diff --git a/charts/spark-operator-chart/values.yaml b/charts/spark-operator-chart/values.yaml index aaa0930bb..7b9b9330c 100644 --- a/charts/spark-operator-chart/values.yaml +++ b/charts/spark-operator-chart/values.yaml @@ -67,6 +67,9 @@ controller: # -- Specifies whether to enable batch scheduler for spark jobs scheduling. # If enabled, users can specify batch scheduler name in spark application. enable: false + # -- Specifies a list of kube-scheduler names for scheduling Spark pods. + kubeSchedulerNames: [] + # - default-scheduler # -- Default batch scheduler to be used if not specified by the user. # If specified, this value must be either "volcano" or "yunikorn". Specifying any other # value will cause the controller to error on startup. diff --git a/cmd/operator/controller/start.go b/cmd/operator/controller/start.go index 12c014f4f..e6bf91c64 100644 --- a/cmd/operator/controller/start.go +++ b/cmd/operator/controller/start.go @@ -45,6 +45,7 @@ import ( logzap "sigs.k8s.io/controller-runtime/pkg/log/zap" metricsserver "sigs.k8s.io/controller-runtime/pkg/metrics/server" ctrlwebhook "sigs.k8s.io/controller-runtime/pkg/webhook" + schedulingv1alpha1 "sigs.k8s.io/scheduler-plugins/apis/scheduling/v1alpha1" sparkoperator "github.com/kubeflow/spark-operator" "github.com/kubeflow/spark-operator/api/v1beta1" @@ -53,6 +54,7 @@ import ( "github.com/kubeflow/spark-operator/internal/controller/sparkapplication" "github.com/kubeflow/spark-operator/internal/metrics" "github.com/kubeflow/spark-operator/internal/scheduler" + "github.com/kubeflow/spark-operator/internal/scheduler/kubescheduler" "github.com/kubeflow/spark-operator/internal/scheduler/volcano" "github.com/kubeflow/spark-operator/internal/scheduler/yunikorn" "github.com/kubeflow/spark-operator/pkg/common" @@ -74,6 +76,7 @@ var ( // Batch scheduler enableBatchScheduler bool + kubeSchedulerNames []string defaultBatchScheduler string // Spark web UI service and ingress @@ -106,6 +109,7 @@ var ( func init() { utilruntime.Must(clientgoscheme.AddToScheme(scheme)) + utilruntime.Must(schedulingv1alpha1.AddToScheme(scheme)) utilruntime.Must(v1beta1.AddToScheme(scheme)) utilruntime.Must(v1beta2.AddToScheme(scheme)) @@ -130,6 +134,7 @@ func NewStartCommand() *cobra.Command { command.Flags().DurationVar(&cacheSyncTimeout, "cache-sync-timeout", 30*time.Second, "Informer cache sync timeout.") command.Flags().BoolVar(&enableBatchScheduler, "enable-batch-scheduler", false, "Enable batch schedulers.") + command.Flags().StringSliceVar(&kubeSchedulerNames, "kube-scheduler-names", []string{}, "The kube-scheduler names for scheduling Spark applications.") command.Flags().StringVar(&defaultBatchScheduler, "default-batch-scheduler", "", "Default batch scheduler.") command.Flags().BoolVar(&enableUIService, "enable-ui-service", true, "Enable Spark Web UI service.") @@ -214,6 +219,11 @@ func start() { _ = registry.Register(common.VolcanoSchedulerName, volcano.Factory) _ = registry.Register(yunikorn.SchedulerName, yunikorn.Factory) + // Register kube-schedulers. + for _, name := range kubeSchedulerNames { + registry.Register(name, kubescheduler.Factory) + } + schedulerNames := registry.GetRegisteredSchedulerNames() if defaultBatchScheduler != "" && !slices.Contains(schedulerNames, defaultBatchScheduler) { logger.Error(nil, "Failed to find default batch scheduler in registered schedulers") @@ -362,6 +372,9 @@ func newSparkApplicationReconcilerOptions() sparkapplication.Options { SparkApplicationMetrics: sparkApplicationMetrics, SparkExecutorMetrics: sparkExecutorMetrics, } + if enableBatchScheduler { + options.KubeSchedulerNames = kubeSchedulerNames + } return options } diff --git a/examples/spark-pi-kube-scheduler.yaml b/examples/spark-pi-kube-scheduler.yaml new file mode 100644 index 000000000..010154fe2 --- /dev/null +++ b/examples/spark-pi-kube-scheduler.yaml @@ -0,0 +1,43 @@ +# +# Copyright 2024 The Kubeflow authors. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +apiVersion: sparkoperator.k8s.io/v1beta2 +kind: SparkApplication +metadata: + name: spark-pi-kube-scheduler + namespace: default +spec: + type: Scala + mode: cluster + image: spark:3.5.0 + imagePullPolicy: IfNotPresent + mainClass: org.apache.spark.examples.SparkPi + mainApplicationFile: local:///opt/spark/examples/jars/spark-examples_2.12-3.5.0.jar + sparkVersion: 3.5.0 + driver: + labels: + version: 3.5.0 + cores: 1 + coreLimit: 1200m + memory: 512m + serviceAccount: spark-operator-spark + executor: + labels: + version: 3.5.0 + instances: 2 + cores: 1 + coreLimit: 1200m + memory: 512m + batchScheduler: kube-scheduler diff --git a/go.mod b/go.mod index df65323b3..e9bed8ec2 100644 --- a/go.mod +++ b/go.mod @@ -28,6 +28,7 @@ require ( k8s.io/kubernetes v1.30.2 k8s.io/utils v0.0.0-20240710235135-d4aae2beeffc sigs.k8s.io/controller-runtime v0.17.5 + sigs.k8s.io/scheduler-plugins v0.29.7 volcano.sh/apis v1.9.0 ) diff --git a/go.sum b/go.sum index a5bb11dd5..bf61f0abc 100644 --- a/go.sum +++ b/go.sum @@ -721,6 +721,8 @@ sigs.k8s.io/kustomize/api v0.17.2 h1:E7/Fjk7V5fboiuijoZHgs4aHuexi5Y2loXlVOAVAG5g sigs.k8s.io/kustomize/api v0.17.2/go.mod h1:UWTz9Ct+MvoeQsHcJ5e+vziRRkwimm3HytpZgIYqye0= sigs.k8s.io/kustomize/kyaml v0.17.1 h1:TnxYQxFXzbmNG6gOINgGWQt09GghzgTP6mIurOgrLCQ= sigs.k8s.io/kustomize/kyaml v0.17.1/go.mod h1:9V0mCjIEYjlXuCdYsSXvyoy2BTsLESH7TlGV81S282U= +sigs.k8s.io/scheduler-plugins v0.29.7 h1:FSV/uGoU1shHoCoHDiXYHREI1ZLj/VaOkAWyRWXSZzs= +sigs.k8s.io/scheduler-plugins v0.29.7/go.mod h1:fin+Wv9sMnkcDUtXcBRoR9S4vYVCkhyY4ZMi9mJyzLY= sigs.k8s.io/structured-merge-diff/v4 v4.4.1 h1:150L+0vs/8DA78h1u02ooW1/fFq/Lwr+sGiqlzvrtq4= sigs.k8s.io/structured-merge-diff/v4 v4.4.1/go.mod h1:N8hJocpFajUSSeSJ9bOZ77VzejKZaXsTtZo4/u7Io08= sigs.k8s.io/yaml v1.4.0 h1:Mk1wCc2gy/F0THH0TAp1QYyJNzRm2KCLy3o5ASXVI5E= diff --git a/internal/controller/sparkapplication/controller.go b/internal/controller/sparkapplication/controller.go index 9f2917cb9..de8df8b00 100644 --- a/internal/controller/sparkapplication/controller.go +++ b/internal/controller/sparkapplication/controller.go @@ -42,6 +42,7 @@ import ( "github.com/kubeflow/spark-operator/api/v1beta2" "github.com/kubeflow/spark-operator/internal/metrics" "github.com/kubeflow/spark-operator/internal/scheduler" + "github.com/kubeflow/spark-operator/internal/scheduler/kubescheduler" "github.com/kubeflow/spark-operator/internal/scheduler/volcano" "github.com/kubeflow/spark-operator/internal/scheduler/yunikorn" "github.com/kubeflow/spark-operator/pkg/common" @@ -60,6 +61,8 @@ type Options struct { IngressURLFormat string DefaultBatchScheduler string + KubeSchedulerNames []string + SparkApplicationMetrics *metrics.SparkApplicationMetrics SparkExecutorMetrics *metrics.SparkExecutorMetrics } @@ -1213,6 +1216,16 @@ func (r *Reconciler) shouldDoBatchScheduling(app *v1beta2.SparkApplication) (boo scheduler, err = r.registry.GetScheduler(schedulerName, nil) } + for _, name := range r.options.KubeSchedulerNames { + if schedulerName == name { + config := &kubescheduler.Config{ + SchedulerName: name, + Client: r.manager.GetClient(), + } + scheduler, err = r.registry.GetScheduler(name, config) + } + } + if err != nil || scheduler == nil { logger.Error(err, "Failed to get scheduler for SparkApplication", "name", app.Name, "namespace", app.Namespace, "scheduler", schedulerName) return false, nil diff --git a/internal/scheduler/kubescheduler/scheduler.go b/internal/scheduler/kubescheduler/scheduler.go new file mode 100644 index 000000000..b7126b137 --- /dev/null +++ b/internal/scheduler/kubescheduler/scheduler.go @@ -0,0 +1,159 @@ +/* +Copyright 2024 The Kubeflow authors. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + https://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package kubescheduler + +import ( + "context" + "fmt" + + corev1 "k8s.io/api/core/v1" + "k8s.io/apimachinery/pkg/api/errors" + metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" + "k8s.io/apimachinery/pkg/types" + "sigs.k8s.io/controller-runtime/pkg/client" + "sigs.k8s.io/controller-runtime/pkg/log" + schedulingv1alpha1 "sigs.k8s.io/scheduler-plugins/apis/scheduling/v1alpha1" + + "github.com/kubeflow/spark-operator/api/v1beta2" + "github.com/kubeflow/spark-operator/internal/scheduler" + "github.com/kubeflow/spark-operator/pkg/util" +) + +const ( + Name = "kube-scheduler" +) + +var ( + logger = log.Log.WithName("") +) + +// Scheduler is a scheduler that uses scheduler plugins to schedule Spark pods. +// Ref: https://github.com/kubernetes-sigs/scheduler-plugins. +type Scheduler struct { + name string + client client.Client +} + +// Scheduler implements scheduler.Interface. +var _ scheduler.Interface = &Scheduler{} + +// Config defines the configurations of kube-scheduler. +type Config struct { + SchedulerName string + Client client.Client +} + +// Config implements scheduler.Config. +var _ scheduler.Config = &Config{} + +// Factory creates a new Scheduler instance. +func Factory(config scheduler.Config) (scheduler.Interface, error) { + c, ok := config.(*Config) + if !ok { + return nil, fmt.Errorf("failed to get kube-scheduler config") + } + + scheduler := &Scheduler{ + name: c.SchedulerName, + client: c.Client, + } + return scheduler, nil +} + +// Name implements scheduler.Interface. +func (s *Scheduler) Name() string { + return s.name +} + +// ShouldSchedule implements scheduler.Interface. +func (s *Scheduler) ShouldSchedule(app *v1beta2.SparkApplication) bool { + // There is no additional requirements for scheduling. + return true +} + +// Schedule implements scheduler.Interface. +func (s *Scheduler) Schedule(app *v1beta2.SparkApplication) error { + minResources := util.SumResourceList([]corev1.ResourceList{util.GetDriverRequestResource(app), util.GetExecutorRequestResource(app)}) + podGroup := &schedulingv1alpha1.PodGroup{ + ObjectMeta: metav1.ObjectMeta{ + Name: getPodGroupName(app), + Namespace: app.Namespace, + OwnerReferences: []metav1.OwnerReference{ + *metav1.NewControllerRef(app, v1beta2.SchemeGroupVersion.WithKind("SparkApplication")), + }, + }, + Spec: schedulingv1alpha1.PodGroupSpec{ + MinMember: 1, + MinResources: minResources, + }, + } + + if err := s.syncPodGroup(podGroup); err != nil { + return fmt.Errorf("failed to sync pod group: %v", err) + } + + // Add a label `scheduling.x-k8s.io/pod-group` to mark the pod belongs to a group + if app.ObjectMeta.Labels == nil { + app.ObjectMeta.Labels = make(map[string]string) + } + app.ObjectMeta.Labels[schedulingv1alpha1.PodGroupLabel] = podGroup.Name + + return nil +} + +// Cleanup implements scheduler.Interface. +func (s *Scheduler) Cleanup(app *v1beta2.SparkApplication) error { + podGroup := &schedulingv1alpha1.PodGroup{ + ObjectMeta: metav1.ObjectMeta{ + Name: getPodGroupName(app), + Namespace: app.Namespace, + }, + } + if err := s.client.Delete(context.TODO(), podGroup); err != nil { + if errors.IsNotFound(err) { + return nil + } + return err + } + logger.Info("Deleted PodGroup", "Name", podGroup.Name, "Namespace", podGroup.Namespace) + return nil +} + +func (s *Scheduler) syncPodGroup(podGroup *schedulingv1alpha1.PodGroup) error { + key := types.NamespacedName{ + Namespace: podGroup.Namespace, + Name: podGroup.Name, + } + + if err := s.client.Get(context.TODO(), key, &schedulingv1alpha1.PodGroup{}); err != nil { + if !errors.IsNotFound(err) { + return err + } + + if err := s.client.Create(context.TODO(), podGroup); err != nil { + return err + } + logger.Info("Created PodGroup", "Name", podGroup.Name, "Namespace", podGroup.Namespace) + return nil + } + + if err := s.client.Update(context.TODO(), podGroup); err != nil { + return err + } + logger.Info("Updated PodGroup", "Name", podGroup.Name, "Namespace", podGroup.Namespace) + return nil +} diff --git a/internal/scheduler/kubescheduler/util.go b/internal/scheduler/kubescheduler/util.go new file mode 100644 index 000000000..f4996a204 --- /dev/null +++ b/internal/scheduler/kubescheduler/util.go @@ -0,0 +1,27 @@ +/* +Copyright 2024 The Kubeflow authors. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + https://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package kubescheduler + +import ( + "fmt" + + "github.com/kubeflow/spark-operator/api/v1beta2" +) + +func getPodGroupName(app *v1beta2.SparkApplication) string { + return fmt.Sprintf("%s-pg", app.Name) +}