diff --git a/hack/run-e2e-kind.sh b/hack/run-e2e-kind.sh index 1c8a4c5d7..4c18b473c 100755 --- a/hack/run-e2e-kind.sh +++ b/hack/run-e2e-kind.sh @@ -45,7 +45,7 @@ export IMAGE_MCAD="${IMAGE_REPOSITORY_MCAD}:${IMAGE_TAG_MCAD}" CLUSTER_STARTED="false" export KUTTL_VERSION=0.15.0 export KUTTL_OPTIONS=${TEST_KUTTL_OPTIONS} -export KUTTL_TEST_SUITES=("${ROOT_DIR}/test/kuttl-test.yaml" "${ROOT_DIR}/test/kuttl-test-deployment-03.yaml" "${ROOT_DIR}/test/kuttl-test-deployment-02.yaml" "${ROOT_DIR}/test/kuttl-test-deployment-01.yaml") +export KUTTL_TEST_SUITES=("${ROOT_DIR}/test/kuttl-test.yaml" "${ROOT_DIR}/test/kuttl-test-borrowing.yaml" "${ROOT_DIR}/test/kuttl-test-deployment-03.yaml" "${ROOT_DIR}/test/kuttl-test-deployment-02.yaml" "${ROOT_DIR}/test/kuttl-test-deployment-01.yaml") DUMP_LOGS="true" function update_test_host { diff --git a/pkg/controller/clusterstate/api/node_info.go b/pkg/controller/clusterstate/api/node_info.go index 8696749db..a82dca606 100644 --- a/pkg/controller/clusterstate/api/node_info.go +++ b/pkg/controller/clusterstate/api/node_info.go @@ -43,8 +43,10 @@ type NodeInfo struct { // The releasing resource on that node Releasing *Resource + // The idle resource on that node Idle *Resource + // The used resource on that node, including running and terminating // pods Used *Resource @@ -104,7 +106,6 @@ func (ni *NodeInfo) Clone() *NodeInfo { func (ni *NodeInfo) SetNode(node *v1.Node) { if ni.Node == nil { ni.Idle = NewResource(node.Status.Allocatable) - } ni.Name = node.Name @@ -121,5 +122,4 @@ func (ni NodeInfo) String() string { return fmt.Sprintf("Node (%s): idle <%v>, used <%v>, releasing <%v>%s", ni.Name, ni.Idle, ni.Used, ni.Releasing, res) - } diff --git a/pkg/controller/queuejob/queuejob_controller_ex.go b/pkg/controller/queuejob/queuejob_controller_ex.go index 011d45047..04e0aa278 100644 --- a/pkg/controller/queuejob/queuejob_controller_ex.go +++ b/pkg/controller/queuejob/queuejob_controller_ex.go @@ -151,8 +151,8 @@ func GetQueueJobKey(obj interface{}) (string, error) { return fmt.Sprintf("%s/%s", qj.Namespace, qj.Name), nil } -//UpdateQueueJobStatus was part of pod informer, this is now a method of queuejob_controller file. -//This change is done in an effort to simplify the controller and enable to move to controller runtime. +// UpdateQueueJobStatus was part of pod informer, this is now a method of queuejob_controller file. +// This change is done in an effort to simplify the controller and enable to move to controller runtime. func (qjm *XController) UpdateQueueJobStatus(queuejob *arbv1.AppWrapper) error { labelSelector := fmt.Sprintf("%s=%s", "appwrapper.mcad.ibm.com", queuejob.Name) @@ -192,14 +192,14 @@ func (qjm *XController) UpdateQueueJobStatus(queuejob *arbv1.AppWrapper) error { return nil } -//allocatableCapacity calculates the capacity available on each node by substracting resources -//consumed by existing pods. -//For a large cluster with thousands of nodes and hundreds of thousands of pods this -//method could be a performance bottleneck -//We can then move this method to a seperate thread that basically runs every X interval and -//provides resources available to the next AW that needs to be dispatched. -//Obviously the thread would need locking and timer to expire cache. -//May be move to controller runtime can help. +// allocatableCapacity calculates the capacity available on each node by substracting resources +// consumed by existing pods. +// For a large cluster with thousands of nodes and hundreds of thousands of pods this +// method could be a performance bottleneck +// We can then move this method to a seperate thread that basically runs every X interval and +// provides resources available to the next AW that needs to be dispatched. +// Obviously the thread would need locking and timer to expire cache. +// May be moved to controller runtime can help. func (qjm *XController) allocatableCapacity() *clusterstateapi.Resource { capacity := clusterstateapi.EmptyResource() nodes, _ := qjm.clients.CoreV1().Nodes().List(context.Background(), metav1.ListOptions{}) @@ -346,8 +346,8 @@ func NewJobController(config *rest.Config, serverOption *options.ServerOption) * return cc } -//TODO: We can use informer to filter AWs that do not meet the minScheduling spec. -//we still need a thread for dispatch duration but minScheduling spec can definetly be moved to an informer +// TODO: We can use informer to filter AWs that do not meet the minScheduling spec. +// we still need a thread for dispatch duration but minScheduling spec can definetly be moved to an informer func (qjm *XController) PreemptQueueJobs() { ctx := context.Background() @@ -935,7 +935,7 @@ func (qjm *XController) chooseAgent(ctx context.Context, qj *arbv1.AppWrapper) s // Now evaluate quota if qjm.serverOption.QuotaEnabled { if qjm.quotaManager != nil { - if fits, preemptAWs, _ := qjm.quotaManager.Fits(qj, qjAggrResources, proposedPreemptions); fits { + if fits, preemptAWs, _ := qjm.quotaManager.Fits(qj, qjAggrResources, nil, proposedPreemptions); fits { klog.V(2).Infof("[chooseAgent] AppWrapper %s has enough quota.\n", qj.Name) qjm.preemptAWJobs(ctx, preemptAWs) return agentId @@ -1191,138 +1191,135 @@ func (qjm *XController) ScheduleNext(qj *arbv1.AppWrapper) { unallocatedResources, priorityindex, qj, "") klog.Infof("[ScheduleNext] [Agent Mode] Appwrapper '%s/%s' with resources %v to be scheduled on aggregated idle resources %v", qj.Namespace, qj.Name, aggqj, resources) - // Assume preemption will remove low priroity AWs in the system, optimistically dispatch such AWs - - if aggqj.LessEqual(resources) { - //cache is turned-off, refer issue: https://github.com/project-codeflare/multi-cluster-app-dispatcher/issues/588 - // unallocatedHistogramMap := qjm.cache.GetUnallocatedHistograms() - // if !qjm.nodeChecks(unallocatedHistogramMap, qj) { - // klog.Infof("[ScheduleNext] [Agent Mode] Optimistic dispatch for AW '%s/%s' requesting aggregated resources %v histogram for point in-time fragmented resources are available in the cluster %s", - // qj.Name, qj.Namespace, qjm.GetAggregatedResources(qj), proto.MarshalTextString(unallocatedHistogramMap["gpu"])) - // } - // Now evaluate quota - fits := true - klog.Infof("[ScheduleNext] [Agent Mode] available resourse successful check for '%s/%s' at %s activeQ=%t Unsched=%t &qj=%p Version=%s Status=%+v.", - qj.Name, qj.Name, time.Now().Sub(HOLStartTime), qjm.qjqueue.IfExistActiveQ(qj), qjm.qjqueue.IfExistUnschedulableQ(qj), qj, qj.ResourceVersion, qj.Status) - if qjm.serverOption.QuotaEnabled { - if qjm.quotaManager != nil { - // Quota tree design: - // - All AppWrappers without quota submission will consume quota from the 'default' node. - // - All quota trees in the system should have a 'default' node so AppWrappers without - // quota specification can be dispatched - // - If the AppWrapper doesn't have a quota label, then one is added for every tree with the 'default' value - // - Depending on how the 'default' node is configured, AppWrappers that don't specify quota could be - // preemptable by default (e.g., 'default' node with 'cpu: 0m' and 'memory: 0Mi' quota and 'hardLimit: false' - // such node borrows quota from other nodes already in the system) - allTrees := qjm.quotaManager.GetValidQuotaLabels() - newLabels := make(map[string]string) - for key, value := range qj.Labels { - newLabels[key] = value + // Jobs dispatched with quota management may be borrowing quota from other tree nodes making those jobs preemptable, regardless of their priority. + // Cluster resources need to be considered to determine if both quota and resources (after deleting borrowing AppWrappers) are availabe for the new AppWrapper + // We perform a "quota check" first followed by a "resource check" + fits := true + if qjm.serverOption.QuotaEnabled { + if qjm.quotaManager != nil { + // Quota tree design: + // - All AppWrappers without quota submission will consume quota from the 'default' node. + // - All quota trees in the system should have a 'default' node so AppWrappers without + // quota specification can be dispatched + // - If the AppWrapper doesn't have a quota label, then one is added for every tree with the 'default' value + // - Depending on how the 'default' node is configured, AppWrappers that don't specify quota could be + // preemptable by default (e.g., 'default' node with 'cpu: 0m' and 'memory: 0Mi' quota and 'hardLimit: false' + // such node borrows quota from other nodes already in the system) + allTrees := qjm.quotaManager.GetValidQuotaLabels() + newLabels := make(map[string]string) + for key, value := range qj.Labels { + newLabels[key] = value + } + updateLabels := false + for _, treeName := range allTrees { + if _, quotaSetForAW := newLabels[treeName]; !quotaSetForAW { + newLabels[treeName] = "default" + updateLabels = true } - updateLabels := false - for _, treeName := range allTrees { - if _, quotaSetForAW := newLabels[treeName]; !quotaSetForAW { - newLabels[treeName] = "default" - updateLabels = true + } + if updateLabels { + tempAW, retryErr := qjm.getAppWrapper(qj.Namespace, qj.Name, "[ScheduleNext] [Agent Mode] update labels") + if retryErr != nil { + if apierrors.IsNotFound(retryErr) { + klog.Warningf("[ScheduleNext] [Agent Mode] app wrapper '%s/%s' not found while trying to update labels, skiping dispatch.", qj.Namespace, qj.Name) + return nil } + return retryErr } - if updateLabels { - tempAW, retryErr := qjm.getAppWrapper(qj.Namespace, qj.Name, "[ScheduleNext] [Agent Mode] update labels") - if retryErr != nil { - if apierrors.IsNotFound(retryErr) { - klog.Warningf("[ScheduleNext] [Agent Mode] app wrapper '%s/%s' not found while trying to update labels, skiping dispatch.", qj.Namespace, qj.Name) - return nil - } - return retryErr - } - tempAW.SetLabels(newLabels) - updatedAW, retryErr := qjm.updateEtcd(ctx, tempAW, "ScheduleNext [Agent Mode] - setDefaultQuota") - if retryErr != nil { - if apierrors.IsConflict(err) { - klog.Warningf("[ScheduleNext] [Agent mode] Conflict error detected when updating labels in etcd for app wrapper '%s/%s, status = %+v. Retrying update.", qj.Namespace, qj.Name, qj.Status) - } else { - klog.Errorf("[ScheduleNext] [Agent mode] Failed to update labels in etcd for app wrapper '%s/%s', status = %+v, err=%v", qj.Namespace, qj.Name, qj.Status, err) - } - return retryErr + tempAW.SetLabels(newLabels) + updatedAW, retryErr := qjm.updateEtcd(ctx, tempAW, "ScheduleNext [Agent Mode] - setDefaultQuota") + if retryErr != nil { + if apierrors.IsConflict(err) { + klog.Warningf("[ScheduleNext] [Agent mode] Conflict error detected when updating labels in etcd for app wrapper '%s/%s, status = %+v. Retrying update.", qj.Namespace, qj.Name, qj.Status) + } else { + klog.Errorf("[ScheduleNext] [Agent mode] Failed to update labels in etcd for app wrapper '%s/%s', status = %+v, err=%v", qj.Namespace, qj.Name, qj.Status, err) } - klog.Infof("[ScheduleNext] [Agent Mode] Default quota added to AW '%s/%s'", qj.Namespace, qj.Name) - updatedAW.DeepCopyInto(qj) + return retryErr } - var msg string - var preemptAWs []*arbv1.AppWrapper - quotaFits, preemptAWs, msg = qjm.quotaManager.Fits(qj, aggqj, proposedPreemptions) - if quotaFits { - klog.Infof("[ScheduleNext] [Agent mode] quota evaluation successful for app wrapper '%s/%s' activeQ=%t Unsched=%t &qj=%p Version=%s Status=%+v", - qj.Namespace, qj.Name, time.Now().Sub(HOLStartTime), qjm.qjqueue.IfExistActiveQ(qj), qjm.qjqueue.IfExistUnschedulableQ(qj), qj, qj.ResourceVersion, qj.Status) - // Set any jobs that are marked for preemption - qjm.preemptAWJobs(ctx, preemptAWs) - } else { // Not enough free quota to dispatch appwrapper - dispatchFailedMessage = "Insufficient quota to dispatch AppWrapper." - dispatchFailedReason = "quota limit exceeded" - klog.Infof("[ScheduleNext] [Agent Mode] Blocking dispatch for app wrapper '%s/%s' due to quota limits, activeQ=%t Unsched=%t &qj=%p Version=%s Status=%+v msg=%s", - qj.Namespace, qj.Name, time.Now().Sub(HOLStartTime), qjm.qjqueue.IfExistActiveQ(qj), qjm.qjqueue.IfExistUnschedulableQ(qj), qj, qj.ResourceVersion, qj.Status, msg) - //call update etcd here to retrigger AW execution for failed quota - //TODO: quota management tests fail if this is converted into go-routine, need to inspect why? - qjm.backoff(context.Background(), qj, dispatchFailedReason, dispatchFailedMessage) + klog.Infof("[ScheduleNext] [Agent Mode] Default quota added to AW '%s/%s'", qj.Namespace, qj.Name) + updatedAW.DeepCopyInto(qj) + } - } - fits = quotaFits - } else { - fits = false - // Quota manager not initialized - dispatchFailedMessage = "Quota evaluation is enabled but not initialized. Insufficient quota to dispatch AppWrapper." - klog.Errorf("[ScheduleNext] [Agent Mode] Quota evaluation is enabled but not initialized. AppWrapper '%s/%s' does not have enough quota", qj.Namespace, qj.Name) + // Allocate consumer into quota tree and check if there are enough resources to dispatch it + var msg string + var preemptAWs []*arbv1.AppWrapper + quotaFits, preemptAWs, msg = qjm.quotaManager.Fits(qj, aggqj, resources, proposedPreemptions) + klog.Info("%s %s %s", quotaFits, preemptAWs, msg) + + if quotaFits { + klog.Infof("[ScheduleNext] [Agent mode] quota evaluation successful for app wrapper '%s/%s' activeQ=%t Unsched=%t &qj=%p Version=%s Status=%+v", + qj.Namespace, qj.Name, time.Now().Sub(HOLStartTime), qjm.qjqueue.IfExistActiveQ(qj), qjm.qjqueue.IfExistUnschedulableQ(qj), qj, qj.ResourceVersion, qj.Status) + // Set any jobs that are marked for preemption + qjm.preemptAWJobs(ctx, preemptAWs) + } else { // Not enough free quota to dispatch appwrapper + dispatchFailedMessage = "Insufficient quota and/or resources to dispatch AppWrapper." + dispatchFailedReason = "quota limit exceeded" + klog.Infof("[ScheduleNext] [Agent Mode] Blocking dispatch for app wrapper '%s/%s' due to quota limits, activeQ=%t Unsched=%t &qj=%p Version=%s Status=%+v msg=%s", + qj.Namespace, qj.Name, time.Now().Sub(HOLStartTime), qjm.qjqueue.IfExistActiveQ(qj), qjm.qjqueue.IfExistUnschedulableQ(qj), qj, qj.ResourceVersion, qj.Status, msg) + // Call update etcd here to retrigger AW execution for failed quota + // TODO: quota management tests fail if this is converted into go-routine, need to inspect why? + qjm.backoff(context.Background(), qj, dispatchFailedReason, dispatchFailedMessage) } + fits = quotaFits } else { - klog.V(4).Infof("[ScheduleNext] [Agent Mode] quota evaluation not enabled for '%s/%s' at %s activeQ=%t Unsched=%t &qj=%p Version=%s Status=%+v", qj.Namespace, - qj.Name, time.Now().Sub(HOLStartTime), qjm.qjqueue.IfExistActiveQ(qj), qjm.qjqueue.IfExistUnschedulableQ(qj), qj, qj.ResourceVersion, qj.Status) + // Quota manager not initialized + dispatchFailedMessage = "Quota evaluation is enabled but not initialized. Insufficient quota to dispatch AppWrapper." + klog.Errorf("[ScheduleNext] [Agent Mode] Quota evaluation is enabled but not initialized. AppWrapper '%s/%s' does not have enough quota", qj.Namespace, qj.Name) + fits = false } - //TODO: Remove forwarded loop - forwarded = true - // If quota evalauation sucedeed or quota evaluation not enabled set the appwrapper to be dispatched - if fits { - - // aw is ready to go! - tempAW, retryErr := qjm.getAppWrapper(qj.Namespace, qj.Name, "[ScheduleNext] [Agent Mode] -- ready to dispatch") - if retryErr != nil { - if apierrors.IsNotFound(retryErr) { - return nil - } - klog.Errorf("[ScheduleNext] [Agent Mode] Failed to get fresh copy of the app wrapper '%s/%s' to update status, err = %v", qj.Namespace, qj.Name, err) - return retryErr + } else { + klog.V(4).Infof("[ScheduleNext] [Agent Mode] Quota evaluation not enabled for '%s/%s' at %s activeQ=%t Unsched=%t &qj=%p Version=%s Status=%+v", qj.Namespace, + qj.Name, time.Now().Sub(HOLStartTime), qjm.qjqueue.IfExistActiveQ(qj), qjm.qjqueue.IfExistUnschedulableQ(qj), qj, qj.ResourceVersion, qj.Status) + + if aggqj.LessEqual(resources) { // Check if enough resources to dispatch + fits = true + klog.Infof("[ScheduleNext] [Agent Mode] available resourse successful check for '%s/%s' at %s activeQ=%t Unsched=%t &qj=%p Version=%s Status=%+v.", + qj.Name, qj.Name, time.Now().Sub(HOLStartTime), qjm.qjqueue.IfExistActiveQ(qj), qjm.qjqueue.IfExistUnschedulableQ(qj), qj, qj.ResourceVersion, qj.Status) + } else { // Not enough free resources to dispatch HOL + fits = false + dispatchFailedMessage = "Insufficient resources to dispatch AppWrapper." + klog.Infof("[ScheduleNext] [Agent Mode] Failed to dispatch app wrapper '%s/%s' due to insuficient resources, activeQ=%t Unsched=%t &qj=%p Version=%s Status=%+v", + qj.Namespace, qj.Name, qjm.qjqueue.IfExistActiveQ(qj), + qjm.qjqueue.IfExistUnschedulableQ(qj), qj, qj.ResourceVersion, qj.Status) + // TODO: Remove forwarded logic as a big AW will never be forwarded + forwarded = true + // should we call backoff or update etcd? + go qjm.backoff(ctx, qj, dispatchFailedReason, dispatchFailedMessage) + } + } + forwarded = true + if fits { + // aw is ready to go! + tempAW, retryErr := qjm.getAppWrapper(qj.Namespace, qj.Name, "[ScheduleNext] [Agent Mode] -- ready to dispatch") + if retryErr != nil { + if apierrors.IsNotFound(retryErr) { + return nil } - tempAW.Status.CanRun = true - tempAW.Status.FilterIgnore = true // update CanRun & Spec. no need to trigger event - retryErr = qjm.updateStatusInEtcd(ctx, tempAW, "ScheduleNext - setCanRun") - if retryErr != nil { - if qjm.quotaManager != nil && quotaFits { - // Quota was allocated for this appwrapper, release it. - qjm.quotaManager.Release(qj) - } - if apierrors.IsNotFound(retryErr) { - klog.Warningf("[ScheduleNext] [Agent Mode] app wrapper '%s/%s' not found after status update, skiping dispatch.", qj.Namespace, qj.Name) - return nil - } else if apierrors.IsConflict(retryErr) { - klog.Warningf("[ScheduleNext] [Agent mode] Conflict error detected when updating status in etcd for app wrapper '%s/%s, status = %+v. Retrying update.", qj.Namespace, qj.Name, qj.Status) - } else if retryErr != nil { - klog.Errorf("[ScheduleNext] [Agent mode] Failed to update status in etcd for app wrapper '%s/%s', status = %+v, err=%v", qj.Namespace, qj.Name, qj.Status, err) - } - return retryErr + klog.Errorf("[ScheduleNext] [Agent Mode] Failed to get fresh copy of the app wrapper '%s/%s' to update status, err = %v", qj.Namespace, qj.Name, err) + return retryErr + } + tempAW.Status.CanRun = true + tempAW.Status.FilterIgnore = true // update CanRun & Spec. no need to trigger event + retryErr = qjm.updateStatusInEtcd(ctx, tempAW, "ScheduleNext - setCanRun") + if retryErr != nil { + if qjm.quotaManager != nil && quotaFits { + // Quota was allocated for this appwrapper, release it. + qjm.quotaManager.Release(qj) } - tempAW.DeepCopyInto(qj) - forwarded = true - } // fits - } else { // Not enough free resources to dispatch HOL - dispatchFailedMessage = "Insufficient resources to dispatch AppWrapper." - klog.Infof("[ScheduleNext] [Agent Mode] Failed to dispatch app wrapper '%s/%s' due to insuficient resources, activeQ=%t Unsched=%t &qj=%p Version=%s Status=%+v", - qj.Namespace, qj.Name, qjm.qjqueue.IfExistActiveQ(qj), - qjm.qjqueue.IfExistUnschedulableQ(qj), qj, qj.ResourceVersion, qj.Status) - //TODO: Remove forwarded logic as a big AW will never be forwarded + if apierrors.IsNotFound(retryErr) { + klog.Warningf("[ScheduleNext] [Agent Mode] app wrapper '%s/%s' not found after status update, skiping dispatch.", qj.Namespace, qj.Name) + return nil + } else if apierrors.IsConflict(retryErr) { + klog.Warningf("[ScheduleNext] [Agent mode] Conflict error detected when updating status in etcd for app wrapper '%s/%s, status = %+v. Retrying update.", qj.Namespace, qj.Name, qj.Status) + } else if retryErr != nil { + klog.Errorf("[ScheduleNext] [Agent mode] Failed to update status in etcd for app wrapper '%s/%s', status = %+v, err=%v", qj.Namespace, qj.Name, qj.Status, err) + } + return retryErr + } + tempAW.DeepCopyInto(qj) forwarded = true - // should we call backoff or update etcd? - go qjm.backoff(ctx, qj, dispatchFailedReason, dispatchFailedMessage) } - // if the HeadOfLineHoldingTime option is not set it will break the loop + //TODO: Remove schedulingTimeExpired flag: https://github.com/project-codeflare/multi-cluster-app-dispatcher/issues/586 schedulingTimeExpired := false if forwarded { @@ -1690,12 +1687,12 @@ func larger(a, b string) bool { return a > b // Equal length, lexicographic order } -//When an AW is deleted, do not add such AWs to the event queue. -//AW can never be brought back when it is deleted by an external client, so do not bother adding it to event queue. -//There will be a scenario, where an AW is in middle of dispatch cycle and it may be deleted. At that point when such an -//AW is added to etcd a conflict error will be raised. This will cause the current AW to be skipped. -//If there are large number of delete's may be informer misses few delete events for this simplification. -//For 1K AW all of them are deleted from the system, and the next batch of re-submitted AW begins processing in less than 2 mins +// When an AW is deleted, do not add such AWs to the event queue. +// AW can never be brought back when it is deleted by an external client, so do not bother adding it to event queue. +// There will be a scenario, where an AW is in middle of dispatch cycle and it may be deleted. At that point when such an +// AW is added to etcd a conflict error will be raised. This will cause the current AW to be skipped. +// If there are large number of delete's may be informer misses few delete events for this simplification. +// For 1K AW all of them are deleted from the system, and the next batch of re-submitted AW begins processing in less than 2 mins func (cc *XController) deleteQueueJob(obj interface{}) { qj, ok := obj.(*arbv1.AppWrapper) if !ok { diff --git a/pkg/controller/quota/quota_manager_interface.go b/pkg/controller/quota/quota_manager_interface.go index 4ca782f12..afed8e33d 100644 --- a/pkg/controller/quota/quota_manager_interface.go +++ b/pkg/controller/quota/quota_manager_interface.go @@ -21,7 +21,7 @@ import ( ) type QuotaManagerInterface interface { - Fits(aw *arbv1.AppWrapper, resources *clusterstateapi.Resource, proposedPremptions []*arbv1.AppWrapper) (bool, []*arbv1.AppWrapper, string) + Fits(aw *arbv1.AppWrapper, requestedResources *clusterstateapi.Resource, clusterResources *clusterstateapi.Resource, proposedPremptions []*arbv1.AppWrapper) (bool, []*arbv1.AppWrapper, string) Release(aw *arbv1.AppWrapper) bool GetValidQuotaLabels() []string } diff --git a/pkg/controller/quota/quotaforestmanager/qm_lib_backend_with_quotasubt_mgr.go b/pkg/controller/quota/quotaforestmanager/qm_lib_backend_with_quotasubt_mgr.go index c691fd577..e662911f8 100644 --- a/pkg/controller/quota/quotaforestmanager/qm_lib_backend_with_quotasubt_mgr.go +++ b/pkg/controller/quota/quotaforestmanager/qm_lib_backend_with_quotasubt_mgr.go @@ -32,6 +32,7 @@ import ( qmbackend "github.com/project-codeflare/multi-cluster-app-dispatcher/pkg/quotaplugins/quota-forest/quota-manager/quota" qmbackendutils "github.com/project-codeflare/multi-cluster-app-dispatcher/pkg/quotaplugins/quota-forest/quota-manager/quota/utils" "github.com/project-codeflare/multi-cluster-app-dispatcher/pkg/quotaplugins/util" + "github.com/project-codeflare/multi-cluster-app-dispatcher/pkg/controller/queuejobresources/genericresource" "k8s.io/client-go/rest" "math" @@ -205,7 +206,7 @@ func (qm *QuotaManager) loadDispatchedAWs(dispatchedAWDemands map[string]*cluste } aw.SetLabels(newLabels) - doesFit, preemptionIds, errorMessage := qm.Fits(aw, v, nil) + doesFit, preemptionIds, errorMessage := qm.Fits(aw, v, nil, nil) if !doesFit { klog.Errorf("[loadDispatchedAWs] Loading of AppWrapper %s/%s failed.", aw.Namespace, aw.Name) @@ -509,7 +510,7 @@ func (qm *QuotaManager) buildRequest(aw *arbv1.AppWrapper, return consumerInfo, err } -func (qm *QuotaManager) Fits(aw *arbv1.AppWrapper, awResDemands *clusterstateapi.Resource, +func (qm *QuotaManager) Fits(aw *arbv1.AppWrapper, awResDemands *clusterstateapi.Resource, clusterResources *clusterstateapi.Resource, proposedPreemptions []*arbv1.AppWrapper) (bool, []*arbv1.AppWrapper, string) { // If a Quota Manager Backend instance does not exists then assume quota failed @@ -555,7 +556,7 @@ func (qm *QuotaManager) Fits(aw *arbv1.AppWrapper, awResDemands *clusterstateapi consumerID := consumerInfo.GetID() klog.V(4).Infof("[Fits] Sending quota allocation request: %#v ", consumerInfo) - allocResponse, err := qm.quotaManagerBackend.AllocateForest(QuotaManagerForestName, consumerID) + allocResponse, err := qm.quotaManagerBackend.TryAllocateForest(QuotaManagerForestName, consumerID) if err != nil { qm.removeConsumer(consumerID) klog.Errorf("[Fits] Error allocating consumer: %s/%s, err=%#v.", aw.Namespace, aw.Name, err) @@ -569,9 +570,47 @@ func (qm *QuotaManager) Fits(aw *arbv1.AppWrapper, awResDemands *clusterstateapi return doesFit, preemptIds, strings.TrimSpace(allocResponse.GetMessage()) } preemptIds = qm.getAppWrappers(allocResponse.GetPreemptedIds()) + + // Update cluster resources in the even that preemption happens + // TODO: Potentially move this resource updated out to the calling function (would need to comeback again to undo the allocation + // if the resources are not enough after preemption) + if clusterResources != nil { + updatedResources := clusterResources + + for _, appWrapper := range preemptIds { + updatedResources.Add(qm.getAggregatedResources(appWrapper)) + } + + // Check if job fits with the update resources after preempted AppWrappers are removed + if clusterResources != nil && !awResDemands.LessEqual(updatedResources) { + qm.quotaManagerBackend.UndoAllocateForest(QuotaManagerForestName, consumerID) + qm.removeConsumer(consumerID) + return false, preemptIds, fmt.Sprintf("[Fits] AppWrapper '%s/%s' does not fit in the cluster, even after borrowed quota is freed", aw.Namespace, aw.Name) + } + } + return doesFit, preemptIds, strings.TrimSpace(allocResponse.GetMessage()) } +func (qm *QuotaManager) getAggregatedResources(appWrapper *arbv1.AppWrapper) *clusterstateapi.Resource { + // After quota evaluation, a set of AppWrappers is returned for preemption. Before deciding to delete them, + // we need to make sure enough resources are free for the new AppWrapper after the preemptable list is deleted. + // For this we need to add back the requests consumed by the preemptable AppWrappers to the available resources + // in order to perform a correct resource check with updated values. + allocated := clusterstateapi.EmptyResource() + + for _, genericItem := range appWrapper.Spec.AggrResources.GenericItems { + resources, err := genericresource.GetResources(&genericItem) + if err != nil { + klog.V(8).Infof("[GetAggregatedResources] Failure aggregating resources for %s/%s, err=%#v, genericItem=%#v", + appWrapper.Namespace, appWrapper.Name, err, genericItem) + } + allocated = allocated.Add(resources) + } + + return allocated +} + func (qm *QuotaManager) getAppWrappers(preemptIds []string) []*arbv1.AppWrapper { var aws []*arbv1.AppWrapper if len(preemptIds) <= 0 { diff --git a/pkg/quotaplugins/quota-forest/quota-manager/quota/core/quotanode.go b/pkg/quotaplugins/quota-forest/quota-manager/quota/core/quotanode.go index fb2b16328..c01e787e5 100644 --- a/pkg/quotaplugins/quota-forest/quota-manager/quota/core/quotanode.go +++ b/pkg/quotaplugins/quota-forest/quota-manager/quota/core/quotanode.go @@ -138,7 +138,7 @@ func (qn *QuotaNode) SlideDown() { func (qn *QuotaNode) SlideUp(c *Consumer, applyPriority bool, allocationRecovery *AllocationRecovery, preemptedConsumers *[]string) bool { - if qn.isHard { + if qn.isHard && !qn.IsRoot() { return false } diff --git a/pkg/quotaplugins/quota-forest/quota-manager/quota/quotamanager.go b/pkg/quotaplugins/quota-forest/quota-manager/quota/quotamanager.go index 24cc69baa..78d78c709 100644 --- a/pkg/quotaplugins/quota-forest/quota-manager/quota/quotamanager.go +++ b/pkg/quotaplugins/quota-forest/quota-manager/quota/quotamanager.go @@ -494,12 +494,14 @@ func (m *Manager) AllocateForest(forestName string, consumerID string) (response // TryAllocateForest : allocate a consumer on a forest func (m *Manager) TryAllocateForest(forestName string, consumerID string) (response *core.AllocationResponse, err error) { + if m.mode != Normal { + response, err = m.AllocateForest(forestName, consumerID) + return response, err + } + m.mutex.Lock() defer m.mutex.Unlock() - if m.mode != Normal { - return nil, fmt.Errorf("manager is not in normal mode") - } forestController, forestConsumer, err := m.preAllocateForest(forestName, consumerID) if err == nil && forestController.IsConsumerAllocated(consumerID) { err = fmt.Errorf("consumer %s already allocated on forest %s", consumerID, forestName) diff --git a/pkg/quotaplugins/quota-forest/quota-manager/quota/utils/defaults.go b/pkg/quotaplugins/quota-forest/quota-manager/quota/utils/defaults.go index 97bf0b8b7..f5d2395f1 100644 --- a/pkg/quotaplugins/quota-forest/quota-manager/quota/utils/defaults.go +++ b/pkg/quotaplugins/quota-forest/quota-manager/quota/utils/defaults.go @@ -21,7 +21,7 @@ var ( DefaultTreeName string = "default" // DefaultResourceNames : the default resource names - DefaultResourceNames []string = []string{"cpu", "memory"} + DefaultResourceNames []string = []string{"cpu", "memory", "nvidia.com/gpu"} // DefaultTreeKind : the default kind attribute of the tree DefaultTreeKind string = "QuotaTree" diff --git a/pkg/quotaplugins/quota-simple-rest/quota_rest_manager.go b/pkg/quotaplugins/quota-simple-rest/quota_rest_manager.go index 13ccab71e..0667e9262 100644 --- a/pkg/quotaplugins/quota-simple-rest/quota_rest_manager.go +++ b/pkg/quotaplugins/quota-simple-rest/quota_rest_manager.go @@ -257,7 +257,7 @@ func (qm *QuotaManager) getQuotaDesignation(aw *arbv1.AppWrapper) []QuotaGroup { return groups } -func (qm *QuotaManager) Fits(aw *arbv1.AppWrapper, awResDemands *clusterstateapi.Resource, +func (qm *QuotaManager) Fits(aw *arbv1.AppWrapper, awResDemands *clusterstateapi.Resource, clusterResources *clusterstateapi.Resource, proposedPreemptions []*arbv1.AppWrapper) (bool, []*arbv1.AppWrapper, string) { // Handle uninitialized quota manager diff --git a/test/e2e-kuttl-borrowing/install-quota-subtree.yaml b/test/e2e-kuttl-borrowing/install-quota-subtree.yaml new file mode 100644 index 000000000..236f6d009 --- /dev/null +++ b/test/e2e-kuttl-borrowing/install-quota-subtree.yaml @@ -0,0 +1,95 @@ +--- +apiVersion: ibm.com/v1 +kind: QuotaSubtree +metadata: + name: context-root + namespace: kube-system + labels: + tree: quota_context +spec: + children: + - name: context-root + quotas: + requests: + cpu: 1075m + memory: 1045Mi + nvidia.com/gpu: 16 +--- +apiVersion: ibm.com/v1 +kind: QuotaSubtree +metadata: + name: service-root + namespace: kube-system + labels: + tree: quota_service +spec: + children: + - name: service-root + quotas: + requests: + cpu: 1075m + memory: 1045Mi + nvidia.com/gpu: 16 +--- +apiVersion: ibm.com/v1 +kind: QuotaSubtree +metadata: + name: context-root-children + namespace: kube-system + labels: + tree: quota_context +spec: + parent: context-root + children: + - name: gold + quotas: + hardLimit: false + requests: + cpu: 1075m + memory: 450Mi + nvidia.com/gpu: 8 + - name: silver + quotas: + hardLimit: false + requests: + cpu: 1075m + memory: 400Mi + nvidia.com/gpu: 8 + - name: bronze + quotas: + hardLimit: true + requests: + cpu: 900m + memory: 300Mi + nvidia.com/gpu: 8 + - name: default + quotas: + hardLimit: false + requests: + cpu: 0m + memory: 0Mi + nvidia.com/gpu: 0 +--- +apiVersion: ibm.com/v1 +kind: QuotaSubtree +metadata: + name: service-root-children + namespace: kube-system + labels: + tree: quota_service +spec: + parent: service-root + children: + - name: gold + quotas: + requests: + cpu: 1075m + memory: 1045Mi + nvidia.com/gpu: 16 + - name: default + quotas: + hardLimit: false + requests: + cpu: 0m + memory: 0Mi + nvidia.com/gpu: 0 diff --git a/test/e2e-kuttl-borrowing/steps/00-assert.yaml b/test/e2e-kuttl-borrowing/steps/00-assert.yaml new file mode 100644 index 000000000..853505926 --- /dev/null +++ b/test/e2e-kuttl-borrowing/steps/00-assert.yaml @@ -0,0 +1,25 @@ +# Verify CRDs existence +apiVersion: apiextensions.k8s.io/v1 +kind: CustomResourceDefinition +metadata: + name: appwrappers.mcad.ibm.com +status: + acceptedNames: + kind: AppWrapper + listKind: AppWrapperList + plural: appwrappers + singular: appwrapper + storedVersions: + - v1beta1 +--- +apiVersion: apiextensions.k8s.io/v1 +kind: CustomResourceDefinition +metadata: + name: quotasubtrees.ibm.com +status: + acceptedNames: + kind: QuotaSubtree + singular: quotasubtree + plural: quotasubtrees + storedVersions: + - v1 diff --git a/test/e2e-kuttl-borrowing/steps/01-assert.yaml b/test/e2e-kuttl-borrowing/steps/01-assert.yaml new file mode 100644 index 000000000..175462a90 --- /dev/null +++ b/test/e2e-kuttl-borrowing/steps/01-assert.yaml @@ -0,0 +1,32 @@ +# Verify subtree creations +apiVersion: ibm.com/v1 +kind: QuotaSubtree +metadata: + name: context-root + namespace: kube-system + labels: + tree: quota_context +--- +apiVersion: ibm.com/v1 +kind: QuotaSubtree +metadata: + name: service-root + namespace: kube-system + labels: + tree: quota_service +--- +apiVersion: ibm.com/v1 +kind: QuotaSubtree +metadata: + name: context-root-children + namespace: kube-system + labels: + tree: quota_context +--- +apiVersion: ibm.com/v1 +kind: QuotaSubtree +metadata: + name: service-root-children + namespace: kube-system + labels: + tree: quota_service diff --git a/test/e2e-kuttl-borrowing/steps/02-assert.yaml b/test/e2e-kuttl-borrowing/steps/02-assert.yaml new file mode 100644 index 000000000..672b198f5 --- /dev/null +++ b/test/e2e-kuttl-borrowing/steps/02-assert.yaml @@ -0,0 +1,5 @@ +# Verify test namespace existence +apiVersion: v1 +kind: Namespace +metadata: + name: test \ No newline at end of file diff --git a/test/e2e-kuttl-borrowing/steps/02-install.yaml b/test/e2e-kuttl-borrowing/steps/02-install.yaml new file mode 100644 index 000000000..5e0105d10 --- /dev/null +++ b/test/e2e-kuttl-borrowing/steps/02-install.yaml @@ -0,0 +1,4 @@ +apiVersion: v1 +kind: Namespace +metadata: + name: test \ No newline at end of file diff --git a/test/e2e-kuttl-borrowing/steps/03-assert.yaml b/test/e2e-kuttl-borrowing/steps/03-assert.yaml new file mode 100644 index 000000000..9245c48e0 --- /dev/null +++ b/test/e2e-kuttl-borrowing/steps/03-assert.yaml @@ -0,0 +1,9 @@ +# Verify job is running +--- +apiVersion: mcad.ibm.com/v1beta1 +kind: AppWrapper +metadata: + name: my-job-1 + namespace: test +status: + state: Running diff --git a/test/e2e-kuttl-borrowing/steps/03-install.yaml b/test/e2e-kuttl-borrowing/steps/03-install.yaml new file mode 100644 index 000000000..c44823c80 --- /dev/null +++ b/test/e2e-kuttl-borrowing/steps/03-install.yaml @@ -0,0 +1,64 @@ +--- +apiVersion: mcad.ibm.com/v1beta1 +kind: AppWrapper +metadata: + name: my-job-1 + namespace: test + labels: + quota_context: "silver" + quota_service: "gold" +spec: + schedulingSpec: + minAvailable: 2 + resources: + GenericItems: + - replicas: 1 + completionstatus: Complete + custompodresources: + - replicas: 2 + requests: + cpu: 500m + nvidia.com/gpu: 8 + memory: 300Mi + limits: + cpu: 500m + nvidia.com/gpu: 8 + memory: 300Mi + generictemplate: + apiVersion: batch/v1 + kind: Job + metadata: + name: my-job-1 + namespace: test + labels: + appwrapper.mcad.ibm.com: my-job-1 + spec: + parallelism: 1 + completions: 1 + template: + metadata: + name: my-job-1 + namespace: test + labels: + appwrapper.mcad.ibm.com: my-job-1 + spec: + terminationGracePeriodSeconds: 1 + restartPolicy: Never + containers: + - name: ubuntu + image: ubuntu:latest + imagePullPolicy: IfNotPresent + command: + - sh + - -c + - | + sleep infinity + resources: + requests: + cpu: 500m + nvidia.com/gpu: 8 + memory: 300Mi + limits: + cpu: 500m + nvidia.com/gpu: 8 + memory: 300Mi diff --git a/test/e2e-kuttl-borrowing/steps/04-assert.yaml b/test/e2e-kuttl-borrowing/steps/04-assert.yaml new file mode 100644 index 000000000..10649417b --- /dev/null +++ b/test/e2e-kuttl-borrowing/steps/04-assert.yaml @@ -0,0 +1,9 @@ +# Verify job is running +--- +apiVersion: mcad.ibm.com/v1beta1 +kind: AppWrapper +metadata: + name: my-job-2 + namespace: test +status: + state: Running diff --git a/test/e2e-kuttl-borrowing/steps/04-install.yaml b/test/e2e-kuttl-borrowing/steps/04-install.yaml new file mode 100644 index 000000000..804b06505 --- /dev/null +++ b/test/e2e-kuttl-borrowing/steps/04-install.yaml @@ -0,0 +1,64 @@ +--- +apiVersion: mcad.ibm.com/v1beta1 +kind: AppWrapper +metadata: + name: my-job-2 + namespace: test + labels: + quota_context: "gold" + quota_service: "gold" +spec: + schedulingSpec: + minAvailable: 1 + resources: + GenericItems: + - replicas: 1 + completionstatus: Complete + custompodresources: + - replicas: 1 + requests: + cpu: 500m + nvidia.com/gpu: 8 + memory: 300Mi + limits: + cpu: 500m + nvidia.com/gpu: 8 + memory: 300Mi + generictemplate: + apiVersion: batch/v1 + kind: Job + metadata: + name: my-job-2 + namespace: test + labels: + appwrapper.mcad.ibm.com: my-job-2 + spec: + parallelism: 1 + completions: 1 + template: + metadata: + name: my-job-2 + namespace: test + labels: + appwrapper.mcad.ibm.com: my-job-2 + spec: + terminationGracePeriodSeconds: 1 + restartPolicy: Never + containers: + - name: ubuntu + image: ubuntu:latest + imagePullPolicy: IfNotPresent + command: + - sh + - -c + - | + sleep infinity + resources: + requests: + cpu: 500m + nvidia.com/gpu: 8 + memory: 300Mi + limits: + cpu: 500m + nvidia.com/gpu: 8 + memory: 300Mi diff --git a/test/e2e-kuttl/install-quota-subtree.yaml b/test/e2e-kuttl/install-quota-subtree.yaml index baa6dd665..236f6d009 100644 --- a/test/e2e-kuttl/install-quota-subtree.yaml +++ b/test/e2e-kuttl/install-quota-subtree.yaml @@ -13,6 +13,7 @@ spec: requests: cpu: 1075m memory: 1045Mi + nvidia.com/gpu: 16 --- apiVersion: ibm.com/v1 kind: QuotaSubtree @@ -28,6 +29,7 @@ spec: requests: cpu: 1075m memory: 1045Mi + nvidia.com/gpu: 16 --- apiVersion: ibm.com/v1 kind: QuotaSubtree @@ -41,27 +43,32 @@ spec: children: - name: gold quotas: + hardLimit: false requests: cpu: 1075m memory: 450Mi + nvidia.com/gpu: 8 - name: silver quotas: hardLimit: false requests: cpu: 1075m memory: 400Mi + nvidia.com/gpu: 8 - name: bronze quotas: hardLimit: true requests: cpu: 900m memory: 300Mi + nvidia.com/gpu: 8 - name: default quotas: hardLimit: false requests: cpu: 0m memory: 0Mi + nvidia.com/gpu: 0 --- apiVersion: ibm.com/v1 kind: QuotaSubtree @@ -78,9 +85,11 @@ spec: requests: cpu: 1075m memory: 1045Mi + nvidia.com/gpu: 16 - name: default quotas: hardLimit: false requests: cpu: 0m memory: 0Mi + nvidia.com/gpu: 0 diff --git a/test/e2e-kuttl/quota-forest/09-assert.yaml b/test/e2e-kuttl/quota-forest/09-assert.yaml index fc13e7ed6..d7b5983db 100644 --- a/test/e2e-kuttl/quota-forest/09-assert.yaml +++ b/test/e2e-kuttl/quota-forest/09-assert.yaml @@ -1,4 +1,4 @@ -#Verify AppWrappers finished successfully +# Verify AppWrappers finished successfully --- apiVersion: mcad.ibm.com/v1beta1 kind: AppWrapper diff --git a/test/kuttl-test-borrowing.yaml b/test/kuttl-test-borrowing.yaml new file mode 100644 index 000000000..beb9995e2 --- /dev/null +++ b/test/kuttl-test-borrowing.yaml @@ -0,0 +1,9 @@ +apiVersion: kuttl.dev/v1beta1 +kind: TestSuite +testDirs: + - test/e2e-kuttl-borrowing/ +timeout: 60 +artifactsDir: _output/logs +commands: + - script: helm upgrade --install mcad-controller deployment/mcad-controller --namespace kube-system --wait --set loglevel=${LOG_LEVEL} --set resources.requests.cpu=1000m --set resources.requests.memory=1024Mi --set resources.limits.cpu=4000m --set resources.limits.memory=4096Mi --set image.repository=$IMAGE_REPOSITORY_MCAD --set image.tag=$IMAGE_TAG_MCAD --set image.pullPolicy=$MCAD_IMAGE_PULL_POLICY --set configMap.quotaEnabled='"true"' --set quotaManagement.rbac.apiGroup=ibm.com --set quotaManagement.rbac.resource=quotasubtrees --set configMap.name=mcad-controller-configmap --set configMap.preemptionEnabled='"true"' + - script: kubectl apply -f ${ROOT_DIR}/test/e2e-kuttl-borrowing/install-quota-subtree.yaml