Skip to content

Commit

Permalink
Merge pull request #74 from MaciekPytel/autoscaler_errors
Browse files Browse the repository at this point in the history
Add typed errors, add errors_total metric
  • Loading branch information
mwielgus authored May 19, 2017
2 parents 0d8bd3d + 58cdfa1 commit 9083c13
Show file tree
Hide file tree
Showing 13 changed files with 224 additions and 98 deletions.
3 changes: 2 additions & 1 deletion cluster-autoscaler/core/autoscaler.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@ import (
"github.com/golang/glog"
"k8s.io/autoscaler/cluster-autoscaler/config/dynamic"
"k8s.io/autoscaler/cluster-autoscaler/simulator"
"k8s.io/autoscaler/cluster-autoscaler/utils/errors"
kube_util "k8s.io/autoscaler/cluster-autoscaler/utils/kubernetes"
kube_record "k8s.io/client-go/tools/record"
kube_client "k8s.io/kubernetes/pkg/client/clientset_generated/clientset"
Expand All @@ -37,7 +38,7 @@ type AutoscalerOptions struct {
// The configuration can be injected at the creation of an autoscaler
type Autoscaler interface {
// RunOnce represents an iteration in the control-loop of CA
RunOnce(currentTime time.Time)
RunOnce(currentTime time.Time) *errors.AutoscalerError
// CleanUp represents a clean-up required before the first invocation of RunOnce
CleanUp()
// ExitCleanUp is a clean-up performed just before process termination.
Expand Down
5 changes: 3 additions & 2 deletions cluster-autoscaler/core/dynamic_autoscaler.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@ import (
"k8s.io/autoscaler/cluster-autoscaler/config/dynamic"
"k8s.io/autoscaler/cluster-autoscaler/metrics"
"k8s.io/autoscaler/cluster-autoscaler/simulator"
"k8s.io/autoscaler/cluster-autoscaler/utils/errors"
kube_util "k8s.io/autoscaler/cluster-autoscaler/utils/kubernetes"
kube_record "k8s.io/client-go/tools/record"
kube_client "k8s.io/kubernetes/pkg/client/clientset_generated/clientset"
Expand Down Expand Up @@ -56,14 +57,14 @@ func (a *DynamicAutoscaler) ExitCleanUp() {
}

// RunOnce represents a single iteration of a dynamic autoscaler inside the CA's control-loop
func (a *DynamicAutoscaler) RunOnce(currentTime time.Time) {
func (a *DynamicAutoscaler) RunOnce(currentTime time.Time) *errors.AutoscalerError {
reconfigureStart := time.Now()
metrics.UpdateLastTime("reconfigure", reconfigureStart)
if err := a.Reconfigure(); err != nil {
glog.Errorf("Failed to reconfigure : %v", err)
}
metrics.UpdateDuration("reconfigure", reconfigureStart)
a.autoscaler.RunOnce(currentTime)
return a.autoscaler.RunOnce(currentTime)
}

// Reconfigure this dynamic autoscaler if the configmap is updated
Expand Down
4 changes: 3 additions & 1 deletion cluster-autoscaler/core/dynamic_autoscaler_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@ package core
import (
"github.com/stretchr/testify/mock"
"k8s.io/autoscaler/cluster-autoscaler/config/dynamic"
"k8s.io/autoscaler/cluster-autoscaler/utils/errors"
"testing"
"time"
)
Expand All @@ -27,8 +28,9 @@ type AutoscalerMock struct {
mock.Mock
}

func (m *AutoscalerMock) RunOnce(currentTime time.Time) {
func (m *AutoscalerMock) RunOnce(currentTime time.Time) *errors.AutoscalerError {
m.Called(currentTime)
return nil
}

func (m *AutoscalerMock) CleanUp() {
Expand Down
5 changes: 3 additions & 2 deletions cluster-autoscaler/core/polling_autoscaler.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@ import (

"github.com/golang/glog"
"k8s.io/autoscaler/cluster-autoscaler/metrics"
"k8s.io/autoscaler/cluster-autoscaler/utils/errors"
)

// PollingAutoscaler is a variant of autoscaler which polls the source-of-truth every time RunOnce is invoked
Expand Down Expand Up @@ -48,14 +49,14 @@ func (a *PollingAutoscaler) ExitCleanUp() {
}

// RunOnce represents a single iteration of a polling autoscaler inside the CA's control-loop
func (a *PollingAutoscaler) RunOnce(currentTime time.Time) {
func (a *PollingAutoscaler) RunOnce(currentTime time.Time) *errors.AutoscalerError {
reconfigureStart := time.Now()
metrics.UpdateLastTime("poll", reconfigureStart)
if err := a.Poll(); err != nil {
glog.Errorf("Failed to poll : %v", err)
}
metrics.UpdateDuration("poll", reconfigureStart)
a.autoscaler.RunOnce(currentTime)
return a.autoscaler.RunOnce(currentTime)
}

// Poll latest data from cloud provider to recreate this autoscaler
Expand Down
55 changes: 30 additions & 25 deletions cluster-autoscaler/core/scale_down.go
Original file line number Diff line number Diff line change
Expand Up @@ -27,9 +27,10 @@ import (
"k8s.io/autoscaler/cluster-autoscaler/metrics"
"k8s.io/autoscaler/cluster-autoscaler/simulator"
"k8s.io/autoscaler/cluster-autoscaler/utils/deletetaint"
"k8s.io/autoscaler/cluster-autoscaler/utils/errors"
kube_util "k8s.io/autoscaler/cluster-autoscaler/utils/kubernetes"

"k8s.io/apimachinery/pkg/api/errors"
kube_errors "k8s.io/apimachinery/pkg/api/errors"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
kube_record "k8s.io/client-go/tools/record"
apiv1 "k8s.io/kubernetes/pkg/api/v1"
Expand Down Expand Up @@ -113,7 +114,7 @@ func (sd *ScaleDown) UpdateUnneededNodes(
nodes []*apiv1.Node,
pods []*apiv1.Pod,
timestamp time.Time,
pdbs []*policyv1.PodDisruptionBudget) error {
pdbs []*policyv1.PodDisruptionBudget) *errors.AutoscalerError {

currentlyUnneededNodes := make([]*apiv1.Node, 0)
nodeNameToNodeInfo := schedulercache.CreateNodeNameToInfoMap(pods, nodes)
Expand Down Expand Up @@ -152,18 +153,18 @@ func (sd *ScaleDown) UpdateUnneededNodes(
}

// Phase2 - check which nodes can be probably removed using fast drain.
nodesToRemove, newHints, err := simulator.FindNodesToRemove(currentlyUnneededNodes, nodes, pods,
nodesToRemove, newHints, simulatorErr := simulator.FindNodesToRemove(currentlyUnneededNodes, nodes, pods,
nil, sd.context.PredicateChecker,
len(currentlyUnneededNodes), true, sd.podLocationHints, sd.usageTracker, timestamp, pdbs)
if err != nil {
glog.Errorf("Error while simulating node drains: %v", err)
if simulatorErr != nil {
glog.Errorf("Error while simulating node drains: %v", simulatorErr)

sd.unneededNodesList = make([]*apiv1.Node, 0)
sd.unneededNodes = make(map[string]time.Time)
sd.nodeUtilizationMap = make(map[string]float64)
sd.context.ClusterStateRegistry.UpdateScaleDownCandidates(sd.unneededNodesList, timestamp)

return fmt.Errorf("error while simulating node drains: %v", err)
return simulatorErr.AddPrefix("error while simulating node drains: ")
}

// Update the timestamp map.
Expand All @@ -190,7 +191,7 @@ func (sd *ScaleDown) UpdateUnneededNodes(

// TryToScaleDown tries to scale down the cluster. It returns ScaleDownResult indicating if any node was
// removed and error if such occured.
func (sd *ScaleDown) TryToScaleDown(nodes []*apiv1.Node, pods []*apiv1.Pod, pdbs []*policyv1.PodDisruptionBudget) (ScaleDownResult, error) {
func (sd *ScaleDown) TryToScaleDown(nodes []*apiv1.Node, pods []*apiv1.Pod, pdbs []*policyv1.PodDisruptionBudget) (ScaleDownResult, *errors.AutoscalerError) {

now := time.Now()
candidates := make([]*apiv1.Node, 0)
Expand Down Expand Up @@ -248,7 +249,7 @@ func (sd *ScaleDown) TryToScaleDown(nodes []*apiv1.Node, pods []*apiv1.Pod, pdbs
// to recreate on other nodes.
emptyNodes := getEmptyNodes(candidates, pods, sd.context.MaxEmptyBulkDelete, sd.context.CloudProvider)
if len(emptyNodes) > 0 {
confirmation := make(chan error, len(emptyNodes))
confirmation := make(chan *errors.AutoscalerError, len(emptyNodes))
for _, node := range emptyNodes {
glog.V(0).Infof("Scale-down: removing empty node %s", node.Name)
sd.context.LogRecorder.Eventf(apiv1.EventTypeNormal, "ScaleDownEmpty", "Scale-down: removing empty node %s", node.Name)
Expand All @@ -267,14 +268,14 @@ func (sd *ScaleDown) TryToScaleDown(nodes []*apiv1.Node, pods []*apiv1.Pod, pdbs
confirmation <- deleteErr
}(node)
}
var finalError error
var finalError *errors.AutoscalerError

startTime := time.Now()
for range emptyNodes {
timeElapsed := time.Now().Sub(startTime)
timeLeft := MaxCloudProviderNodeDeletionTime - timeElapsed
if timeLeft < 0 {
finalError = fmt.Errorf("Failed to delete nodes in time")
finalError = errors.NewAutoscalerError(errors.TransientError, "Failed to delete nodes in time")
break
}
select {
Expand All @@ -284,13 +285,13 @@ func (sd *ScaleDown) TryToScaleDown(nodes []*apiv1.Node, pods []*apiv1.Pod, pdbs
finalError = err
}
case <-time.After(timeLeft):
finalError = fmt.Errorf("Failed to delete nodes in time")
finalError = errors.NewAutoscalerError(errors.TransientError, "Failed to delete nodes in time")
}
}
if finalError == nil {
return ScaleDownNodeDeleted, nil
}
return ScaleDownError, fmt.Errorf("failed to delete at least one empty node: %v", finalError)
return ScaleDownError, finalError.AddPrefix("failed to delete at least one empty node: ")
}

// We look for only 1 node so new hints may be incomplete.
Expand All @@ -299,7 +300,7 @@ func (sd *ScaleDown) TryToScaleDown(nodes []*apiv1.Node, pods []*apiv1.Pod, pdbs
sd.podLocationHints, sd.usageTracker, time.Now(), pdbs)

if err != nil {
return ScaleDownError, fmt.Errorf("Find node to remove failed: %v", err)
return ScaleDownError, err.AddPrefix("Find node to remove failed: ")
}
if len(nodesToRemove) == 0 {
glog.V(1).Infof("No node to remove")
Expand All @@ -320,7 +321,7 @@ func (sd *ScaleDown) TryToScaleDown(nodes []*apiv1.Node, pods []*apiv1.Pod, pdbs
simulator.RemoveNodeFromTracker(sd.usageTracker, toRemove.Node.Name, sd.unneededNodes)
err = deleteNode(sd.context, toRemove.Node, toRemove.PodsToReschedule)
if err != nil {
return ScaleDownError, fmt.Errorf("Failed to delete %s: %v", toRemove.Node.Name, err)
return ScaleDownError, err.AddPrefix("Failed to delete %s: ", toRemove.Node.Name)
}
if readinessMap[toRemove.Node.Name] {
metrics.RegisterScaleDown(1, metrics.Underutilized)
Expand Down Expand Up @@ -373,7 +374,7 @@ func getEmptyNodes(candidates []*apiv1.Node, pods []*apiv1.Pod, maxEmptyBulkDele
return result[:limit]
}

func deleteNode(context *AutoscalingContext, node *apiv1.Node, pods []*apiv1.Pod) error {
func deleteNode(context *AutoscalingContext, node *apiv1.Node, pods []*apiv1.Pod) *errors.AutoscalerError {
if err := drainNode(node, pods, context.ClientSet, context.Recorder, context.MaxGratefulTerminationSec,
MaxPodEvictionTime, EvictionRetryTime); err != nil {
return err
Expand Down Expand Up @@ -410,13 +411,13 @@ func evictPod(podToEvict *apiv1.Pod, client kube_client.Interface, recorder kube
// Performs drain logic on the node. Marks the node as unschedulable and later removes all pods, giving
// them up to MaxGracefulTerminationTime to finish.
func drainNode(node *apiv1.Node, pods []*apiv1.Pod, client kube_client.Interface, recorder kube_record.EventRecorder,
maxGratefulTerminationSec int, maxPodEvictionTime time.Duration, waitBetweenRetries time.Duration) error {
maxGratefulTerminationSec int, maxPodEvictionTime time.Duration, waitBetweenRetries time.Duration) *errors.AutoscalerError {

drainSuccessful := false
toEvict := len(pods)
if err := deletetaint.MarkToBeDeleted(node, client); err != nil {
recorder.Eventf(node, apiv1.EventTypeWarning, "ScaleDownFailed", "failed to mark the node as toBeDeleted/unschedulable: %v", err)
return err
return errors.ToAutoscalerError(errors.ApiCallError, err)
}

// If we fail to evict all the pods from the node we want to remove delete taint
Expand Down Expand Up @@ -448,11 +449,13 @@ func drainNode(node *apiv1.Node, pods []*apiv1.Pod, client kube_client.Interface
metrics.RegisterEvictions(1)
}
case <-time.After(retryUntil.Sub(time.Now()) + 5*time.Second):
return fmt.Errorf("Failed to drain node %s/%s: timeout when waiting for creating evictions", node.Namespace, node.Name)
return errors.NewAutoscalerError(
errors.ApiCallError, "Failed to drain node %s/%s: timeout when waiting for creating evictions", node.Namespace, node.Name)
}
}
if len(evictionErrs) != 0 {
return fmt.Errorf("Failed to drain node %s/%s, due to following errors: %v", node.Namespace, node.Name, evictionErrs)
return errors.NewAutoscalerError(
errors.ApiCallError, "Failed to drain node %s/%s, due to following errors: %v", node.Namespace, node.Name, evictionErrs)
}

// Evictions created successfully, wait maxGratefulTerminationSec + PodEvictionHeadroom to see if pods really disappeared.
Expand All @@ -466,7 +469,7 @@ func drainNode(node *apiv1.Node, pods []*apiv1.Pod, client kube_client.Interface
allGone = false
break
}
if !errors.IsNotFound(err) {
if !kube_errors.IsNotFound(err) {
glog.Errorf("Failed to check pod %s/%s: %v", pod.Namespace, pod.Name, err)
allGone = false
}
Expand All @@ -478,7 +481,8 @@ func drainNode(node *apiv1.Node, pods []*apiv1.Pod, client kube_client.Interface
return nil
}
}
return fmt.Errorf("Failed to drain node %s/%s: pods remaining after timeout", node.Namespace, node.Name)
return errors.NewAutoscalerError(
errors.TransientError, "Failed to drain node %s/%s: pods remaining after timeout", node.Namespace, node.Name)
}

// cleanToBeDeleted cleans ToBeDeleted taints.
Expand All @@ -499,16 +503,17 @@ func cleanToBeDeleted(nodes []*apiv1.Node, client kube_client.Interface, recorde
// Removes the given node from cloud provider. No extra pre-deletion actions are executed on
// the Kubernetes side.
func deleteNodeFromCloudProvider(node *apiv1.Node, cloudProvider cloudprovider.CloudProvider,
recorder kube_record.EventRecorder, registry *clusterstate.ClusterStateRegistry) error {
recorder kube_record.EventRecorder, registry *clusterstate.ClusterStateRegistry) *errors.AutoscalerError {
nodeGroup, err := cloudProvider.NodeGroupForNode(node)
if err != nil {
return fmt.Errorf("failed to node group for %s: %v", node.Name, err)
return errors.NewAutoscalerError(
errors.CloudProviderError, "failed to find node group for %s: %v", node.Name, err)
}
if nodeGroup == nil || reflect.ValueOf(nodeGroup).IsNil() {
return fmt.Errorf("picked node that doesn't belong to a node group: %s", node.Name)
return errors.NewAutoscalerError(errors.InternalError, "picked node that doesn't belong to a node group: %s", node.Name)
}
if err = nodeGroup.DeleteNodes([]*apiv1.Node{node}); err != nil {
return fmt.Errorf("failed to delete %s: %v", node.Name, err)
return errors.NewAutoscalerError(errors.CloudProviderError, "failed to delete %s: %v", node.Name, err)
}
recorder.Eventf(node, apiv1.EventTypeNormal, "ScaleDown", "node removed by cluster autoscaler")
registry.RegisterScaleDown(&clusterstate.ScaleDownRequest{
Expand Down
22 changes: 15 additions & 7 deletions cluster-autoscaler/core/scale_up.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,13 +17,13 @@ limitations under the License.
package core

import (
"fmt"
"time"

"k8s.io/autoscaler/cluster-autoscaler/clusterstate"
"k8s.io/autoscaler/cluster-autoscaler/estimator"
"k8s.io/autoscaler/cluster-autoscaler/expander"
"k8s.io/autoscaler/cluster-autoscaler/metrics"
"k8s.io/autoscaler/cluster-autoscaler/utils/errors"
apiv1 "k8s.io/kubernetes/pkg/api/v1"
extensionsv1 "k8s.io/kubernetes/pkg/apis/extensions/v1beta1"
"k8s.io/kubernetes/plugin/pkg/scheduler/schedulercache"
Expand All @@ -35,7 +35,7 @@ import (
// false if it didn't and error if an error occured. Assumes that all nodes in the cluster are
// ready and in sync with instance groups.
func ScaleUp(context *AutoscalingContext, unschedulablePods []*apiv1.Pod, nodes []*apiv1.Node,
daemonSets []*extensionsv1.DaemonSet) (bool, error) {
daemonSets []*extensionsv1.DaemonSet) (bool, *errors.AutoscalerError) {
// From now on we only care about unschedulable pods that were marked after the newest
// node became available for the scheduler.
if len(unschedulablePods) == 0 {
Expand All @@ -49,14 +49,17 @@ func ScaleUp(context *AutoscalingContext, unschedulablePods []*apiv1.Pod, nodes
nodeInfos, err := GetNodeInfosForGroups(nodes, context.CloudProvider, context.ClientSet,
daemonSets, context.PredicateChecker)
if err != nil {
return false, fmt.Errorf("failed to build node infos for node groups: %v", err)
return false, err.AddPrefix("failed to build node infos for node groups: ")
}

upcomingNodes := make([]*schedulercache.NodeInfo, 0)
for nodeGroup, numberOfNodes := range context.ClusterStateRegistry.GetUpcomingNodes() {
nodeTemplate, found := nodeInfos[nodeGroup]
if !found {
return false, fmt.Errorf("failed to find template node for node group %s", nodeGroup)
return false, errors.NewAutoscalerError(
errors.InternalError,
"failed to find template node for node group %s",
nodeGroup)
}
for i := 0; i < numberOfNodes; i++ {
upcomingNodes = append(upcomingNodes, nodeTemplate)
Expand Down Expand Up @@ -153,7 +156,9 @@ func ScaleUp(context *AutoscalingContext, unschedulablePods []*apiv1.Pod, nodes

currentSize, err := bestOption.NodeGroup.TargetSize()
if err != nil {
return false, fmt.Errorf("failed to get node group size: %v", err)
return false, errors.NewAutoscalerError(
errors.CloudProviderError,
"failed to get node group size: %v", err)
}
newSize := currentSize + bestOption.NodeCount
if newSize >= bestOption.NodeGroup.MaxSize() {
Expand All @@ -165,14 +170,17 @@ func ScaleUp(context *AutoscalingContext, unschedulablePods []*apiv1.Pod, nodes
glog.V(1).Infof("Capping size to max cluster total size (%d)", context.MaxNodesTotal)
newSize = context.MaxNodesTotal - len(nodes) + currentSize
if newSize < currentSize {
return false, fmt.Errorf("max node total count already reached")
return false, errors.NewAutoscalerError(
errors.TransientError,
"max node total count already reached")
}
}

glog.V(0).Infof("Scale-up: setting group %s size to %d", bestOption.NodeGroup.Id(), newSize)
increase := newSize - currentSize
if err := bestOption.NodeGroup.IncreaseSize(increase); err != nil {
return false, fmt.Errorf("failed to increase node group size: %v", err)
return false, errors.NewAutoscalerError(
errors.CloudProviderError, "failed to increase node group size: %v", err)
}
context.ClusterStateRegistry.RegisterScaleUp(
&clusterstate.ScaleUpRequest{
Expand Down
Loading

0 comments on commit 9083c13

Please sign in to comment.