Skip to content

Commit

Permalink
Fail fast if OCI instance pool is out of capacity/quota.
Browse files Browse the repository at this point in the history
  • Loading branch information
jlamillan committed Nov 28, 2022
1 parent 81181ec commit f0d25c6
Show file tree
Hide file tree
Showing 6 changed files with 244 additions and 33 deletions.
3 changes: 3 additions & 0 deletions cluster-autoscaler/cloudprovider/oci/oci_instance_pool.go
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,9 @@ const (
instanceIDLabelSuffix = "instance-id_suffix"
ociInstancePoolIDAnnotation = "oci.oraclecloud.com/instancepool-id"
ociInstancePoolResourceIdent = "instancepool"
ociInstancePoolLaunchOp = "LaunchInstancesInPool"
instanceStateUnfulfilled = "Unfulfilled"
instanceIDUnfulfilled = "instance_placeholder"

// Overload ociInstancePoolIDAnnotation to indicate a kubernetes node doesn't belong to any OCI Instance Pool.
ociInstancePoolIDNonPoolMember = "non_pool_member"
Expand Down
186 changes: 165 additions & 21 deletions cluster-autoscaler/cloudprovider/oci/oci_instance_pool_cache.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,10 +18,12 @@ package oci

import (
"context"
"fmt"
"github.com/pkg/errors"
"k8s.io/apimachinery/pkg/util/wait"
"k8s.io/autoscaler/cluster-autoscaler/cloudprovider/oci/oci-go-sdk/v43/common"
"k8s.io/autoscaler/cluster-autoscaler/cloudprovider/oci/oci-go-sdk/v43/core"
"k8s.io/autoscaler/cluster-autoscaler/cloudprovider/oci/oci-go-sdk/v43/workrequests"
"k8s.io/klog/v2"
"math"
"strings"
Expand All @@ -48,6 +50,13 @@ type VirtualNetworkClient interface {
GetVnic(context.Context, core.GetVnicRequest) (core.GetVnicResponse, error)
}

// WorkRequestClient wraps workrequests.WorkRequestClient exposing the functions we actually require.
type WorkRequestClient interface {
GetWorkRequest(context.Context, workrequests.GetWorkRequestRequest) (workrequests.GetWorkRequestResponse, error)
ListWorkRequests(context.Context, workrequests.ListWorkRequestsRequest) (workrequests.ListWorkRequestsResponse, error)
ListWorkRequestErrors(context.Context, workrequests.ListWorkRequestErrorsRequest) (workrequests.ListWorkRequestErrorsResponse, error)
}

type instancePoolCache struct {
mu sync.Mutex
poolCache map[string]*core.InstancePool
Expand All @@ -57,16 +66,18 @@ type instancePoolCache struct {
computeManagementClient ComputeMgmtClient
computeClient ComputeClient
virtualNetworkClient VirtualNetworkClient
workRequestsClient WorkRequestClient
}

func newInstancePoolCache(computeManagementClient ComputeMgmtClient, computeClient ComputeClient, virtualNetworkClient VirtualNetworkClient) *instancePoolCache {
func newInstancePoolCache(computeManagementClient ComputeMgmtClient, computeClient ComputeClient, virtualNetworkClient VirtualNetworkClient, workRequestsClient WorkRequestClient) *instancePoolCache {
return &instancePoolCache{
poolCache: map[string]*core.InstancePool{},
instanceSummaryCache: map[string]*[]core.InstanceSummary{},
unownedInstances: map[OciRef]bool{},
computeManagementClient: computeManagementClient,
computeClient: computeClient,
virtualNetworkClient: virtualNetworkClient,
workRequestsClient: workRequestsClient,
}
}

Expand All @@ -82,16 +93,16 @@ func (c *instancePoolCache) rebuild(staticInstancePools map[string]*InstancePool
// Since we only support static instance-pools we don't need to worry about pruning.

for id := range staticInstancePools {
resp, err := c.computeManagementClient.GetInstancePool(context.Background(), core.GetInstancePoolRequest{
getInstancePoolResp, err := c.computeManagementClient.GetInstancePool(context.Background(), core.GetInstancePoolRequest{
InstancePoolId: common.String(id),
})
if err != nil {
klog.Errorf("get instance pool %s failed: %v", id, err)
return err
}
klog.V(6).Infof("GetInstancePool() response %v", resp.InstancePool)
klog.V(6).Infof("GetInstancePool() response %v", getInstancePoolResp.InstancePool)

c.setInstancePool(&resp.InstancePool)
c.setInstancePool(&getInstancePoolResp.InstancePool)

var instanceSummaries []core.InstanceSummary
var page *string
Expand All @@ -112,7 +123,32 @@ func (c *instancePoolCache) rebuild(staticInstancePools map[string]*InstancePool
break
}
}
c.setInstanceSummaries(*resp.InstancePool.Id, &instanceSummaries)
c.setInstanceSummaries(id, &instanceSummaries)
// Compare instance pool's size with the latest number of InstanceSummaries. If found, look for unrecoverable
// errors such as quota or capacity issues in scaling pool.
if len(*c.instanceSummaryCache[id]) < *c.poolCache[id].Size {
klog.V(4).Infof("Instance pool %s has only %d instances created while requested count is %d. ",
*getInstancePoolResp.InstancePool.DisplayName, len(*c.instanceSummaryCache[id]), *c.poolCache[id].Size)

if getInstancePoolResp.LifecycleState != core.InstancePoolLifecycleStateRunning {
lastWorkRequest, err := c.lastStartedWorkRequest(*getInstancePoolResp.CompartmentId, id)

// The last started work request may be many minutes old depending on sync interval
// and exponential backoff time of OCI retried OCI operations.
if err == nil && *lastWorkRequest.OperationType == ociInstancePoolLaunchOp &&
lastWorkRequest.Status == workrequests.WorkRequestSummaryStatusFailed {
unrecoverableErrorMsg := c.firstUnrecoverableErrorForWorkRequest(*lastWorkRequest.Id)
if unrecoverableErrorMsg != "" {
klog.V(4).Infof("Creating placeholder instances for %s.", *getInstancePoolResp.InstancePool.DisplayName)
for i := len(*c.instanceSummaryCache[id]); i < *c.poolCache[id].Size; i++ {
c.addUnfulfilledInstanceToCache(id, fmt.Sprintf("%s%s-%d", instanceIDUnfulfilled,
*getInstancePoolResp.InstancePool.Id, i), *getInstancePoolResp.InstancePool.CompartmentId,
fmt.Sprintf("%s-%d", *getInstancePoolResp.InstancePool.DisplayName, i))
}
}
}
}
}
}

// Reset unowned instances cache.
Expand All @@ -121,6 +157,15 @@ func (c *instancePoolCache) rebuild(staticInstancePools map[string]*InstancePool
return nil
}

func (c *instancePoolCache) addUnfulfilledInstanceToCache(instancePoolID, instanceID, compartmentID, name string) {
*c.instanceSummaryCache[instancePoolID] = append(*c.instanceSummaryCache[instancePoolID], core.InstanceSummary{
Id: common.String(instanceID),
CompartmentId: common.String(compartmentID),
State: common.String(instanceStateUnfulfilled),
DisplayName: common.String(name),
})
}

// removeInstance tries to remove the instance from the specified instance pool. If the instance isn't in the array,
// then it won't do anything removeInstance returns true if it actually removed the instance and reduced the size of
// the instance pool.
Expand All @@ -131,19 +176,27 @@ func (c *instancePoolCache) removeInstance(instancePool InstancePoolNodeGroup, i
return false
}

_, err := c.computeManagementClient.DetachInstancePoolInstance(context.Background(), core.DetachInstancePoolInstanceRequest{
InstancePoolId: common.String(instancePool.Id()),
DetachInstancePoolInstanceDetails: core.DetachInstancePoolInstanceDetails{
InstanceId: common.String(instanceID),
IsDecrementSize: common.Bool(true),
IsAutoTerminate: common.Bool(true),
},
})
var err error
if strings.Contains(instanceID, instanceIDUnfulfilled) {
// For an unfulfilled instance, reduce the target size of the instance pool and remove the placeholder instance from cache.
err = c.setSize(instancePool.Id(), *c.poolCache[instancePool.Id()].Size-1)
} else {
_, err = c.computeManagementClient.DetachInstancePoolInstance(context.Background(), core.DetachInstancePoolInstanceRequest{
InstancePoolId: common.String(instancePool.Id()),
DetachInstancePoolInstanceDetails: core.DetachInstancePoolInstanceDetails{
InstanceId: common.String(instanceID),
IsDecrementSize: common.Bool(true),
IsAutoTerminate: common.Bool(true),
},
})
}

if err == nil {
c.mu.Lock()
// Decrease pool size in cache since IsDecrementSize was true
// Decrease pool size in cache
c.poolCache[instancePool.Id()].Size = common.Int(*c.poolCache[instancePool.Id()].Size - 1)
// Since we're removing the instance from cache, we don't need to expire the pool cache
c.removeInstanceSummaryFromCache(instancePool.Id(), instanceID)
c.mu.Unlock()
return true
}
Expand All @@ -156,6 +209,12 @@ func (c *instancePoolCache) removeInstance(instancePool InstancePoolNodeGroup, i
// through the configured instance-pools (ListInstancePoolInstances) for a match.
func (c *instancePoolCache) findInstanceByDetails(ociInstance OciRef) (*OciRef, error) {

// Unfilled instance placeholder
if strings.Contains(ociInstance.Name, instanceIDUnfulfilled) {
instIndex := strings.LastIndex(ociInstance.Name, "-")
ociInstance.PoolID = strings.Replace(ociInstance.Name[:instIndex], instanceIDUnfulfilled, "", 1)
return &ociInstance, nil
}
// Minimum amount of information we need to make a positive match
if ociInstance.InstanceID == "" && ociInstance.PrivateIPAddress == "" && ociInstance.PublicIPAddress == "" {
return nil, errors.New("instance id or an IP address is required to resolve details")
Expand Down Expand Up @@ -321,6 +380,7 @@ func (c *instancePoolCache) setSize(instancePoolID string, size int) error {
return err
}

isScaleUp := size > *getInstancePoolResp.Size
scaleDelta := int(math.Abs(float64(*getInstancePoolResp.Size - size)))

updateDetails := core.UpdateInstancePoolDetails{
Expand All @@ -336,28 +396,32 @@ func (c *instancePoolCache) setSize(instancePoolID string, size int) error {
return err
}

c.mu.Lock()
c.poolCache[instancePoolID].Size = common.Int(size)
c.mu.Unlock()

// Just return Immediately if this was a scale down to be consistent with DetachInstancePoolInstance
if !isScaleUp {
return nil
}

// Only wait for scale up (not scale down)
ctx := context.Background()
ctx, cancelFunc := context.WithTimeout(ctx, maxScalingWaitTime(scaleDelta, 20, 10*time.Minute))
// Ensure this context is always canceled so channels, go routines, etc. always complete.
defer cancelFunc()

// Wait for the number of Running instances in this pool to reach size
err = c.waitForRunningInstanceCount(ctx, size, instancePoolID, *getInstancePoolResp.CompartmentId)
if err != nil {
return err
}

// Allow an additional time for the pool State to reach Running
ctx, _ = context.WithTimeout(ctx, 10*time.Minute)
err = c.waitForState(ctx, instancePoolID, core.InstancePoolLifecycleStateRunning)
if err != nil {
return err
}

c.mu.Lock()
c.poolCache[instancePoolID].Size = common.Int(size)
c.mu.Unlock()

return nil
}

Expand Down Expand Up @@ -446,10 +510,21 @@ func (c *instancePoolCache) monitorScalingProgress(ctx context.Context, target i
return
}

// Fail scale (up) operation fast by watching for unrecoverable errors such as quota or capacity issues
lastWorkRequest, err := c.lastStartedWorkRequest(compartmentID, instancePoolID)
if err == nil && *lastWorkRequest.OperationType == ociInstancePoolLaunchOp &&
lastWorkRequest.Status == workrequests.WorkRequestSummaryStatusInProgress {
unrecoverableErrorMsg := c.firstUnrecoverableErrorForWorkRequest(*lastWorkRequest.Id)
if unrecoverableErrorMsg != "" {
errCh <- errors.New(unrecoverableErrorMsg)
return
}
}

var page *string
numRunningInstances := 0
for {
// List instances in the pool
// Next, wait until the number of instances in the pool reaches the target
listInstancePoolInstances, err := c.computeManagementClient.ListInstancePoolInstances(context.Background(), core.ListInstancePoolInstancesRequest{
InstancePoolId: common.String(instancePoolID),
CompartmentId: common.String(compartmentID),
Expand Down Expand Up @@ -509,6 +584,21 @@ func (c *instancePoolCache) getSize(id string) (int, error) {
return *pool.Size, nil
}

// removeInstanceSummaryFromCache removes looks through the pool cache for an InstanceSummary with the specified ID and
// removes it if found
func (c *instancePoolCache) removeInstanceSummaryFromCache(instancePoolID, instanceID string) {
var instanceSummaries []core.InstanceSummary

if instanceSummaryCache, found := c.instanceSummaryCache[instancePoolID]; found {
for _, instanceSummary := range *instanceSummaryCache {
if instanceSummary.Id != nil && *instanceSummary.Id != instanceID {
instanceSummaries = append(instanceSummaries, instanceSummary)
}
}
c.instanceSummaryCache[instancePoolID] = &instanceSummaries
}
}

// maxScalingWaitTime estimates the maximum amount of time, as a duration, that to scale size instances.
// note, larger scale operations are broken up internally to smaller batches. This is an internal detail
// and can be overridden on a tenancy basis. 20 is a good default.
Expand All @@ -528,3 +618,57 @@ func maxScalingWaitTime(size, batchSize int, timePerBatch time.Duration) time.Du

return maxScalingWaitTime + buffer
}

// lastStartedWorkRequest returns the *last started* work request for the specified resource or an error if none are found
func (c *instancePoolCache) lastStartedWorkRequest(compartmentID, resourceID string) (workrequests.WorkRequestSummary, error) {

klog.V(6).Infof("Looking for the last started work request for resource %s.", resourceID)
listWorkRequests, err := c.workRequestsClient.ListWorkRequests(context.Background(), workrequests.ListWorkRequestsRequest{
CompartmentId: common.String(compartmentID),
Limit: common.Int(100),
ResourceId: common.String(resourceID),
})
if err != nil {
klog.Errorf("list work requests for %s failed: %v", resourceID, err)
return workrequests.WorkRequestSummary{}, err
}

var lastStartedWorkRequest = workrequests.WorkRequestSummary{}
for i, nextWorkRequest := range listWorkRequests.Items {
if i == 0 && nextWorkRequest.TimeStarted != nil {
lastStartedWorkRequest = nextWorkRequest
} else {
if nextWorkRequest.TimeStarted != nil && nextWorkRequest.TimeStarted.After(lastStartedWorkRequest.TimeStarted.Time) {
lastStartedWorkRequest = nextWorkRequest
}
}
}

if lastStartedWorkRequest.TimeStarted != nil {
return lastStartedWorkRequest, nil
}

return workrequests.WorkRequestSummary{}, errors.New("no work requests found")
}

// firstUnrecoverableErrorForWorkRequest returns the first non-recoverable error message associated with the specified
// work-request ID, or the empty string if none are found.
func (c *instancePoolCache) firstUnrecoverableErrorForWorkRequest(workRequestID string) string {

klog.V(6).Infof("Looking for non-recoverable errors for work request %s.", workRequestID)
// Look through the error logs looking for known unrecoverable error messages(s)
workRequestErrors, _ := c.workRequestsClient.ListWorkRequestErrors(context.Background(),
workrequests.ListWorkRequestErrorsRequest{WorkRequestId: common.String(workRequestID),
SortOrder: workrequests.ListWorkRequestErrorsSortOrderDesc})
for _, nextErr := range workRequestErrors.Items {
// Abort wait for certain unrecoverable errors such as capacity and quota issues
if strings.Contains(strings.ToLower(*nextErr.Message), strings.ToLower("QuotaExceeded")) ||
strings.Contains(strings.ToLower(*nextErr.Message), strings.ToLower("LimitExceeded")) ||
strings.Contains(strings.ToLower(*nextErr.Message), strings.ToLower("OutOfCapacity")) {
klog.V(4).Infof("Found unrecoverable error(s) in work request %s.", workRequestID)
return *nextErr.Message
}
}
klog.V(6).Infof("No non-recoverable errors for work request %s found.", workRequestID)
return ""
}
Loading

0 comments on commit f0d25c6

Please sign in to comment.