Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Resource quota enforcement webhook #544

Merged
merged 22 commits into from
Aug 28, 2019
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions Gopkg.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

7 changes: 7 additions & 0 deletions docs/user-guide.md
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,7 @@ The Kubernetes Operator for Apache Spark ships with a command-line tool called `
* [Configuring Automatic Application Re-submission on Submission Failures](#configuring-automatic-application-re-submission-on-submission-failures)
* [Running Spark Applications on a Schedule using a ScheduledSparkApplication](#running-spark-applications-on-a-schedule-using-a-scheduledsparkapplication)
* [Enabling Leader Election for High Availability](#enabling-leader-election-for-high-availability)
* [Enabling Resource Quota Enforcement](#enabling-resource-quota-enforcement)
* [Customizing the Operator](#customizing-the-operator)

## Using a SparkApplication
Expand Down Expand Up @@ -564,6 +565,12 @@ The operator supports a high-availability (HA) mode, in which there can be more
| `leader-election-renew-deadline` | 14 seconds | Leader election renew deadline. |
| `leader-election-retry-period` | 4 seconds | Leader election retry period. |

## Enabling Resource Quota Enforcement

The Spark Operator provides limited support for resource quota enforcement using a validating webhook. It will count the resources of non-terminal-phase SparkApplications and Pods, and determine whether a requested SparkApplication will fit given the remaining resources. ResourceQuota scope selectors are not supported, any ResourceQuota object that does not match the entire namespace will be ignored. Like the native Pod quota enforcement, current usage is updated asynchronously, so some overscheduling is possible.

If you are running Spark applications in namespaces that are subject to resource quota constraints, consider enabling this feature to avoid driver resource starvation. Quota enforcement can be enabled with the command line arguments `-enable-resource-quota-enforcement=true`. It is recommended to also set `-webhook-fail-on-error=true`.

## Customizing the Operator

To customize the operator, you can follow the steps below:
Expand Down
62 changes: 41 additions & 21 deletions main.go
Original file line number Diff line number Diff line change
Expand Up @@ -54,25 +54,26 @@ import (
)

var (
master = flag.String("master", "", "The address of the Kubernetes API server. Overrides any value in kubeconfig. Only required if out-of-cluster.")
kubeConfig = flag.String("kubeConfig", "", "Path to a kube config. Only required if out-of-cluster.")
installCRDs = flag.Bool("install-crds", true, "Whether to install CRDs")
controllerThreads = flag.Int("controller-threads", 10, "Number of worker threads used by the SparkApplication controller.")
resyncInterval = flag.Int("resync-interval", 30, "Informer resync interval in seconds.")
namespace = flag.String("namespace", apiv1.NamespaceAll, "The Kubernetes namespace to manage. Will manage custom resource objects of the managed CRD types for the whole cluster if unset.")
enableWebhook = flag.Bool("enable-webhook", false, "Whether to enable the mutating admission webhook for admitting and patching Spark pods.")
enableMetrics = flag.Bool("enable-metrics", false, "Whether to enable the metrics endpoint.")
metricsPort = flag.String("metrics-port", "10254", "Port for the metrics endpoint.")
metricsEndpoint = flag.String("metrics-endpoint", "/metrics", "Metrics endpoint.")
metricsPrefix = flag.String("metrics-prefix", "", "Prefix for the metrics.")
ingressURLFormat = flag.String("ingress-url-format", "", "Ingress URL format.")
enableLeaderElection = flag.Bool("leader-election", false, "Enable Spark operator leader election.")
leaderElectionLockNamespace = flag.String("leader-election-lock-namespace", "spark-operator", "Namespace in which to create the ConfigMap for leader election.")
leaderElectionLockName = flag.String("leader-election-lock-name", "spark-operator-lock", "Name of the ConfigMap for leader election.")
leaderElectionLeaseDuration = flag.Duration("leader-election-lease-duration", 15*time.Second, "Leader election lease duration.")
leaderElectionRenewDeadline = flag.Duration("leader-election-renew-deadline", 14*time.Second, "Leader election renew deadline.")
leaderElectionRetryPeriod = flag.Duration("leader-election-retry-period", 4*time.Second, "Leader election retry period.")
batchSchedulerName = flag.String("batch-scheduler-name", "", "Use specified scheduler for pods' batch scheduling.")
master = flag.String("master", "", "The address of the Kubernetes API server. Overrides any value in kubeconfig. Only required if out-of-cluster.")
kubeConfig = flag.String("kubeConfig", "", "Path to a kube config. Only required if out-of-cluster.")
installCRDs = flag.Bool("install-crds", true, "Whether to install CRDs")
controllerThreads = flag.Int("controller-threads", 10, "Number of worker threads used by the SparkApplication controller.")
resyncInterval = flag.Int("resync-interval", 30, "Informer resync interval in seconds.")
namespace = flag.String("namespace", apiv1.NamespaceAll, "The Kubernetes namespace to manage. Will manage custom resource objects of the managed CRD types for the whole cluster if unset.")
enableWebhook = flag.Bool("enable-webhook", false, "Whether to enable the mutating admission webhook for admitting and patching Spark pods.")
enableResourceQuotaEnforcement = flag.Bool("enable-resource-quota-enforcement", false, "Whether to enable ResourceQuota enforcement for SparkApplication resources. Requires the webhook to be enabled.")
enableMetrics = flag.Bool("enable-metrics", false, "Whether to enable the metrics endpoint.")
metricsPort = flag.String("metrics-port", "10254", "Port for the metrics endpoint.")
metricsEndpoint = flag.String("metrics-endpoint", "/metrics", "Metrics endpoint.")
metricsPrefix = flag.String("metrics-prefix", "", "Prefix for the metrics.")
ingressURLFormat = flag.String("ingress-url-format", "", "Ingress URL format.")
enableLeaderElection = flag.Bool("leader-election", false, "Enable Spark operator leader election.")
leaderElectionLockNamespace = flag.String("leader-election-lock-namespace", "spark-operator", "Namespace in which to create the ConfigMap for leader election.")
leaderElectionLockName = flag.String("leader-election-lock-name", "spark-operator-lock", "Name of the ConfigMap for leader election.")
leaderElectionLeaseDuration = flag.Duration("leader-election-lease-duration", 15*time.Second, "Leader election lease duration.")
leaderElectionRenewDeadline = flag.Duration("leader-election-renew-deadline", 14*time.Second, "Leader election renew deadline.")
leaderElectionRetryPeriod = flag.Duration("leader-election-retry-period", 4*time.Second, "Leader election retry period.")
batchSchedulerName = flag.String("batch-scheduler-name", "", "Use specified scheduler for pods' batch scheduling.")
)

func main() {
Expand Down Expand Up @@ -172,15 +173,26 @@ func main() {

var hook *webhook.WebHook
if *enableWebhook {
var coreV1InformerFactory informers.SharedInformerFactory
if *enableResourceQuotaEnforcement {
coreV1InformerFactory = buildCoreV1InformerFactory(kubeClient)
}
var err error
// Don't deregister webhook on exit if leader election enabled (i.e. multiple webhooks running)
hook, err = webhook.New(kubeClient, crInformerFactory, *namespace, !*enableLeaderElection)
hook, err = webhook.New(kubeClient, crInformerFactory, *namespace, !*enableLeaderElection, *enableResourceQuotaEnforcement, coreV1InformerFactory)
if err != nil {
glog.Fatal(err)
}
if err = hook.Start(); err != nil {

if *enableResourceQuotaEnforcement {
go coreV1InformerFactory.Start(stopCh)
}

if err = hook.Start(stopCh); err != nil {
glog.Fatal(err)
}
} else if *enableResourceQuotaEnforcement {
glog.Fatal("Webhook must be enabled to use resource quota enforcement.")
}

if *enableLeaderElection {
Expand Down Expand Up @@ -261,3 +273,11 @@ func buildPodInformerFactory(kubeClient clientset.Interface) informers.SharedInf
podFactoryOpts = append(podFactoryOpts, informers.WithTweakListOptions(tweakListOptionsFunc))
return informers.NewSharedInformerFactoryWithOptions(kubeClient, time.Duration(*resyncInterval)*time.Second, podFactoryOpts...)
}

func buildCoreV1InformerFactory(kubeClient clientset.Interface) informers.SharedInformerFactory {
var coreV1FactoryOpts []informers.SharedInformerOption
if *namespace != apiv1.NamespaceAll {
coreV1FactoryOpts = append(coreV1FactoryOpts, informers.WithNamespace(*namespace))
}
return informers.NewSharedInformerFactoryWithOptions(kubeClient, time.Duration(*resyncInterval)*time.Second, coreV1FactoryOpts...)
khogeland marked this conversation as resolved.
Show resolved Hide resolved
}
5 changes: 4 additions & 1 deletion manifest/spark-operator-rbac.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -45,14 +45,17 @@ rules:
- apiGroups: [""]
resources: ["nodes"]
verbs: ["get"]
- apiGroups: [""]
resources: ["resourcequotas"]
verbs: ["get", "list", "watch"]
- apiGroups: [""]
resources: ["events"]
verbs: ["create", "update", "patch"]
- apiGroups: ["apiextensions.k8s.io"]
resources: ["customresourcedefinitions"]
verbs: ["create", "get", "update", "delete"]
- apiGroups: ["admissionregistration.k8s.io"]
resources: ["mutatingwebhookconfigurations"]
resources: ["mutatingwebhookconfigurations", "validatingwebhookconfigurations"]
verbs: ["create", "get", "update", "delete"]
- apiGroups: ["sparkoperator.k8s.io"]
resources: ["sparkapplications", "scheduledsparkapplications"]
Expand Down
97 changes: 97 additions & 0 deletions pkg/webhook/resourceusage/enforcer.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,97 @@
package resourceusage

import (
"fmt"
so "github.com/GoogleCloudPlatform/spark-on-k8s-operator/pkg/apis/sparkoperator.k8s.io/v1beta1"
crdinformers "github.com/GoogleCloudPlatform/spark-on-k8s-operator/pkg/client/informers/externalversions"
"github.com/golang/glog"
corev1 "k8s.io/api/core/v1"
"k8s.io/apimachinery/pkg/labels"
"k8s.io/client-go/informers"
corev1informers "k8s.io/client-go/informers/core/v1"
"k8s.io/client-go/tools/cache"
)

type ResourceQuotaEnforcer struct {
watcher ResourceUsageWatcher
resourceQuotaInformer corev1informers.ResourceQuotaInformer
}

func NewResourceQuotaEnforcer(crdInformerFactory crdinformers.SharedInformerFactory, coreV1InformerFactory informers.SharedInformerFactory) ResourceQuotaEnforcer {
resourceUsageWatcher := newResourceUsageWatcher(crdInformerFactory, coreV1InformerFactory)
informer := coreV1InformerFactory.Core().V1().ResourceQuotas()
informer.Informer().AddEventHandler(cache.ResourceEventHandlerFuncs{})
return ResourceQuotaEnforcer{
watcher: resourceUsageWatcher,
resourceQuotaInformer: informer,
}
}

// TODO: There appears to be a deadlock in cache.WaitForCacheSync. Possibly related? https://github.com/kubernetes/kubernetes/issues/71450
// For now, return immediately. There will be a short window after startup where quota calcuation is incorrect.
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'll try to hunt this down and file an issue against k8s (or determine if it's been fixed in a later version).

func (r ResourceQuotaEnforcer) WaitForCacheSync(stopCh <-chan struct{}) error {
/*if !cache.WaitForCacheSync(stopCh, func() bool {
return r.resourceQuotaInformer.Informer().HasSynced()
}) {
return fmt.Errorf("cache sync canceled")
}*/
return nil
}

func (r *ResourceQuotaEnforcer) admitResource(kind, namespace, name string, requestedResources ResourceList) (string, error) {
glog.V(2).Infof("Processing admission request for %s %s/%s, requesting: %s", kind, namespace, name, requestedResources)
resourceQuotas, err := r.resourceQuotaInformer.Lister().ResourceQuotas(namespace).List(labels.Everything())
if err != nil {
return "", err
}
if (requestedResources.cpu.IsZero() && requestedResources.memory.IsZero()) || len(resourceQuotas) == 0 {
return "", nil
}

currentNamespaceUsage, currentApplicationUsage := r.watcher.GetCurrentResourceUsageWithApplication(namespace, kind, name)

for _, quota := range resourceQuotas {
// Scope selectors not currently supported, ignore any ResourceQuota that does not match everything.
if quota.Spec.ScopeSelector != nil || len(quota.Spec.Scopes) > 0 {
continue
}

// If an existing application has increased its usage, check it against the quota again. If its usage hasn't increased, always allow it.
if requestedResources.cpu.Cmp(currentApplicationUsage.cpu) == 1 {
khogeland marked this conversation as resolved.
Show resolved Hide resolved
if cpuLimit, present := quota.Spec.Hard[corev1.ResourceCPU]; present {
availableCpu := cpuLimit
availableCpu.Sub(currentNamespaceUsage.cpu)
if requestedResources.cpu.Cmp(availableCpu) == 1 {
return fmt.Sprintf("%s %s/%s requests too many cores (%.3f cores requested, %.3f available).", kind, namespace, name, float64(requestedResources.cpu.MilliValue())/1000.0, float64(availableCpu.MilliValue())/1000.0), nil
}
}
}

if requestedResources.memory.Cmp(currentApplicationUsage.memory) == 1 {
if memoryLimit, present := quota.Spec.Hard[corev1.ResourceMemory]; present {
availableMemory := memoryLimit
availableMemory.Sub(currentNamespaceUsage.memory)
if requestedResources.memory.Cmp(availableMemory) == 1 {
return fmt.Sprintf("%s %s/%s requests too much memory (%dMi requested, %dMi available).", kind, namespace, name, requestedResources.memory.Value()/(1<<20), availableMemory.Value()/(1<<20)), nil
}
}
}
}
return "", nil
}

func (r *ResourceQuotaEnforcer) AdmitSparkApplication(app so.SparkApplication) (string, error) {
resourceUsage, err := sparkApplicationResourceUsage(app)
if err != nil {
return "", err
}
return r.admitResource(KindSparkApplication, app.ObjectMeta.Namespace, app.ObjectMeta.Name, resourceUsage)
}

func (r *ResourceQuotaEnforcer) AdmitScheduledSparkApplication(app so.ScheduledSparkApplication) (string, error) {
resourceUsage, err := scheduledSparkApplicationResourceUsage(app)
if err != nil {
return "", err
}
return r.admitResource(KindScheduledSparkApplication, app.ObjectMeta.Namespace, app.ObjectMeta.Name, resourceUsage)
}
119 changes: 119 additions & 0 deletions pkg/webhook/resourceusage/handlers.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,119 @@
package resourceusage

import (
so "github.com/GoogleCloudPlatform/spark-on-k8s-operator/pkg/apis/sparkoperator.k8s.io/v1beta1"

"github.com/golang/glog"
corev1 "k8s.io/api/core/v1"
"k8s.io/client-go/tools/cache"
)

func (r *ResourceUsageWatcher) onPodAdded(obj interface{}) {
pod := obj.(*corev1.Pod)
// A pod launched by the Spark operator will already be accounted for by the CRD informer callback
if !launchedBySparkOperator(pod.ObjectMeta) {
r.setResources("Pod", namespaceOrDefault(pod.ObjectMeta), pod.ObjectMeta.Name, podResourceUsage(pod), r.usageByNamespacePod)
}
}

func (r *ResourceUsageWatcher) onPodUpdated(oldObj, newObj interface{}) {
newPod := newObj.(*corev1.Pod)
if !launchedBySparkOperator(newPod.ObjectMeta) {
if newPod.Status.Phase == corev1.PodFailed || newPod.Status.Phase == corev1.PodSucceeded {
r.deleteResources("Pod", namespaceOrDefault(newPod.ObjectMeta), newPod.ObjectMeta.Name, r.usageByNamespacePod)
} else {
r.setResources("Pod", namespaceOrDefault(newPod.ObjectMeta), newPod.ObjectMeta.Name, podResourceUsage(newPod), r.usageByNamespacePod)
}
}
}

func (r *ResourceUsageWatcher) onPodDeleted(obj interface{}) {
var pod *corev1.Pod
switch o := obj.(type) {
case *corev1.Pod:
pod = o
case cache.DeletedFinalStateUnknown:
pod = o.Obj.(*corev1.Pod)
default:
return
}
if !launchedBySparkOperator(pod.ObjectMeta) {
r.deleteResources("Pod", namespaceOrDefault(pod.ObjectMeta), pod.ObjectMeta.Name, r.usageByNamespacePod)
}
}

func (r *ResourceUsageWatcher) onSparkApplicationAdded(obj interface{}) {
app := obj.(*so.SparkApplication)
namespace := namespaceOrDefault(app.ObjectMeta)
khogeland marked this conversation as resolved.
Show resolved Hide resolved
resources, err := sparkApplicationResourceUsage(*app)
if err != nil {
glog.Errorf("failed to determine resource usage of SparkApplication %s/%s: %v", namespace, app.ObjectMeta.Name, err)
} else {
r.setResources(KindSparkApplication, namespace, app.ObjectMeta.Name, resources, r.usageByNamespaceApplication)
}
}

func (r *ResourceUsageWatcher) onSparkApplicationUpdated(oldObj, newObj interface{}) {
oldApp := oldObj.(*so.SparkApplication)
newApp := newObj.(*so.SparkApplication)
if oldApp.ResourceVersion == newApp.ResourceVersion {
return
}
namespace := namespaceOrDefault(newApp.ObjectMeta)
newResources, err := sparkApplicationResourceUsage(*newApp)
if err != nil {
glog.Errorf("failed to determine resource useage of SparkApplication %s/%s: %v", namespace, newApp.ObjectMeta.Name, err)
} else {
r.setResources(KindSparkApplication, namespace, newApp.ObjectMeta.Name, newResources, r.usageByNamespaceApplication)
}
}

func (r *ResourceUsageWatcher) onSparkApplicationDeleted(obj interface{}) {
var app *so.SparkApplication
switch o := obj.(type) {
case *so.SparkApplication:
app = o
case cache.DeletedFinalStateUnknown:
app = o.Obj.(*so.SparkApplication)
default:
return
}
namespace := namespaceOrDefault(app.ObjectMeta)
r.deleteResources(KindSparkApplication, namespace, app.ObjectMeta.Name, r.usageByNamespaceApplication)
}

func (r *ResourceUsageWatcher) onScheduledSparkApplicationAdded(obj interface{}) {
app := obj.(*so.ScheduledSparkApplication)
namespace := namespaceOrDefault(app.ObjectMeta)
resources, err := scheduledSparkApplicationResourceUsage(*app)
if err != nil {
glog.Errorf("failed to determine resource usage of ScheduledSparkApplication %s/%s: %v", namespace, app.ObjectMeta.Name, err)
} else {
r.setResources(KindScheduledSparkApplication, namespace, app.ObjectMeta.Name, resources, r.usageByNamespaceScheduledApplication)
}
}

func (r *ResourceUsageWatcher) onScheduledSparkApplicationUpdated(oldObj, newObj interface{}) {
newApp := oldObj.(*so.ScheduledSparkApplication)
namespace := namespaceOrDefault(newApp.ObjectMeta)
newResources, err := scheduledSparkApplicationResourceUsage(*newApp)
if err != nil {
glog.Errorf("failed to determine resource usage of ScheduledSparkApplication %s/%s: %v", namespace, newApp.ObjectMeta.Name, err)
} else {
r.setResources(KindSparkApplication, namespace, newApp.ObjectMeta.Name, newResources, r.usageByNamespaceScheduledApplication)
}
}

func (r *ResourceUsageWatcher) onScheduledSparkApplicationDeleted(obj interface{}) {
var app *so.ScheduledSparkApplication
switch o := obj.(type) {
case *so.ScheduledSparkApplication:
app = o
case cache.DeletedFinalStateUnknown:
app = o.Obj.(*so.ScheduledSparkApplication)
default:
return
}
namespace := namespaceOrDefault(app.ObjectMeta)
r.deleteResources(KindScheduledSparkApplication, namespace, app.ObjectMeta.Name, r.usageByNamespaceScheduledApplication)
}
Loading