Skip to content

Commit

Permalink
Merge pull request #172 from Vacant2333/fix-isUnavailable
Browse files Browse the repository at this point in the history
fix: fallback to OnDemand when Spot is not available
  • Loading branch information
jwcesign authored Dec 31, 2024
2 parents 9fb2451 + 5d2e077 commit 7898a13
Show file tree
Hide file tree
Showing 6 changed files with 5,975 additions and 671 deletions.
10 changes: 0 additions & 10 deletions pkg/cache/unavailableofferings.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,11 +22,6 @@ import (

"github.com/patrickmn/go-cache"
"knative.dev/pkg/logging"
karpv1 "sigs.k8s.io/karpenter/pkg/apis/v1"
)

var (
spotKey = key("", "", karpv1.CapacityTypeSpot)
)

// UnavailableOfferings stores any offerings that return ICE (insufficient capacity errors) when
Expand Down Expand Up @@ -56,11 +51,6 @@ func NewUnavailableOfferings() *UnavailableOfferings {

// IsUnavailable returns true if the offering appears in the cache
func (u *UnavailableOfferings) IsUnavailable(instanceType, zone, capacityType string) bool {
if capacityType == karpv1.CapacityTypeSpot {
if _, found := u.cache.Get(spotKey); found {
return true
}
}
_, found := u.cache.Get(key(instanceType, zone, capacityType))
return found
}
Expand Down
2 changes: 1 addition & 1 deletion pkg/controllers/interruption/controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -85,7 +85,7 @@ func (c *Controller) Reconcile(ctx context.Context, node *corev1.Node) (reconcil
return reconcile.Result{}, nil
}

func (c *Controller) Register(ctx context.Context, m manager.Manager) error {
func (c *Controller) Register(_ context.Context, m manager.Manager) error {
return controllerruntime.NewControllerManagedBy(m).
Named("interruption").
For(&corev1.Node{}).
Expand Down
7 changes: 4 additions & 3 deletions pkg/controllers/nodeclaim/garbagecollection/controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -39,9 +39,10 @@ import (
)

type Controller struct {
kubeClient client.Client
cloudProvider cloudprovider.CloudProvider
successfulCount uint64 // keeps track of successful reconciles for more aggressive requeueing near the start of the controller
kubeClient client.Client
cloudProvider cloudprovider.CloudProvider
// Keeps track of successful reconciles for more aggressive requeueing near the start of the controller.
successfulCount uint64
}

func NewController(kubeClient client.Client, cloudProvider cloudprovider.CloudProvider) *Controller {
Expand Down
106 changes: 73 additions & 33 deletions pkg/providers/instancetype/instancetype.go
Original file line number Diff line number Diff line change
Expand Up @@ -66,8 +66,9 @@ type DefaultProvider struct {

instanceTypesInfo []*ecsclient.DescribeInstanceTypesResponseBodyInstanceTypesInstanceType

muInstanceTypesOfferings sync.RWMutex
instanceTypesOfferings map[string]sets.Set[string]
muInstanceTypesOfferings sync.RWMutex
instanceTypesOfferings map[string]sets.Set[string]
spotInstanceTypesOfferings map[string]sets.Set[string]

instanceTypesCache *cache.Cache

Expand All @@ -83,17 +84,18 @@ func NewDefaultProvider(region string, kubeClient client.Client, ecsClient *ecsc
instanceTypesCache *cache.Cache, unavailableOfferingsCache *kcache.UnavailableOfferings,
pricingProvider pricing.Provider, ackProvider ack.Provider) *DefaultProvider {
return &DefaultProvider{
kubeClient: kubeClient,
ecsClient: ecsClient,
region: region,
pricingProvider: pricingProvider,
ackProvider: ackProvider,
instanceTypesInfo: []*ecsclient.DescribeInstanceTypesResponseBodyInstanceTypesInstanceType{},
instanceTypesOfferings: map[string]sets.Set[string]{},
instanceTypesCache: instanceTypesCache,
unavailableOfferings: unavailableOfferingsCache,
cm: pretty.NewChangeMonitor(),
instanceTypesSeqNum: 0,
kubeClient: kubeClient,
ecsClient: ecsClient,
region: region,
pricingProvider: pricingProvider,
ackProvider: ackProvider,
instanceTypesInfo: []*ecsclient.DescribeInstanceTypesResponseBodyInstanceTypesInstanceType{},
instanceTypesOfferings: map[string]sets.Set[string]{},
spotInstanceTypesOfferings: map[string]sets.Set[string]{},
instanceTypesCache: instanceTypesCache,
unavailableOfferings: unavailableOfferingsCache,
cm: pretty.NewChangeMonitor(),
instanceTypesSeqNum: 0,
}
}

Expand All @@ -111,6 +113,9 @@ func (p *DefaultProvider) validateState(nodeClass *v1alpha1.ECSNodeClass) error
if len(p.instanceTypesOfferings) == 0 {
return errors.New("no instance types offerings found")
}
if len(p.spotInstanceTypesOfferings) == 0 {
return errors.New("no spot instance types offerings found")
}
if len(nodeClass.Status.VSwitches) == 0 {
return errors.New("no vswitches found")
}
Expand Down Expand Up @@ -182,13 +187,15 @@ func (p *DefaultProvider) List(ctx context.Context, kc *v1alpha1.KubeletConfigur
zoneData := lo.Map(allZones.UnsortedList(), func(zoneID string, _ int) ZoneData {
if !p.instanceTypesOfferings[lo.FromPtr(i.InstanceTypeId)].Has(zoneID) || !vSwitchsZones.Has(zoneID) {
return ZoneData{
ID: zoneID,
Available: false,
ID: zoneID,
Available: false,
SpotAvailable: false,
}
}
return ZoneData{
ID: zoneID,
Available: true,
ID: zoneID,
Available: true,
SpotAvailable: p.spotInstanceTypesOfferings[lo.FromPtr(i.InstanceTypeId)].Has(zoneID),
}
})

Expand Down Expand Up @@ -271,7 +278,40 @@ func (p *DefaultProvider) UpdateInstanceTypeOfferings(ctx context.Context) error
log.FromContext(ctx).Error(err, "failed to get instance type offerings")
return err
}
if err := processAvailableResourcesResponse(resp, instanceTypesOfferings); err != nil {
log.FromContext(ctx).Error(err, "failed to process available resource response")
return err
}

if p.cm.HasChanged("instance-type-offering", instanceTypesOfferings) {
// Only update instanceTypesSeqNun with the instance type offerings have been changed
// This is to not create new keys with duplicate instance type offerings option
atomic.AddUint64(&p.instanceTypesOfferingsSeqNum, 1)
log.FromContext(ctx).WithValues("instance-type-count", len(instanceTypesOfferings)).V(1).Info("discovered offerings for instance types")
}
p.instanceTypesOfferings = instanceTypesOfferings

spotInstanceTypesOfferings := map[string]sets.Set[string]{}
describeAvailableResourceRequest = &ecsclient.DescribeAvailableResourceRequest{
RegionId: tea.String(p.region),
DestinationResource: tea.String("InstanceType"),
SpotStrategy: tea.String("SpotAsPriceGo"),
}
resp, err = p.ecsClient.DescribeAvailableResourceWithOptions(
describeAvailableResourceRequest, &util.RuntimeOptions{})
if err != nil {
log.FromContext(ctx).Error(err, "failed to get spot instance type offerings")
return err
}
if err := processAvailableResourcesResponse(resp, spotInstanceTypesOfferings); err != nil {
log.FromContext(ctx).Error(err, "failed to process spot instance type offerings")
return err
}
p.spotInstanceTypesOfferings = spotInstanceTypesOfferings
return nil
}

func processAvailableResourcesResponse(resp *ecsclient.DescribeAvailableResourceResponse, offerings map[string]sets.Set[string]) error {
if resp == nil || resp.Body == nil {
return errors.New("DescribeAvailableResourceWithOptions failed to return any instance types")
} else if resp.Body.AvailableZones == nil || len(resp.Body.AvailableZones.AvailableZone) == 0 {
Expand All @@ -280,18 +320,12 @@ func (p *DefaultProvider) UpdateInstanceTypeOfferings(ctx context.Context) error

for _, az := range resp.Body.AvailableZones.AvailableZone {
// TODO: Later, `ClosedWithStock` will be tested to determine if `ClosedWithStock` should be added.
if *az.StatusCategory == "WithStock" { // WithStock, ClosedWithStock, WithoutStock, ClosedWithoutStock
processAvailableResources(az, instanceTypesOfferings)
// WithStock, ClosedWithStock, WithoutStock, ClosedWithoutStock
if *az.StatusCategory != "WithStock" {
continue
}
processAvailableResources(az, offerings)
}

if p.cm.HasChanged("instance-type-offering", instanceTypesOfferings) {
// Only update instanceTypesSeqNun with the instance type offerings have been changed
// This is to not create new keys with duplicate instance type offerings option
atomic.AddUint64(&p.instanceTypesOfferingsSeqNum, 1)
log.FromContext(ctx).WithValues("instance-type-count", len(instanceTypesOfferings)).V(1).Info("discovered offerings for instance types")
}
p.instanceTypesOfferings = instanceTypesOfferings
return nil
}

Expand All @@ -307,12 +341,14 @@ func processAvailableResources(az *ecsclient.DescribeAvailableResourceResponseBo

for _, sr := range ar.SupportedResources.SupportedResource {
// TODO: Later, `ClosedWithStock` will be tested to determine if `ClosedWithStock` should be added.
if *sr.StatusCategory == "WithStock" { // WithStock, ClosedWithStock, WithoutStock, ClosedWithoutStock
if _, ok := instanceTypesOfferings[*sr.Value]; !ok {
instanceTypesOfferings[*sr.Value] = sets.New[string]()
}
instanceTypesOfferings[*sr.Value].Insert(*az.ZoneId)
// WithStock, ClosedWithStock, WithoutStock, ClosedWithoutStock
if *sr.StatusCategory != "WithStock" {
continue
}
if _, ok := instanceTypesOfferings[*sr.Value]; !ok {
instanceTypesOfferings[*sr.Value] = sets.New[string]()
}
instanceTypesOfferings[*sr.Value].Insert(*az.ZoneId)
}
}
}
Expand Down Expand Up @@ -365,6 +401,10 @@ func getAllInstanceTypes(client *ecsclient.Client) ([]*ecsclient.DescribeInstanc
func (p *DefaultProvider) createOfferings(_ context.Context, instanceType string, zones []ZoneData) []cloudprovider.Offering {
var offerings []cloudprovider.Offering
for _, zone := range zones {
if !zone.Available {
continue
}

odPrice, odOK := p.pricingProvider.OnDemandPrice(instanceType)
spotPrice, spotOK := p.pricingProvider.SpotPrice(instanceType, zone.ID)

Expand All @@ -375,7 +415,7 @@ func (p *DefaultProvider) createOfferings(_ context.Context, instanceType string
offerings = append(offerings, p.createOffering(zone.ID, karpv1.CapacityTypeOnDemand, odPrice, offeringAvailable))
}

if spotOK {
if spotOK && zone.SpotAvailable {
isUnavailable := p.unavailableOfferings.IsUnavailable(instanceType, zone.ID, karpv1.CapacityTypeSpot)
offeringAvailable := !isUnavailable && zone.Available

Expand Down
7 changes: 4 additions & 3 deletions pkg/providers/instancetype/types.go
Original file line number Diff line number Diff line change
Expand Up @@ -53,8 +53,9 @@ const (
)

type ZoneData struct {
ID string
Available bool
ID string
Available bool
SpotAvailable bool
}

func calculateResourceOverhead(pods, cpuM, memoryMi int64) corev1.ResourceList {
Expand All @@ -79,7 +80,7 @@ var thresholds = [...]struct {
overhead float64
}{
{1000, 0.06},
{3000, 0.01},
{2000, 0.01},
{3000, 0.005},
{4000, 0.005},
}
Expand Down
Loading

0 comments on commit 7898a13

Please sign in to comment.