Skip to content

Commit

Permalink
Redesign node monitoring to account for Node deletion (#255)
Browse files Browse the repository at this point in the history
1. Split node monitoring into two reconcilers, one to monitor Nodes
   and one to monitor and update the designated slack ClusterQueue.
2. Remove entries from in memory caches when a Node is deleted.
3. Watch slack cluster queue to be able to react to changes in
   nominalQuotas and adjust lendingLimits accordingly.

Fixes #252.
  • Loading branch information
dgrove-oss authored Oct 16, 2024
1 parent 4b282d0 commit cacf2c7
Show file tree
Hide file tree
Showing 4 changed files with 294 additions and 112 deletions.
174 changes: 81 additions & 93 deletions internal/controller/appwrapper/node_health_monitor.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,77 +18,104 @@ package appwrapper

import (
"context"
"maps"
"sync"

v1 "k8s.io/api/core/v1"
"k8s.io/apimachinery/pkg/api/errors"
"k8s.io/apimachinery/pkg/api/resource"
"k8s.io/apimachinery/pkg/types"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/util/sets"
"k8s.io/utils/ptr"

ctrl "sigs.k8s.io/controller-runtime"
"sigs.k8s.io/controller-runtime/pkg/client"
"sigs.k8s.io/controller-runtime/pkg/event"
"sigs.k8s.io/controller-runtime/pkg/handler"
"sigs.k8s.io/controller-runtime/pkg/log"
kueue "sigs.k8s.io/kueue/apis/kueue/v1beta1"

"github.com/project-codeflare/appwrapper/pkg/config"
)

// NodeHealthMonitor watches Nodes and maintains mappings of Nodes that have either
// been marked as Unschedulable or that have been labeled to indicate that
// they have resources that Autopilot has tainted as NoSchedule or NoExeucte.
// they have resources that Autopilot has tainted as NoSchedule or NoExecute.
// This information is used to automate the maintenance of the lendingLimit of
// a designated slack ClusterQueue and to migrate running workloads away from NoExecute resources.
type NodeHealthMonitor struct {
client.Client
Config *config.AppWrapperConfig
Events chan event.GenericEvent // event channel for NodeHealthMonitor to trigger SlackClusterQueueMonitor
}

var (
// noExecuteNodes is a mapping from Node names to resources with an Autopilot NoExeucte taint
noExecuteNodes = make(map[string]sets.Set[string])
// noExecuteNodes is a mapping from Node names to resources with an Autopilot NoExecute taint
noExecuteNodes = make(map[string]sets.Set[string])
// noExecuteNodesMutex synchronizes access to noExecuteNodes
noExecuteNodesMutex sync.RWMutex

// noScheduleNodes is a mapping from Node names to resource quantities that are unschedulable.
// A resource may be unscheduable either because:
// noScheduleNodes is a mapping from Node names to ResourceLists of unschedulable resources.
// A resource may be unschedulable either because:
// (a) the Node is cordoned (node.Spec.Unschedulable is true) or
// (b) Autopilot has labeled the with either a NoExecute or NoSchedule taint.
noScheduleNodes = make(map[string]map[string]*resource.Quantity)
// (b) Autopilot has labeled the Node with a NoExecute or NoSchedule taint for the resource.
noScheduleNodes = make(map[string]v1.ResourceList)
// noScheduleNodesMutex synchronizes access to noScheduleNodes
noScheduleNodesMutex sync.RWMutex
)

// permission to watch nodes
//+kubebuilder:rbac:groups="",resources=nodes,verbs=get;list;watch
//+kubebuilder:rbac:groups=kueue.x-k8s.io,resources=clusterqueues,verbs=get;list;watch;update;patch

func (r *NodeHealthMonitor) Reconcile(ctx context.Context, req ctrl.Request) (ctrl.Result, error) {
node := &v1.Node{}
if err := r.Get(ctx, req.NamespacedName, node); err != nil {
return ctrl.Result{}, nil
if errors.IsNotFound(err) {
r.updateForNodeDeletion(ctx, req.Name)
return ctrl.Result{}, nil
}
return ctrl.Result{}, err
}

r.updateNoExecuteNodes(ctx, node)

// If there is a slack ClusterQueue, update its lending limits

if r.Config.SlackQueueName == "" {
return ctrl.Result{}, nil
if node.DeletionTimestamp.IsZero() {
r.updateNoExecuteNodes(ctx, node)
r.updateNoScheduleNodes(ctx, node)
} else {
r.updateForNodeDeletion(ctx, req.Name)
}

cq := &kueue.ClusterQueue{}
if err := r.Get(ctx, types.NamespacedName{Name: r.Config.SlackQueueName}, cq); err != nil {
if errors.IsNotFound(err) {
return ctrl.Result{}, nil // give up if slack quota is not defined
return ctrl.Result{}, nil
}

func (r *NodeHealthMonitor) triggerSlackCQMonitor() {
if r.Config.SlackQueueName != "" {
select {
case r.Events <- event.GenericEvent{Object: &metav1.PartialObjectMetadata{ObjectMeta: metav1.ObjectMeta{Name: r.Config.SlackQueueName}}}:
default:
// do not block if event is already in channel
}
return ctrl.Result{}, err
}
}

r.updateNoScheduleNodes(ctx, cq, node)

return r.updateLendingLimits(ctx, cq)
// update noExecuteNodes and noScheduleNodes for the deletion of nodeName
func (r *NodeHealthMonitor) updateForNodeDeletion(ctx context.Context, nodeName string) {
if _, ok := noExecuteNodes[nodeName]; ok {
noExecuteNodesMutex.Lock() // BEGIN CRITICAL SECTION
delete(noExecuteNodes, nodeName)
noExecuteNodesMutex.Unlock() // END CRITICAL SECTION
log.FromContext(ctx).Info("Updated NoExecute information due to Node deletion",
"Number NoExecute Nodes", len(noExecuteNodes), "NoExecute Resource Details", noExecuteNodes)
r.triggerSlackCQMonitor()
}
if _, ok := noScheduleNodes[nodeName]; ok {
noScheduleNodesMutex.Lock() // BEGIN CRITICAL SECTION
delete(noScheduleNodes, nodeName)
noScheduleNodesMutex.Unlock() // END CRITICAL SECTION
log.FromContext(ctx).Info("Updated NoSchedule information due to Node deletion",
"Number NoSchedule Nodes", len(noScheduleNodes), "NoSchedule Resource Details", noScheduleNodes)
r.triggerSlackCQMonitor()
}
}

// update noExecuteNodes entry for node
func (r *NodeHealthMonitor) updateNoExecuteNodes(ctx context.Context, node *v1.Node) {
noExecuteResources := make(sets.Set[string])
for key, value := range node.GetLabels() {
Expand Down Expand Up @@ -117,93 +144,54 @@ func (r *NodeHealthMonitor) updateNoExecuteNodes(ctx context.Context, node *v1.N
}
noExecuteNodesMutex.Unlock() // END CRITICAL SECTION

// Safe to log outside the mutex because because this method is the only writer of noExecuteNodes
// and the controller runtime is configured to not allow concurrent execution of this controller.
if noExecuteNodesChanged {
log.FromContext(ctx).Info("Updated node NoExecute information", "Number NoExecute Nodes", len(noExecuteNodes), "NoExecute Resource Details", noExecuteNodes)
log.FromContext(ctx).Info("Updated NoExecute information", "Number NoExecute Nodes", len(noExecuteNodes), "NoExecute Resource Details", noExecuteNodes)
r.triggerSlackCQMonitor()
}
}

func (r *NodeHealthMonitor) updateNoScheduleNodes(_ context.Context, cq *kueue.ClusterQueue, node *v1.Node) {
// update unschedulable resource quantities for this node
noScheduleQuantities := make(map[string]*resource.Quantity)
// update noScheduleNodes entry for node
func (r *NodeHealthMonitor) updateNoScheduleNodes(ctx context.Context, node *v1.Node) {
var noScheduleResources v1.ResourceList
if node.Spec.Unschedulable {
// add all non-pod resources covered by cq if the node is cordoned
for _, resourceName := range cq.Spec.ResourceGroups[0].Flavors[0].Resources {
if string(resourceName.Name) != "pods" {
noScheduleQuantities[string(resourceName.Name)] = node.Status.Capacity.Name(resourceName.Name, resource.DecimalSI)
}
}
noScheduleResources = node.Status.Capacity.DeepCopy()
delete(noScheduleResources, v1.ResourcePods)
} else {
noScheduleResources = make(v1.ResourceList)
for key, value := range node.GetLabels() {
for resourceName, taints := range r.Config.Autopilot.ResourceTaints {
for _, taint := range taints {
if key == taint.Key && value == taint.Value {
noScheduleQuantities[resourceName] = node.Status.Capacity.Name(v1.ResourceName(resourceName), resource.DecimalSI)
quantity := node.Status.Capacity.Name(v1.ResourceName(resourceName), resource.DecimalSI)
if !quantity.IsZero() {
noScheduleResources[v1.ResourceName(resourceName)] = *quantity
}
}
}
}
}
}

if len(noScheduleQuantities) > 0 {
noScheduleNodes[node.GetName()] = noScheduleQuantities
} else {
delete(noScheduleNodes, node.GetName())
}
}

func (r *NodeHealthMonitor) updateLendingLimits(ctx context.Context, cq *kueue.ClusterQueue) (ctrl.Result, error) {

// compute unschedulable resource totals
unschedulableQuantities := map[string]*resource.Quantity{}
for _, quantities := range noScheduleNodes {
for resourceName, quantity := range quantities {
if !quantity.IsZero() {
if unschedulableQuantities[resourceName] == nil {
unschedulableQuantities[resourceName] = ptr.To(*quantity)
} else {
unschedulableQuantities[resourceName].Add(*quantity)
}
}
}
}

// enforce lending limits on 1st flavor of 1st resource group
resources := cq.Spec.ResourceGroups[0].Flavors[0].Resources
limitsChanged := false
for i, quota := range resources {
var lendingLimit *resource.Quantity
if unschedulableQuantity := unschedulableQuantities[quota.Name.String()]; unschedulableQuantity != nil {
if quota.NominalQuota.Cmp(*unschedulableQuantity) > 0 {
lendingLimit = ptr.To(quota.NominalQuota)
lendingLimit.Sub(*unschedulableQuantity)
} else {
lendingLimit = resource.NewQuantity(0, resource.DecimalSI)
}
}
if quota.LendingLimit == nil && lendingLimit != nil ||
quota.LendingLimit != nil && lendingLimit == nil ||
quota.LendingLimit != nil && lendingLimit != nil && quota.LendingLimit.Cmp(*lendingLimit) != 0 {
limitsChanged = true
resources[i].LendingLimit = lendingLimit
noScheduleNodesChanged := false
noScheduleNodesMutex.Lock() // BEGIN CRITICAL SECTION
if priorEntry, ok := noScheduleNodes[node.GetName()]; ok {
if len(noScheduleResources) == 0 {
delete(noScheduleNodes, node.GetName())
noScheduleNodesChanged = true
} else if !maps.Equal(priorEntry, noScheduleResources) {
noScheduleNodes[node.GetName()] = noScheduleResources
noScheduleNodesChanged = true
}
} else if len(noScheduleResources) > 0 {
noScheduleNodes[node.GetName()] = noScheduleResources
noScheduleNodesChanged = true
}
noScheduleNodesMutex.Unlock() // END CRITICAL SECTION

// update lending limits
if limitsChanged {
err := r.Update(ctx, cq)
if err == nil {
log.FromContext(ctx).Info("Updated lending limits", "Resources", resources)
return ctrl.Result{}, nil
} else if errors.IsConflict(err) {
return ctrl.Result{Requeue: true}, nil
} else {
return ctrl.Result{}, err
}
if noScheduleNodesChanged {
log.FromContext(ctx).Info("Updated NoSchedule information", "Number NoSchedule Nodes", len(noScheduleNodes), "NoSchedule Resource Details", noScheduleNodes)
r.triggerSlackCQMonitor()
}

return ctrl.Result{}, nil
}

// SetupWithManager sets up the controller with the Manager.
Expand Down
Loading

0 comments on commit cacf2c7

Please sign in to comment.