Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Support extended kube-scheduler as batch scheduler #2136

Merged
merged 2 commits into from
Sep 3, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions charts/spark-operator-chart/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -90,6 +90,7 @@ See [helm uninstall](https://helm.sh/docs/helm/helm_uninstall) for command docum
| controller.uiIngress.enable | bool | `false` | Specifies whether to create ingress for Spark web UI. `controller.uiService.enable` must be `true` to enable ingress. |
| controller.uiIngress.urlFormat | string | `""` | Ingress URL format. Required if `controller.uiIngress.enable` is true. |
| controller.batchScheduler.enable | bool | `false` | Specifies whether to enable batch scheduler for spark jobs scheduling. If enabled, users can specify batch scheduler name in spark application. |
| controller.batchScheduler.kubeSchedulerNames | list | `[]` | Specifies a list of kube-scheduler names for scheduling Spark pods. |
| controller.batchScheduler.default | string | `""` | Default batch scheduler to be used if not specified by the user. If specified, this value must be either "volcano" or "yunikorn". Specifying any other value will cause the controller to error on startup. |
| controller.serviceAccount.create | bool | `true` | Specifies whether to create a service account for the controller. |
| controller.serviceAccount.name | string | `""` | Optional name for the controller service account. |
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -74,7 +74,12 @@ spec:
{{- end }}
{{- if .Values.controller.batchScheduler.enable }}
- --enable-batch-scheduler=true
- --default-batch-scheduler={{ .Values.controller.batchScheduler.default }}
{{- with .Values.controller.batchScheduler.kubeSchedulerNames }}
- --kube-scheduler-names={{ . | join "," }}
{{- end }}
{{- with .Values.controller.batchScheduler.default }}
- --default-batch-scheduler={{ . }}
{{- end }}
{{- end }}
{{- if .Values.prometheus.metrics.enable }}
- --enable-metrics=true
Expand Down
11 changes: 11 additions & 0 deletions charts/spark-operator-chart/templates/controller/rbac.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -129,6 +129,17 @@ rules:
- podgroups
verbs:
- "*"
- apiGroups:
- scheduling.x-k8s.io
resources:
- podgroups
verbs:
- get
- list
- watch
- create
- update
- delete
{{- end }}
---

Expand Down
3 changes: 3 additions & 0 deletions charts/spark-operator-chart/values.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -67,6 +67,9 @@ controller:
# -- Specifies whether to enable batch scheduler for spark jobs scheduling.
# If enabled, users can specify batch scheduler name in spark application.
enable: false
# -- Specifies a list of kube-scheduler names for scheduling Spark pods.
kubeSchedulerNames: []
# - default-scheduler
# -- Default batch scheduler to be used if not specified by the user.
# If specified, this value must be either "volcano" or "yunikorn". Specifying any other
# value will cause the controller to error on startup.
Expand Down
13 changes: 13 additions & 0 deletions cmd/operator/controller/start.go
Original file line number Diff line number Diff line change
Expand Up @@ -45,6 +45,7 @@ import (
logzap "sigs.k8s.io/controller-runtime/pkg/log/zap"
metricsserver "sigs.k8s.io/controller-runtime/pkg/metrics/server"
ctrlwebhook "sigs.k8s.io/controller-runtime/pkg/webhook"
schedulingv1alpha1 "sigs.k8s.io/scheduler-plugins/apis/scheduling/v1alpha1"

sparkoperator "github.com/kubeflow/spark-operator"
"github.com/kubeflow/spark-operator/api/v1beta1"
Expand All @@ -53,6 +54,7 @@ import (
"github.com/kubeflow/spark-operator/internal/controller/sparkapplication"
"github.com/kubeflow/spark-operator/internal/metrics"
"github.com/kubeflow/spark-operator/internal/scheduler"
"github.com/kubeflow/spark-operator/internal/scheduler/kubescheduler"
"github.com/kubeflow/spark-operator/internal/scheduler/volcano"
"github.com/kubeflow/spark-operator/internal/scheduler/yunikorn"
"github.com/kubeflow/spark-operator/pkg/common"
Expand All @@ -74,6 +76,7 @@ var (

// Batch scheduler
enableBatchScheduler bool
kubeSchedulerNames []string
defaultBatchScheduler string

// Spark web UI service and ingress
Expand Down Expand Up @@ -106,6 +109,7 @@ var (

func init() {
utilruntime.Must(clientgoscheme.AddToScheme(scheme))
utilruntime.Must(schedulingv1alpha1.AddToScheme(scheme))

utilruntime.Must(v1beta1.AddToScheme(scheme))
utilruntime.Must(v1beta2.AddToScheme(scheme))
Expand All @@ -130,6 +134,7 @@ func NewStartCommand() *cobra.Command {
command.Flags().DurationVar(&cacheSyncTimeout, "cache-sync-timeout", 30*time.Second, "Informer cache sync timeout.")

command.Flags().BoolVar(&enableBatchScheduler, "enable-batch-scheduler", false, "Enable batch schedulers.")
command.Flags().StringSliceVar(&kubeSchedulerNames, "kube-scheduler-names", []string{}, "The kube-scheduler names for scheduling Spark applications.")
command.Flags().StringVar(&defaultBatchScheduler, "default-batch-scheduler", "", "Default batch scheduler.")

command.Flags().BoolVar(&enableUIService, "enable-ui-service", true, "Enable Spark Web UI service.")
Expand Down Expand Up @@ -214,6 +219,11 @@ func start() {
_ = registry.Register(common.VolcanoSchedulerName, volcano.Factory)
_ = registry.Register(yunikorn.SchedulerName, yunikorn.Factory)

// Register kube-schedulers.
for _, name := range kubeSchedulerNames {
registry.Register(name, kubescheduler.Factory)
}

schedulerNames := registry.GetRegisteredSchedulerNames()
if defaultBatchScheduler != "" && !slices.Contains(schedulerNames, defaultBatchScheduler) {
logger.Error(nil, "Failed to find default batch scheduler in registered schedulers")
Expand Down Expand Up @@ -362,6 +372,9 @@ func newSparkApplicationReconcilerOptions() sparkapplication.Options {
SparkApplicationMetrics: sparkApplicationMetrics,
SparkExecutorMetrics: sparkExecutorMetrics,
}
if enableBatchScheduler {
options.KubeSchedulerNames = kubeSchedulerNames
}
return options
}

Expand Down
43 changes: 43 additions & 0 deletions examples/spark-pi-kube-scheduler.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,43 @@
#
# Copyright 2024 The Kubeflow authors.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.

apiVersion: sparkoperator.k8s.io/v1beta2
kind: SparkApplication
metadata:
name: spark-pi-kube-scheduler
namespace: default
spec:
type: Scala
mode: cluster
image: spark:3.5.0
imagePullPolicy: IfNotPresent
mainClass: org.apache.spark.examples.SparkPi
mainApplicationFile: local:///opt/spark/examples/jars/spark-examples_2.12-3.5.0.jar
sparkVersion: 3.5.0
driver:
labels:
version: 3.5.0
cores: 1
coreLimit: 1200m
memory: 512m
serviceAccount: spark-operator-spark
executor:
labels:
version: 3.5.0
instances: 2
cores: 1
coreLimit: 1200m
memory: 512m
batchScheduler: kube-scheduler
1 change: 1 addition & 0 deletions go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@ require (
k8s.io/kubernetes v1.30.2
k8s.io/utils v0.0.0-20240710235135-d4aae2beeffc
sigs.k8s.io/controller-runtime v0.17.5
sigs.k8s.io/scheduler-plugins v0.29.7
volcano.sh/apis v1.9.0
)

Expand Down
2 changes: 2 additions & 0 deletions go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -721,6 +721,8 @@ sigs.k8s.io/kustomize/api v0.17.2 h1:E7/Fjk7V5fboiuijoZHgs4aHuexi5Y2loXlVOAVAG5g
sigs.k8s.io/kustomize/api v0.17.2/go.mod h1:UWTz9Ct+MvoeQsHcJ5e+vziRRkwimm3HytpZgIYqye0=
sigs.k8s.io/kustomize/kyaml v0.17.1 h1:TnxYQxFXzbmNG6gOINgGWQt09GghzgTP6mIurOgrLCQ=
sigs.k8s.io/kustomize/kyaml v0.17.1/go.mod h1:9V0mCjIEYjlXuCdYsSXvyoy2BTsLESH7TlGV81S282U=
sigs.k8s.io/scheduler-plugins v0.29.7 h1:FSV/uGoU1shHoCoHDiXYHREI1ZLj/VaOkAWyRWXSZzs=
sigs.k8s.io/scheduler-plugins v0.29.7/go.mod h1:fin+Wv9sMnkcDUtXcBRoR9S4vYVCkhyY4ZMi9mJyzLY=
sigs.k8s.io/structured-merge-diff/v4 v4.4.1 h1:150L+0vs/8DA78h1u02ooW1/fFq/Lwr+sGiqlzvrtq4=
sigs.k8s.io/structured-merge-diff/v4 v4.4.1/go.mod h1:N8hJocpFajUSSeSJ9bOZ77VzejKZaXsTtZo4/u7Io08=
sigs.k8s.io/yaml v1.4.0 h1:Mk1wCc2gy/F0THH0TAp1QYyJNzRm2KCLy3o5ASXVI5E=
Expand Down
13 changes: 13 additions & 0 deletions internal/controller/sparkapplication/controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -42,6 +42,7 @@ import (
"github.com/kubeflow/spark-operator/api/v1beta2"
"github.com/kubeflow/spark-operator/internal/metrics"
"github.com/kubeflow/spark-operator/internal/scheduler"
"github.com/kubeflow/spark-operator/internal/scheduler/kubescheduler"
"github.com/kubeflow/spark-operator/internal/scheduler/volcano"
"github.com/kubeflow/spark-operator/internal/scheduler/yunikorn"
"github.com/kubeflow/spark-operator/pkg/common"
Expand All @@ -60,6 +61,8 @@ type Options struct {
IngressURLFormat string
DefaultBatchScheduler string

KubeSchedulerNames []string

SparkApplicationMetrics *metrics.SparkApplicationMetrics
SparkExecutorMetrics *metrics.SparkExecutorMetrics
}
Expand Down Expand Up @@ -1213,6 +1216,16 @@ func (r *Reconciler) shouldDoBatchScheduling(app *v1beta2.SparkApplication) (boo
scheduler, err = r.registry.GetScheduler(schedulerName, nil)
}

for _, name := range r.options.KubeSchedulerNames {
if schedulerName == name {
config := &kubescheduler.Config{
SchedulerName: name,
Client: r.manager.GetClient(),
}
scheduler, err = r.registry.GetScheduler(name, config)
}
}

if err != nil || scheduler == nil {
logger.Error(err, "Failed to get scheduler for SparkApplication", "name", app.Name, "namespace", app.Namespace, "scheduler", schedulerName)
return false, nil
Expand Down
159 changes: 159 additions & 0 deletions internal/scheduler/kubescheduler/scheduler.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,159 @@
/*
Copyright 2024 The Kubeflow authors.

Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at

https://www.apache.org/licenses/LICENSE-2.0

Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/

package kubescheduler

import (
"context"
"fmt"

corev1 "k8s.io/api/core/v1"
"k8s.io/apimachinery/pkg/api/errors"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/types"
"sigs.k8s.io/controller-runtime/pkg/client"
"sigs.k8s.io/controller-runtime/pkg/log"
schedulingv1alpha1 "sigs.k8s.io/scheduler-plugins/apis/scheduling/v1alpha1"

"github.com/kubeflow/spark-operator/api/v1beta2"
"github.com/kubeflow/spark-operator/internal/scheduler"
"github.com/kubeflow/spark-operator/pkg/util"
)

const (
Name = "kube-scheduler"
)

var (
logger = log.Log.WithName("")
)

// Scheduler is a scheduler that uses scheduler plugins to schedule Spark pods.
// Ref: https://github.com/kubernetes-sigs/scheduler-plugins.
type Scheduler struct {
name string
client client.Client
}

// Scheduler implements scheduler.Interface.
var _ scheduler.Interface = &Scheduler{}

// Config defines the configurations of kube-scheduler.
type Config struct {
SchedulerName string
Client client.Client
}

// Config implements scheduler.Config.
var _ scheduler.Config = &Config{}

// Factory creates a new Scheduler instance.
func Factory(config scheduler.Config) (scheduler.Interface, error) {
c, ok := config.(*Config)
if !ok {
return nil, fmt.Errorf("failed to get kube-scheduler config")
}

scheduler := &Scheduler{
name: c.SchedulerName,
client: c.Client,
}
return scheduler, nil
}

// Name implements scheduler.Interface.
func (s *Scheduler) Name() string {
return s.name
}

// ShouldSchedule implements scheduler.Interface.
func (s *Scheduler) ShouldSchedule(app *v1beta2.SparkApplication) bool {
// There is no additional requirements for scheduling.
return true
}

// Schedule implements scheduler.Interface.
func (s *Scheduler) Schedule(app *v1beta2.SparkApplication) error {
minResources := util.SumResourceList([]corev1.ResourceList{util.GetDriverRequestResource(app), util.GetExecutorRequestResource(app)})
podGroup := &schedulingv1alpha1.PodGroup{
ObjectMeta: metav1.ObjectMeta{
Name: getPodGroupName(app),
Namespace: app.Namespace,
OwnerReferences: []metav1.OwnerReference{
*metav1.NewControllerRef(app, v1beta2.SchemeGroupVersion.WithKind("SparkApplication")),
},
},
Spec: schedulingv1alpha1.PodGroupSpec{
MinMember: 1,
MinResources: minResources,
},
}

if err := s.syncPodGroup(podGroup); err != nil {
return fmt.Errorf("failed to sync pod group: %v", err)
}

// Add a label `scheduling.x-k8s.io/pod-group` to mark the pod belongs to a group
if app.ObjectMeta.Labels == nil {
app.ObjectMeta.Labels = make(map[string]string)
}
app.ObjectMeta.Labels[schedulingv1alpha1.PodGroupLabel] = podGroup.Name

return nil
}

// Cleanup implements scheduler.Interface.
func (s *Scheduler) Cleanup(app *v1beta2.SparkApplication) error {
podGroup := &schedulingv1alpha1.PodGroup{
ObjectMeta: metav1.ObjectMeta{
Name: getPodGroupName(app),
Namespace: app.Namespace,
},
}
if err := s.client.Delete(context.TODO(), podGroup); err != nil {
if errors.IsNotFound(err) {
return nil
}
return err
}
logger.Info("Deleted PodGroup", "Name", podGroup.Name, "Namespace", podGroup.Namespace)
return nil
}

func (s *Scheduler) syncPodGroup(podGroup *schedulingv1alpha1.PodGroup) error {
key := types.NamespacedName{
Namespace: podGroup.Namespace,
Name: podGroup.Name,
}

if err := s.client.Get(context.TODO(), key, &schedulingv1alpha1.PodGroup{}); err != nil {
if !errors.IsNotFound(err) {
return err
}

if err := s.client.Create(context.TODO(), podGroup); err != nil {
return err
}
logger.Info("Created PodGroup", "Name", podGroup.Name, "Namespace", podGroup.Namespace)
return nil
}

if err := s.client.Update(context.TODO(), podGroup); err != nil {
return err
}
logger.Info("Updated PodGroup", "Name", podGroup.Name, "Namespace", podGroup.Namespace)
return nil
}
27 changes: 27 additions & 0 deletions internal/scheduler/kubescheduler/util.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,27 @@
/*
Copyright 2024 The Kubeflow authors.

Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at

https://www.apache.org/licenses/LICENSE-2.0

Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/

package kubescheduler

import (
"fmt"

"github.com/kubeflow/spark-operator/api/v1beta2"
)

func getPodGroupName(app *v1beta2.SparkApplication) string {
return fmt.Sprintf("%s-pg", app.Name)
}