Skip to content

Commit

Permalink
Resource quota enforcement webhook (#544)
Browse files Browse the repository at this point in the history
* Cert configuration and reloading

* Add support for strict webhook error handling

* Improve webhook error handling

* Don't deregister the webhook when failure policy is strict

* standard error message capitalization

* have the webhook parse its own configuration from flags

* clean up cert provider code

* Add explanation for skipping deregistration

* Resource Quota enforcement webhook

* Fix bad merge

* Cleanup, fixes

* Cleanup

* Document the quota enforcer
  • Loading branch information
kevin hogeland authored and liyinan926 committed Aug 28, 2019
1 parent dc27045 commit edcf4cd
Show file tree
Hide file tree
Showing 10 changed files with 919 additions and 79 deletions.
1 change: 1 addition & 0 deletions Gopkg.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

7 changes: 7 additions & 0 deletions docs/user-guide.md
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,7 @@ The Kubernetes Operator for Apache Spark ships with a command-line tool called `
* [Configuring Automatic Application Re-submission on Submission Failures](#configuring-automatic-application-re-submission-on-submission-failures)
* [Running Spark Applications on a Schedule using a ScheduledSparkApplication](#running-spark-applications-on-a-schedule-using-a-scheduledsparkapplication)
* [Enabling Leader Election for High Availability](#enabling-leader-election-for-high-availability)
* [Enabling Resource Quota Enforcement](#enabling-resource-quota-enforcement)
* [Customizing the Operator](#customizing-the-operator)

## Using a SparkApplication
Expand Down Expand Up @@ -564,6 +565,12 @@ The operator supports a high-availability (HA) mode, in which there can be more
| `leader-election-renew-deadline` | 14 seconds | Leader election renew deadline. |
| `leader-election-retry-period` | 4 seconds | Leader election retry period. |

## Enabling Resource Quota Enforcement

The Spark Operator provides limited support for resource quota enforcement using a validating webhook. It will count the resources of non-terminal-phase SparkApplications and Pods, and determine whether a requested SparkApplication will fit given the remaining resources. ResourceQuota scope selectors are not supported, any ResourceQuota object that does not match the entire namespace will be ignored. Like the native Pod quota enforcement, current usage is updated asynchronously, so some overscheduling is possible.

If you are running Spark applications in namespaces that are subject to resource quota constraints, consider enabling this feature to avoid driver resource starvation. Quota enforcement can be enabled with the command line arguments `-enable-resource-quota-enforcement=true`. It is recommended to also set `-webhook-fail-on-error=true`.

## Customizing the Operator

To customize the operator, you can follow the steps below:
Expand Down
62 changes: 41 additions & 21 deletions main.go
Original file line number Diff line number Diff line change
Expand Up @@ -54,25 +54,26 @@ import (
)

var (
master = flag.String("master", "", "The address of the Kubernetes API server. Overrides any value in kubeconfig. Only required if out-of-cluster.")
kubeConfig = flag.String("kubeConfig", "", "Path to a kube config. Only required if out-of-cluster.")
installCRDs = flag.Bool("install-crds", true, "Whether to install CRDs")
controllerThreads = flag.Int("controller-threads", 10, "Number of worker threads used by the SparkApplication controller.")
resyncInterval = flag.Int("resync-interval", 30, "Informer resync interval in seconds.")
namespace = flag.String("namespace", apiv1.NamespaceAll, "The Kubernetes namespace to manage. Will manage custom resource objects of the managed CRD types for the whole cluster if unset.")
enableWebhook = flag.Bool("enable-webhook", false, "Whether to enable the mutating admission webhook for admitting and patching Spark pods.")
enableMetrics = flag.Bool("enable-metrics", false, "Whether to enable the metrics endpoint.")
metricsPort = flag.String("metrics-port", "10254", "Port for the metrics endpoint.")
metricsEndpoint = flag.String("metrics-endpoint", "/metrics", "Metrics endpoint.")
metricsPrefix = flag.String("metrics-prefix", "", "Prefix for the metrics.")
ingressURLFormat = flag.String("ingress-url-format", "", "Ingress URL format.")
enableLeaderElection = flag.Bool("leader-election", false, "Enable Spark operator leader election.")
leaderElectionLockNamespace = flag.String("leader-election-lock-namespace", "spark-operator", "Namespace in which to create the ConfigMap for leader election.")
leaderElectionLockName = flag.String("leader-election-lock-name", "spark-operator-lock", "Name of the ConfigMap for leader election.")
leaderElectionLeaseDuration = flag.Duration("leader-election-lease-duration", 15*time.Second, "Leader election lease duration.")
leaderElectionRenewDeadline = flag.Duration("leader-election-renew-deadline", 14*time.Second, "Leader election renew deadline.")
leaderElectionRetryPeriod = flag.Duration("leader-election-retry-period", 4*time.Second, "Leader election retry period.")
batchSchedulerName = flag.String("batch-scheduler-name", "", "Use specified scheduler for pods' batch scheduling.")
master = flag.String("master", "", "The address of the Kubernetes API server. Overrides any value in kubeconfig. Only required if out-of-cluster.")
kubeConfig = flag.String("kubeConfig", "", "Path to a kube config. Only required if out-of-cluster.")
installCRDs = flag.Bool("install-crds", true, "Whether to install CRDs")
controllerThreads = flag.Int("controller-threads", 10, "Number of worker threads used by the SparkApplication controller.")
resyncInterval = flag.Int("resync-interval", 30, "Informer resync interval in seconds.")
namespace = flag.String("namespace", apiv1.NamespaceAll, "The Kubernetes namespace to manage. Will manage custom resource objects of the managed CRD types for the whole cluster if unset.")
enableWebhook = flag.Bool("enable-webhook", false, "Whether to enable the mutating admission webhook for admitting and patching Spark pods.")
enableResourceQuotaEnforcement = flag.Bool("enable-resource-quota-enforcement", false, "Whether to enable ResourceQuota enforcement for SparkApplication resources. Requires the webhook to be enabled.")
enableMetrics = flag.Bool("enable-metrics", false, "Whether to enable the metrics endpoint.")
metricsPort = flag.String("metrics-port", "10254", "Port for the metrics endpoint.")
metricsEndpoint = flag.String("metrics-endpoint", "/metrics", "Metrics endpoint.")
metricsPrefix = flag.String("metrics-prefix", "", "Prefix for the metrics.")
ingressURLFormat = flag.String("ingress-url-format", "", "Ingress URL format.")
enableLeaderElection = flag.Bool("leader-election", false, "Enable Spark operator leader election.")
leaderElectionLockNamespace = flag.String("leader-election-lock-namespace", "spark-operator", "Namespace in which to create the ConfigMap for leader election.")
leaderElectionLockName = flag.String("leader-election-lock-name", "spark-operator-lock", "Name of the ConfigMap for leader election.")
leaderElectionLeaseDuration = flag.Duration("leader-election-lease-duration", 15*time.Second, "Leader election lease duration.")
leaderElectionRenewDeadline = flag.Duration("leader-election-renew-deadline", 14*time.Second, "Leader election renew deadline.")
leaderElectionRetryPeriod = flag.Duration("leader-election-retry-period", 4*time.Second, "Leader election retry period.")
batchSchedulerName = flag.String("batch-scheduler-name", "", "Use specified scheduler for pods' batch scheduling.")
)

func main() {
Expand Down Expand Up @@ -172,15 +173,26 @@ func main() {

var hook *webhook.WebHook
if *enableWebhook {
var coreV1InformerFactory informers.SharedInformerFactory
if *enableResourceQuotaEnforcement {
coreV1InformerFactory = buildCoreV1InformerFactory(kubeClient)
}
var err error
// Don't deregister webhook on exit if leader election enabled (i.e. multiple webhooks running)
hook, err = webhook.New(kubeClient, crInformerFactory, *namespace, !*enableLeaderElection)
hook, err = webhook.New(kubeClient, crInformerFactory, *namespace, !*enableLeaderElection, *enableResourceQuotaEnforcement, coreV1InformerFactory)
if err != nil {
glog.Fatal(err)
}
if err = hook.Start(); err != nil {

if *enableResourceQuotaEnforcement {
go coreV1InformerFactory.Start(stopCh)
}

if err = hook.Start(stopCh); err != nil {
glog.Fatal(err)
}
} else if *enableResourceQuotaEnforcement {
glog.Fatal("Webhook must be enabled to use resource quota enforcement.")
}

if *enableLeaderElection {
Expand Down Expand Up @@ -261,3 +273,11 @@ func buildPodInformerFactory(kubeClient clientset.Interface) informers.SharedInf
podFactoryOpts = append(podFactoryOpts, informers.WithTweakListOptions(tweakListOptionsFunc))
return informers.NewSharedInformerFactoryWithOptions(kubeClient, time.Duration(*resyncInterval)*time.Second, podFactoryOpts...)
}

func buildCoreV1InformerFactory(kubeClient clientset.Interface) informers.SharedInformerFactory {
var coreV1FactoryOpts []informers.SharedInformerOption
if *namespace != apiv1.NamespaceAll {
coreV1FactoryOpts = append(coreV1FactoryOpts, informers.WithNamespace(*namespace))
}
return informers.NewSharedInformerFactoryWithOptions(kubeClient, time.Duration(*resyncInterval)*time.Second, coreV1FactoryOpts...)
}
5 changes: 4 additions & 1 deletion manifest/spark-operator-rbac.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -45,14 +45,17 @@ rules:
- apiGroups: [""]
resources: ["nodes"]
verbs: ["get"]
- apiGroups: [""]
resources: ["resourcequotas"]
verbs: ["get", "list", "watch"]
- apiGroups: [""]
resources: ["events"]
verbs: ["create", "update", "patch"]
- apiGroups: ["apiextensions.k8s.io"]
resources: ["customresourcedefinitions"]
verbs: ["create", "get", "update", "delete"]
- apiGroups: ["admissionregistration.k8s.io"]
resources: ["mutatingwebhookconfigurations"]
resources: ["mutatingwebhookconfigurations", "validatingwebhookconfigurations"]
verbs: ["create", "get", "update", "delete"]
- apiGroups: ["sparkoperator.k8s.io"]
resources: ["sparkapplications", "scheduledsparkapplications"]
Expand Down
97 changes: 97 additions & 0 deletions pkg/webhook/resourceusage/enforcer.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,97 @@
package resourceusage

import (
"fmt"
so "github.com/GoogleCloudPlatform/spark-on-k8s-operator/pkg/apis/sparkoperator.k8s.io/v1beta1"
crdinformers "github.com/GoogleCloudPlatform/spark-on-k8s-operator/pkg/client/informers/externalversions"
"github.com/golang/glog"
corev1 "k8s.io/api/core/v1"
"k8s.io/apimachinery/pkg/labels"
"k8s.io/client-go/informers"
corev1informers "k8s.io/client-go/informers/core/v1"
"k8s.io/client-go/tools/cache"
)

type ResourceQuotaEnforcer struct {
watcher ResourceUsageWatcher
resourceQuotaInformer corev1informers.ResourceQuotaInformer
}

func NewResourceQuotaEnforcer(crdInformerFactory crdinformers.SharedInformerFactory, coreV1InformerFactory informers.SharedInformerFactory) ResourceQuotaEnforcer {
resourceUsageWatcher := newResourceUsageWatcher(crdInformerFactory, coreV1InformerFactory)
informer := coreV1InformerFactory.Core().V1().ResourceQuotas()
informer.Informer().AddEventHandler(cache.ResourceEventHandlerFuncs{})
return ResourceQuotaEnforcer{
watcher: resourceUsageWatcher,
resourceQuotaInformer: informer,
}
}

// TODO: There appears to be a deadlock in cache.WaitForCacheSync. Possibly related? https://github.com/kubernetes/kubernetes/issues/71450
// For now, return immediately. There will be a short window after startup where quota calcuation is incorrect.
func (r ResourceQuotaEnforcer) WaitForCacheSync(stopCh <-chan struct{}) error {
/*if !cache.WaitForCacheSync(stopCh, func() bool {
return r.resourceQuotaInformer.Informer().HasSynced()
}) {
return fmt.Errorf("cache sync canceled")
}*/
return nil
}

func (r *ResourceQuotaEnforcer) admitResource(kind, namespace, name string, requestedResources ResourceList) (string, error) {
glog.V(2).Infof("Processing admission request for %s %s/%s, requesting: %s", kind, namespace, name, requestedResources)
resourceQuotas, err := r.resourceQuotaInformer.Lister().ResourceQuotas(namespace).List(labels.Everything())
if err != nil {
return "", err
}
if (requestedResources.cpu.IsZero() && requestedResources.memory.IsZero()) || len(resourceQuotas) == 0 {
return "", nil
}

currentNamespaceUsage, currentApplicationUsage := r.watcher.GetCurrentResourceUsageWithApplication(namespace, kind, name)

for _, quota := range resourceQuotas {
// Scope selectors not currently supported, ignore any ResourceQuota that does not match everything.
if quota.Spec.ScopeSelector != nil || len(quota.Spec.Scopes) > 0 {
continue
}

// If an existing application has increased its usage, check it against the quota again. If its usage hasn't increased, always allow it.
if requestedResources.cpu.Cmp(currentApplicationUsage.cpu) == 1 {
if cpuLimit, present := quota.Spec.Hard[corev1.ResourceCPU]; present {
availableCpu := cpuLimit
availableCpu.Sub(currentNamespaceUsage.cpu)
if requestedResources.cpu.Cmp(availableCpu) == 1 {
return fmt.Sprintf("%s %s/%s requests too many cores (%.3f cores requested, %.3f available).", kind, namespace, name, float64(requestedResources.cpu.MilliValue())/1000.0, float64(availableCpu.MilliValue())/1000.0), nil
}
}
}

if requestedResources.memory.Cmp(currentApplicationUsage.memory) == 1 {
if memoryLimit, present := quota.Spec.Hard[corev1.ResourceMemory]; present {
availableMemory := memoryLimit
availableMemory.Sub(currentNamespaceUsage.memory)
if requestedResources.memory.Cmp(availableMemory) == 1 {
return fmt.Sprintf("%s %s/%s requests too much memory (%dMi requested, %dMi available).", kind, namespace, name, requestedResources.memory.Value()/(1<<20), availableMemory.Value()/(1<<20)), nil
}
}
}
}
return "", nil
}

func (r *ResourceQuotaEnforcer) AdmitSparkApplication(app so.SparkApplication) (string, error) {
resourceUsage, err := sparkApplicationResourceUsage(app)
if err != nil {
return "", err
}
return r.admitResource(KindSparkApplication, app.ObjectMeta.Namespace, app.ObjectMeta.Name, resourceUsage)
}

func (r *ResourceQuotaEnforcer) AdmitScheduledSparkApplication(app so.ScheduledSparkApplication) (string, error) {
resourceUsage, err := scheduledSparkApplicationResourceUsage(app)
if err != nil {
return "", err
}
return r.admitResource(KindScheduledSparkApplication, app.ObjectMeta.Namespace, app.ObjectMeta.Name, resourceUsage)
}
119 changes: 119 additions & 0 deletions pkg/webhook/resourceusage/handlers.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,119 @@
package resourceusage

import (
so "github.com/GoogleCloudPlatform/spark-on-k8s-operator/pkg/apis/sparkoperator.k8s.io/v1beta1"

"github.com/golang/glog"
corev1 "k8s.io/api/core/v1"
"k8s.io/client-go/tools/cache"
)

func (r *ResourceUsageWatcher) onPodAdded(obj interface{}) {
pod := obj.(*corev1.Pod)
// A pod launched by the Spark operator will already be accounted for by the CRD informer callback
if !launchedBySparkOperator(pod.ObjectMeta) {
r.setResources("Pod", namespaceOrDefault(pod.ObjectMeta), pod.ObjectMeta.Name, podResourceUsage(pod), r.usageByNamespacePod)
}
}

func (r *ResourceUsageWatcher) onPodUpdated(oldObj, newObj interface{}) {
newPod := newObj.(*corev1.Pod)
if !launchedBySparkOperator(newPod.ObjectMeta) {
if newPod.Status.Phase == corev1.PodFailed || newPod.Status.Phase == corev1.PodSucceeded {
r.deleteResources("Pod", namespaceOrDefault(newPod.ObjectMeta), newPod.ObjectMeta.Name, r.usageByNamespacePod)
} else {
r.setResources("Pod", namespaceOrDefault(newPod.ObjectMeta), newPod.ObjectMeta.Name, podResourceUsage(newPod), r.usageByNamespacePod)
}
}
}

func (r *ResourceUsageWatcher) onPodDeleted(obj interface{}) {
var pod *corev1.Pod
switch o := obj.(type) {
case *corev1.Pod:
pod = o
case cache.DeletedFinalStateUnknown:
pod = o.Obj.(*corev1.Pod)
default:
return
}
if !launchedBySparkOperator(pod.ObjectMeta) {
r.deleteResources("Pod", namespaceOrDefault(pod.ObjectMeta), pod.ObjectMeta.Name, r.usageByNamespacePod)
}
}

func (r *ResourceUsageWatcher) onSparkApplicationAdded(obj interface{}) {
app := obj.(*so.SparkApplication)
namespace := namespaceOrDefault(app.ObjectMeta)
resources, err := sparkApplicationResourceUsage(*app)
if err != nil {
glog.Errorf("failed to determine resource usage of SparkApplication %s/%s: %v", namespace, app.ObjectMeta.Name, err)
} else {
r.setResources(KindSparkApplication, namespace, app.ObjectMeta.Name, resources, r.usageByNamespaceApplication)
}
}

func (r *ResourceUsageWatcher) onSparkApplicationUpdated(oldObj, newObj interface{}) {
oldApp := oldObj.(*so.SparkApplication)
newApp := newObj.(*so.SparkApplication)
if oldApp.ResourceVersion == newApp.ResourceVersion {
return
}
namespace := namespaceOrDefault(newApp.ObjectMeta)
newResources, err := sparkApplicationResourceUsage(*newApp)
if err != nil {
glog.Errorf("failed to determine resource useage of SparkApplication %s/%s: %v", namespace, newApp.ObjectMeta.Name, err)
} else {
r.setResources(KindSparkApplication, namespace, newApp.ObjectMeta.Name, newResources, r.usageByNamespaceApplication)
}
}

func (r *ResourceUsageWatcher) onSparkApplicationDeleted(obj interface{}) {
var app *so.SparkApplication
switch o := obj.(type) {
case *so.SparkApplication:
app = o
case cache.DeletedFinalStateUnknown:
app = o.Obj.(*so.SparkApplication)
default:
return
}
namespace := namespaceOrDefault(app.ObjectMeta)
r.deleteResources(KindSparkApplication, namespace, app.ObjectMeta.Name, r.usageByNamespaceApplication)
}

func (r *ResourceUsageWatcher) onScheduledSparkApplicationAdded(obj interface{}) {
app := obj.(*so.ScheduledSparkApplication)
namespace := namespaceOrDefault(app.ObjectMeta)
resources, err := scheduledSparkApplicationResourceUsage(*app)
if err != nil {
glog.Errorf("failed to determine resource usage of ScheduledSparkApplication %s/%s: %v", namespace, app.ObjectMeta.Name, err)
} else {
r.setResources(KindScheduledSparkApplication, namespace, app.ObjectMeta.Name, resources, r.usageByNamespaceScheduledApplication)
}
}

func (r *ResourceUsageWatcher) onScheduledSparkApplicationUpdated(oldObj, newObj interface{}) {
newApp := oldObj.(*so.ScheduledSparkApplication)
namespace := namespaceOrDefault(newApp.ObjectMeta)
newResources, err := scheduledSparkApplicationResourceUsage(*newApp)
if err != nil {
glog.Errorf("failed to determine resource usage of ScheduledSparkApplication %s/%s: %v", namespace, newApp.ObjectMeta.Name, err)
} else {
r.setResources(KindSparkApplication, namespace, newApp.ObjectMeta.Name, newResources, r.usageByNamespaceScheduledApplication)
}
}

func (r *ResourceUsageWatcher) onScheduledSparkApplicationDeleted(obj interface{}) {
var app *so.ScheduledSparkApplication
switch o := obj.(type) {
case *so.ScheduledSparkApplication:
app = o
case cache.DeletedFinalStateUnknown:
app = o.Obj.(*so.ScheduledSparkApplication)
default:
return
}
namespace := namespaceOrDefault(app.ObjectMeta)
r.deleteResources(KindScheduledSparkApplication, namespace, app.ObjectMeta.Name, r.usageByNamespaceScheduledApplication)
}
Loading

0 comments on commit edcf4cd

Please sign in to comment.