From 25166ce1f8e216c9a6235fa5389d88ca78847130 Mon Sep 17 00:00:00 2001 From: Abhishek Malvankar Date: Thu, 29 Jun 2023 21:48:22 -0400 Subject: [PATCH 1/5] simply acc logic --- .../queuejob/queuejob_controller_ex.go | 69 ++++--------------- 1 file changed, 12 insertions(+), 57 deletions(-) diff --git a/pkg/controller/queuejob/queuejob_controller_ex.go b/pkg/controller/queuejob/queuejob_controller_ex.go index dd7f66e8d..44f481701 100644 --- a/pkg/controller/queuejob/queuejob_controller_ex.go +++ b/pkg/controller/queuejob/queuejob_controller_ex.go @@ -905,8 +905,10 @@ func (qjm *XController) getAggregatedAvailableResourcesPriority(unallocatedClust continue } else if !value.Status.CanRun { klog.V(11).Infof("[getAggAvaiResPri] %s: Skipping adjustments for %s since it can not run.", time.Now().String(), value.Name) + totalResource := qjm.addTotalSnapshotResourcesConsumedByAw(value.Status.TotalGPU, value.Status.TotalCPU, value.Status.TotalMemory) + preemptable = preemptable.Add(totalResource) continue - } else if value.Status.SystemPriority < targetpr { + } else if value.Status.SystemPriority < targetpr || !value.Status.CanRun { // Dispatcher Mode: Ensure this job is part of the target cluster if qjm.isDispatcher { // Get the job key @@ -938,69 +940,22 @@ func (qjm *XController) getAggregatedAvailableResourcesPriority(unallocatedClust klog.V(10).Infof("[getAggAvaiResPri] %s: Skipping adjustments for %s since priority %f is >= %f of requesting job: %s.", time.Now().String(), value.Name, value.Status.SystemPriority, targetpr, requestingJob.Name) continue - } else if value.Status.State == arbv1.AppWrapperStateEnqueued { - // Don't count the resources that can run but not yet realized (job orchestration pending or partially running). + } else if value.Status.CanRun && (targetpr < value.Status.SystemPriority) { + qjv := clusterstateapi.EmptyResource() for _, resctrl := range qjm.qjobResControls { - qjv := resctrl.GetAggregatedResources(value) - pending = pending.Add(qjv) + qjv = resctrl.GetAggregatedResources(value) klog.V(10).Infof("[getAggAvaiResPri] Subtract all resources %+v in resctrlType=%T for job %s which can-run is set to: %v but state is still pending.", qjv, resctrl, value.Name, value.Status.CanRun) } for _, genericItem := range value.Spec.AggrResources.GenericItems { - qjv, _ := genericresource.GetResources(&genericItem) - pending = pending.Add(qjv) + qjv, _ = genericresource.GetResources(&genericItem) klog.V(10).Infof("[getAggAvaiResPri] Subtract all resources %+v in resctrlType=%T for job %s which can-run is set to: %v but state is still pending.", qjv, genericItem, value.Name, value.Status.CanRun) } - continue - } else if value.Status.State == arbv1.AppWrapperStateActive { - if value.Status.Pending > 0 { - //Don't count partially running jobs with pods still pending. - for _, resctrl := range qjm.qjobResControls { - qjv := resctrl.GetAggregatedResources(value) - pending = pending.Add(qjv) - klog.V(4).Infof("[getAggAvaiResPri] Subtract all resources %+v in resctrlType=%T for job %s which can-run is set to: %v and status set to: %s but no pod counts in the state have been defined.", qjv, resctrl, value.Name, value.Status.CanRun, value.Status.State) - } - for _, genericItem := range value.Spec.AggrResources.GenericItems { - qjv, _ := genericresource.GetResources(&genericItem) - pending = pending.Add(qjv) - klog.V(4).Infof("[getAggAvaiResPri] Subtract all resources %+v in resctrlType=%T for job %s which can-run is set to: %v and status set to: %s but no pod counts in the state have been defined.", qjv, genericItem, value.Name, value.Status.CanRun, value.Status.State) - } - - } else { - // TODO: Hack to handle race condition when Running jobs have not yet updated the pod counts (In-Flight AW Jobs) - // This hack uses the golang struct implied behavior of defining the object without a value. In this case - // of using 'int32' novalue and value of 0 are the same. - if value.Status.Pending == 0 && value.Status.Running == 0 && value.Status.Succeeded == 0 && value.Status.Failed == 0 { - - // In some cases the object wrapped in the appwrapper never creates pod. This likely happens - // in a custom resource that does some processing and errors occur before creating the pod or - // even there is not a problem within the CR controler but when the K8s quota is hit not - // allowing pods to get create due the admission controller. This check will now put a timeout - // on reserving these resources that are "in-flight") - dispatchedCond := qjm.getLatestStatusConditionType(value, arbv1.AppWrapperCondDispatched) - - // If pod counts for AW have not updated within the timeout window, account for - // this object's resources to give the object controller more time to start creating - // pods. This matters when resources are scare. Once the timeout expires, - // resources for this object will not be held and other AW may be dispatched which - // could consume resources initially allocated for this object. This is to handle - // object controllers (essentially custom resource controllers) that do not work as - // expected by creating pods. - if qjm.waitForPodCountUpdates(dispatchedCond) { - for _, resctrl := range qjm.qjobResControls { - qjv := resctrl.GetAggregatedResources(value) - pending = pending.Add(qjv) - klog.V(4).Infof("[getAggAvaiResPri] Subtract all resources %+v in resctrlType=%T for job %s which can-run is set to: %v and status set to: %s but no pod counts in the state have been defined.", qjv, resctrl, value.Name, value.Status.CanRun, value.Status.State) - } - for _, genericItem := range value.Spec.AggrResources.GenericItems { - qjv, _ := genericresource.GetResources(&genericItem) - pending = pending.Add(qjv) - klog.V(4).Infof("[getAggAvaiResPri] Subtract all resources %+v in resctrlType=%T for job %s which can-run is set to: %v and status set to: %s but no pod counts in the state have been defined.", qjv, genericItem, value.Name, value.Status.CanRun, value.Status.State) - } - } else { - klog.V(4).Infof("[getAggAvaiResPri] Resources will no longer be reserved for %s/%s due to timeout of %d ms for pod creating.", value.Name, value.Namespace, qjm.serverOption.DispatchResourceReservationTimeout) - } - } + totalResource := qjm.addTotalSnapshotResourcesConsumedByAw(value.Status.TotalGPU, value.Status.TotalCPU, value.Status.TotalMemory) + pending, err = qjv.NonNegSub(totalResource) + if err != nil { + klog.Errorf("[getAggAvaiResPri] Subtraction of resources failed, adding entire appwrapper resoources %v", qjv) + pending = qjv } continue } else { From 86ae9e3ca43a0acde1a8257466cda7db77b1d02a Mon Sep 17 00:00:00 2001 From: Abhishek Malvankar Date: Thu, 29 Jun 2023 23:57:55 -0400 Subject: [PATCH 2/5] fix test --- pkg/controller/queuejob/queuejob_controller_ex.go | 15 +++++++++------ 1 file changed, 9 insertions(+), 6 deletions(-) diff --git a/pkg/controller/queuejob/queuejob_controller_ex.go b/pkg/controller/queuejob/queuejob_controller_ex.go index 44f481701..005a3f356 100644 --- a/pkg/controller/queuejob/queuejob_controller_ex.go +++ b/pkg/controller/queuejob/queuejob_controller_ex.go @@ -908,7 +908,7 @@ func (qjm *XController) getAggregatedAvailableResourcesPriority(unallocatedClust totalResource := qjm.addTotalSnapshotResourcesConsumedByAw(value.Status.TotalGPU, value.Status.TotalCPU, value.Status.TotalMemory) preemptable = preemptable.Add(totalResource) continue - } else if value.Status.SystemPriority < targetpr || !value.Status.CanRun { + } else if value.Status.SystemPriority < targetpr { // Dispatcher Mode: Ensure this job is part of the target cluster if qjm.isDispatcher { // Get the job key @@ -940,23 +940,26 @@ func (qjm *XController) getAggregatedAvailableResourcesPriority(unallocatedClust klog.V(10).Infof("[getAggAvaiResPri] %s: Skipping adjustments for %s since priority %f is >= %f of requesting job: %s.", time.Now().String(), value.Name, value.Status.SystemPriority, targetpr, requestingJob.Name) continue - } else if value.Status.CanRun && (targetpr < value.Status.SystemPriority) { + } else if value.Status.CanRun { qjv := clusterstateapi.EmptyResource() for _, resctrl := range qjm.qjobResControls { - qjv = resctrl.GetAggregatedResources(value) + res := resctrl.GetAggregatedResources(value) + qjv.Add(res) klog.V(10).Infof("[getAggAvaiResPri] Subtract all resources %+v in resctrlType=%T for job %s which can-run is set to: %v but state is still pending.", qjv, resctrl, value.Name, value.Status.CanRun) } for _, genericItem := range value.Spec.AggrResources.GenericItems { - qjv, _ = genericresource.GetResources(&genericItem) + res, _ := genericresource.GetResources(&genericItem) + qjv.Add(res) klog.V(10).Infof("[getAggAvaiResPri] Subtract all resources %+v in resctrlType=%T for job %s which can-run is set to: %v but state is still pending.", qjv, genericItem, value.Name, value.Status.CanRun) } totalResource := qjm.addTotalSnapshotResourcesConsumedByAw(value.Status.TotalGPU, value.Status.TotalCPU, value.Status.TotalMemory) - pending, err = qjv.NonNegSub(totalResource) + delta, err := qjv.NonNegSub(totalResource) if err != nil { - klog.Errorf("[getAggAvaiResPri] Subtraction of resources failed, adding entire appwrapper resoources %v", qjv) + klog.Errorf("[getAggAvaiResPri] Subtraction of resources failed, adding entire appwrapper resoources %v, %v", qjv, err) pending = qjv } + pending = pending.Add(delta) continue } else { //Do nothing From 31277eb991794faf2ddf1451d711d18fccddaad7 Mon Sep 17 00:00:00 2001 From: Abhishek Malvankar Date: Fri, 30 Jun 2023 08:21:40 -0400 Subject: [PATCH 3/5] change log level --- pkg/controller/queuejob/queuejob_controller_ex.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/pkg/controller/queuejob/queuejob_controller_ex.go b/pkg/controller/queuejob/queuejob_controller_ex.go index 005a3f356..d25413161 100644 --- a/pkg/controller/queuejob/queuejob_controller_ex.go +++ b/pkg/controller/queuejob/queuejob_controller_ex.go @@ -956,7 +956,7 @@ func (qjm *XController) getAggregatedAvailableResourcesPriority(unallocatedClust totalResource := qjm.addTotalSnapshotResourcesConsumedByAw(value.Status.TotalGPU, value.Status.TotalCPU, value.Status.TotalMemory) delta, err := qjv.NonNegSub(totalResource) if err != nil { - klog.Errorf("[getAggAvaiResPri] Subtraction of resources failed, adding entire appwrapper resoources %v, %v", qjv, err) + klog.Warningf("[getAggAvaiResPri] Subtraction of resources failed, adding entire appwrapper resoources %v, %v", qjv, err) pending = qjv } pending = pending.Add(delta) From 874a076a985f6ac2205e198b2935c47573a399a6 Mon Sep 17 00:00:00 2001 From: Abhishek Malvankar Date: Fri, 30 Jun 2023 13:23:37 -0400 Subject: [PATCH 4/5] address review --- pkg/controller/queuejob/queuejob_controller_ex.go | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/pkg/controller/queuejob/queuejob_controller_ex.go b/pkg/controller/queuejob/queuejob_controller_ex.go index d25413161..209949591 100644 --- a/pkg/controller/queuejob/queuejob_controller_ex.go +++ b/pkg/controller/queuejob/queuejob_controller_ex.go @@ -904,8 +904,8 @@ func (qjm *XController) getAggregatedAvailableResourcesPriority(unallocatedClust klog.V(11).Infof("[getAggAvaiResPri] %s: Skipping adjustments for %s since it is the job being processed.", time.Now().String(), value.Name) continue } else if !value.Status.CanRun { - klog.V(11).Infof("[getAggAvaiResPri] %s: Skipping adjustments for %s since it can not run.", time.Now().String(), value.Name) totalResource := qjm.addTotalSnapshotResourcesConsumedByAw(value.Status.TotalGPU, value.Status.TotalCPU, value.Status.TotalMemory) + klog.V(6).Infof("[getAggAvaiResPri] %s: AW %s cannot run, adding any dangling pod resources %v while it being preempted.", time.Now().String(), value.Name, totalResource) preemptable = preemptable.Add(totalResource) continue } else if value.Status.SystemPriority < targetpr { @@ -950,7 +950,7 @@ func (qjm *XController) getAggregatedAvailableResourcesPriority(unallocatedClust for _, genericItem := range value.Spec.AggrResources.GenericItems { res, _ := genericresource.GetResources(&genericItem) qjv.Add(res) - klog.V(10).Infof("[getAggAvaiResPri] Subtract all resources %+v in resctrlType=%T for job %s which can-run is set to: %v but state is still pending.", qjv, genericItem, value.Name, value.Status.CanRun) + klog.V(10).Infof("[getAggAvaiResPri] Subtract all resources %+v in genericItem=%T for job %s which can-run is set to: %v but state is still pending.", qjv, genericItem, value.Name, value.Status.CanRun) } totalResource := qjm.addTotalSnapshotResourcesConsumedByAw(value.Status.TotalGPU, value.Status.TotalCPU, value.Status.TotalMemory) From edd192b0b5587c1818671f67e732936668d19452 Mon Sep 17 00:00:00 2001 From: Abhishek Malvankar Date: Fri, 30 Jun 2023 16:18:07 -0400 Subject: [PATCH 5/5] resolve review-2 --- pkg/controller/queuejob/queuejob_controller_ex.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/pkg/controller/queuejob/queuejob_controller_ex.go b/pkg/controller/queuejob/queuejob_controller_ex.go index 209949591..d6489205f 100644 --- a/pkg/controller/queuejob/queuejob_controller_ex.go +++ b/pkg/controller/queuejob/queuejob_controller_ex.go @@ -931,7 +931,7 @@ func (qjm *XController) getAggregatedAvailableResourcesPriority(unallocatedClust totalResource := qjm.addTotalSnapshotResourcesConsumedByAw(value.Status.TotalGPU, value.Status.TotalCPU, value.Status.TotalMemory) preemptable = preemptable.Add(totalResource) - + klog.V(6).Infof("[getAggAvaiResPri] %s proirity %v is lower target priority %v reclaiming total preemptable resources %v", value.Name, value.Status.SystemPriority, targetpr, totalResource) continue } else if qjm.isDispatcher { // Dispatcher job does not currently track pod states. This is