Skip to content

Commit

Permalink
cleanup: refactor Azure cache and remove redundant API calls
Browse files Browse the repository at this point in the history
  • Loading branch information
Cecile Robert-Michon committed Dec 3, 2020
1 parent b201c8d commit a0ba266
Show file tree
Hide file tree
Showing 12 changed files with 509 additions and 775 deletions.
2 changes: 1 addition & 1 deletion cluster-autoscaler/cloudprovider/azure/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -153,7 +153,7 @@ In addition, cluster-autoscaler exposes a `AZURE_VMSS_CACHE_TTL` environment var

| Config Name | Default | Environment Variable | Cloud Config File |
| ----------- | ------- | -------------------- | ----------------- |
| VmssCacheTTL | 15 | AZURE_VMSS_CACHE_TTL | vmssCacheTTL |
| VmssCacheTTL | 60 | AZURE_VMSS_CACHE_TTL | vmssCacheTTL |

The `AZURE_VMSS_VMS_CACHE_TTL` environment variable affects the `GetScaleSetVms` (VMSS VM List) calls rate. The default value is 300 seconds.
A configurable jitter (`AZURE_VMSS_VMS_CACHE_JITTER` environment variable, default 0) expresses the maximum number of second that will be subtracted from that initial VMSS cache TTL after a new VMSS is discovered by the cluster-autoscaler: this can prevent a dogpile effect on clusters having many VMSS.
Expand Down
123 changes: 26 additions & 97 deletions cluster-autoscaler/cloudprovider/azure/azure_agent_pool.go
Original file line number Diff line number Diff line change
Expand Up @@ -35,21 +35,13 @@ import (
"k8s.io/autoscaler/cluster-autoscaler/config/dynamic"
klog "k8s.io/klog/v2"
schedulerframework "k8s.io/kubernetes/pkg/scheduler/framework"
"k8s.io/legacy-cloud-providers/azure/retry"
)

const (
vmInstancesRefreshPeriod = 5 * time.Minute
clusterAutoscalerDeploymentPrefix = `cluster-autoscaler-`
defaultMaxDeploymentsCount = 10
)

var virtualMachinesStatusCache struct {
lastRefresh map[string]time.Time
mutex sync.Mutex
virtualMachines map[string][]compute.VirtualMachine
}

