Skip to content

Commit

Permalink
Support extended kube-scheduler as batch scheduler (#2136)
Browse files Browse the repository at this point in the history
* Support coscheduling with kube-scheduler plugins

Signed-off-by: Yi Chen <github@chenyicn.net>

* Add example for using kube-schulder coscheduling

Signed-off-by: Yi Chen <github@chenyicn.net>

---------

Signed-off-by: Yi Chen <github@chenyicn.net>
  • Loading branch information
ChenYi015 authored Sep 3, 2024
1 parent c93b0ec commit e8d3de9
Show file tree
Hide file tree
Showing 11 changed files with 279 additions and 1 deletion.
1 change: 1 addition & 0 deletions charts/spark-operator-chart/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -90,6 +90,7 @@ See [helm uninstall](https://helm.sh/docs/helm/helm_uninstall) for command docum
| controller.uiIngress.enable | bool | `false` | Specifies whether to create ingress for Spark web UI. `controller.uiService.enable` must be `true` to enable ingress. |
| controller.uiIngress.urlFormat | string | `""` | Ingress URL format. Required if `controller.uiIngress.enable` is true. |
| controller.batchScheduler.enable | bool | `false` | Specifies whether to enable batch scheduler for spark jobs scheduling. If enabled, users can specify batch scheduler name in spark application. |
| controller.batchScheduler.kubeSchedulerNames | list | `[]` | Specifies a list of kube-scheduler names for scheduling Spark pods. |
| controller.batchScheduler.default | string | `""` | Default batch scheduler to be used if not specified by the user. If specified, this value must be either "volcano" or "yunikorn". Specifying any other value will cause the controller to error on startup. |
| controller.serviceAccount.create | bool | `true` | Specifies whether to create a service account for the controller. |
| controller.serviceAccount.name | string | `""` | Optional name for the controller service account. |
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -74,7 +74,12 @@ spec:
{{- end }}
{{- if .Values.controller.batchScheduler.enable }}
- --enable-batch-scheduler=true
- --default-batch-scheduler={{ .Values.controller.batchScheduler.default }}
{{- with .Values.controller.batchScheduler.kubeSchedulerNames }}
- --kube-scheduler-names={{ . | join "," }}
{{- end }}
{{- with .Values.controller.batchScheduler.default }}
- --default-batch-scheduler={{ . }}
{{- end }}
{{- end }}
{{- if .Values.prometheus.metrics.enable }}
- --enable-metrics=true
Expand Down
11 changes: 11 additions & 0 deletions charts/spark-operator-chart/templates/controller/rbac.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -129,6 +129,17 @@ rules:
- podgroups
verbs:
- "*"
- apiGroups:
- scheduling.x-k8s.io
resources:
- podgroups
verbs:
- get
- list
- watch
- create
- update
- delete
{{- end }}
---

Expand Down
3 changes: 3 additions & 0 deletions charts/spark-operator-chart/values.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -67,6 +67,9 @@ controller:
# -- Specifies whether to enable batch scheduler for spark jobs scheduling.
# If enabled, users can specify batch scheduler name in spark application.
enable: false
# -- Specifies a list of kube-scheduler names for scheduling Spark pods.
kubeSchedulerNames: []
# - default-scheduler
# -- Default batch scheduler to be used if not specified by the user.
# If specified, this value must be either "volcano" or "yunikorn". Specifying any other
# value will cause the controller to error on startup.
Expand Down
13 changes: 13 additions & 0 deletions cmd/operator/controller/start.go
Original file line number Diff line number Diff line change
Expand Up @@ -45,6 +45,7 @@ import (
logzap "sigs.k8s.io/controller-runtime/pkg/log/zap"
metricsserver "sigs.k8s.io/controller-runtime/pkg/metrics/server"
ctrlwebhook "sigs.k8s.io/controller-runtime/pkg/webhook"
schedulingv1alpha1 "sigs.k8s.io/scheduler-plugins/apis/scheduling/v1alpha1"

sparkoperator "github.com/kubeflow/spark-operator"
"github.com/kubeflow/spark-operator/api/v1beta1"
Expand All @@ -53,6 +54,7 @@ import (
"github.com/kubeflow/spark-operator/internal/controller/sparkapplication"
"github.com/kubeflow/spark-operator/internal/metrics"
"github.com/kubeflow/spark-operator/internal/scheduler"
"github.com/kubeflow/spark-operator/internal/scheduler/kubescheduler"
"github.com/kubeflow/spark-operator/internal/scheduler/volcano"
"github.com/kubeflow/spark-operator/internal/scheduler/yunikorn"
"github.com/kubeflow/spark-operator/pkg/common"
Expand All @@ -74,6 +76,7 @@ var (

// Batch scheduler
enableBatchScheduler bool
kubeSchedulerNames []string
defaultBatchScheduler string

// Spark web UI service and ingress
Expand Down Expand Up @@ -106,6 +109,7 @@ var (

func init() {
utilruntime.Must(clientgoscheme.AddToScheme(scheme))
utilruntime.Must(schedulingv1alpha1.AddToScheme(scheme))

utilruntime.Must(v1beta1.AddToScheme(scheme))
utilruntime.Must(v1beta2.AddToScheme(scheme))
Expand All @@ -130,6 +134,7 @@ func NewStartCommand() *cobra.Command {
command.Flags().DurationVar(&cacheSyncTimeout, "cache-sync-timeout", 30*time.Second, "Informer cache sync timeout.")

command.Flags().BoolVar(&enableBatchScheduler, "enable-batch-scheduler", false, "Enable batch schedulers.")
command.Flags().StringSliceVar(&kubeSchedulerNames, "kube-scheduler-names", []string{}, "The kube-scheduler names for scheduling Spark applications.")
command.Flags().StringVar(&defaultBatchScheduler, "default-batch-scheduler", "", "Default batch scheduler.")

command.Flags().BoolVar(&enableUIService, "enable-ui-service", true, "Enable Spark Web UI service.")
Expand Down Expand Up @@ -214,6 +219,11 @@ func start() {
_ = registry.Register(common.VolcanoSchedulerName, volcano.Factory)
_ = registry.Register(yunikorn.SchedulerName, yunikorn.Factory)

// Register kube-schedulers.
for _, name := range kubeSchedulerNames {
registry.Register(name, kubescheduler.Factory)
}

schedulerNames := registry.GetRegisteredSchedulerNames()
if defaultBatchScheduler != "" && !slices.Contains(schedulerNames, defaultBatchScheduler) {
logger.Error(nil, "Failed to find default batch scheduler in registered schedulers")
Expand Down Expand Up @@ -362,6 +372,9 @@ func newSparkApplicationReconcilerOptions() sparkapplication.Options {
SparkApplicationMetrics: sparkApplicationMetrics,
SparkExecutorMetrics: sparkExecutorMetrics,
}
if enableBatchScheduler {
options.KubeSchedulerNames = kubeSchedulerNames
}
return options
}

Expand Down
43 changes: 43 additions & 0 deletions examples/spark-pi-kube-scheduler.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,43 @@
#
# Copyright 2024 The Kubeflow authors.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.

apiVersion: sparkoperator.k8s.io/v1beta2
kind: SparkApplication
metadata:
name: spark-pi-kube-scheduler
namespace: default
spec:
type: Scala
mode: cluster
image: spark:3.5.0
imagePullPolicy: IfNotPresent
mainClass: org.apache.spark.examples.SparkPi
mainApplicationFile: local:///opt/spark/examples/jars/spark-examples_2.12-3.5.0.jar
sparkVersion: 3.5.0
driver:
labels:
version: 3.5.0
cores: 1
coreLimit: 1200m
memory: 512m
serviceAccount: spark-operator-spark
executor:
labels:
version: 3.5.0
instances: 2
cores: 1
coreLimit: 1200m
memory: 512m
batchScheduler: kube-scheduler
1 change: 1 addition & 0 deletions go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@ require (
k8s.io/kubernetes v1.30.2
k8s.io/utils v0.0.0-20240710235135-d4aae2beeffc
sigs.k8s.io/controller-runtime v0.17.5
sigs.k8s.io/scheduler-plugins v0.29.7
volcano.sh/apis v1.9.0
)

Expand Down
2 changes: 2 additions & 0 deletions go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -721,6 +721,8 @@ sigs.k8s.io/kustomize/api v0.17.2 h1:E7/Fjk7V5fboiuijoZHgs4aHuexi5Y2loXlVOAVAG5g
sigs.k8s.io/kustomize/api v0.17.2/go.mod h1:UWTz9Ct+MvoeQsHcJ5e+vziRRkwimm3HytpZgIYqye0=
sigs.k8s.io/kustomize/kyaml v0.17.1 h1:TnxYQxFXzbmNG6gOINgGWQt09GghzgTP6mIurOgrLCQ=
sigs.k8s.io/kustomize/kyaml v0.17.1/go.mod h1:9V0mCjIEYjlXuCdYsSXvyoy2BTsLESH7TlGV81S282U=
sigs.k8s.io/scheduler-plugins v0.29.7 h1:FSV/uGoU1shHoCoHDiXYHREI1ZLj/VaOkAWyRWXSZzs=
sigs.k8s.io/scheduler-plugins v0.29.7/go.mod h1:fin+Wv9sMnkcDUtXcBRoR9S4vYVCkhyY4ZMi9mJyzLY=
sigs.k8s.io/structured-merge-diff/v4 v4.4.1 h1:150L+0vs/8DA78h1u02ooW1/fFq/Lwr+sGiqlzvrtq4=
sigs.k8s.io/structured-merge-diff/v4 v4.4.1/go.mod h1:N8hJocpFajUSSeSJ9bOZ77VzejKZaXsTtZo4/u7Io08=
sigs.k8s.io/yaml v1.4.0 h1:Mk1wCc2gy/F0THH0TAp1QYyJNzRm2KCLy3o5ASXVI5E=
Expand Down
13 changes: 13 additions & 0 deletions internal/controller/sparkapplication/controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -42,6 +42,7 @@ import (
"github.com/kubeflow/spark-operator/api/v1beta2"
"github.com/kubeflow/spark-operator/internal/metrics"
"github.com/kubeflow/spark-operator/internal/scheduler"
"github.com/kubeflow/spark-operator/internal/scheduler/kubescheduler"
"github.com/kubeflow/spark-operator/internal/scheduler/volcano"
"github.com/kubeflow/spark-operator/internal/scheduler/yunikorn"
"github.com/kubeflow/spark-operator/pkg/common"
Expand All @@ -60,6 +61,8 @@ type Options struct {
IngressURLFormat string
DefaultBatchScheduler string

KubeSchedulerNames []string

SparkApplicationMetrics *metrics.SparkApplicationMetrics
SparkExecutorMetrics *metrics.SparkExecutorMetrics
}
Expand Down Expand Up @@ -1213,6 +1216,16 @@ func (r *Reconciler) shouldDoBatchScheduling(app *v1beta2.SparkApplication) (boo
scheduler, err = r.registry.GetScheduler(schedulerName, nil)
}

for _, name := range r.options.KubeSchedulerNames {
if schedulerName == name {
config := &kubescheduler.Config{
SchedulerName: name,
Client: r.manager.GetClient(),
}
scheduler, err = r.registry.GetScheduler(name, config)
}
}

if err != nil || scheduler == nil {
logger.Error(err, "Failed to get scheduler for SparkApplication", "name", app.Name, "namespace", app.Namespace, "scheduler", schedulerName)
return false, nil
Expand Down
159 changes: 159 additions & 0 deletions internal/scheduler/kubescheduler/scheduler.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,159 @@
/*
Copyright 2024 The Kubeflow authors.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
https://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/

package kubescheduler

import (
"context"
"fmt"

corev1 "k8s.io/api/core/v1"
"k8s.io/apimachinery/pkg/api/errors"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/types"
"sigs.k8s.io/controller-runtime/pkg/client"
"sigs.k8s.io/controller-runtime/pkg/log"
schedulingv1alpha1 "sigs.k8s.io/scheduler-plugins/apis/scheduling/v1alpha1"

"github.com/kubeflow/spark-operator/api/v1beta2"
"github.com/kubeflow/spark-operator/internal/scheduler"
"github.com/kubeflow/spark-operator/pkg/util"
)

const (
Name = "kube-scheduler"
)

var (
logger = log.Log.WithName("")
)

// Scheduler is a scheduler that uses scheduler plugins to schedule Spark pods.
// Ref: https://github.com/kubernetes-sigs/scheduler-plugins.
type Scheduler struct {
name string
client client.Client
}

// Scheduler implements scheduler.Interface.
var _ scheduler.Interface = &Scheduler{}

// Config defines the configurations of kube-scheduler.
type Config struct {
SchedulerName string
Client client.Client
}

// Config implements scheduler.Config.
var _ scheduler.Config = &Config{}

// Factory creates a new Scheduler instance.
func Factory(config scheduler.Config) (scheduler.Interface, error) {
c, ok := config.(*Config)
if !ok {
return nil, fmt.Errorf("failed to get kube-scheduler config")
}

scheduler := &Scheduler{
name: c.SchedulerName,
client: c.Client,
}
return scheduler, nil
}

// Name implements scheduler.Interface.
func (s *Scheduler) Name() string {
return s.name
}

// ShouldSchedule implements scheduler.Interface.
func (s *Scheduler) ShouldSchedule(app *v1beta2.SparkApplication) bool {
// There is no additional requirements for scheduling.
return true
}

// Schedule implements scheduler.Interface.
func (s *Scheduler) Schedule(app *v1beta2.SparkApplication) error {
minResources := util.SumResourceList([]corev1.ResourceList{util.GetDriverRequestResource(app), util.GetExecutorRequestResource(app)})
podGroup := &schedulingv1alpha1.PodGroup{
ObjectMeta: metav1.ObjectMeta{
Name: getPodGroupName(app),
Namespace: app.Namespace,
OwnerReferences: []metav1.OwnerReference{
*metav1.NewControllerRef(app, v1beta2.SchemeGroupVersion.WithKind("SparkApplication")),
},
},
Spec: schedulingv1alpha1.PodGroupSpec{
MinMember: 1,
MinResources: minResources,
},
}

if err := s.syncPodGroup(podGroup); err != nil {
return fmt.Errorf("failed to sync pod group: %v", err)
}

// Add a label `scheduling.x-k8s.io/pod-group` to mark the pod belongs to a group
if app.ObjectMeta.Labels == nil {
app.ObjectMeta.Labels = make(map[string]string)
}
app.ObjectMeta.Labels[schedulingv1alpha1.PodGroupLabel] = podGroup.Name

return nil
}

// Cleanup implements scheduler.Interface.
func (s *Scheduler) Cleanup(app *v1beta2.SparkApplication) error {
podGroup := &schedulingv1alpha1.PodGroup{
ObjectMeta: metav1.ObjectMeta{
Name: getPodGroupName(app),
Namespace: app.Namespace,
},
}
if err := s.client.Delete(context.TODO(), podGroup); err != nil {
if errors.IsNotFound(err) {
return nil
}
return err
}
logger.Info("Deleted PodGroup", "Name", podGroup.Name, "Namespace", podGroup.Namespace)
return nil
}

func (s *Scheduler) syncPodGroup(podGroup *schedulingv1alpha1.PodGroup) error {
key := types.NamespacedName{
Namespace: podGroup.Namespace,
Name: podGroup.Name,
}

if err := s.client.Get(context.TODO(), key, &schedulingv1alpha1.PodGroup{}); err != nil {
if !errors.IsNotFound(err) {
return err
}

if err := s.client.Create(context.TODO(), podGroup); err != nil {
return err
}
logger.Info("Created PodGroup", "Name", podGroup.Name, "Namespace", podGroup.Namespace)
return nil
}

if err := s.client.Update(context.TODO(), podGroup); err != nil {
return err
}
logger.Info("Updated PodGroup", "Name", podGroup.Name, "Namespace", podGroup.Namespace)
return nil
}
27 changes: 27 additions & 0 deletions internal/scheduler/kubescheduler/util.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,27 @@
/*
Copyright 2024 The Kubeflow authors.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
https://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/

package kubescheduler

import (
"fmt"

"github.com/kubeflow/spark-operator/api/v1beta2"
)

func getPodGroupName(app *v1beta2.SparkApplication) string {
return fmt.Sprintf("%s-pg", app.Name)
}

0 comments on commit e8d3de9

Please sign in to comment.