From 47cd1f2a284cc30b0f2f952bbfa618807e94f8c3 Mon Sep 17 00:00:00 2001 From: Patrick Ohly Date: Wed, 6 Oct 2021 14:50:11 +0200 Subject: [PATCH] filter out nodes waiting for CSI driver When a new node becomes ready, a CSI driver is not going to be running on it immediately. This can cause the cluster autoscaler to scale up once more because of pending pods that can run on that new node once the driver is ready. The actual check is about CSIStorageCapacity. By comparing the published information about the new node against the information for a template node, we can determine whether the CSI driver is done with starting up on the node. The new CSI processor needs information about existing CSIStorageCapacity objects in the cluster, just like the scheduler predicate. Both can share the same informer. For that to work, managing the informer factory must be moved up the call chain so that the setup code for both can use the same factory. --- cluster-autoscaler/core/autoscaler.go | 7 +- cluster-autoscaler/core/scale_test_common.go | 2 +- cluster-autoscaler/core/static_autoscaler.go | 6 + cluster-autoscaler/main.go | 31 +++- .../processors/csi/csi_processor.go | 159 ++++++++++++++++++ cluster-autoscaler/processors/processors.go | 8 +- .../scheduler_based_predicates_checker.go | 38 +++-- .../simulator/test_predicates_checker.go | 2 +- 8 files changed, 227 insertions(+), 26 deletions(-) create mode 100644 cluster-autoscaler/processors/csi/csi_processor.go diff --git a/cluster-autoscaler/core/autoscaler.go b/cluster-autoscaler/core/autoscaler.go index 6fed440793e9..ecc191cd8489 100644 --- a/cluster-autoscaler/core/autoscaler.go +++ b/cluster-autoscaler/core/autoscaler.go @@ -33,6 +33,7 @@ import ( "k8s.io/autoscaler/cluster-autoscaler/simulator" "k8s.io/autoscaler/cluster-autoscaler/utils/backoff" "k8s.io/autoscaler/cluster-autoscaler/utils/errors" + "k8s.io/client-go/informers" kube_client "k8s.io/client-go/kubernetes" ) @@ -40,6 +41,7 @@ import ( type AutoscalerOptions struct { config.AutoscalingOptions KubeClient kube_client.Interface + InformerFactory informers.SharedInformerFactory EventsKubeClient kube_client.Interface AutoscalingKubeClients *context.AutoscalingKubeClients CloudProvider cloudprovider.CloudProvider @@ -85,14 +87,13 @@ func NewAutoscaler(opts AutoscalerOptions) (Autoscaler, errors.AutoscalerError) // Initialize default options if not provided. func initializeDefaultOptions(opts *AutoscalerOptions) error { if opts.Processors == nil { - opts.Processors = ca_processors.DefaultProcessors() + opts.Processors = ca_processors.DefaultProcessors(opts.InformerFactory) } if opts.AutoscalingKubeClients == nil { opts.AutoscalingKubeClients = context.NewAutoscalingKubeClients(opts.AutoscalingOptions, opts.KubeClient, opts.EventsKubeClient) } if opts.PredicateChecker == nil { - predicateCheckerStopChannel := make(chan struct{}) - predicateChecker, err := simulator.NewSchedulerBasedPredicateChecker(opts.KubeClient, predicateCheckerStopChannel) + predicateChecker, err := simulator.NewSchedulerBasedPredicateChecker(opts.KubeClient, opts.InformerFactory) if err != nil { return err } diff --git a/cluster-autoscaler/core/scale_test_common.go b/cluster-autoscaler/core/scale_test_common.go index 0a5f81d7b2d4..811096e9c578 100644 --- a/cluster-autoscaler/core/scale_test_common.go +++ b/cluster-autoscaler/core/scale_test_common.go @@ -198,7 +198,7 @@ func NewScaleTestAutoscalingContext( // Ignoring error here is safe - if a test doesn't specify valid estimatorName, // it either doesn't need one, or should fail when it turns out to be nil. estimatorBuilder, _ := estimator.NewEstimatorBuilder(options.EstimatorName) - predicateChecker, err := simulator.NewSchedulerBasedPredicateChecker(fakeClient, make(chan struct{})) + predicateChecker, err := simulator.NewSchedulerBasedPredicateChecker(fakeClient, nil) if err != nil { return context.AutoscalingContext{}, err } diff --git a/cluster-autoscaler/core/static_autoscaler.go b/cluster-autoscaler/core/static_autoscaler.go index 06558f7ade54..02fc936ecfc0 100644 --- a/cluster-autoscaler/core/static_autoscaler.go +++ b/cluster-autoscaler/core/static_autoscaler.go @@ -762,6 +762,12 @@ func (a *StaticAutoscaler) obtainNodeLists(cp cloudprovider.CloudProvider) ([]*a // TODO: Remove this call when we handle dynamically provisioned resources. allNodes, readyNodes = a.processors.CustomResourcesProcessor.FilterOutNodesWithUnreadyResources(a.AutoscalingContext, allNodes, readyNodes) allNodes, readyNodes = taints.FilterOutNodesWithIgnoredTaints(a.nodeTransformation.IgnoredTaints, allNodes, readyNodes) + + // Filter out nodes that aren't ready because of a missing CSI driver. + if a.processors.CSIProcessor != nil { + allNodes, readyNodes = a.processors.CSIProcessor.FilterOutNodesWithUnreadyResources(a.AutoscalingContext, allNodes, readyNodes) + } + return allNodes, readyNodes, nil } diff --git a/cluster-autoscaler/main.go b/cluster-autoscaler/main.go index 2ce6353e9492..d972410204fd 100644 --- a/cluster-autoscaler/main.go +++ b/cluster-autoscaler/main.go @@ -52,6 +52,7 @@ import ( "k8s.io/autoscaler/cluster-autoscaler/utils/replace" "k8s.io/autoscaler/cluster-autoscaler/utils/units" "k8s.io/autoscaler/cluster-autoscaler/version" + "k8s.io/client-go/informers" kube_client "k8s.io/client-go/kubernetes" "k8s.io/client-go/rest" "k8s.io/client-go/tools/clientcmd" @@ -320,16 +321,22 @@ func buildAutoscaler(debuggingSnapshotter debuggingsnapshot.DebuggingSnapshotter autoscalingOptions := createAutoscalingOptions() kubeClient := createKubeClient(getKubeConfig()) eventsKubeClient := createKubeClient(getKubeConfig()) + informerFactory := informers.NewSharedInformerFactory(kubeClient, 0) + + // This isn't particularly useful at the moment. Perhaps we can accept + // a context and then our caller can decide about a suitable deadline. + stopChannel := make(chan struct{}) opts := core.AutoscalerOptions{ AutoscalingOptions: autoscalingOptions, ClusterSnapshot: simulator.NewDeltaClusterSnapshot(), KubeClient: kubeClient, + InformerFactory: informerFactory, EventsKubeClient: eventsKubeClient, DebuggingSnapshotter: debuggingSnapshotter, } - opts.Processors = ca_processors.DefaultProcessors() + opts.Processors = ca_processors.DefaultProcessors(informerFactory) opts.Processors.PodListProcessor = core.NewFilterOutSchedulablePodListProcessor() nodeInfoComparatorBuilder := nodegroupset.CreateGenericNodeInfoComparator @@ -354,7 +361,27 @@ func buildAutoscaler(debuggingSnapshotter debuggingsnapshot.DebuggingSnapshotter metrics.UpdateMemoryLimitsBytes(autoscalingOptions.MinMemoryTotal, autoscalingOptions.MaxMemoryTotal) // Create autoscaler. - return core.NewAutoscaler(opts) + autoscaler, err := core.NewAutoscaler(opts) + if err != nil { + return nil, err + } + + // this MUST be called after all the informers/listers are acquired via the + // informerFactory....Lister()/informerFactory....Informer() methods + informerFactory.Start(stopChannel) + + // Also wait for all informers to be up-to-date. This is necessary for + // objects that were added when creating the fake client. Without + // this wait, those objects won't be visible via the informers when + // the test runs. + synced := informerFactory.WaitForCacheSync(stopChannel) + for k, v := range synced { + if !v { + return nil, fmt.Errorf("failed to sync informer %v", k) + } + } + + return autoscaler, err } func run(healthCheck *metrics.HealthCheck, debuggingSnapshotter debuggingsnapshot.DebuggingSnapshotter) { diff --git a/cluster-autoscaler/processors/csi/csi_processor.go b/cluster-autoscaler/processors/csi/csi_processor.go new file mode 100644 index 000000000000..fadbc96b3fd3 --- /dev/null +++ b/cluster-autoscaler/processors/csi/csi_processor.go @@ -0,0 +1,159 @@ +/* +Copyright 2021 The Kubernetes Authors. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package csi + +import ( + apiv1 "k8s.io/api/core/v1" + v1 "k8s.io/api/core/v1" + metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" + "k8s.io/apimachinery/pkg/labels" + "k8s.io/autoscaler/cluster-autoscaler/context" + "k8s.io/autoscaler/cluster-autoscaler/utils/kubernetes" + "k8s.io/client-go/informers" + storagev1beta1listers "k8s.io/client-go/listers/storage/v1beta1" + "k8s.io/klog/v2" +) + +// CSIProcessor checks whether a node is ready for applications using volumes +// provided by a CSI driver. This is relevant when the autoscaler has been +// configured to check storage capacity. +// +// Without this processor, the following happens: +// - autoscaler determines that it needs a new node to get volumes +// for a pending pod created +// - the new node starts and is ready to run pods, but the CSI driver +// itself hasn't started running on it yet +// - autoscaler checks for pending pods, finds that the pod still +// cannot run and asks for another node +// - the CSI driver starts, creates volumes and the pod runs +// => the extra node is redundant +// +// To determine whether a node will have a CSI driver, a heuristic is used: if +// a template node derived from the node has a CSIStorageCapacity object, then +// the node itself should also have one, otherwise it is not ready. +type CSIProcessor interface { + // FilterOutNodesWithUnreadyResources removes nodes that should have a CSI + // driver, but don't have CSIStorageCapacity information yet. + FilterOutNodesWithUnreadyResources(context *context.AutoscalingContext, allNodes, readyNodes []*apiv1.Node) ([]*apiv1.Node, []*apiv1.Node) + + // CleanUp frees resources. + CleanUp() +} + +type csiProcessor struct { + csiStorageCapacityLister storagev1beta1listers.CSIStorageCapacityLister +} + +// FilterOutNodesWithUnreadyResources removes nodes that should have a CSI +// driver, but don't have CSIStorageCapacity information yet. +func (p csiProcessor) FilterOutNodesWithUnreadyResources(context *context.AutoscalingContext, allNodes, readyNodes []*apiv1.Node) ([]*apiv1.Node, []*apiv1.Node) { + newAllNodes := make([]*apiv1.Node, 0, len(allNodes)) + newReadyNodes := make([]*apiv1.Node, 0, len(readyNodes)) + nodesWithUnreadyCSI := make(map[string]*apiv1.Node) + for _, node := range readyNodes { + // TODO: short-circuit this check if the node has been in the + // ready state long enough? If all of the tests below hit the + // API server to query CSIDriver and CSIStorageCapacity objects + // and all nodes in a cluster get checked again during each + // scale up run, then this might create a lot of additional + // load. + klog.V(3).Infof("checking CSIStorageCapacity of node %s", node.Name) + if p.isReady(context, node) { + newReadyNodes = append(newReadyNodes, node) + } else { + nodesWithUnreadyCSI[node.Name] = kubernetes.GetUnreadyNodeCopy(node) + } + } + // Override any node with unready CSI with its "unready" copy + for _, node := range allNodes { + if newNode, found := nodesWithUnreadyCSI[node.Name]; found { + newAllNodes = append(newAllNodes, newNode) + } else { + newAllNodes = append(newAllNodes, node) + } + } + return newAllNodes, newReadyNodes +} + +func (p csiProcessor) isReady(context *context.AutoscalingContext, node *v1.Node) bool { + cloudProvider := context.CloudProvider + nodeGroup, err := cloudProvider.NodeGroupForNode(node) + if err != nil || nodeGroup == nil { + // Not a node that is part of a node group? Assume that the normal + // ready state applies and continue. + klog.V(3).Infof("node %s has no node group, skip CSI check (error: %v)", node.Name, err) + return true + } + nodeInfo, err := nodeGroup.TemplateNodeInfo() + if err != nil { + // Again, ignore the node. + klog.V(3).Infof("node %s has no node info, skip CSI check: %v", node.Name, err) + return true + } + templateNode := nodeInfo.Node() + expected := p.numStorageCapacityObjects(templateNode) + if expected == 0 { + // Node cannot be unready because no CSI storage is expected. + klog.V(3).Infof("node %s is not expected to have CSI storage: %v", node.Name) + return true + } + actual := p.numStorageCapacityObjects(node) + if expected <= actual { + klog.V(3).Infof("node %s has enough CSIStorageCapacity objects (expected %d, have %d)", + node.Name, expected, actual) + return true + } + + // CSI driver should have published capacity information and + // hasn't done it yet -> treat the node as not ready yet. + klog.V(3).Infof("node %s is expected to have %d CSIStorageCapacity objects, only has %d -> treat it as unready", + node.Name, expected, actual) + return false +} + +func (p csiProcessor) numStorageCapacityObjects(node *v1.Node) int { + count := 0 + capacities, err := p.csiStorageCapacityLister.List(labels.Everything()) + if err != nil { + klog.Error(err, "list CSIStorageCapacity") + return 0 + } + for _, capacity := range capacities { + // match labels + if capacity.NodeTopology == nil { + continue + } + selector, err := metav1.LabelSelectorAsSelector(capacity.NodeTopology) + if err != nil { + // Invalid capacity object? Ignore it. + continue + } + if selector.Matches(labels.Set(node.Labels)) { + count++ + } + } + return count +} + +func (p csiProcessor) CleanUp() {} + +// NewDefaultCSIProcessor returns a default instance of CSIProcessor. +func NewDefaultCSIProcessor(informerFactory informers.SharedInformerFactory) CSIProcessor { + return csiProcessor{ + csiStorageCapacityLister: informerFactory.Storage().V1beta1().CSIStorageCapacities().Lister(), + } +} diff --git a/cluster-autoscaler/processors/processors.go b/cluster-autoscaler/processors/processors.go index 36e204f5101f..985c503d824a 100644 --- a/cluster-autoscaler/processors/processors.go +++ b/cluster-autoscaler/processors/processors.go @@ -18,6 +18,7 @@ package processors import ( "k8s.io/autoscaler/cluster-autoscaler/processors/actionablecluster" + "k8s.io/autoscaler/cluster-autoscaler/processors/csi" "k8s.io/autoscaler/cluster-autoscaler/processors/customresources" "k8s.io/autoscaler/cluster-autoscaler/processors/nodegroupconfig" "k8s.io/autoscaler/cluster-autoscaler/processors/nodegroups" @@ -27,6 +28,7 @@ import ( "k8s.io/autoscaler/cluster-autoscaler/processors/nodes" "k8s.io/autoscaler/cluster-autoscaler/processors/pods" "k8s.io/autoscaler/cluster-autoscaler/processors/status" + "k8s.io/client-go/informers" ) // AutoscalingProcessors are a set of customizable processors used for encapsulating @@ -60,10 +62,12 @@ type AutoscalingProcessors struct { CustomResourcesProcessor customresources.CustomResourcesProcessor // ActionableClusterProcessor is interface defining whether the cluster is in an actionable state ActionableClusterProcessor actionablecluster.ActionableClusterProcessor + // CSIProcessor checks for nodes that are not ready because the CSI driver is still starting up. + CSIProcessor csi.CSIProcessor } // DefaultProcessors returns default set of processors. -func DefaultProcessors() *AutoscalingProcessors { +func DefaultProcessors(informerFactory informers.SharedInformerFactory) *AutoscalingProcessors { return &AutoscalingProcessors{ PodListProcessor: pods.NewDefaultPodListProcessor(), NodeGroupListProcessor: nodegroups.NewDefaultNodeGroupListProcessor(), @@ -77,6 +81,7 @@ func DefaultProcessors() *AutoscalingProcessors { NodeInfoProcessor: nodeinfos.NewDefaultNodeInfoProcessor(), NodeGroupConfigProcessor: nodegroupconfig.NewDefaultNodeGroupConfigProcessor(), CustomResourcesProcessor: customresources.NewDefaultCustomResourcesProcessor(), + CSIProcessor: csi.NewDefaultCSIProcessor(informerFactory), TemplateNodeInfoProvider: nodeinfosprovider.NewDefaultTemplateNodeInfoProvider(), ActionableClusterProcessor: actionablecluster.NewDefaultActionableClusterProcessor(), } @@ -96,6 +101,7 @@ func (ap *AutoscalingProcessors) CleanUp() { ap.NodeInfoProcessor.CleanUp() ap.NodeGroupConfigProcessor.CleanUp() ap.CustomResourcesProcessor.CleanUp() + ap.CSIProcessor.CleanUp() ap.TemplateNodeInfoProvider.CleanUp() ap.ActionableClusterProcessor.CleanUp() } diff --git a/cluster-autoscaler/simulator/scheduler_based_predicates_checker.go b/cluster-autoscaler/simulator/scheduler_based_predicates_checker.go index d9217cee0f5e..f0d4080eb694 100644 --- a/cluster-autoscaler/simulator/scheduler_based_predicates_checker.go +++ b/cluster-autoscaler/simulator/scheduler_based_predicates_checker.go @@ -42,8 +42,15 @@ type SchedulerBasedPredicateChecker struct { } // NewSchedulerBasedPredicateChecker builds scheduler based PredicateChecker. -func NewSchedulerBasedPredicateChecker(kubeClient kube_client.Interface, stop <-chan struct{}) (*SchedulerBasedPredicateChecker, error) { - informerFactory := informers.NewSharedInformerFactory(kubeClient, 0) +// The informer factory is optional. If nil, then this function will create one, +// start it and wait for cache syncing. +func NewSchedulerBasedPredicateChecker(kubeClient kube_client.Interface, informerFactory informers.SharedInformerFactory) (*SchedulerBasedPredicateChecker, error) { + startInformer := false + if informerFactory == nil { + informerFactory = informers.NewSharedInformerFactory(kubeClient, 0) + startInformer = true + } + config, err := scheduler_config.Default() if err != nil { return nil, fmt.Errorf("couldn't create scheduler config: %v", err) @@ -64,26 +71,21 @@ func NewSchedulerBasedPredicateChecker(kubeClient kube_client.Interface, stop <- return nil, fmt.Errorf("couldn't create scheduler framework; %v", err) } + if startInformer { + stopChannel := make(chan struct{}) + informerFactory.Start(stopChannel) + synced := informerFactory.WaitForCacheSync(stopChannel) + for k, v := range synced { + if !v { + return nil, fmt.Errorf("failed to sync informer %v", k) + } + } + } + checker := &SchedulerBasedPredicateChecker{ framework: framework, delegatingSharedLister: sharedLister, } - - // this MUST be called after all the informers/listers are acquired via the - // informerFactory....Lister()/informerFactory....Informer() methods - informerFactory.Start(stop) - - // Also wait for all informers to be up-to-date. This is necessary for - // objects that were added when creating the fake client. Without - // this wait, those objects won't be visible via the informers when - // the test runs. - synced := informerFactory.WaitForCacheSync(stop) - for k, v := range synced { - if !v { - return nil, fmt.Errorf("failed to sync informer %v", k) - } - } - return checker, nil } diff --git a/cluster-autoscaler/simulator/test_predicates_checker.go b/cluster-autoscaler/simulator/test_predicates_checker.go index 43ccb1233208..5cd26fba2a79 100644 --- a/cluster-autoscaler/simulator/test_predicates_checker.go +++ b/cluster-autoscaler/simulator/test_predicates_checker.go @@ -23,5 +23,5 @@ import ( // NewTestPredicateChecker builds test version of PredicateChecker. func NewTestPredicateChecker() (PredicateChecker, error) { // just call out to NewSchedulerBasedPredicateChecker but use fake kubeClient - return NewSchedulerBasedPredicateChecker(clientsetfake.NewSimpleClientset(), make(chan struct{})) + return NewSchedulerBasedPredicateChecker(clientsetfake.NewSimpleClientset(), nil) }