// AgentPool implements NodeGroup interface for agent pools deployed by aks-engine.
type AgentPool struct {
azureRef
Expand Down Expand Up @@ -132,54 +124,24 @@ func (as *AgentPool) MaxSize() int {
return as.maxSize
}

func (as *AgentPool) getVirtualMachinesFromCache() ([]compute.VirtualMachine, error) {
virtualMachinesStatusCache.mutex.Lock()
defer virtualMachinesStatusCache.mutex.Unlock()
klog.V(4).Infof("getVirtualMachinesFromCache: starts for %+v", as)

if virtualMachinesStatusCache.virtualMachines == nil {
klog.V(4).Infof("getVirtualMachinesFromCache: initialize vm cache")
virtualMachinesStatusCache.virtualMachines = make(map[string][]compute.VirtualMachine)
}
if virtualMachinesStatusCache.lastRefresh == nil {
klog.V(4).Infof("getVirtualMachinesFromCache: initialize last refresh time cache")
virtualMachinesStatusCache.lastRefresh = make(map[string]time.Time)
}

if virtualMachinesStatusCache.lastRefresh[as.Id()].Add(vmInstancesRefreshPeriod).After(time.Now()) {
klog.V(4).Infof("getVirtualMachinesFromCache: get vms from cache")
return virtualMachinesStatusCache.virtualMachines[as.Id()], nil
}
klog.V(4).Infof("getVirtualMachinesFromCache: get vms from API")
vms, rerr := as.GetVirtualMachines()
klog.V(4).Infof("getVirtualMachinesFromCache: got vms from API, len = %d", len(vms))

if rerr != nil {
if isAzureRequestsThrottled(rerr) {
klog.Warningf("getAllVirtualMachines: throttling with message %v, would return the cached vms", rerr)
return virtualMachinesStatusCache.virtualMachines[as.Id()], nil
}

return []compute.VirtualMachine{}, rerr.Error()
}

virtualMachinesStatusCache.virtualMachines[as.Id()] = vms
virtualMachinesStatusCache.lastRefresh[as.Id()] = time.Now()

return vms, nil
// Id returns AgentPool id.
func (as *AgentPool) Id() string {
return as.Name
}

func invalidateVMCache(agentpoolName string) {
virtualMachinesStatusCache.mutex.Lock()
virtualMachinesStatusCache.lastRefresh[agentpoolName] = time.Now().Add(-1 * vmInstancesRefreshPeriod)
virtualMachinesStatusCache.mutex.Unlock()
func (as *AgentPool) getVMsFromCache() ([]compute.VirtualMachine, error) {
allVMs := as.manager.azureCache.getVirtualMachines()
if _, exists := allVMs[as.Name]; !exists {
return []compute.VirtualMachine{}, fmt.Errorf("could not find VMs with poolName: %s", as.Name)
}
return allVMs[as.Name], nil
}

// GetVMIndexes gets indexes of all virtual machines belonging to the agent pool.
func (as *AgentPool) GetVMIndexes() ([]int, map[int]string, error) {
klog.V(6).Infof("GetVMIndexes: starts for as %v", as)

instances, err := as.getVirtualMachinesFromCache()
instances, err := as.getVMsFromCache()
if err != nil {
return nil, nil, err
}
Expand Down Expand Up @@ -222,8 +184,8 @@ func (as *AgentPool) getCurSize() (int64, error) {
klog.V(5).Infof("Returning agent pool (%q) size: %d\n", as.Name, len(indexes))

if as.curSize != int64(len(indexes)) {
klog.V(6).Infof("getCurSize:as.curSize(%d) != real size (%d), invalidating vm cache", as.curSize, len(indexes))
invalidateVMCache(as.Id())
klog.V(6).Infof("getCurSize:as.curSize(%d) != real size (%d), invalidating cache", as.curSize, len(indexes))
as.manager.invalidateCache()
}

as.curSize = int64(len(indexes))
Expand Down Expand Up @@ -316,8 +278,8 @@ func (as *AgentPool) IncreaseSize(delta int) error {
klog.Warningf("IncreaseSize: failed to cleanup outdated deployments with err: %v.", err)
}

klog.V(6).Infof("IncreaseSize: invalidating vm cache")
invalidateVMCache(as.Id())
klog.V(6).Infof("IncreaseSize: invalidating cache")
as.manager.invalidateCache()

indexes, _, err := as.GetVMIndexes()
if err != nil {
Expand Down Expand Up @@ -357,43 +319,15 @@ func (as *AgentPool) IncreaseSize(delta int) error {
// Update cache after scale success.
as.curSize = int64(expectedSize)
as.lastRefresh = time.Now()
klog.V(6).Info("IncreaseSize: invalidating vm cache")
invalidateVMCache(as.Id())
klog.V(6).Info("IncreaseSize: invalidating cache")
as.manager.invalidateCache()
return nil
}

klog.Errorf("deploymentsClient.CreateOrUpdate for deployment %q failed: %v", newDeploymentName, realError)
return realError
}

// GetVirtualMachines returns list of nodes for the given agent pool.
func (as *AgentPool) GetVirtualMachines() ([]compute.VirtualMachine, *retry.Error) {
ctx, cancel := getContextWithCancel()
defer cancel()

result, rerr := as.manager.azClient.virtualMachinesClient.List(ctx, as.manager.config.ResourceGroup)
if rerr != nil {
return nil, rerr
}

instances := make([]compute.VirtualMachine, 0)
for _, instance := range result {
if instance.Tags == nil {
continue
}

tags := instance.Tags
vmPoolName := tags["poolName"]
if vmPoolName == nil || !strings.EqualFold(*vmPoolName, as.Id()) {
continue
}

instances = append(instances, instance)
}

return instances, nil
}

// DecreaseTargetSize decreases the target size of the node group. This function
// doesn't permit to delete any existing node and can be used only to reduce the
// request for new nodes that have not been yet fulfilled. Delta should be negative.
Expand All @@ -403,7 +337,7 @@ func (as *AgentPool) DecreaseTargetSize(delta int) error {
as.mutex.Lock()
defer as.mutex.Unlock()

nodes, err := as.getVirtualMachinesFromCache()
nodes, err := as.getVMsFromCache()
if err != nil {
return err
}
Expand All @@ -427,14 +361,14 @@ func (as *AgentPool) Belongs(node *apiv1.Node) (bool, error) {
Name: node.Spec.ProviderID,
}

targetAsg, err := as.manager.GetAsgForInstance(ref)
targetAsg, err := as.manager.GetNodeGroupForInstance(ref)
if err != nil {
return false, err
}
if targetAsg == nil {
return false, fmt.Errorf("%s doesn't belong to a known agent pool", node.Name)
}
if !strings.EqualFold(targetAsg.Id(), as.Id()) {
if !strings.EqualFold(targetAsg.Id(), as.Name) {
return false, nil
}
return true, nil
Expand All @@ -446,13 +380,13 @@ func (as *AgentPool) DeleteInstances(instances []*azureRef) error {
return nil
}

commonAsg, err := as.manager.GetAsgForInstance(instances[0])
commonAsg, err := as.manager.GetNodeGroupForInstance(instances[0])
if err != nil {
return err
}

for _, instance := range instances {
asg, err := as.manager.GetAsgForInstance(instance)
asg, err := as.manager.GetNodeGroupForInstance(instance)
if err != nil {
return err
}
Expand All @@ -476,8 +410,8 @@ func (as *AgentPool) DeleteInstances(instances []*azureRef) error {
}
}

klog.V(6).Infof("DeleteInstances: invalidating vm cache")
invalidateVMCache(as.Id())
klog.V(6).Infof("DeleteInstances: invalidating cache")
as.manager.invalidateCache()
return nil
}

Expand All @@ -501,7 +435,7 @@ func (as *AgentPool) DeleteNodes(nodes []*apiv1.Node) error {
}

if belongs != true {
return fmt.Errorf("%s belongs to a different asg than %s", node.Name, as.Id())
return fmt.Errorf("%s belongs to a different asg than %s", node.Name, as.Name)
}

ref := &azureRef{
Expand All @@ -518,14 +452,9 @@ func (as *AgentPool) DeleteNodes(nodes []*apiv1.Node) error {
return as.DeleteInstances(refs)
}

// Id returns AgentPool id.
func (as *AgentPool) Id() string {
return as.Name
}

// Debug returns a debug string for the agent pool.
func (as *AgentPool) Debug() string {
return fmt.Sprintf("%s (%d:%d)", as.Id(), as.MinSize(), as.MaxSize())
return fmt.Sprintf("%s (%d:%d)", as.Name, as.MinSize(), as.MaxSize())
}

// TemplateNodeInfo returns a node template for this agent pool.
Expand All @@ -535,7 +464,7 @@ func (as *AgentPool) TemplateNodeInfo() (*schedulerframework.NodeInfo, error) {

// Nodes returns a list of all nodes that belong to this node group.
func (as *AgentPool) Nodes() ([]cloudprovider.Instance, error) {
instances, err := as.getVirtualMachinesFromCache()
instances, err := as.getVMsFromCache()
if err != nil {
return nil, err
}
Expand Down
Loading

0 comments on commit a0ba266

Please sign in to comment.