diff --git a/cluster-autoscaler/core/autoscaler.go b/cluster-autoscaler/core/autoscaler.go index 966f11147c2e..8c4c5e67acfa 100644 --- a/cluster-autoscaler/core/autoscaler.go +++ b/cluster-autoscaler/core/autoscaler.go @@ -22,6 +22,7 @@ import ( "github.com/golang/glog" "k8s.io/autoscaler/cluster-autoscaler/config/dynamic" "k8s.io/autoscaler/cluster-autoscaler/simulator" + "k8s.io/autoscaler/cluster-autoscaler/utils/errors" kube_util "k8s.io/autoscaler/cluster-autoscaler/utils/kubernetes" kube_record "k8s.io/client-go/tools/record" kube_client "k8s.io/kubernetes/pkg/client/clientset_generated/clientset" @@ -37,7 +38,7 @@ type AutoscalerOptions struct { // The configuration can be injected at the creation of an autoscaler type Autoscaler interface { // RunOnce represents an iteration in the control-loop of CA - RunOnce(currentTime time.Time) + RunOnce(currentTime time.Time) *errors.AutoscalerError // CleanUp represents a clean-up required before the first invocation of RunOnce CleanUp() // ExitCleanUp is a clean-up performed just before process termination. diff --git a/cluster-autoscaler/core/dynamic_autoscaler.go b/cluster-autoscaler/core/dynamic_autoscaler.go index c01380d4c405..815cc86908e3 100644 --- a/cluster-autoscaler/core/dynamic_autoscaler.go +++ b/cluster-autoscaler/core/dynamic_autoscaler.go @@ -24,6 +24,7 @@ import ( "k8s.io/autoscaler/cluster-autoscaler/config/dynamic" "k8s.io/autoscaler/cluster-autoscaler/metrics" "k8s.io/autoscaler/cluster-autoscaler/simulator" + "k8s.io/autoscaler/cluster-autoscaler/utils/errors" kube_util "k8s.io/autoscaler/cluster-autoscaler/utils/kubernetes" kube_record "k8s.io/client-go/tools/record" kube_client "k8s.io/kubernetes/pkg/client/clientset_generated/clientset" @@ -56,14 +57,14 @@ func (a *DynamicAutoscaler) ExitCleanUp() { } // RunOnce represents a single iteration of a dynamic autoscaler inside the CA's control-loop -func (a *DynamicAutoscaler) RunOnce(currentTime time.Time) { +func (a *DynamicAutoscaler) RunOnce(currentTime time.Time) *errors.AutoscalerError { reconfigureStart := time.Now() metrics.UpdateLastTime("reconfigure", reconfigureStart) if err := a.Reconfigure(); err != nil { glog.Errorf("Failed to reconfigure : %v", err) } metrics.UpdateDuration("reconfigure", reconfigureStart) - a.autoscaler.RunOnce(currentTime) + return a.autoscaler.RunOnce(currentTime) } // Reconfigure this dynamic autoscaler if the configmap is updated diff --git a/cluster-autoscaler/core/dynamic_autoscaler_test.go b/cluster-autoscaler/core/dynamic_autoscaler_test.go index 2e081933f84b..d86c9b95abc1 100644 --- a/cluster-autoscaler/core/dynamic_autoscaler_test.go +++ b/cluster-autoscaler/core/dynamic_autoscaler_test.go @@ -19,6 +19,7 @@ package core import ( "github.com/stretchr/testify/mock" "k8s.io/autoscaler/cluster-autoscaler/config/dynamic" + "k8s.io/autoscaler/cluster-autoscaler/utils/errors" "testing" "time" ) @@ -27,8 +28,9 @@ type AutoscalerMock struct { mock.Mock } -func (m *AutoscalerMock) RunOnce(currentTime time.Time) { +func (m *AutoscalerMock) RunOnce(currentTime time.Time) *errors.AutoscalerError { m.Called(currentTime) + return nil } func (m *AutoscalerMock) CleanUp() { diff --git a/cluster-autoscaler/core/polling_autoscaler.go b/cluster-autoscaler/core/polling_autoscaler.go index 821705e04bfe..26e51100f1aa 100644 --- a/cluster-autoscaler/core/polling_autoscaler.go +++ b/cluster-autoscaler/core/polling_autoscaler.go @@ -21,6 +21,7 @@ import ( "github.com/golang/glog" "k8s.io/autoscaler/cluster-autoscaler/metrics" + "k8s.io/autoscaler/cluster-autoscaler/utils/errors" ) // PollingAutoscaler is a variant of autoscaler which polls the source-of-truth every time RunOnce is invoked @@ -48,14 +49,14 @@ func (a *PollingAutoscaler) ExitCleanUp() { } // RunOnce represents a single iteration of a polling autoscaler inside the CA's control-loop -func (a *PollingAutoscaler) RunOnce(currentTime time.Time) { +func (a *PollingAutoscaler) RunOnce(currentTime time.Time) *errors.AutoscalerError { reconfigureStart := time.Now() metrics.UpdateLastTime("poll", reconfigureStart) if err := a.Poll(); err != nil { glog.Errorf("Failed to poll : %v", err) } metrics.UpdateDuration("poll", reconfigureStart) - a.autoscaler.RunOnce(currentTime) + return a.autoscaler.RunOnce(currentTime) } // Poll latest data from cloud provider to recreate this autoscaler diff --git a/cluster-autoscaler/core/scale_down.go b/cluster-autoscaler/core/scale_down.go index 5b5aa0796979..4ce4e4c41039 100644 --- a/cluster-autoscaler/core/scale_down.go +++ b/cluster-autoscaler/core/scale_down.go @@ -27,9 +27,10 @@ import ( "k8s.io/autoscaler/cluster-autoscaler/metrics" "k8s.io/autoscaler/cluster-autoscaler/simulator" "k8s.io/autoscaler/cluster-autoscaler/utils/deletetaint" + "k8s.io/autoscaler/cluster-autoscaler/utils/errors" kube_util "k8s.io/autoscaler/cluster-autoscaler/utils/kubernetes" - "k8s.io/apimachinery/pkg/api/errors" + kube_errors "k8s.io/apimachinery/pkg/api/errors" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" kube_record "k8s.io/client-go/tools/record" apiv1 "k8s.io/kubernetes/pkg/api/v1" @@ -113,7 +114,7 @@ func (sd *ScaleDown) UpdateUnneededNodes( nodes []*apiv1.Node, pods []*apiv1.Pod, timestamp time.Time, - pdbs []*policyv1.PodDisruptionBudget) error { + pdbs []*policyv1.PodDisruptionBudget) *errors.AutoscalerError { currentlyUnneededNodes := make([]*apiv1.Node, 0) nodeNameToNodeInfo := schedulercache.CreateNodeNameToInfoMap(pods, nodes) @@ -152,18 +153,18 @@ func (sd *ScaleDown) UpdateUnneededNodes( } // Phase2 - check which nodes can be probably removed using fast drain. - nodesToRemove, newHints, err := simulator.FindNodesToRemove(currentlyUnneededNodes, nodes, pods, + nodesToRemove, newHints, simulatorErr := simulator.FindNodesToRemove(currentlyUnneededNodes, nodes, pods, nil, sd.context.PredicateChecker, len(currentlyUnneededNodes), true, sd.podLocationHints, sd.usageTracker, timestamp, pdbs) - if err != nil { - glog.Errorf("Error while simulating node drains: %v", err) + if simulatorErr != nil { + glog.Errorf("Error while simulating node drains: %v", simulatorErr) sd.unneededNodesList = make([]*apiv1.Node, 0) sd.unneededNodes = make(map[string]time.Time) sd.nodeUtilizationMap = make(map[string]float64) sd.context.ClusterStateRegistry.UpdateScaleDownCandidates(sd.unneededNodesList, timestamp) - return fmt.Errorf("error while simulating node drains: %v", err) + return simulatorErr.AddPrefix("error while simulating node drains: ") } // Update the timestamp map. @@ -190,7 +191,7 @@ func (sd *ScaleDown) UpdateUnneededNodes( // TryToScaleDown tries to scale down the cluster. It returns ScaleDownResult indicating if any node was // removed and error if such occured. -func (sd *ScaleDown) TryToScaleDown(nodes []*apiv1.Node, pods []*apiv1.Pod, pdbs []*policyv1.PodDisruptionBudget) (ScaleDownResult, error) { +func (sd *ScaleDown) TryToScaleDown(nodes []*apiv1.Node, pods []*apiv1.Pod, pdbs []*policyv1.PodDisruptionBudget) (ScaleDownResult, *errors.AutoscalerError) { now := time.Now() candidates := make([]*apiv1.Node, 0) @@ -248,7 +249,7 @@ func (sd *ScaleDown) TryToScaleDown(nodes []*apiv1.Node, pods []*apiv1.Pod, pdbs // to recreate on other nodes. emptyNodes := getEmptyNodes(candidates, pods, sd.context.MaxEmptyBulkDelete, sd.context.CloudProvider) if len(emptyNodes) > 0 { - confirmation := make(chan error, len(emptyNodes)) + confirmation := make(chan *errors.AutoscalerError, len(emptyNodes)) for _, node := range emptyNodes { glog.V(0).Infof("Scale-down: removing empty node %s", node.Name) sd.context.LogRecorder.Eventf(apiv1.EventTypeNormal, "ScaleDownEmpty", "Scale-down: removing empty node %s", node.Name) @@ -267,14 +268,14 @@ func (sd *ScaleDown) TryToScaleDown(nodes []*apiv1.Node, pods []*apiv1.Pod, pdbs confirmation <- deleteErr }(node) } - var finalError error + var finalError *errors.AutoscalerError startTime := time.Now() for range emptyNodes { timeElapsed := time.Now().Sub(startTime) timeLeft := MaxCloudProviderNodeDeletionTime - timeElapsed if timeLeft < 0 { - finalError = fmt.Errorf("Failed to delete nodes in time") + finalError = errors.NewAutoscalerError(errors.TransientError, "Failed to delete nodes in time") break } select { @@ -284,13 +285,13 @@ func (sd *ScaleDown) TryToScaleDown(nodes []*apiv1.Node, pods []*apiv1.Pod, pdbs finalError = err } case <-time.After(timeLeft): - finalError = fmt.Errorf("Failed to delete nodes in time") + finalError = errors.NewAutoscalerError(errors.TransientError, "Failed to delete nodes in time") } } if finalError == nil { return ScaleDownNodeDeleted, nil } - return ScaleDownError, fmt.Errorf("failed to delete at least one empty node: %v", finalError) + return ScaleDownError, finalError.AddPrefix("failed to delete at least one empty node: ") } // We look for only 1 node so new hints may be incomplete. @@ -299,7 +300,7 @@ func (sd *ScaleDown) TryToScaleDown(nodes []*apiv1.Node, pods []*apiv1.Pod, pdbs sd.podLocationHints, sd.usageTracker, time.Now(), pdbs) if err != nil { - return ScaleDownError, fmt.Errorf("Find node to remove failed: %v", err) + return ScaleDownError, err.AddPrefix("Find node to remove failed: ") } if len(nodesToRemove) == 0 { glog.V(1).Infof("No node to remove") @@ -320,7 +321,7 @@ func (sd *ScaleDown) TryToScaleDown(nodes []*apiv1.Node, pods []*apiv1.Pod, pdbs simulator.RemoveNodeFromTracker(sd.usageTracker, toRemove.Node.Name, sd.unneededNodes) err = deleteNode(sd.context, toRemove.Node, toRemove.PodsToReschedule) if err != nil { - return ScaleDownError, fmt.Errorf("Failed to delete %s: %v", toRemove.Node.Name, err) + return ScaleDownError, err.AddPrefix("Failed to delete %s: ", toRemove.Node.Name) } if readinessMap[toRemove.Node.Name] { metrics.RegisterScaleDown(1, metrics.Underutilized) @@ -373,7 +374,7 @@ func getEmptyNodes(candidates []*apiv1.Node, pods []*apiv1.Pod, maxEmptyBulkDele return result[:limit] } -func deleteNode(context *AutoscalingContext, node *apiv1.Node, pods []*apiv1.Pod) error { +func deleteNode(context *AutoscalingContext, node *apiv1.Node, pods []*apiv1.Pod) *errors.AutoscalerError { if err := drainNode(node, pods, context.ClientSet, context.Recorder, context.MaxGratefulTerminationSec, MaxPodEvictionTime, EvictionRetryTime); err != nil { return err @@ -410,13 +411,13 @@ func evictPod(podToEvict *apiv1.Pod, client kube_client.Interface, recorder kube // Performs drain logic on the node. Marks the node as unschedulable and later removes all pods, giving // them up to MaxGracefulTerminationTime to finish. func drainNode(node *apiv1.Node, pods []*apiv1.Pod, client kube_client.Interface, recorder kube_record.EventRecorder, - maxGratefulTerminationSec int, maxPodEvictionTime time.Duration, waitBetweenRetries time.Duration) error { + maxGratefulTerminationSec int, maxPodEvictionTime time.Duration, waitBetweenRetries time.Duration) *errors.AutoscalerError { drainSuccessful := false toEvict := len(pods) if err := deletetaint.MarkToBeDeleted(node, client); err != nil { recorder.Eventf(node, apiv1.EventTypeWarning, "ScaleDownFailed", "failed to mark the node as toBeDeleted/unschedulable: %v", err) - return err + return errors.ToAutoscalerError(errors.ApiCallError, err) } // If we fail to evict all the pods from the node we want to remove delete taint @@ -448,11 +449,13 @@ func drainNode(node *apiv1.Node, pods []*apiv1.Pod, client kube_client.Interface metrics.RegisterEvictions(1) } case <-time.After(retryUntil.Sub(time.Now()) + 5*time.Second): - return fmt.Errorf("Failed to drain node %s/%s: timeout when waiting for creating evictions", node.Namespace, node.Name) + return errors.NewAutoscalerError( + errors.ApiCallError, "Failed to drain node %s/%s: timeout when waiting for creating evictions", node.Namespace, node.Name) } } if len(evictionErrs) != 0 { - return fmt.Errorf("Failed to drain node %s/%s, due to following errors: %v", node.Namespace, node.Name, evictionErrs) + return errors.NewAutoscalerError( + errors.ApiCallError, "Failed to drain node %s/%s, due to following errors: %v", node.Namespace, node.Name, evictionErrs) } // Evictions created successfully, wait maxGratefulTerminationSec + PodEvictionHeadroom to see if pods really disappeared. @@ -466,7 +469,7 @@ func drainNode(node *apiv1.Node, pods []*apiv1.Pod, client kube_client.Interface allGone = false break } - if !errors.IsNotFound(err) { + if !kube_errors.IsNotFound(err) { glog.Errorf("Failed to check pod %s/%s: %v", pod.Namespace, pod.Name, err) allGone = false } @@ -478,7 +481,8 @@ func drainNode(node *apiv1.Node, pods []*apiv1.Pod, client kube_client.Interface return nil } } - return fmt.Errorf("Failed to drain node %s/%s: pods remaining after timeout", node.Namespace, node.Name) + return errors.NewAutoscalerError( + errors.TransientError, "Failed to drain node %s/%s: pods remaining after timeout", node.Namespace, node.Name) } // cleanToBeDeleted cleans ToBeDeleted taints. @@ -499,16 +503,17 @@ func cleanToBeDeleted(nodes []*apiv1.Node, client kube_client.Interface, recorde // Removes the given node from cloud provider. No extra pre-deletion actions are executed on // the Kubernetes side. func deleteNodeFromCloudProvider(node *apiv1.Node, cloudProvider cloudprovider.CloudProvider, - recorder kube_record.EventRecorder, registry *clusterstate.ClusterStateRegistry) error { + recorder kube_record.EventRecorder, registry *clusterstate.ClusterStateRegistry) *errors.AutoscalerError { nodeGroup, err := cloudProvider.NodeGroupForNode(node) if err != nil { - return fmt.Errorf("failed to node group for %s: %v", node.Name, err) + return errors.NewAutoscalerError( + errors.CloudProviderError, "failed to find node group for %s: %v", node.Name, err) } if nodeGroup == nil || reflect.ValueOf(nodeGroup).IsNil() { - return fmt.Errorf("picked node that doesn't belong to a node group: %s", node.Name) + return errors.NewAutoscalerError(errors.InternalError, "picked node that doesn't belong to a node group: %s", node.Name) } if err = nodeGroup.DeleteNodes([]*apiv1.Node{node}); err != nil { - return fmt.Errorf("failed to delete %s: %v", node.Name, err) + return errors.NewAutoscalerError(errors.CloudProviderError, "failed to delete %s: %v", node.Name, err) } recorder.Eventf(node, apiv1.EventTypeNormal, "ScaleDown", "node removed by cluster autoscaler") registry.RegisterScaleDown(&clusterstate.ScaleDownRequest{ diff --git a/cluster-autoscaler/core/scale_up.go b/cluster-autoscaler/core/scale_up.go index e448975d172e..7a6282347bf0 100644 --- a/cluster-autoscaler/core/scale_up.go +++ b/cluster-autoscaler/core/scale_up.go @@ -17,13 +17,13 @@ limitations under the License. package core import ( - "fmt" "time" "k8s.io/autoscaler/cluster-autoscaler/clusterstate" "k8s.io/autoscaler/cluster-autoscaler/estimator" "k8s.io/autoscaler/cluster-autoscaler/expander" "k8s.io/autoscaler/cluster-autoscaler/metrics" + "k8s.io/autoscaler/cluster-autoscaler/utils/errors" apiv1 "k8s.io/kubernetes/pkg/api/v1" extensionsv1 "k8s.io/kubernetes/pkg/apis/extensions/v1beta1" "k8s.io/kubernetes/plugin/pkg/scheduler/schedulercache" @@ -35,7 +35,7 @@ import ( // false if it didn't and error if an error occured. Assumes that all nodes in the cluster are // ready and in sync with instance groups. func ScaleUp(context *AutoscalingContext, unschedulablePods []*apiv1.Pod, nodes []*apiv1.Node, - daemonSets []*extensionsv1.DaemonSet) (bool, error) { + daemonSets []*extensionsv1.DaemonSet) (bool, *errors.AutoscalerError) { // From now on we only care about unschedulable pods that were marked after the newest // node became available for the scheduler. if len(unschedulablePods) == 0 { @@ -49,14 +49,17 @@ func ScaleUp(context *AutoscalingContext, unschedulablePods []*apiv1.Pod, nodes nodeInfos, err := GetNodeInfosForGroups(nodes, context.CloudProvider, context.ClientSet, daemonSets, context.PredicateChecker) if err != nil { - return false, fmt.Errorf("failed to build node infos for node groups: %v", err) + return false, err.AddPrefix("failed to build node infos for node groups: ") } upcomingNodes := make([]*schedulercache.NodeInfo, 0) for nodeGroup, numberOfNodes := range context.ClusterStateRegistry.GetUpcomingNodes() { nodeTemplate, found := nodeInfos[nodeGroup] if !found { - return false, fmt.Errorf("failed to find template node for node group %s", nodeGroup) + return false, errors.NewAutoscalerError( + errors.InternalError, + "failed to find template node for node group %s", + nodeGroup) } for i := 0; i < numberOfNodes; i++ { upcomingNodes = append(upcomingNodes, nodeTemplate) @@ -153,7 +156,9 @@ func ScaleUp(context *AutoscalingContext, unschedulablePods []*apiv1.Pod, nodes currentSize, err := bestOption.NodeGroup.TargetSize() if err != nil { - return false, fmt.Errorf("failed to get node group size: %v", err) + return false, errors.NewAutoscalerError( + errors.CloudProviderError, + "failed to get node group size: %v", err) } newSize := currentSize + bestOption.NodeCount if newSize >= bestOption.NodeGroup.MaxSize() { @@ -165,14 +170,17 @@ func ScaleUp(context *AutoscalingContext, unschedulablePods []*apiv1.Pod, nodes glog.V(1).Infof("Capping size to max cluster total size (%d)", context.MaxNodesTotal) newSize = context.MaxNodesTotal - len(nodes) + currentSize if newSize < currentSize { - return false, fmt.Errorf("max node total count already reached") + return false, errors.NewAutoscalerError( + errors.TransientError, + "max node total count already reached") } } glog.V(0).Infof("Scale-up: setting group %s size to %d", bestOption.NodeGroup.Id(), newSize) increase := newSize - currentSize if err := bestOption.NodeGroup.IncreaseSize(increase); err != nil { - return false, fmt.Errorf("failed to increase node group size: %v", err) + return false, errors.NewAutoscalerError( + errors.CloudProviderError, "failed to increase node group size: %v", err) } context.ClusterStateRegistry.RegisterScaleUp( &clusterstate.ScaleUpRequest{ diff --git a/cluster-autoscaler/core/static_autoscaler.go b/cluster-autoscaler/core/static_autoscaler.go index 0ecddbcf394e..52ccdf895d64 100644 --- a/cluster-autoscaler/core/static_autoscaler.go +++ b/cluster-autoscaler/core/static_autoscaler.go @@ -21,6 +21,7 @@ import ( "k8s.io/autoscaler/cluster-autoscaler/clusterstate/utils" "k8s.io/autoscaler/cluster-autoscaler/metrics" + "k8s.io/autoscaler/cluster-autoscaler/utils/errors" kube_util "k8s.io/autoscaler/cluster-autoscaler/utils/kubernetes" kube_record "k8s.io/client-go/tools/record" @@ -72,7 +73,7 @@ func (a *StaticAutoscaler) CleanUp() { } // RunOnce iterates over node groups and scales them up/down if necessary -func (a *StaticAutoscaler) RunOnce(currentTime time.Time) { +func (a *StaticAutoscaler) RunOnce(currentTime time.Time) *errors.AutoscalerError { readyNodeLister := a.ReadyNodeLister() allNodeLister := a.AllNodeLister() unschedulablePodLister := a.UnschedulablePodLister() @@ -85,30 +86,30 @@ func (a *StaticAutoscaler) RunOnce(currentTime time.Time) { readyNodes, err := readyNodeLister.List() if err != nil { glog.Errorf("Failed to list ready nodes: %v", err) - return + return errors.ToAutoscalerError(errors.ApiCallError, err) } if len(readyNodes) == 0 { - glog.Error("No ready nodes in the cluster") + glog.Warningf("No ready nodes in the cluster") scaleDown.CleanUpUnneededNodes() - return + return nil } allNodes, err := allNodeLister.List() if err != nil { glog.Errorf("Failed to list all nodes: %v", err) - return + return errors.ToAutoscalerError(errors.ApiCallError, err) } if len(allNodes) == 0 { - glog.Error("No nodes in the cluster") + glog.Warningf("No nodes in the cluster") scaleDown.CleanUpUnneededNodes() - return + return nil } err = a.ClusterStateRegistry.UpdateNodes(allNodes, currentTime) if err != nil { glog.Errorf("Failed to update node registry: %v", err) scaleDown.CleanUpUnneededNodes() - return + return errors.ToAutoscalerError(errors.CloudProviderError, err) } metrics.UpdateClusterState(a.ClusterStateRegistry) @@ -122,7 +123,7 @@ func (a *StaticAutoscaler) RunOnce(currentTime time.Time) { if !a.ClusterStateRegistry.IsClusterHealthy() { glog.Warning("Cluster is not ready for autoscaling") scaleDown.CleanUpUnneededNodes() - return + return nil } metrics.UpdateDuration("updateClusterState", runStart) @@ -139,15 +140,15 @@ func (a *StaticAutoscaler) RunOnce(currentTime time.Time) { if removedAny { glog.Warningf("Some unregistered nodes were removed, but got error: %v", err) } else { - glog.Warningf("Failed to remove unregistered nodes: %v", err) + glog.Errorf("Failed to remove unregistered nodes: %v", err) } - return + return errors.ToAutoscalerError(errors.CloudProviderError, err) } // Some nodes were removed. Let's skip this iteration, the next one should be better. if removedAny { glog.V(0).Infof("Some unregistered nodes were removed, skipping iteration") - return + return nil } } @@ -156,25 +157,25 @@ func (a *StaticAutoscaler) RunOnce(currentTime time.Time) { // TODO: andrewskim - add protection for ready AWS nodes. fixedSomething, err := fixNodeGroupSize(autoscalingContext, time.Now()) if err != nil { - glog.Warningf("Failed to fix node group sizes: %v", err) - return + glog.Errorf("Failed to fix node group sizes: %v", err) + return errors.ToAutoscalerError(errors.CloudProviderError, err) } if fixedSomething { glog.V(0).Infof("Some node group target size was fixed, skipping the iteration") - return + return nil } allUnschedulablePods, err := unschedulablePodLister.List() if err != nil { glog.Errorf("Failed to list unscheduled pods: %v", err) - return + return errors.ToAutoscalerError(errors.ApiCallError, err) } metrics.UpdateUnschedulablePodsCount(len(allUnschedulablePods)) allScheduled, err := scheduledPodLister.List() if err != nil { glog.Errorf("Failed to list scheduled pods: %v", err) - return + return errors.ToAutoscalerError(errors.ApiCallError, err) } // We need to reset all pods that have been marked as unschedulable not after @@ -224,20 +225,20 @@ func (a *StaticAutoscaler) RunOnce(currentTime time.Time) { daemonsets, err := a.ListerRegistry.DaemonSetLister().List() if err != nil { glog.Errorf("Failed to get daemonset list") - return + return errors.ToAutoscalerError(errors.ApiCallError, err) } - scaledUp, err := ScaleUp(autoscalingContext, unschedulablePodsToHelp, readyNodes, daemonsets) + scaledUp, typedErr := ScaleUp(autoscalingContext, unschedulablePodsToHelp, readyNodes, daemonsets) metrics.UpdateDuration("scaleup", scaleUpStart) - if err != nil { - glog.Errorf("Failed to scale up: %v", err) - return + if typedErr != nil { + glog.Errorf("Failed to scale up: %v", typedErr) + return typedErr } else if scaledUp { a.lastScaleUpTime = time.Now() // No scale down in this iteration. - return + return nil } } @@ -247,7 +248,7 @@ func (a *StaticAutoscaler) RunOnce(currentTime time.Time) { pdbs, err := pdbLister.List() if err != nil { glog.Errorf("Failed to list pod disruption budgets: %v", err) - return + return errors.ToAutoscalerError(errors.ApiCallError, err) } // In dry run only utilization is updated @@ -262,10 +263,10 @@ func (a *StaticAutoscaler) RunOnce(currentTime time.Time) { glog.V(4).Infof("Calculating unneeded nodes") scaleDown.CleanUp(time.Now()) - err = scaleDown.UpdateUnneededNodes(allNodes, allScheduled, time.Now(), pdbs) - if err != nil { - glog.Warningf("Failed to scale down: %v", err) - return + typedErr := scaleDown.UpdateUnneededNodes(allNodes, allScheduled, time.Now(), pdbs) + if typedErr != nil { + glog.Errorf("Failed to scale down: %v", typedErr) + return typedErr } metrics.UpdateDuration("findUnneeded", unneededStart) @@ -281,19 +282,20 @@ func (a *StaticAutoscaler) RunOnce(currentTime time.Time) { scaleDownStart := time.Now() metrics.UpdateLastTime("scaleDown", scaleDownStart) - result, err := scaleDown.TryToScaleDown(allNodes, allScheduled, pdbs) + result, typedErr := scaleDown.TryToScaleDown(allNodes, allScheduled, pdbs) metrics.UpdateDuration("scaleDown", scaleDownStart) // TODO: revisit result handling - if err != nil { + if typedErr != nil { glog.Errorf("Failed to scale down: %v", err) - } else { - if result == ScaleDownError || result == ScaleDownNoNodeDeleted { - a.lastScaleDownFailedTrial = time.Now() - } + return typedErr + } + if result == ScaleDownError || result == ScaleDownNoNodeDeleted { + a.lastScaleDownFailedTrial = time.Now() } } } + return nil } // ExitCleanUp removes status configmap. diff --git a/cluster-autoscaler/core/utils.go b/cluster-autoscaler/core/utils.go index 7abf3b4cf421..3442d303462a 100644 --- a/cluster-autoscaler/core/utils.go +++ b/cluster-autoscaler/core/utils.go @@ -26,6 +26,7 @@ import ( "k8s.io/autoscaler/cluster-autoscaler/clusterstate" "k8s.io/autoscaler/cluster-autoscaler/simulator" "k8s.io/autoscaler/cluster-autoscaler/utils/daemonset" + "k8s.io/autoscaler/cluster-autoscaler/utils/errors" kube_util "k8s.io/autoscaler/cluster-autoscaler/utils/kubernetes" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" @@ -137,14 +138,14 @@ func createNodeNameToInfoMap(pods []*apiv1.Pod, nodes []*apiv1.Node) map[string] // // TODO(mwielgus): Review error policy - sometimes we may continue with partial errors. func GetNodeInfosForGroups(nodes []*apiv1.Node, cloudProvider cloudprovider.CloudProvider, kubeClient kube_client.Interface, - daemonsets []*extensionsv1.DaemonSet, predicateChecker *simulator.PredicateChecker) (map[string]*schedulercache.NodeInfo, error) { + daemonsets []*extensionsv1.DaemonSet, predicateChecker *simulator.PredicateChecker) (map[string]*schedulercache.NodeInfo, *errors.AutoscalerError) { result := make(map[string]*schedulercache.NodeInfo) // processNode returns information whether the nodeTemplate was generated and if there was an error. - processNode := func(node *apiv1.Node) (bool, error) { + processNode := func(node *apiv1.Node) (bool, *errors.AutoscalerError) { nodeGroup, err := cloudProvider.NodeGroupForNode(node) if err != nil { - return false, err + return false, errors.ToAutoscalerError(errors.CloudProviderError, err) } if nodeGroup == nil || reflect.ValueOf(nodeGroup).IsNil() { return false, nil @@ -171,9 +172,9 @@ func GetNodeInfosForGroups(nodes []*apiv1.Node, cloudProvider cloudprovider.Clou if !kube_util.IsNodeReadyAndSchedulable(node) { continue } - _, err := processNode(node) - if err != nil { - return map[string]*schedulercache.NodeInfo{}, err + _, typedErr := processNode(node) + if typedErr != nil { + return map[string]*schedulercache.NodeInfo{}, typedErr } } for _, nodeGroup := range cloudProvider.NodeGroups() { @@ -190,16 +191,17 @@ func GetNodeInfosForGroups(nodes []*apiv1.Node, cloudProvider cloudprovider.Clou continue } else { glog.Errorf("Unable to build proper template node for %s: %v", id, err) - return map[string]*schedulercache.NodeInfo{}, err + return map[string]*schedulercache.NodeInfo{}, errors.ToAutoscalerError( + errors.CloudProviderError, err) } } pods := daemonset.GetDaemonSetPodsForNode(baseNodeInfo, daemonsets, predicateChecker) pods = append(pods, baseNodeInfo.Pods()...) fullNodeInfo := schedulercache.NewNodeInfo(pods...) fullNodeInfo.SetNode(baseNodeInfo.Node()) - sanitizedNodeInfo, err := sanitizeNodeInfo(fullNodeInfo, id) - if err != nil { - return map[string]*schedulercache.NodeInfo{}, err + sanitizedNodeInfo, typedErr := sanitizeNodeInfo(fullNodeInfo, id) + if typedErr != nil { + return map[string]*schedulercache.NodeInfo{}, typedErr } result[id] = sanitizedNodeInfo } @@ -208,13 +210,14 @@ func GetNodeInfosForGroups(nodes []*apiv1.Node, cloudProvider cloudprovider.Clou for _, node := range nodes { // Allowing broken nodes if !kube_util.IsNodeReadyAndSchedulable(node) { - added, err := processNode(node) - if err != nil { - return map[string]*schedulercache.NodeInfo{}, err + added, typedErr := processNode(node) + if typedErr != nil { + return map[string]*schedulercache.NodeInfo{}, typedErr } nodeGroup, err := cloudProvider.NodeGroupForNode(node) if err != nil { - return map[string]*schedulercache.NodeInfo{}, err + return map[string]*schedulercache.NodeInfo{}, errors.ToAutoscalerError( + errors.CloudProviderError, err) } if added { glog.Warningf("Built template for %s based on unready/unschedulable node %s", nodeGroup.Id(), node.Name) @@ -225,7 +228,7 @@ func GetNodeInfosForGroups(nodes []*apiv1.Node, cloudProvider cloudprovider.Clou return result, nil } -func sanitizeNodeInfo(nodeInfo *schedulercache.NodeInfo, nodeGroupName string) (*schedulercache.NodeInfo, error) { +func sanitizeNodeInfo(nodeInfo *schedulercache.NodeInfo, nodeGroupName string) (*schedulercache.NodeInfo, *errors.AutoscalerError) { // Sanitize node name. sanitizedNode, err := sanitizeTemplateNode(nodeInfo.Node(), nodeGroupName) if err != nil { @@ -237,7 +240,7 @@ func sanitizeNodeInfo(nodeInfo *schedulercache.NodeInfo, nodeGroupName string) ( for _, pod := range nodeInfo.Pods() { obj, err := api.Scheme.DeepCopy(pod) if err != nil { - return nil, err + return nil, errors.ToAutoscalerError(errors.InternalError, err) } sanitizedPod := obj.(*apiv1.Pod) sanitizedPod.Spec.NodeName = sanitizedNode.Name @@ -247,15 +250,15 @@ func sanitizeNodeInfo(nodeInfo *schedulercache.NodeInfo, nodeGroupName string) ( // Build a new node info. sanitizedNodeInfo := schedulercache.NewNodeInfo(sanitizedPods...) if err := sanitizedNodeInfo.SetNode(sanitizedNode); err != nil { - return nil, err + return nil, errors.ToAutoscalerError(errors.InternalError, err) } return sanitizedNodeInfo, nil } -func sanitizeTemplateNode(node *apiv1.Node, nodeGroup string) (*apiv1.Node, error) { +func sanitizeTemplateNode(node *apiv1.Node, nodeGroup string) (*apiv1.Node, *errors.AutoscalerError) { obj, err := api.Scheme.DeepCopy(node) if err != nil { - return nil, err + return nil, errors.ToAutoscalerError(errors.InternalError, err) } nodeName := fmt.Sprintf("template-node-for-%s-%d", nodeGroup, rand.Int63()) newNode := obj.(*apiv1.Node) diff --git a/cluster-autoscaler/main.go b/cluster-autoscaler/main.go index e2c23a5547e6..aad78f4b1518 100644 --- a/cluster-autoscaler/main.go +++ b/cluster-autoscaler/main.go @@ -35,6 +35,7 @@ import ( "k8s.io/autoscaler/cluster-autoscaler/expander" "k8s.io/autoscaler/cluster-autoscaler/metrics" "k8s.io/autoscaler/cluster-autoscaler/simulator" + "k8s.io/autoscaler/cluster-autoscaler/utils/errors" kube_util "k8s.io/autoscaler/cluster-autoscaler/utils/kubernetes" kube_client "k8s.io/kubernetes/pkg/client/clientset_generated/clientset" kube_leaderelection "k8s.io/kubernetes/pkg/client/leaderelection" @@ -191,7 +192,10 @@ func run(_ <-chan struct{}) { loopStart := time.Now() metrics.UpdateLastTime("main", loopStart) - autoscaler.RunOnce(loopStart) + err := autoscaler.RunOnce(loopStart) + if err != nil && err.Type() != errors.TransientError { + metrics.RegisterError(err) + } metrics.UpdateDuration("main", loopStart) } diff --git a/cluster-autoscaler/metrics/metrics.go b/cluster-autoscaler/metrics/metrics.go index 2a99dba3b144..8f8137e5f073 100644 --- a/cluster-autoscaler/metrics/metrics.go +++ b/cluster-autoscaler/metrics/metrics.go @@ -21,6 +21,7 @@ import ( "time" "k8s.io/autoscaler/cluster-autoscaler/clusterstate" + "k8s.io/autoscaler/cluster-autoscaler/utils/errors" "github.com/prometheus/client_golang/prometheus" ) @@ -86,6 +87,14 @@ var ( ) /**** Metrics related to autoscaler operations ****/ + errorsCount = prometheus.NewCounterVec( + prometheus.CounterOpts{ + Namespace: caNamespace, + Name: "errors_total", + Help: "The number of CA loops failed due to an error.", + }, []string{"type"}, + ) + scaleUpCount = prometheus.NewCounter( prometheus.CounterOpts{ Namespace: caNamespace, @@ -125,6 +134,7 @@ func init() { prometheus.MustRegister(unschedulablePodsCount) prometheus.MustRegister(lastActivity) prometheus.MustRegister(functionDuration) + prometheus.MustRegister(errorsCount) prometheus.MustRegister(scaleUpCount) prometheus.MustRegister(scaleDownCount) prometheus.MustRegister(evictionsCount) @@ -166,6 +176,12 @@ func UpdateUnschedulablePodsCount(podsCount int) { unschedulablePodsCount.Set(float64(podsCount)) } +// RegisterError records any errors preventing Cluster Autoscaler from working. +// No more than one error should be recorded per loop. +func RegisterError(err *errors.AutoscalerError) { + errorsCount.WithLabelValues(string(err.Type())).Add(1.0) +} + // RegisterScaleUp records number of nodes added by scale up func RegisterScaleUp(nodesCount int) { scaleUpCount.Add(float64(nodesCount)) diff --git a/cluster-autoscaler/simulator/cluster.go b/cluster-autoscaler/simulator/cluster.go index 35b57bd2bf1b..da281fe8a1d2 100644 --- a/cluster-autoscaler/simulator/cluster.go +++ b/cluster-autoscaler/simulator/cluster.go @@ -24,6 +24,7 @@ import ( "time" "k8s.io/apimachinery/pkg/api/resource" + "k8s.io/autoscaler/cluster-autoscaler/utils/errors" apiv1 "k8s.io/kubernetes/pkg/api/v1" policyv1 "k8s.io/kubernetes/pkg/apis/policy/v1beta1" client "k8s.io/kubernetes/pkg/client/clientset_generated/clientset" @@ -58,7 +59,7 @@ func FindNodesToRemove(candidates []*apiv1.Node, allNodes []*apiv1.Node, pods [] fastCheck bool, oldHints map[string]string, usageTracker *UsageTracker, timestamp time.Time, podDisruptionBudgets []*policyv1.PodDisruptionBudget, -) (nodesToRemove []NodeToBeRemoved, podReschedulingHints map[string]string, finalError error) { +) (nodesToRemove []NodeToBeRemoved, podReschedulingHints map[string]string, finalError *errors.AutoscalerError) { nodeNameToNodeInfo := schedulercache.CreateNodeNameToInfoMap(pods, allNodes) result := make([]NodeToBeRemoved, 0) diff --git a/cluster-autoscaler/simulator/nodes.go b/cluster-autoscaler/simulator/nodes.go index 77828151595f..23bdaccd351a 100644 --- a/cluster-autoscaler/simulator/nodes.go +++ b/cluster-autoscaler/simulator/nodes.go @@ -22,6 +22,7 @@ import ( metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" "k8s.io/apimachinery/pkg/fields" "k8s.io/autoscaler/cluster-autoscaler/utils/drain" + "k8s.io/autoscaler/cluster-autoscaler/utils/errors" api "k8s.io/kubernetes/pkg/api" apiv1 "k8s.io/kubernetes/pkg/api/v1" kube_client "k8s.io/kubernetes/pkg/client/clientset_generated/clientset" @@ -31,12 +32,13 @@ import ( // GetRequiredPodsForNode returns a list od pods that would appear on the node if the // node was just created (like daemonset and manifest-run pods). It reuses kubectl // drain command to get the list. -func GetRequiredPodsForNode(nodename string, client kube_client.Interface) ([]*apiv1.Pod, error) { +func GetRequiredPodsForNode(nodename string, client kube_client.Interface) ([]*apiv1.Pod, *errors.AutoscalerError) { + // TODO: we should change this to use informer podListResult, err := client.Core().Pods(apiv1.NamespaceAll).List( metav1.ListOptions{FieldSelector: fields.SelectorFromSet(fields.Set{"spec.nodeName": nodename}).String()}) if err != nil { - return []*apiv1.Pod{}, err + return []*apiv1.Pod{}, errors.ToAutoscalerError(errors.ApiCallError, err) } allPods := make([]*apiv1.Pod, 0) for i := range podListResult.Items { @@ -54,7 +56,7 @@ func GetRequiredPodsForNode(nodename string, client kube_client.Interface) ([]*a 0, time.Now()) if err != nil { - return []*apiv1.Pod{}, err + return []*apiv1.Pod{}, errors.ToAutoscalerError(errors.InternalError, err) } podsToRemoveMap := make(map[string]struct{}) @@ -76,14 +78,14 @@ func GetRequiredPodsForNode(nodename string, client kube_client.Interface) ([]*a } // BuildNodeInfoForNode build a NodeInfo structure for the given node as if the node was just created. -func BuildNodeInfoForNode(node *apiv1.Node, client kube_client.Interface) (*schedulercache.NodeInfo, error) { +func BuildNodeInfoForNode(node *apiv1.Node, client kube_client.Interface) (*schedulercache.NodeInfo, *errors.AutoscalerError) { requiredPods, err := GetRequiredPodsForNode(node.Name, client) if err != nil { return nil, err } result := schedulercache.NewNodeInfo(requiredPods...) if err := result.SetNode(node); err != nil { - return nil, err + return nil, errors.ToAutoscalerError(errors.InternalError, err) } return result, nil } diff --git a/cluster-autoscaler/utils/errors/errors.go b/cluster-autoscaler/utils/errors/errors.go new file mode 100644 index 000000000000..c0d91d06313a --- /dev/null +++ b/cluster-autoscaler/utils/errors/errors.go @@ -0,0 +1,80 @@ +/* +Copyright 2017 The Kubernetes Authors. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package errors + +import ( + "fmt" +) + +// AutoscalerErrorType describes a high-level category of a given error +type AutoscalerErrorType string + +// AutoscalerError contains information about Autoscaler errors +type AutoscalerError struct { + errorType AutoscalerErrorType + msg string +} + +const ( + // CloudProviderError is an error related to underlying infrastructure + CloudProviderError AutoscalerErrorType = "cloudProviderError" + // ApiCallError is an error related to communication with k8s API server + ApiCallError AutoscalerErrorType = "apiCallError" + // InternalError is an error inside Cluster Autoscaler + InternalError AutoscalerErrorType = "internalError" + // TransientError is an error that causes us to skip a single loop, but + // does not require any additional action. + TransientError AutoscalerErrorType = "transientError" +) + +// NewAutoscalerError returns new autoscaler error with a message constructed from format string +func NewAutoscalerError(errorType AutoscalerErrorType, msg string, args ...interface{}) *AutoscalerError { + return &AutoscalerError{ + errorType: errorType, + msg: fmt.Sprintf(msg, args...), + } +} + +// ToAutoscalerError converts an error to AutoscalerError with given type, +// unless it already is an AutoscalerError (in which case it's not modified). +func ToAutoscalerError(defaultType AutoscalerErrorType, err error) *AutoscalerError { + if e, ok := err.(*AutoscalerError); ok { + return e + } + return NewAutoscalerError(defaultType, err.Error()) +} + +// Error implements golang error interface +func (e *AutoscalerError) Error() string { + return e.msg +} + +// Type returns the typ of AutoscalerError +func (e *AutoscalerError) Type() AutoscalerErrorType { + return e.errorType +} + +// AddPrefix adds a prefix to error message. +// Returns the error it's called for convienient inline use. +// Example: +// if err := DoSomething(myObject); err != nil { +// return err.AddPrefix("can't do something with %v: ", myObject) +// } +func (e *AutoscalerError) AddPrefix(msg string, args ...interface{}) *AutoscalerError { + e.msg = fmt.Sprintf(msg, args...) + e.msg + return e +}