diff --git a/pkg/controller/queuejob/queuejob_controller_ex.go b/pkg/controller/queuejob/queuejob_controller_ex.go index dd7f66e8d..d6489205f 100644 --- a/pkg/controller/queuejob/queuejob_controller_ex.go +++ b/pkg/controller/queuejob/queuejob_controller_ex.go @@ -904,7 +904,9 @@ func (qjm *XController) getAggregatedAvailableResourcesPriority(unallocatedClust klog.V(11).Infof("[getAggAvaiResPri] %s: Skipping adjustments for %s since it is the job being processed.", time.Now().String(), value.Name) continue } else if !value.Status.CanRun { - klog.V(11).Infof("[getAggAvaiResPri] %s: Skipping adjustments for %s since it can not run.", time.Now().String(), value.Name) + totalResource := qjm.addTotalSnapshotResourcesConsumedByAw(value.Status.TotalGPU, value.Status.TotalCPU, value.Status.TotalMemory) + klog.V(6).Infof("[getAggAvaiResPri] %s: AW %s cannot run, adding any dangling pod resources %v while it being preempted.", time.Now().String(), value.Name, totalResource) + preemptable = preemptable.Add(totalResource) continue } else if value.Status.SystemPriority < targetpr { // Dispatcher Mode: Ensure this job is part of the target cluster @@ -929,7 +931,7 @@ func (qjm *XController) getAggregatedAvailableResourcesPriority(unallocatedClust totalResource := qjm.addTotalSnapshotResourcesConsumedByAw(value.Status.TotalGPU, value.Status.TotalCPU, value.Status.TotalMemory) preemptable = preemptable.Add(totalResource) - + klog.V(6).Infof("[getAggAvaiResPri] %s proirity %v is lower target priority %v reclaiming total preemptable resources %v", value.Name, value.Status.SystemPriority, targetpr, totalResource) continue } else if qjm.isDispatcher { // Dispatcher job does not currently track pod states. This is @@ -938,70 +940,26 @@ func (qjm *XController) getAggregatedAvailableResourcesPriority(unallocatedClust klog.V(10).Infof("[getAggAvaiResPri] %s: Skipping adjustments for %s since priority %f is >= %f of requesting job: %s.", time.Now().String(), value.Name, value.Status.SystemPriority, targetpr, requestingJob.Name) continue - } else if value.Status.State == arbv1.AppWrapperStateEnqueued { - // Don't count the resources that can run but not yet realized (job orchestration pending or partially running). + } else if value.Status.CanRun { + qjv := clusterstateapi.EmptyResource() for _, resctrl := range qjm.qjobResControls { - qjv := resctrl.GetAggregatedResources(value) - pending = pending.Add(qjv) + res := resctrl.GetAggregatedResources(value) + qjv.Add(res) klog.V(10).Infof("[getAggAvaiResPri] Subtract all resources %+v in resctrlType=%T for job %s which can-run is set to: %v but state is still pending.", qjv, resctrl, value.Name, value.Status.CanRun) } for _, genericItem := range value.Spec.AggrResources.GenericItems { - qjv, _ := genericresource.GetResources(&genericItem) - pending = pending.Add(qjv) - klog.V(10).Infof("[getAggAvaiResPri] Subtract all resources %+v in resctrlType=%T for job %s which can-run is set to: %v but state is still pending.", qjv, genericItem, value.Name, value.Status.CanRun) + res, _ := genericresource.GetResources(&genericItem) + qjv.Add(res) + klog.V(10).Infof("[getAggAvaiResPri] Subtract all resources %+v in genericItem=%T for job %s which can-run is set to: %v but state is still pending.", qjv, genericItem, value.Name, value.Status.CanRun) } - continue - } else if value.Status.State == arbv1.AppWrapperStateActive { - if value.Status.Pending > 0 { - //Don't count partially running jobs with pods still pending. - for _, resctrl := range qjm.qjobResControls { - qjv := resctrl.GetAggregatedResources(value) - pending = pending.Add(qjv) - klog.V(4).Infof("[getAggAvaiResPri] Subtract all resources %+v in resctrlType=%T for job %s which can-run is set to: %v and status set to: %s but no pod counts in the state have been defined.", qjv, resctrl, value.Name, value.Status.CanRun, value.Status.State) - } - for _, genericItem := range value.Spec.AggrResources.GenericItems { - qjv, _ := genericresource.GetResources(&genericItem) - pending = pending.Add(qjv) - klog.V(4).Infof("[getAggAvaiResPri] Subtract all resources %+v in resctrlType=%T for job %s which can-run is set to: %v and status set to: %s but no pod counts in the state have been defined.", qjv, genericItem, value.Name, value.Status.CanRun, value.Status.State) - } - - } else { - // TODO: Hack to handle race condition when Running jobs have not yet updated the pod counts (In-Flight AW Jobs) - // This hack uses the golang struct implied behavior of defining the object without a value. In this case - // of using 'int32' novalue and value of 0 are the same. - if value.Status.Pending == 0 && value.Status.Running == 0 && value.Status.Succeeded == 0 && value.Status.Failed == 0 { - - // In some cases the object wrapped in the appwrapper never creates pod. This likely happens - // in a custom resource that does some processing and errors occur before creating the pod or - // even there is not a problem within the CR controler but when the K8s quota is hit not - // allowing pods to get create due the admission controller. This check will now put a timeout - // on reserving these resources that are "in-flight") - dispatchedCond := qjm.getLatestStatusConditionType(value, arbv1.AppWrapperCondDispatched) - - // If pod counts for AW have not updated within the timeout window, account for - // this object's resources to give the object controller more time to start creating - // pods. This matters when resources are scare. Once the timeout expires, - // resources for this object will not be held and other AW may be dispatched which - // could consume resources initially allocated for this object. This is to handle - // object controllers (essentially custom resource controllers) that do not work as - // expected by creating pods. - if qjm.waitForPodCountUpdates(dispatchedCond) { - for _, resctrl := range qjm.qjobResControls { - qjv := resctrl.GetAggregatedResources(value) - pending = pending.Add(qjv) - klog.V(4).Infof("[getAggAvaiResPri] Subtract all resources %+v in resctrlType=%T for job %s which can-run is set to: %v and status set to: %s but no pod counts in the state have been defined.", qjv, resctrl, value.Name, value.Status.CanRun, value.Status.State) - } - for _, genericItem := range value.Spec.AggrResources.GenericItems { - qjv, _ := genericresource.GetResources(&genericItem) - pending = pending.Add(qjv) - klog.V(4).Infof("[getAggAvaiResPri] Subtract all resources %+v in resctrlType=%T for job %s which can-run is set to: %v and status set to: %s but no pod counts in the state have been defined.", qjv, genericItem, value.Name, value.Status.CanRun, value.Status.State) - } - } else { - klog.V(4).Infof("[getAggAvaiResPri] Resources will no longer be reserved for %s/%s due to timeout of %d ms for pod creating.", value.Name, value.Namespace, qjm.serverOption.DispatchResourceReservationTimeout) - } - } + totalResource := qjm.addTotalSnapshotResourcesConsumedByAw(value.Status.TotalGPU, value.Status.TotalCPU, value.Status.TotalMemory) + delta, err := qjv.NonNegSub(totalResource) + if err != nil { + klog.Warningf("[getAggAvaiResPri] Subtraction of resources failed, adding entire appwrapper resoources %v, %v", qjv, err) + pending = qjv } + pending = pending.Add(delta) continue } else { //Do nothing