From 714374e93fa6544e9f0a103b779a2c33ba3f5f7c Mon Sep 17 00:00:00 2001 From: Abhishek Malvankar Date: Fri, 16 Jun 2023 15:36:40 -0400 Subject: [PATCH 01/14] fix double counting when AWs dont have any pods running --- pkg/controller/queuejob/queuejob_controller_ex.go | 5 ++++- 1 file changed, 4 insertions(+), 1 deletion(-) diff --git a/pkg/controller/queuejob/queuejob_controller_ex.go b/pkg/controller/queuejob/queuejob_controller_ex.go index e303a2b69..06b18bae3 100644 --- a/pkg/controller/queuejob/queuejob_controller_ex.go +++ b/pkg/controller/queuejob/queuejob_controller_ex.go @@ -921,7 +921,10 @@ func (qjm *XController) getAggregatedAvailableResourcesPriority(unallocatedClust } for _, genericItem := range value.Spec.AggrResources.GenericItems { qjv, _ := genericresource.GetResources(&genericItem) - preemptable = preemptable.Add(qjv) + //only add resources when AWs are truly running + if value.Status.State == arbv1.AppWrapperStateActive && value.Status.QueueJobState == arbv1.AppWrapperConditionType(arbv1.AppWrapperStateActive) { + preemptable = preemptable.Add(qjv) + } } continue From c81e05ba91eb08c64e74ee3eaf1bb13bdfa96455 Mon Sep 17 00:00:00 2001 From: Abhishek Malvankar Date: Mon, 19 Jun 2023 14:13:56 -0400 Subject: [PATCH 02/14] use proposed preemptions to dispatch --- pkg/controller/queuejob/queuejob_controller_ex.go | 8 ++++++-- 1 file changed, 6 insertions(+), 2 deletions(-) diff --git a/pkg/controller/queuejob/queuejob_controller_ex.go b/pkg/controller/queuejob/queuejob_controller_ex.go index 06b18bae3..2bf07197e 100644 --- a/pkg/controller/queuejob/queuejob_controller_ex.go +++ b/pkg/controller/queuejob/queuejob_controller_ex.go @@ -917,7 +917,10 @@ func (qjm *XController) getAggregatedAvailableResourcesPriority(unallocatedClust for _, resctrl := range qjm.qjobResControls { qjv := resctrl.GetAggregatedResources(value) - preemptable = preemptable.Add(qjv) + //only add resources when AWs are truly running + if value.Status.State == arbv1.AppWrapperStateActive && value.Status.QueueJobState == arbv1.AppWrapperConditionType(arbv1.AppWrapperStateActive) { + preemptable = preemptable.Add(qjv) + } } for _, genericItem := range value.Spec.AggrResources.GenericItems { qjv, _ := genericresource.GetResources(&genericItem) @@ -1262,7 +1265,8 @@ func (qjm *XController) ScheduleNext() { qjm.cache.GetUnallocatedResources(), priorityindex, qj, "") klog.Infof("[ScheduleNext] XQJ %s with resources %v to be scheduled on aggregated idle resources %v", qj.Name, aggqj, resources) - if aggqj.LessEqual(resources) && qjm.nodeChecks(qjm.cache.GetUnallocatedHistograms(), qj) { + // either AW fits inside a node with histogram check or has enough low priority AWs to dispatch + if aggqj.LessEqual(resources) && qjm.nodeChecks(qjm.cache.GetUnallocatedHistograms(), qj) || aggqj.LessEqual(resources) && len(proposedPreemptions) > 0 { // Now evaluate quota fits := true klog.V(4).Infof("[ScheduleNext] HOL available resourse successful check for %s at %s activeQ=%t Unsched=%t &qj=%p Version=%s Status=%+v due to quota limits", qj.Name, time.Now().Sub(HOLStartTime), qjm.qjqueue.IfExistActiveQ(qj), qjm.qjqueue.IfExistUnschedulableQ(qj), qj, qj.ResourceVersion, qj.Status) From ff6c59af1b39ba145ac33345e0e2d9316e5706f5 Mon Sep 17 00:00:00 2001 From: Abhishek Malvankar Date: Tue, 20 Jun 2023 12:39:00 -0400 Subject: [PATCH 03/14] perform histrogram checks prior to preemption or pending res eval --- .../queuejob/queuejob_controller_ex.go | 199 +++++++++--------- 1 file changed, 101 insertions(+), 98 deletions(-) diff --git a/pkg/controller/queuejob/queuejob_controller_ex.go b/pkg/controller/queuejob/queuejob_controller_ex.go index 2bf07197e..205c53649 100644 --- a/pkg/controller/queuejob/queuejob_controller_ex.go +++ b/pkg/controller/queuejob/queuejob_controller_ex.go @@ -1266,113 +1266,116 @@ func (qjm *XController) ScheduleNext() { klog.Infof("[ScheduleNext] XQJ %s with resources %v to be scheduled on aggregated idle resources %v", qj.Name, aggqj, resources) // either AW fits inside a node with histogram check or has enough low priority AWs to dispatch - if aggqj.LessEqual(resources) && qjm.nodeChecks(qjm.cache.GetUnallocatedHistograms(), qj) || aggqj.LessEqual(resources) && len(proposedPreemptions) > 0 { - // Now evaluate quota - fits := true - klog.V(4).Infof("[ScheduleNext] HOL available resourse successful check for %s at %s activeQ=%t Unsched=%t &qj=%p Version=%s Status=%+v due to quota limits", qj.Name, time.Now().Sub(HOLStartTime), qjm.qjqueue.IfExistActiveQ(qj), qjm.qjqueue.IfExistUnschedulableQ(qj), qj, qj.ResourceVersion, qj.Status) - if qjm.serverOption.QuotaEnabled { - if qjm.quotaManager != nil { - // Quota tree design: - // - All AppWrappers without quota submission will consume quota from the 'default' node. - // - All quota trees in the system should have a 'default' node so AppWrappers without - // quota specification can be dispatched - // - If the AppWrapper doesn't have a quota label, then one is added for every tree with the 'default' value - // - Depending on how the 'default' node is configured, AppWrappers that don't specify quota could be - // preemptable by default (e.g., 'default' node with 'cpu: 0m' and 'memory: 0Mi' quota and 'hardLimit: false' - // such node borrows quota from other nodes already in the system) - apiCacheAWJob, err := qjm.queueJobLister.AppWrappers(qj.Namespace).Get(qj.Name) - if err != nil { - klog.Errorf("[ScheduleNext] Failed to get AppWrapper from API Cache %v/%v: %v", - qj.Namespace, qj.Name, err) - continue - } - allTrees := qjm.quotaManager.GetValidQuotaLabels() - newLabels := make(map[string]string) - for key, value := range apiCacheAWJob.Labels { - newLabels[key] = value - } - updateLabels := false - for _, treeName := range allTrees { - if _, quotaSetForAW := newLabels[treeName]; !quotaSetForAW { - newLabels[treeName] = "default" - updateLabels = true + if qjm.nodeChecks(qjm.cache.GetUnallocatedHistograms(), qj) { + + if aggqj.LessEqual(resources) || aggqj.LessEqual(resources) && len(proposedPreemptions) > 0 { + // Now evaluate quota + fits := true + klog.V(4).Infof("[ScheduleNext] HOL available resourse successful check for %s at %s activeQ=%t Unsched=%t &qj=%p Version=%s Status=%+v due to quota limits", qj.Name, time.Now().Sub(HOLStartTime), qjm.qjqueue.IfExistActiveQ(qj), qjm.qjqueue.IfExistUnschedulableQ(qj), qj, qj.ResourceVersion, qj.Status) + if qjm.serverOption.QuotaEnabled { + if qjm.quotaManager != nil { + // Quota tree design: + // - All AppWrappers without quota submission will consume quota from the 'default' node. + // - All quota trees in the system should have a 'default' node so AppWrappers without + // quota specification can be dispatched + // - If the AppWrapper doesn't have a quota label, then one is added for every tree with the 'default' value + // - Depending on how the 'default' node is configured, AppWrappers that don't specify quota could be + // preemptable by default (e.g., 'default' node with 'cpu: 0m' and 'memory: 0Mi' quota and 'hardLimit: false' + // such node borrows quota from other nodes already in the system) + apiCacheAWJob, err := qjm.queueJobLister.AppWrappers(qj.Namespace).Get(qj.Name) + if err != nil { + klog.Errorf("[ScheduleNext] Failed to get AppWrapper from API Cache %v/%v: %v", + qj.Namespace, qj.Name, err) + continue } - } - if updateLabels { - apiCacheAWJob.SetLabels(newLabels) - if err := qjm.updateEtcd(apiCacheAWJob, "ScheduleNext - setDefaultQuota"); err == nil { - klog.V(3).Infof("[ScheduleNext] Default quota added to AW %v", qj.Name) - } else { - klog.V(3).Infof("[ScheduleNext] Failed to added default quota to AW %v, skipping dispatch of AW", qj.Name) - return + allTrees := qjm.quotaManager.GetValidQuotaLabels() + newLabels := make(map[string]string) + for key, value := range apiCacheAWJob.Labels { + newLabels[key] = value } - } - var msg string - var preemptAWs []*arbv1.AppWrapper - quotaFits, preemptAWs, msg = qjm.quotaManager.Fits(qj, aggqj, proposedPreemptions) - if quotaFits { - klog.Infof("[ScheduleNext] HOL quota evaluation successful %s for %s activeQ=%t Unsched=%t &qj=%p Version=%s Status=%+v due to quota limits", qj.Name, time.Now().Sub(HOLStartTime), qjm.qjqueue.IfExistActiveQ(qj), qjm.qjqueue.IfExistUnschedulableQ(qj), qj, qj.ResourceVersion, qj.Status) - // Set any jobs that are marked for preemption - qjm.preemptAWJobs(preemptAWs) - } else { // Not enough free quota to dispatch appwrapper - dispatchFailedMessage = "Insufficient quota to dispatch AppWrapper." - if len(msg) > 0 { - dispatchFailedReason += " " - dispatchFailedReason += msg + updateLabels := false + for _, treeName := range allTrees { + if _, quotaSetForAW := newLabels[treeName]; !quotaSetForAW { + newLabels[treeName] = "default" + updateLabels = true + } } - klog.V(3).Infof("[ScheduleNext] HOL Blocking by %s for %s activeQ=%t Unsched=%t &qj=%p Version=%s Status=%+v msg=%s, due to quota limits", - qj.Name, time.Now().Sub(HOLStartTime), qjm.qjqueue.IfExistActiveQ(qj), qjm.qjqueue.IfExistUnschedulableQ(qj), qj, qj.ResourceVersion, qj.Status, msg) + if updateLabels { + apiCacheAWJob.SetLabels(newLabels) + if err := qjm.updateEtcd(apiCacheAWJob, "ScheduleNext - setDefaultQuota"); err == nil { + klog.V(3).Infof("[ScheduleNext] Default quota added to AW %v", qj.Name) + } else { + klog.V(3).Infof("[ScheduleNext] Failed to added default quota to AW %v, skipping dispatch of AW", qj.Name) + return + } + } + var msg string + var preemptAWs []*arbv1.AppWrapper + quotaFits, preemptAWs, msg = qjm.quotaManager.Fits(qj, aggqj, proposedPreemptions) + if quotaFits { + klog.Infof("[ScheduleNext] HOL quota evaluation successful %s for %s activeQ=%t Unsched=%t &qj=%p Version=%s Status=%+v due to quota limits", qj.Name, time.Now().Sub(HOLStartTime), qjm.qjqueue.IfExistActiveQ(qj), qjm.qjqueue.IfExistUnschedulableQ(qj), qj, qj.ResourceVersion, qj.Status) + // Set any jobs that are marked for preemption + qjm.preemptAWJobs(preemptAWs) + } else { // Not enough free quota to dispatch appwrapper + dispatchFailedMessage = "Insufficient quota to dispatch AppWrapper." + if len(msg) > 0 { + dispatchFailedReason += " " + dispatchFailedReason += msg + } + klog.V(3).Infof("[ScheduleNext] HOL Blocking by %s for %s activeQ=%t Unsched=%t &qj=%p Version=%s Status=%+v msg=%s, due to quota limits", + qj.Name, time.Now().Sub(HOLStartTime), qjm.qjqueue.IfExistActiveQ(qj), qjm.qjqueue.IfExistUnschedulableQ(qj), qj, qj.ResourceVersion, qj.Status, msg) + } + fits = quotaFits + } else { + fits = false + //Quota manager not initialized + dispatchFailedMessage = "Quota evaluation is enabled but not initialized. Insufficient quota to dispatch AppWrapper." + klog.Errorf("[ScheduleNext] Quota evaluation is enabled but not initialized. AppWrapper %s/%s does not have enough quota\n", qj.Name, qj.Namespace) } - fits = quotaFits } else { - fits = false - //Quota manager not initialized - dispatchFailedMessage = "Quota evaluation is enabled but not initialized. Insufficient quota to dispatch AppWrapper." - klog.Errorf("[ScheduleNext] Quota evaluation is enabled but not initialized. AppWrapper %s/%s does not have enough quota\n", qj.Name, qj.Namespace) + klog.V(4).Infof("[ScheduleNext] HOL quota evaluation not enabled for %s at %s activeQ=%t Unsched=%t &qj=%p Version=%s Status=%+v", qj.Name, time.Now().Sub(HOLStartTime), qjm.qjqueue.IfExistActiveQ(qj), qjm.qjqueue.IfExistUnschedulableQ(qj), qj, qj.ResourceVersion, qj.Status) } - } else { - klog.V(4).Infof("[ScheduleNext] HOL quota evaluation not enabled for %s at %s activeQ=%t Unsched=%t &qj=%p Version=%s Status=%+v", qj.Name, time.Now().Sub(HOLStartTime), qjm.qjqueue.IfExistActiveQ(qj), qjm.qjqueue.IfExistUnschedulableQ(qj), qj, qj.ResourceVersion, qj.Status) - } - // If quota evalauation sucedeed or quota evaluation not enabled set the appwrapper to be dispatched - if fits { - // aw is ready to go! - apiQueueJob, e := qjm.queueJobLister.AppWrappers(qj.Namespace).Get(qj.Name) - // apiQueueJob's ControllerFirstTimestamp is only microsecond level instead of nanosecond level - if e != nil { - klog.Errorf("[ScheduleNext] Unable to get AW %s from API cache &aw=%p Version=%s Status=%+v err=%#v", qj.Name, qj, qj.ResourceVersion, qj.Status, err) - if qjm.quotaManager != nil && quotaFits { - //Quota was allocated for this appwrapper, release it. - qjm.quotaManager.Release(qj) + // If quota evalauation sucedeed or quota evaluation not enabled set the appwrapper to be dispatched + if fits { + // aw is ready to go! + apiQueueJob, e := qjm.queueJobLister.AppWrappers(qj.Namespace).Get(qj.Name) + // apiQueueJob's ControllerFirstTimestamp is only microsecond level instead of nanosecond level + if e != nil { + klog.Errorf("[ScheduleNext] Unable to get AW %s from API cache &aw=%p Version=%s Status=%+v err=%#v", qj.Name, qj, qj.ResourceVersion, qj.Status, err) + if qjm.quotaManager != nil && quotaFits { + //Quota was allocated for this appwrapper, release it. + qjm.quotaManager.Release(qj) + } + return } - return - } - // make sure qj has the latest information - if larger(apiQueueJob.ResourceVersion, qj.ResourceVersion) { - klog.V(4).Infof("[ScheduleNext] %s found more recent copy from cache &qj=%p qj=%+v", qj.Name, qj, qj) - klog.V(4).Infof("[ScheduleNext] %s found more recent copy from cache &apiQueueJob=%p apiQueueJob=%+v", apiQueueJob.Name, apiQueueJob, apiQueueJob) - apiQueueJob.DeepCopyInto(qj) - } - desired := int32(0) - for i, ar := range qj.Spec.AggrResources.Items { - desired += ar.Replicas - qj.Spec.AggrResources.Items[i].AllocatedReplicas = ar.Replicas - } - qj.Status.CanRun = true - qj.Status.FilterIgnore = true // update CanRun & Spec. no need to trigger event - // Handle k8s watch race condition - if err := qjm.updateEtcd(qj, "ScheduleNext - setCanRun"); err == nil { - // add to eventQueue for dispatching to Etcd - if err = qjm.eventQueue.Add(qj); err != nil { // unsuccessful add to eventQueue, add back to activeQ - klog.Errorf("[ScheduleNext] Fail to add %s to eventQueue, activeQ.Add_toSchedulingQueue &qj=%p Version=%s Status=%+v err=%#v", qj.Name, qj, qj.ResourceVersion, qj.Status, err) - qjm.qjqueue.MoveToActiveQueueIfExists(qj) - } else { // successful add to eventQueue, remove from qjqueue - qjm.qjqueue.Delete(qj) - forwarded = true - klog.V(4).Infof("[ScheduleNext] %s Delay=%.6f seconds eventQueue.Add_afterHeadOfLine activeQ=%t, Unsched=%t &aw=%p Version=%s Status=%+v", qj.Name, time.Now().Sub(qj.Status.ControllerFirstTimestamp.Time).Seconds(), qjm.qjqueue.IfExistActiveQ(qj), qjm.qjqueue.IfExistUnschedulableQ(qj), qj, qj.ResourceVersion, qj.Status) + // make sure qj has the latest information + if larger(apiQueueJob.ResourceVersion, qj.ResourceVersion) { + klog.V(4).Infof("[ScheduleNext] %s found more recent copy from cache &qj=%p qj=%+v", qj.Name, qj, qj) + klog.V(4).Infof("[ScheduleNext] %s found more recent copy from cache &apiQueueJob=%p apiQueueJob=%+v", apiQueueJob.Name, apiQueueJob, apiQueueJob) + apiQueueJob.DeepCopyInto(qj) } - } //updateEtcd - } //fits + desired := int32(0) + for i, ar := range qj.Spec.AggrResources.Items { + desired += ar.Replicas + qj.Spec.AggrResources.Items[i].AllocatedReplicas = ar.Replicas + } + qj.Status.CanRun = true + qj.Status.FilterIgnore = true // update CanRun & Spec. no need to trigger event + // Handle k8s watch race condition + if err := qjm.updateEtcd(qj, "ScheduleNext - setCanRun"); err == nil { + // add to eventQueue for dispatching to Etcd + if err = qjm.eventQueue.Add(qj); err != nil { // unsuccessful add to eventQueue, add back to activeQ + klog.Errorf("[ScheduleNext] Fail to add %s to eventQueue, activeQ.Add_toSchedulingQueue &qj=%p Version=%s Status=%+v err=%#v", qj.Name, qj, qj.ResourceVersion, qj.Status, err) + qjm.qjqueue.MoveToActiveQueueIfExists(qj) + } else { // successful add to eventQueue, remove from qjqueue + qjm.qjqueue.Delete(qj) + forwarded = true + klog.V(4).Infof("[ScheduleNext] %s Delay=%.6f seconds eventQueue.Add_afterHeadOfLine activeQ=%t, Unsched=%t &aw=%p Version=%s Status=%+v", qj.Name, time.Now().Sub(qj.Status.ControllerFirstTimestamp.Time).Seconds(), qjm.qjqueue.IfExistActiveQ(qj), qjm.qjqueue.IfExistUnschedulableQ(qj), qj, qj.ResourceVersion, qj.Status) + } + } //updateEtcd + } //fits + } } else { // Not enough free resources to dispatch HOL dispatchFailedMessage = "Insufficient resources to dispatch AppWrapper." klog.V(4).Infof("[ScheduleNext] HOL Blocking by %s for %s activeQ=%t Unsched=%t &qj=%p Version=%s Status=%+v", qj.Name, time.Now().Sub(HOLStartTime), qjm.qjqueue.IfExistActiveQ(qj), qjm.qjqueue.IfExistUnschedulableQ(qj), qj, qj.ResourceVersion, qj.Status) From 6f351aaed56fbef7787b7bfe292e56a41509306c Mon Sep 17 00:00:00 2001 From: Abhishek Malvankar Date: Wed, 21 Jun 2023 15:15:35 -0400 Subject: [PATCH 04/14] remove histograms --- pkg/apis/controller/v1beta1/appwrapper.go | 8 + .../queuejob/queuejob_controller_ex.go | 221 +++++++++--------- pkg/controller/queuejobresources/pod/pod.go | 5 + 3 files changed, 119 insertions(+), 115 deletions(-) diff --git a/pkg/apis/controller/v1beta1/appwrapper.go b/pkg/apis/controller/v1beta1/appwrapper.go index f5302b3d7..01bb87b32 100644 --- a/pkg/apis/controller/v1beta1/appwrapper.go +++ b/pkg/apis/controller/v1beta1/appwrapper.go @@ -263,6 +263,14 @@ type AppWrapperStatus struct { // Represents the latest available observations of pods under appwrapper PendingPodConditions []PendingPodSpec `json:"pendingpodconditions"` + + //Resources consumed + + TotalCPU float64 `json:"totalcpu,omitempty"` + + TotalMemory float64 `json:"totalmemory,omitempty"` + + TotalGPU int64 `json:"totalgpu,omitempty"` } type AppWrapperState string diff --git a/pkg/controller/queuejob/queuejob_controller_ex.go b/pkg/controller/queuejob/queuejob_controller_ex.go index 205c53649..3c8b760cc 100644 --- a/pkg/controller/queuejob/queuejob_controller_ex.go +++ b/pkg/controller/queuejob/queuejob_controller_ex.go @@ -915,20 +915,11 @@ func (qjm *XController) getAggregatedAvailableResourcesPriority(unallocatedClust klog.V(10).Infof("[getAggAvaiResPri] %s: Added %s to candidate preemptable job with priority %f.", time.Now().String(), value.Name, value.Status.SystemPriority) } - for _, resctrl := range qjm.qjobResControls { - qjv := resctrl.GetAggregatedResources(value) - //only add resources when AWs are truly running - if value.Status.State == arbv1.AppWrapperStateActive && value.Status.QueueJobState == arbv1.AppWrapperConditionType(arbv1.AppWrapperStateActive) { - preemptable = preemptable.Add(qjv) - } - } - for _, genericItem := range value.Spec.AggrResources.GenericItems { - qjv, _ := genericresource.GetResources(&genericItem) - //only add resources when AWs are truly running - if value.Status.State == arbv1.AppWrapperStateActive && value.Status.QueueJobState == arbv1.AppWrapperConditionType(arbv1.AppWrapperStateActive) { - preemptable = preemptable.Add(qjv) - } - } + totalResource := clusterstateapi.EmptyResource() + totalResource.GPU = value.Status.TotalGPU + totalResource.MilliCPU = value.Status.TotalCPU + totalResource.Memory = value.Status.TotalMemory + preemptable = preemptable.Add(totalResource) continue } else if qjm.isDispatcher { @@ -1265,117 +1256,117 @@ func (qjm *XController) ScheduleNext() { qjm.cache.GetUnallocatedResources(), priorityindex, qj, "") klog.Infof("[ScheduleNext] XQJ %s with resources %v to be scheduled on aggregated idle resources %v", qj.Name, aggqj, resources) - // either AW fits inside a node with histogram check or has enough low priority AWs to dispatch + // Assume preemption will remove low priroity AWs in the system, optimistically dispatch such AWs if qjm.nodeChecks(qjm.cache.GetUnallocatedHistograms(), qj) { - - if aggqj.LessEqual(resources) || aggqj.LessEqual(resources) && len(proposedPreemptions) > 0 { - // Now evaluate quota - fits := true - klog.V(4).Infof("[ScheduleNext] HOL available resourse successful check for %s at %s activeQ=%t Unsched=%t &qj=%p Version=%s Status=%+v due to quota limits", qj.Name, time.Now().Sub(HOLStartTime), qjm.qjqueue.IfExistActiveQ(qj), qjm.qjqueue.IfExistUnschedulableQ(qj), qj, qj.ResourceVersion, qj.Status) - if qjm.serverOption.QuotaEnabled { - if qjm.quotaManager != nil { - // Quota tree design: - // - All AppWrappers without quota submission will consume quota from the 'default' node. - // - All quota trees in the system should have a 'default' node so AppWrappers without - // quota specification can be dispatched - // - If the AppWrapper doesn't have a quota label, then one is added for every tree with the 'default' value - // - Depending on how the 'default' node is configured, AppWrappers that don't specify quota could be - // preemptable by default (e.g., 'default' node with 'cpu: 0m' and 'memory: 0Mi' quota and 'hardLimit: false' - // such node borrows quota from other nodes already in the system) - apiCacheAWJob, err := qjm.queueJobLister.AppWrappers(qj.Namespace).Get(qj.Name) - if err != nil { - klog.Errorf("[ScheduleNext] Failed to get AppWrapper from API Cache %v/%v: %v", - qj.Namespace, qj.Name, err) - continue - } - allTrees := qjm.quotaManager.GetValidQuotaLabels() - newLabels := make(map[string]string) - for key, value := range apiCacheAWJob.Labels { - newLabels[key] = value - } - updateLabels := false - for _, treeName := range allTrees { - if _, quotaSetForAW := newLabels[treeName]; !quotaSetForAW { - newLabels[treeName] = "default" - updateLabels = true - } + klog.V(4).Infof("[ScheduleNext] Optimistic dispatch for AW %v in namespace %v requesting aggregated resources %v histogram for point in-time fragmented resources are available in the cluster %v", qj.Name, qj.Namespace, qjm.GetAggregatedResources(qj), qjm.cache.GetUnallocatedHistograms()) + } + if aggqj.LessEqual(resources) { + // Now evaluate quota + fits := true + klog.V(4).Infof("[ScheduleNext] HOL available resourse successful check for %s at %s activeQ=%t Unsched=%t &qj=%p Version=%s Status=%+v due to quota limits", qj.Name, time.Now().Sub(HOLStartTime), qjm.qjqueue.IfExistActiveQ(qj), qjm.qjqueue.IfExistUnschedulableQ(qj), qj, qj.ResourceVersion, qj.Status) + if qjm.serverOption.QuotaEnabled { + if qjm.quotaManager != nil { + // Quota tree design: + // - All AppWrappers without quota submission will consume quota from the 'default' node. + // - All quota trees in the system should have a 'default' node so AppWrappers without + // quota specification can be dispatched + // - If the AppWrapper doesn't have a quota label, then one is added for every tree with the 'default' value + // - Depending on how the 'default' node is configured, AppWrappers that don't specify quota could be + // preemptable by default (e.g., 'default' node with 'cpu: 0m' and 'memory: 0Mi' quota and 'hardLimit: false' + // such node borrows quota from other nodes already in the system) + apiCacheAWJob, err := qjm.queueJobLister.AppWrappers(qj.Namespace).Get(qj.Name) + if err != nil { + klog.Errorf("[ScheduleNext] Failed to get AppWrapper from API Cache %v/%v: %v", + qj.Namespace, qj.Name, err) + continue + } + allTrees := qjm.quotaManager.GetValidQuotaLabels() + newLabels := make(map[string]string) + for key, value := range apiCacheAWJob.Labels { + newLabels[key] = value + } + updateLabels := false + for _, treeName := range allTrees { + if _, quotaSetForAW := newLabels[treeName]; !quotaSetForAW { + newLabels[treeName] = "default" + updateLabels = true } - if updateLabels { - apiCacheAWJob.SetLabels(newLabels) - if err := qjm.updateEtcd(apiCacheAWJob, "ScheduleNext - setDefaultQuota"); err == nil { - klog.V(3).Infof("[ScheduleNext] Default quota added to AW %v", qj.Name) - } else { - klog.V(3).Infof("[ScheduleNext] Failed to added default quota to AW %v, skipping dispatch of AW", qj.Name) - return - } + } + if updateLabels { + apiCacheAWJob.SetLabels(newLabels) + if err := qjm.updateEtcd(apiCacheAWJob, "ScheduleNext - setDefaultQuota"); err == nil { + klog.V(3).Infof("[ScheduleNext] Default quota added to AW %v", qj.Name) + } else { + klog.V(3).Infof("[ScheduleNext] Failed to added default quota to AW %v, skipping dispatch of AW", qj.Name) + return } - var msg string - var preemptAWs []*arbv1.AppWrapper - quotaFits, preemptAWs, msg = qjm.quotaManager.Fits(qj, aggqj, proposedPreemptions) - if quotaFits { - klog.Infof("[ScheduleNext] HOL quota evaluation successful %s for %s activeQ=%t Unsched=%t &qj=%p Version=%s Status=%+v due to quota limits", qj.Name, time.Now().Sub(HOLStartTime), qjm.qjqueue.IfExistActiveQ(qj), qjm.qjqueue.IfExistUnschedulableQ(qj), qj, qj.ResourceVersion, qj.Status) - // Set any jobs that are marked for preemption - qjm.preemptAWJobs(preemptAWs) - } else { // Not enough free quota to dispatch appwrapper - dispatchFailedMessage = "Insufficient quota to dispatch AppWrapper." - if len(msg) > 0 { - dispatchFailedReason += " " - dispatchFailedReason += msg - } - klog.V(3).Infof("[ScheduleNext] HOL Blocking by %s for %s activeQ=%t Unsched=%t &qj=%p Version=%s Status=%+v msg=%s, due to quota limits", - qj.Name, time.Now().Sub(HOLStartTime), qjm.qjqueue.IfExistActiveQ(qj), qjm.qjqueue.IfExistUnschedulableQ(qj), qj, qj.ResourceVersion, qj.Status, msg) + } + var msg string + var preemptAWs []*arbv1.AppWrapper + quotaFits, preemptAWs, msg = qjm.quotaManager.Fits(qj, aggqj, proposedPreemptions) + if quotaFits { + klog.Infof("[ScheduleNext] HOL quota evaluation successful %s for %s activeQ=%t Unsched=%t &qj=%p Version=%s Status=%+v due to quota limits", qj.Name, time.Now().Sub(HOLStartTime), qjm.qjqueue.IfExistActiveQ(qj), qjm.qjqueue.IfExistUnschedulableQ(qj), qj, qj.ResourceVersion, qj.Status) + // Set any jobs that are marked for preemption + qjm.preemptAWJobs(preemptAWs) + } else { // Not enough free quota to dispatch appwrapper + dispatchFailedMessage = "Insufficient quota to dispatch AppWrapper." + if len(msg) > 0 { + dispatchFailedReason += " " + dispatchFailedReason += msg } - fits = quotaFits - } else { - fits = false - //Quota manager not initialized - dispatchFailedMessage = "Quota evaluation is enabled but not initialized. Insufficient quota to dispatch AppWrapper." - klog.Errorf("[ScheduleNext] Quota evaluation is enabled but not initialized. AppWrapper %s/%s does not have enough quota\n", qj.Name, qj.Namespace) + klog.V(3).Infof("[ScheduleNext] HOL Blocking by %s for %s activeQ=%t Unsched=%t &qj=%p Version=%s Status=%+v msg=%s, due to quota limits", + qj.Name, time.Now().Sub(HOLStartTime), qjm.qjqueue.IfExistActiveQ(qj), qjm.qjqueue.IfExistUnschedulableQ(qj), qj, qj.ResourceVersion, qj.Status, msg) } + fits = quotaFits } else { - klog.V(4).Infof("[ScheduleNext] HOL quota evaluation not enabled for %s at %s activeQ=%t Unsched=%t &qj=%p Version=%s Status=%+v", qj.Name, time.Now().Sub(HOLStartTime), qjm.qjqueue.IfExistActiveQ(qj), qjm.qjqueue.IfExistUnschedulableQ(qj), qj, qj.ResourceVersion, qj.Status) + fits = false + //Quota manager not initialized + dispatchFailedMessage = "Quota evaluation is enabled but not initialized. Insufficient quota to dispatch AppWrapper." + klog.Errorf("[ScheduleNext] Quota evaluation is enabled but not initialized. AppWrapper %s/%s does not have enough quota\n", qj.Name, qj.Namespace) } + } else { + klog.V(4).Infof("[ScheduleNext] HOL quota evaluation not enabled for %s at %s activeQ=%t Unsched=%t &qj=%p Version=%s Status=%+v", qj.Name, time.Now().Sub(HOLStartTime), qjm.qjqueue.IfExistActiveQ(qj), qjm.qjqueue.IfExistUnschedulableQ(qj), qj, qj.ResourceVersion, qj.Status) + } - // If quota evalauation sucedeed or quota evaluation not enabled set the appwrapper to be dispatched - if fits { - // aw is ready to go! - apiQueueJob, e := qjm.queueJobLister.AppWrappers(qj.Namespace).Get(qj.Name) - // apiQueueJob's ControllerFirstTimestamp is only microsecond level instead of nanosecond level - if e != nil { - klog.Errorf("[ScheduleNext] Unable to get AW %s from API cache &aw=%p Version=%s Status=%+v err=%#v", qj.Name, qj, qj.ResourceVersion, qj.Status, err) - if qjm.quotaManager != nil && quotaFits { - //Quota was allocated for this appwrapper, release it. - qjm.quotaManager.Release(qj) - } - return + // If quota evalauation sucedeed or quota evaluation not enabled set the appwrapper to be dispatched + if fits { + // aw is ready to go! + apiQueueJob, e := qjm.queueJobLister.AppWrappers(qj.Namespace).Get(qj.Name) + // apiQueueJob's ControllerFirstTimestamp is only microsecond level instead of nanosecond level + if e != nil { + klog.Errorf("[ScheduleNext] Unable to get AW %s from API cache &aw=%p Version=%s Status=%+v err=%#v", qj.Name, qj, qj.ResourceVersion, qj.Status, err) + if qjm.quotaManager != nil && quotaFits { + //Quota was allocated for this appwrapper, release it. + qjm.quotaManager.Release(qj) } - // make sure qj has the latest information - if larger(apiQueueJob.ResourceVersion, qj.ResourceVersion) { - klog.V(4).Infof("[ScheduleNext] %s found more recent copy from cache &qj=%p qj=%+v", qj.Name, qj, qj) - klog.V(4).Infof("[ScheduleNext] %s found more recent copy from cache &apiQueueJob=%p apiQueueJob=%+v", apiQueueJob.Name, apiQueueJob, apiQueueJob) - apiQueueJob.DeepCopyInto(qj) - } - desired := int32(0) - for i, ar := range qj.Spec.AggrResources.Items { - desired += ar.Replicas - qj.Spec.AggrResources.Items[i].AllocatedReplicas = ar.Replicas + return + } + // make sure qj has the latest information + if larger(apiQueueJob.ResourceVersion, qj.ResourceVersion) { + klog.V(4).Infof("[ScheduleNext] %s found more recent copy from cache &qj=%p qj=%+v", qj.Name, qj, qj) + klog.V(4).Infof("[ScheduleNext] %s found more recent copy from cache &apiQueueJob=%p apiQueueJob=%+v", apiQueueJob.Name, apiQueueJob, apiQueueJob) + apiQueueJob.DeepCopyInto(qj) + } + desired := int32(0) + for i, ar := range qj.Spec.AggrResources.Items { + desired += ar.Replicas + qj.Spec.AggrResources.Items[i].AllocatedReplicas = ar.Replicas + } + qj.Status.CanRun = true + qj.Status.FilterIgnore = true // update CanRun & Spec. no need to trigger event + // Handle k8s watch race condition + if err := qjm.updateEtcd(qj, "ScheduleNext - setCanRun"); err == nil { + // add to eventQueue for dispatching to Etcd + if err = qjm.eventQueue.Add(qj); err != nil { // unsuccessful add to eventQueue, add back to activeQ + klog.Errorf("[ScheduleNext] Fail to add %s to eventQueue, activeQ.Add_toSchedulingQueue &qj=%p Version=%s Status=%+v err=%#v", qj.Name, qj, qj.ResourceVersion, qj.Status, err) + qjm.qjqueue.MoveToActiveQueueIfExists(qj) + } else { // successful add to eventQueue, remove from qjqueue + qjm.qjqueue.Delete(qj) + forwarded = true + klog.V(4).Infof("[ScheduleNext] %s Delay=%.6f seconds eventQueue.Add_afterHeadOfLine activeQ=%t, Unsched=%t &aw=%p Version=%s Status=%+v", qj.Name, time.Now().Sub(qj.Status.ControllerFirstTimestamp.Time).Seconds(), qjm.qjqueue.IfExistActiveQ(qj), qjm.qjqueue.IfExistUnschedulableQ(qj), qj, qj.ResourceVersion, qj.Status) } - qj.Status.CanRun = true - qj.Status.FilterIgnore = true // update CanRun & Spec. no need to trigger event - // Handle k8s watch race condition - if err := qjm.updateEtcd(qj, "ScheduleNext - setCanRun"); err == nil { - // add to eventQueue for dispatching to Etcd - if err = qjm.eventQueue.Add(qj); err != nil { // unsuccessful add to eventQueue, add back to activeQ - klog.Errorf("[ScheduleNext] Fail to add %s to eventQueue, activeQ.Add_toSchedulingQueue &qj=%p Version=%s Status=%+v err=%#v", qj.Name, qj, qj.ResourceVersion, qj.Status, err) - qjm.qjqueue.MoveToActiveQueueIfExists(qj) - } else { // successful add to eventQueue, remove from qjqueue - qjm.qjqueue.Delete(qj) - forwarded = true - klog.V(4).Infof("[ScheduleNext] %s Delay=%.6f seconds eventQueue.Add_afterHeadOfLine activeQ=%t, Unsched=%t &aw=%p Version=%s Status=%+v", qj.Name, time.Now().Sub(qj.Status.ControllerFirstTimestamp.Time).Seconds(), qjm.qjqueue.IfExistActiveQ(qj), qjm.qjqueue.IfExistUnschedulableQ(qj), qj, qj.ResourceVersion, qj.Status) - } - } //updateEtcd - } //fits - } + } //updateEtcd + } //fits } else { // Not enough free resources to dispatch HOL dispatchFailedMessage = "Insufficient resources to dispatch AppWrapper." klog.V(4).Infof("[ScheduleNext] HOL Blocking by %s for %s activeQ=%t Unsched=%t &qj=%p Version=%s Status=%+v", qj.Name, time.Now().Sub(HOLStartTime), qjm.qjqueue.IfExistActiveQ(qj), qjm.qjqueue.IfExistUnschedulableQ(qj), qj, qj.ResourceVersion, qj.Status) diff --git a/pkg/controller/queuejobresources/pod/pod.go b/pkg/controller/queuejobresources/pod/pod.go index 97caaf42f..a17d1b4a2 100644 --- a/pkg/controller/queuejobresources/pod/pod.go +++ b/pkg/controller/queuejobresources/pod/pod.go @@ -243,6 +243,7 @@ func (qjrPod *QueueJobResPod) UpdateQueueJobStatus(queuejob *arbv1.AppWrapper) e } running := int32(queuejobresources.FilterPods(pods, v1.PodRunning)) + totalResourcesConsumed := queuejobresources.GetPodResourcesByPhase(v1.PodRunning, pods) pending := int32(queuejobresources.FilterPods(pods, v1.PodPending)) succeeded := int32(queuejobresources.FilterPods(pods, v1.PodSucceeded)) failed := int32(queuejobresources.FilterPods(pods, v1.PodFailed)) @@ -254,6 +255,10 @@ func (qjrPod *QueueJobResPod) UpdateQueueJobStatus(queuejob *arbv1.AppWrapper) e queuejob.Status.Running = running queuejob.Status.Succeeded = succeeded queuejob.Status.Failed = failed + //Total resources by all running pods + queuejob.Status.TotalGPU = totalResourcesConsumed.GPU + queuejob.Status.TotalCPU = totalResourcesConsumed.MilliCPU + queuejob.Status.TotalMemory = totalResourcesConsumed.Memory queuejob.Status.PendingPodConditions = nil for podName, cond := range podsConditionMap { From afe6397b9c3b2210612d5c213acd52ee901b83c7 Mon Sep 17 00:00:00 2001 From: Abhishek Malvankar Date: Wed, 21 Jun 2023 15:45:18 -0400 Subject: [PATCH 05/14] negate nodechecks histogram --- pkg/controller/queuejob/queuejob_controller_ex.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/pkg/controller/queuejob/queuejob_controller_ex.go b/pkg/controller/queuejob/queuejob_controller_ex.go index 3c8b760cc..9f6b005f4 100644 --- a/pkg/controller/queuejob/queuejob_controller_ex.go +++ b/pkg/controller/queuejob/queuejob_controller_ex.go @@ -1257,7 +1257,7 @@ func (qjm *XController) ScheduleNext() { klog.Infof("[ScheduleNext] XQJ %s with resources %v to be scheduled on aggregated idle resources %v", qj.Name, aggqj, resources) // Assume preemption will remove low priroity AWs in the system, optimistically dispatch such AWs - if qjm.nodeChecks(qjm.cache.GetUnallocatedHistograms(), qj) { + if !qjm.nodeChecks(qjm.cache.GetUnallocatedHistograms(), qj) { klog.V(4).Infof("[ScheduleNext] Optimistic dispatch for AW %v in namespace %v requesting aggregated resources %v histogram for point in-time fragmented resources are available in the cluster %v", qj.Name, qj.Namespace, qjm.GetAggregatedResources(qj), qjm.cache.GetUnallocatedHistograms()) } if aggqj.LessEqual(resources) { From 2017cfc9a7594045de970708ec9632160fbfb4fa Mon Sep 17 00:00:00 2001 From: Abhishek Malvankar Date: Fri, 23 Jun 2023 16:34:56 -0400 Subject: [PATCH 06/14] fix double counting for pending AWs --- .../queuejob/queuejob_controller_ex.go | 43 ++++++++----------- 1 file changed, 19 insertions(+), 24 deletions(-) diff --git a/pkg/controller/queuejob/queuejob_controller_ex.go b/pkg/controller/queuejob/queuejob_controller_ex.go index 9f6b005f4..990bf30f0 100644 --- a/pkg/controller/queuejob/queuejob_controller_ex.go +++ b/pkg/controller/queuejob/queuejob_controller_ex.go @@ -869,6 +869,17 @@ func (qjm *XController) getDispatchedAppWrappers() (map[string]*clusterstateapi. return awrRetVal, awsRetVal } +func (qjm *XController) addTotalSnapshotResourcesConsumedByAw(totalgpu int64, totalcpu float64, totalmemory float64) *clusterstateapi.Resource { + + totalResource := clusterstateapi.EmptyResource() + totalResource.GPU = totalgpu + totalResource.MilliCPU = totalcpu + totalResource.Memory = totalmemory + + return totalResource + +} + func (qjm *XController) getAggregatedAvailableResourcesPriority(unallocatedClusterResources *clusterstateapi. Resource, targetpr float64, requestingJob *arbv1.AppWrapper, agentId string) (*clusterstateapi.Resource, []*arbv1.AppWrapper) { r := unallocatedClusterResources.Clone() @@ -915,10 +926,7 @@ func (qjm *XController) getAggregatedAvailableResourcesPriority(unallocatedClust klog.V(10).Infof("[getAggAvaiResPri] %s: Added %s to candidate preemptable job with priority %f.", time.Now().String(), value.Name, value.Status.SystemPriority) } - totalResource := clusterstateapi.EmptyResource() - totalResource.GPU = value.Status.TotalGPU - totalResource.MilliCPU = value.Status.TotalCPU - totalResource.Memory = value.Status.TotalMemory + totalResource := qjm.addTotalSnapshotResourcesConsumedByAw(value.Status.TotalGPU, value.Status.TotalCPU, value.Status.TotalMemory) preemptable = preemptable.Add(totalResource) continue @@ -931,30 +939,17 @@ func (qjm *XController) getAggregatedAvailableResourcesPriority(unallocatedClust continue } else if value.Status.State == arbv1.AppWrapperStateEnqueued { // Don't count the resources that can run but not yet realized (job orchestration pending or partially running). - for _, resctrl := range qjm.qjobResControls { - qjv := resctrl.GetAggregatedResources(value) - pending = pending.Add(qjv) - klog.V(10).Infof("[getAggAvaiResPri] Subtract all resources %+v in resctrlType=%T for job %s which can-run is set to: %v but state is still pending.", qjv, resctrl, value.Name, value.Status.CanRun) - } - for _, genericItem := range value.Spec.AggrResources.GenericItems { - qjv, _ := genericresource.GetResources(&genericItem) - pending = pending.Add(qjv) - klog.V(10).Infof("[getAggAvaiResPri] Subtract all resources %+v in resctrlType=%T for job %s which can-run is set to: %v but state is still pending.", qjv, genericItem, value.Name, value.Status.CanRun) - } + + totalResource := qjm.addTotalSnapshotResourcesConsumedByAw(value.Status.TotalGPU, value.Status.TotalCPU, value.Status.TotalMemory) + pending = pending.Add(totalResource) + continue } else if value.Status.State == arbv1.AppWrapperStateActive { if value.Status.Pending > 0 { //Don't count partially running jobs with pods still pending. - for _, resctrl := range qjm.qjobResControls { - qjv := resctrl.GetAggregatedResources(value) - pending = pending.Add(qjv) - klog.V(10).Infof("[getAggAvaiResPri] Subtract all resources %+v in resctrlType=%T for job %s which can-run is set to: %v and status set to: %s but %v pod(s) are pending.", qjv, resctrl, value.Name, value.Status.CanRun, value.Status.State, value.Status.Pending) - } - for _, genericItem := range value.Spec.AggrResources.GenericItems { - qjv, _ := genericresource.GetResources(&genericItem) - pending = pending.Add(qjv) - klog.V(10).Infof("[getAggAvaiResPri] Subtract all resources %+v in resctrlType=%T for job %s which can-run is set to: %v and status set to: %s but %v pod(s) are pending.", qjv, genericItem, value.Name, value.Status.CanRun, value.Status.State, value.Status.Pending) - } + totalResource := qjm.addTotalSnapshotResourcesConsumedByAw(value.Status.TotalGPU, value.Status.TotalCPU, value.Status.TotalMemory) + pending = pending.Add(totalResource) + } else { // TODO: Hack to handle race condition when Running jobs have not yet updated the pod counts (In-Flight AW Jobs) // This hack uses the golang struct implied behavior of defining the object without a value. In this case From 66a21c2df95e6b2bc2a759449e866c07ef40da63 Mon Sep 17 00:00:00 2001 From: Abhishek Malvankar Date: Sat, 24 Jun 2023 22:15:39 -0400 Subject: [PATCH 07/14] remove noisy logging --- pkg/controller/clusterstate/cache/cache.go | 24 +++++++++++----------- 1 file changed, 12 insertions(+), 12 deletions(-) diff --git a/pkg/controller/clusterstate/cache/cache.go b/pkg/controller/clusterstate/cache/cache.go index 4ef2f77ec..9ae976e0e 100644 --- a/pkg/controller/clusterstate/cache/cache.go +++ b/pkg/controller/clusterstate/cache/cache.go @@ -33,11 +33,12 @@ package cache import ( "context" "fmt" - "github.com/golang/protobuf/proto" - dto "github.com/prometheus/client_model/go" "sync" "time" + "github.com/golang/protobuf/proto" + dto "github.com/prometheus/client_model/go" + v1 "k8s.io/api/core/v1" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" "k8s.io/client-go/informers" @@ -210,7 +211,6 @@ func (sc *ClusterStateCache) GetUnallocatedResources() *api.Resource { return r.Add(sc.availableResources) } - func (sc *ClusterStateCache) GetUnallocatedHistograms() map[string]*dto.Metric { sc.Mutex.Lock() defer sc.Mutex.Unlock() @@ -238,7 +238,7 @@ func (sc *ClusterStateCache) GetResourceCapacities() *api.Resource { // Save the cluster state. func (sc *ClusterStateCache) saveState(available *api.Resource, capacity *api.Resource, - availableHistogram *api.ResourceHistogram) error { + availableHistogram *api.ResourceHistogram) error { klog.V(12).Infof("Saving Cluster State") sc.Mutex.Lock() @@ -261,7 +261,7 @@ func (sc *ClusterStateCache) updateState() error { idleMin := api.EmptyResource() idleMax := api.EmptyResource() - firstNode := true + firstNode := true for _, value := range cluster.Nodes { // Do not use any Unschedulable nodes in calculations if value.Unschedulable == true { @@ -277,12 +277,12 @@ func (sc *ClusterStateCache) updateState() error { // Collect Min and Max for histogram if firstNode { idleMin.MilliCPU = idle.MilliCPU - idleMin.Memory = idle.Memory - idleMin.GPU = idle.GPU + idleMin.Memory = idle.Memory + idleMin.GPU = idle.GPU idleMax.MilliCPU = idle.MilliCPU - idleMax.Memory = idle.Memory - idleMax.GPU = idle.GPU + idleMax.Memory = idle.Memory + idleMax.GPU = idle.GPU firstNode = false } else { if value.Idle.MilliCPU < idleMin.MilliCPU { @@ -293,7 +293,7 @@ func (sc *ClusterStateCache) updateState() error { if value.Idle.Memory < idleMin.Memory { idleMin.Memory = value.Idle.Memory - } else if value.Idle.Memory > idleMax.Memory{ + } else if value.Idle.Memory > idleMax.Memory { idleMax.Memory = value.Idle.Memory } @@ -354,7 +354,7 @@ func (sc *ClusterStateCache) processCleanupJob() error { if api.JobTerminated(job) { delete(sc.Jobs, job.UID) - klog.V(3).Infof("[processCleanupJob] Job <%v:%v/%v> was deleted.", job.UID, job.Namespace, job.Name) + klog.V(10).Infof("[processCleanupJob] Job <%v:%v/%v> was deleted.", job.UID, job.Namespace, job.Name) } else { // Retry sc.deleteJob(job) @@ -432,7 +432,7 @@ func (sc *ClusterStateCache) Snapshot() *api.ClusterInfo { // If no scheduling spec, does not handle it. if value.SchedSpec == nil { // Jobs.Tasks are more recognizable than Jobs.UID - klog.V(5).Infof("The scheduling spec of Job <%v> with tasks <%+v> is nil, ignore it.", value.UID, value.Tasks) + klog.V(10).Infof("The scheduling spec of Job <%v> with tasks <%+v> is nil, ignore it.", value.UID, value.Tasks) continue } From 09c38527fdfd932415cb6a905f298a2cc77edebe Mon Sep 17 00:00:00 2001 From: Abhishek Malvankar Date: Sat, 24 Jun 2023 23:01:41 -0400 Subject: [PATCH 08/14] print optimistic dispatch msg when preempt res acquired --- pkg/controller/queuejob/queuejob_controller_ex.go | 9 ++++++--- 1 file changed, 6 insertions(+), 3 deletions(-) diff --git a/pkg/controller/queuejob/queuejob_controller_ex.go b/pkg/controller/queuejob/queuejob_controller_ex.go index 990bf30f0..d7af5f30a 100644 --- a/pkg/controller/queuejob/queuejob_controller_ex.go +++ b/pkg/controller/queuejob/queuejob_controller_ex.go @@ -41,6 +41,7 @@ import ( "strings" "time" + "github.com/gogo/protobuf/proto" qmutils "github.com/project-codeflare/multi-cluster-app-dispatcher/pkg/quotaplugins/util" "github.com/project-codeflare/multi-cluster-app-dispatcher/pkg/controller/quota/quotaforestmanager" @@ -1252,10 +1253,12 @@ func (qjm *XController) ScheduleNext() { klog.Infof("[ScheduleNext] XQJ %s with resources %v to be scheduled on aggregated idle resources %v", qj.Name, aggqj, resources) // Assume preemption will remove low priroity AWs in the system, optimistically dispatch such AWs - if !qjm.nodeChecks(qjm.cache.GetUnallocatedHistograms(), qj) { - klog.V(4).Infof("[ScheduleNext] Optimistic dispatch for AW %v in namespace %v requesting aggregated resources %v histogram for point in-time fragmented resources are available in the cluster %v", qj.Name, qj.Namespace, qjm.GetAggregatedResources(qj), qjm.cache.GetUnallocatedHistograms()) - } + if aggqj.LessEqual(resources) { + unallocatedHistogramMap := qjm.cache.GetUnallocatedHistograms() + if !qjm.nodeChecks(unallocatedHistogramMap, qj) { + klog.V(4).Infof("[ScheduleNext] Optimistic dispatch for AW %v in namespace %v requesting aggregated resources %v histogram for point in-time fragmented resources are available in the cluster %s", qj.Name, qj.Namespace, qjm.GetAggregatedResources(qj), proto.MarshalTextString(unallocatedHistogramMap["gpu"])) + } // Now evaluate quota fits := true klog.V(4).Infof("[ScheduleNext] HOL available resourse successful check for %s at %s activeQ=%t Unsched=%t &qj=%p Version=%s Status=%+v due to quota limits", qj.Name, time.Now().Sub(HOLStartTime), qjm.qjqueue.IfExistActiveQ(qj), qjm.qjqueue.IfExistUnschedulableQ(qj), qj, qj.ResourceVersion, qj.Status) From 0040e4c88f5d54e3baae9c3434d9efadfc12dfee Mon Sep 17 00:00:00 2001 From: Abhishek Malvankar Date: Sun, 25 Jun 2023 16:16:21 -0400 Subject: [PATCH 09/14] release quota on completed AWs only when QM is enabled --- pkg/controller/queuejob/queuejob_controller_ex.go | 4 +++- 1 file changed, 3 insertions(+), 1 deletion(-) diff --git a/pkg/controller/queuejob/queuejob_controller_ex.go b/pkg/controller/queuejob/queuejob_controller_ex.go index d7af5f30a..07b5b1acb 100644 --- a/pkg/controller/queuejob/queuejob_controller_ex.go +++ b/pkg/controller/queuejob/queuejob_controller_ex.go @@ -2171,7 +2171,9 @@ func (cc *XController) manageQueueJob(qj *arbv1.AppWrapper, podPhaseChanges bool updateQj = qj.DeepCopy() } cc.updateEtcd(updateQj, "[syncQueueJob] setCompleted") - cc.quotaManager.Release(updateQj) + if cc.serverOption.QuotaEnabled { + cc.quotaManager.Release(updateQj) + } } // Bugfix to eliminate performance problem of overloading the event queue. From adbe37447f19b61dc49151b75b54d678891c1f6b Mon Sep 17 00:00:00 2001 From: Abhishek Malvankar Date: Mon, 26 Jun 2023 11:29:19 -0400 Subject: [PATCH 10/14] update CRDs --- config/crd/bases/mcad.ibm.com_appwrappers.yaml | 12 ++++++++++++ .../crds/mcad.ibm.com_appwrappers.yaml | 12 ++++++++++++ .../mcad-controller/templates/deployment.yaml | 12 ++++++++++++ 3 files changed, 36 insertions(+) diff --git a/config/crd/bases/mcad.ibm.com_appwrappers.yaml b/config/crd/bases/mcad.ibm.com_appwrappers.yaml index b43d6d3cf..0f7598754 100644 --- a/config/crd/bases/mcad.ibm.com_appwrappers.yaml +++ b/config/crd/bases/mcad.ibm.com_appwrappers.yaml @@ -795,6 +795,18 @@ spec: type: boolean message: type: string + totalcpu: + description: The number of cpu consumed by all pods belonging to an appwrapper. + format: int32 + type: integer + totalmemory: + description: The amount of memory consumed by all pods belonging to an appwrapper. + format: int32 + type: integer + totalgpu: + description: The total number of GPUs consumed by all pods belonging to an appwrapper. + format: int32 + type: integer pending: description: The number of pending pods. format: int32 diff --git a/deployment/mcad-controller/crds/mcad.ibm.com_appwrappers.yaml b/deployment/mcad-controller/crds/mcad.ibm.com_appwrappers.yaml index b43d6d3cf..0f7598754 100644 --- a/deployment/mcad-controller/crds/mcad.ibm.com_appwrappers.yaml +++ b/deployment/mcad-controller/crds/mcad.ibm.com_appwrappers.yaml @@ -795,6 +795,18 @@ spec: type: boolean message: type: string + totalcpu: + description: The number of cpu consumed by all pods belonging to an appwrapper. + format: int32 + type: integer + totalmemory: + description: The amount of memory consumed by all pods belonging to an appwrapper. + format: int32 + type: integer + totalgpu: + description: The total number of GPUs consumed by all pods belonging to an appwrapper. + format: int32 + type: integer pending: description: The number of pending pods. format: int32 diff --git a/deployment/mcad-operator/helm-charts/mcad-controller/templates/deployment.yaml b/deployment/mcad-operator/helm-charts/mcad-controller/templates/deployment.yaml index c37fda3b2..c3e687193 100644 --- a/deployment/mcad-operator/helm-charts/mcad-controller/templates/deployment.yaml +++ b/deployment/mcad-operator/helm-charts/mcad-controller/templates/deployment.yaml @@ -7762,6 +7762,18 @@ spec: type: boolean message: type: string + totalcpu: + description: The number of cpu consumed by all pods belonging to an appwrapper. + format: int32 + type: integer + totalmemory: + description: The amount of memory consumed by all pods belonging to an appwrapper. + format: int32 + type: integer + totalgpu: + description: The total number of GPUs consumed by all pods belonging to an appwrapper. + format: int32 + type: integer pending: description: The number of pending pods. format: int32 From abbfe88ad271259f7718bae6a964e04c5d2cbb21 Mon Sep 17 00:00:00 2001 From: Abhishek Malvankar Date: Tue, 27 Jun 2023 06:58:25 -0400 Subject: [PATCH 11/14] add all AW resources for status dispatched --- pkg/controller/queuejob/queuejob_controller_ex.go | 13 ++++++++++--- 1 file changed, 10 insertions(+), 3 deletions(-) diff --git a/pkg/controller/queuejob/queuejob_controller_ex.go b/pkg/controller/queuejob/queuejob_controller_ex.go index 07b5b1acb..4e0b10d25 100644 --- a/pkg/controller/queuejob/queuejob_controller_ex.go +++ b/pkg/controller/queuejob/queuejob_controller_ex.go @@ -940,9 +940,16 @@ func (qjm *XController) getAggregatedAvailableResourcesPriority(unallocatedClust continue } else if value.Status.State == arbv1.AppWrapperStateEnqueued { // Don't count the resources that can run but not yet realized (job orchestration pending or partially running). - - totalResource := qjm.addTotalSnapshotResourcesConsumedByAw(value.Status.TotalGPU, value.Status.TotalCPU, value.Status.TotalMemory) - pending = pending.Add(totalResource) + for _, resctrl := range qjm.qjobResControls { + qjv := resctrl.GetAggregatedResources(value) + pending = pending.Add(qjv) + klog.V(10).Infof("[getAggAvaiResPri] Subtract all resources %+v in resctrlType=%T for job %s which can-run is set to: %v but state is still pending.", qjv, resctrl, value.Name, value.Status.CanRun) + } + for _, genericItem := range value.Spec.AggrResources.GenericItems { + qjv, _ := genericresource.GetResources(&genericItem) + pending = pending.Add(qjv) + klog.V(10).Infof("[getAggAvaiResPri] Subtract all resources %+v in resctrlType=%T for job %s which can-run is set to: %v but state is still pending.", qjv, genericItem, value.Name, value.Status.CanRun) + } continue } else if value.Status.State == arbv1.AppWrapperStateActive { From 93bb22ac413c18636a8eee48dcc2c36d5c1fb928 Mon Sep 17 00:00:00 2001 From: Abhishek Malvankar Date: Tue, 27 Jun 2023 07:56:02 -0400 Subject: [PATCH 12/14] add label to test dep --- test/e2e/util.go | 1 + 1 file changed, 1 insertion(+) diff --git a/test/e2e/util.go b/test/e2e/util.go index 9dd2248af..435fd29ac 100644 --- a/test/e2e/util.go +++ b/test/e2e/util.go @@ -1102,6 +1102,7 @@ func createDeploymentAWwith550CPU(context *context, name string) *arbv1.AppWrapp "metadata": { "labels": { "app": "aw-deployment-2-550cpu" + "appwrapper.mcad.ibm.com": "aw-deployment-2-550cpu" }, "annotations": { "appwrapper.mcad.ibm.com/appwrapper-name": "aw-deployment-2-550cpu" From 557f3788fa6a5489c5e00045bfab41c1690bc677 Mon Sep 17 00:00:00 2001 From: Abhishek Malvankar Date: Tue, 27 Jun 2023 14:31:15 -0400 Subject: [PATCH 13/14] rollback test case change --- test/e2e/util.go | 1 - 1 file changed, 1 deletion(-) diff --git a/test/e2e/util.go b/test/e2e/util.go index 435fd29ac..9dd2248af 100644 --- a/test/e2e/util.go +++ b/test/e2e/util.go @@ -1102,7 +1102,6 @@ func createDeploymentAWwith550CPU(context *context, name string) *arbv1.AppWrapp "metadata": { "labels": { "app": "aw-deployment-2-550cpu" - "appwrapper.mcad.ibm.com": "aw-deployment-2-550cpu" }, "annotations": { "appwrapper.mcad.ibm.com/appwrapper-name": "aw-deployment-2-550cpu" From d1c2352aec6b4625745180d9ce05edcca02a25cb Mon Sep 17 00:00:00 2001 From: Abhishek Malvankar Date: Tue, 27 Jun 2023 16:53:34 -0400 Subject: [PATCH 14/14] reserve all res during dispatch --- pkg/controller/queuejob/queuejob_controller_ex.go | 12 ++++++++++-- 1 file changed, 10 insertions(+), 2 deletions(-) diff --git a/pkg/controller/queuejob/queuejob_controller_ex.go b/pkg/controller/queuejob/queuejob_controller_ex.go index 4e0b10d25..dd7f66e8d 100644 --- a/pkg/controller/queuejob/queuejob_controller_ex.go +++ b/pkg/controller/queuejob/queuejob_controller_ex.go @@ -955,8 +955,16 @@ func (qjm *XController) getAggregatedAvailableResourcesPriority(unallocatedClust } else if value.Status.State == arbv1.AppWrapperStateActive { if value.Status.Pending > 0 { //Don't count partially running jobs with pods still pending. - totalResource := qjm.addTotalSnapshotResourcesConsumedByAw(value.Status.TotalGPU, value.Status.TotalCPU, value.Status.TotalMemory) - pending = pending.Add(totalResource) + for _, resctrl := range qjm.qjobResControls { + qjv := resctrl.GetAggregatedResources(value) + pending = pending.Add(qjv) + klog.V(4).Infof("[getAggAvaiResPri] Subtract all resources %+v in resctrlType=%T for job %s which can-run is set to: %v and status set to: %s but no pod counts in the state have been defined.", qjv, resctrl, value.Name, value.Status.CanRun, value.Status.State) + } + for _, genericItem := range value.Spec.AggrResources.GenericItems { + qjv, _ := genericresource.GetResources(&genericItem) + pending = pending.Add(qjv) + klog.V(4).Infof("[getAggAvaiResPri] Subtract all resources %+v in resctrlType=%T for job %s which can-run is set to: %v and status set to: %s but no pod counts in the state have been defined.", qjv, genericItem, value.Name, value.Status.CanRun, value.Status.State) + } } else { // TODO: Hack to handle race condition when Running jobs have not yet updated the pod counts (In-Flight AW Jobs)