diff --git a/config/crd/bases/mcad.ibm.com_appwrappers.yaml b/config/crd/bases/mcad.ibm.com_appwrappers.yaml index b43d6d3cf..0f7598754 100644 --- a/config/crd/bases/mcad.ibm.com_appwrappers.yaml +++ b/config/crd/bases/mcad.ibm.com_appwrappers.yaml @@ -795,6 +795,18 @@ spec: type: boolean message: type: string + totalcpu: + description: The number of cpu consumed by all pods belonging to an appwrapper. + format: int32 + type: integer + totalmemory: + description: The amount of memory consumed by all pods belonging to an appwrapper. + format: int32 + type: integer + totalgpu: + description: The total number of GPUs consumed by all pods belonging to an appwrapper. + format: int32 + type: integer pending: description: The number of pending pods. format: int32 diff --git a/deployment/mcad-controller/crds/mcad.ibm.com_appwrappers.yaml b/deployment/mcad-controller/crds/mcad.ibm.com_appwrappers.yaml index b43d6d3cf..0f7598754 100644 --- a/deployment/mcad-controller/crds/mcad.ibm.com_appwrappers.yaml +++ b/deployment/mcad-controller/crds/mcad.ibm.com_appwrappers.yaml @@ -795,6 +795,18 @@ spec: type: boolean message: type: string + totalcpu: + description: The number of cpu consumed by all pods belonging to an appwrapper. + format: int32 + type: integer + totalmemory: + description: The amount of memory consumed by all pods belonging to an appwrapper. + format: int32 + type: integer + totalgpu: + description: The total number of GPUs consumed by all pods belonging to an appwrapper. + format: int32 + type: integer pending: description: The number of pending pods. format: int32 diff --git a/deployment/mcad-operator/helm-charts/mcad-controller/templates/deployment.yaml b/deployment/mcad-operator/helm-charts/mcad-controller/templates/deployment.yaml index c37fda3b2..c3e687193 100644 --- a/deployment/mcad-operator/helm-charts/mcad-controller/templates/deployment.yaml +++ b/deployment/mcad-operator/helm-charts/mcad-controller/templates/deployment.yaml @@ -7762,6 +7762,18 @@ spec: type: boolean message: type: string + totalcpu: + description: The number of cpu consumed by all pods belonging to an appwrapper. + format: int32 + type: integer + totalmemory: + description: The amount of memory consumed by all pods belonging to an appwrapper. + format: int32 + type: integer + totalgpu: + description: The total number of GPUs consumed by all pods belonging to an appwrapper. + format: int32 + type: integer pending: description: The number of pending pods. format: int32 diff --git a/pkg/apis/controller/v1beta1/appwrapper.go b/pkg/apis/controller/v1beta1/appwrapper.go index f5302b3d7..01bb87b32 100644 --- a/pkg/apis/controller/v1beta1/appwrapper.go +++ b/pkg/apis/controller/v1beta1/appwrapper.go @@ -263,6 +263,14 @@ type AppWrapperStatus struct { // Represents the latest available observations of pods under appwrapper PendingPodConditions []PendingPodSpec `json:"pendingpodconditions"` + + //Resources consumed + + TotalCPU float64 `json:"totalcpu,omitempty"` + + TotalMemory float64 `json:"totalmemory,omitempty"` + + TotalGPU int64 `json:"totalgpu,omitempty"` } type AppWrapperState string diff --git a/pkg/controller/clusterstate/cache/cache.go b/pkg/controller/clusterstate/cache/cache.go index 4ef2f77ec..9ae976e0e 100644 --- a/pkg/controller/clusterstate/cache/cache.go +++ b/pkg/controller/clusterstate/cache/cache.go @@ -33,11 +33,12 @@ package cache import ( "context" "fmt" - "github.com/golang/protobuf/proto" - dto "github.com/prometheus/client_model/go" "sync" "time" + "github.com/golang/protobuf/proto" + dto "github.com/prometheus/client_model/go" + v1 "k8s.io/api/core/v1" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" "k8s.io/client-go/informers" @@ -210,7 +211,6 @@ func (sc *ClusterStateCache) GetUnallocatedResources() *api.Resource { return r.Add(sc.availableResources) } - func (sc *ClusterStateCache) GetUnallocatedHistograms() map[string]*dto.Metric { sc.Mutex.Lock() defer sc.Mutex.Unlock() @@ -238,7 +238,7 @@ func (sc *ClusterStateCache) GetResourceCapacities() *api.Resource { // Save the cluster state. func (sc *ClusterStateCache) saveState(available *api.Resource, capacity *api.Resource, - availableHistogram *api.ResourceHistogram) error { + availableHistogram *api.ResourceHistogram) error { klog.V(12).Infof("Saving Cluster State") sc.Mutex.Lock() @@ -261,7 +261,7 @@ func (sc *ClusterStateCache) updateState() error { idleMin := api.EmptyResource() idleMax := api.EmptyResource() - firstNode := true + firstNode := true for _, value := range cluster.Nodes { // Do not use any Unschedulable nodes in calculations if value.Unschedulable == true { @@ -277,12 +277,12 @@ func (sc *ClusterStateCache) updateState() error { // Collect Min and Max for histogram if firstNode { idleMin.MilliCPU = idle.MilliCPU - idleMin.Memory = idle.Memory - idleMin.GPU = idle.GPU + idleMin.Memory = idle.Memory + idleMin.GPU = idle.GPU idleMax.MilliCPU = idle.MilliCPU - idleMax.Memory = idle.Memory - idleMax.GPU = idle.GPU + idleMax.Memory = idle.Memory + idleMax.GPU = idle.GPU firstNode = false } else { if value.Idle.MilliCPU < idleMin.MilliCPU { @@ -293,7 +293,7 @@ func (sc *ClusterStateCache) updateState() error { if value.Idle.Memory < idleMin.Memory { idleMin.Memory = value.Idle.Memory - } else if value.Idle.Memory > idleMax.Memory{ + } else if value.Idle.Memory > idleMax.Memory { idleMax.Memory = value.Idle.Memory } @@ -354,7 +354,7 @@ func (sc *ClusterStateCache) processCleanupJob() error { if api.JobTerminated(job) { delete(sc.Jobs, job.UID) - klog.V(3).Infof("[processCleanupJob] Job <%v:%v/%v> was deleted.", job.UID, job.Namespace, job.Name) + klog.V(10).Infof("[processCleanupJob] Job <%v:%v/%v> was deleted.", job.UID, job.Namespace, job.Name) } else { // Retry sc.deleteJob(job) @@ -432,7 +432,7 @@ func (sc *ClusterStateCache) Snapshot() *api.ClusterInfo { // If no scheduling spec, does not handle it. if value.SchedSpec == nil { // Jobs.Tasks are more recognizable than Jobs.UID - klog.V(5).Infof("The scheduling spec of Job <%v> with tasks <%+v> is nil, ignore it.", value.UID, value.Tasks) + klog.V(10).Infof("The scheduling spec of Job <%v> with tasks <%+v> is nil, ignore it.", value.UID, value.Tasks) continue } diff --git a/pkg/controller/queuejob/queuejob_controller_ex.go b/pkg/controller/queuejob/queuejob_controller_ex.go index e303a2b69..dd7f66e8d 100644 --- a/pkg/controller/queuejob/queuejob_controller_ex.go +++ b/pkg/controller/queuejob/queuejob_controller_ex.go @@ -41,6 +41,7 @@ import ( "strings" "time" + "github.com/gogo/protobuf/proto" qmutils "github.com/project-codeflare/multi-cluster-app-dispatcher/pkg/quotaplugins/util" "github.com/project-codeflare/multi-cluster-app-dispatcher/pkg/controller/quota/quotaforestmanager" @@ -869,6 +870,17 @@ func (qjm *XController) getDispatchedAppWrappers() (map[string]*clusterstateapi. return awrRetVal, awsRetVal } +func (qjm *XController) addTotalSnapshotResourcesConsumedByAw(totalgpu int64, totalcpu float64, totalmemory float64) *clusterstateapi.Resource { + + totalResource := clusterstateapi.EmptyResource() + totalResource.GPU = totalgpu + totalResource.MilliCPU = totalcpu + totalResource.Memory = totalmemory + + return totalResource + +} + func (qjm *XController) getAggregatedAvailableResourcesPriority(unallocatedClusterResources *clusterstateapi. Resource, targetpr float64, requestingJob *arbv1.AppWrapper, agentId string) (*clusterstateapi.Resource, []*arbv1.AppWrapper) { r := unallocatedClusterResources.Clone() @@ -915,14 +927,8 @@ func (qjm *XController) getAggregatedAvailableResourcesPriority(unallocatedClust klog.V(10).Infof("[getAggAvaiResPri] %s: Added %s to candidate preemptable job with priority %f.", time.Now().String(), value.Name, value.Status.SystemPriority) } - for _, resctrl := range qjm.qjobResControls { - qjv := resctrl.GetAggregatedResources(value) - preemptable = preemptable.Add(qjv) - } - for _, genericItem := range value.Spec.AggrResources.GenericItems { - qjv, _ := genericresource.GetResources(&genericItem) - preemptable = preemptable.Add(qjv) - } + totalResource := qjm.addTotalSnapshotResourcesConsumedByAw(value.Status.TotalGPU, value.Status.TotalCPU, value.Status.TotalMemory) + preemptable = preemptable.Add(totalResource) continue } else if qjm.isDispatcher { @@ -944,6 +950,7 @@ func (qjm *XController) getAggregatedAvailableResourcesPriority(unallocatedClust pending = pending.Add(qjv) klog.V(10).Infof("[getAggAvaiResPri] Subtract all resources %+v in resctrlType=%T for job %s which can-run is set to: %v but state is still pending.", qjv, genericItem, value.Name, value.Status.CanRun) } + continue } else if value.Status.State == arbv1.AppWrapperStateActive { if value.Status.Pending > 0 { @@ -951,13 +958,14 @@ func (qjm *XController) getAggregatedAvailableResourcesPriority(unallocatedClust for _, resctrl := range qjm.qjobResControls { qjv := resctrl.GetAggregatedResources(value) pending = pending.Add(qjv) - klog.V(10).Infof("[getAggAvaiResPri] Subtract all resources %+v in resctrlType=%T for job %s which can-run is set to: %v and status set to: %s but %v pod(s) are pending.", qjv, resctrl, value.Name, value.Status.CanRun, value.Status.State, value.Status.Pending) + klog.V(4).Infof("[getAggAvaiResPri] Subtract all resources %+v in resctrlType=%T for job %s which can-run is set to: %v and status set to: %s but no pod counts in the state have been defined.", qjv, resctrl, value.Name, value.Status.CanRun, value.Status.State) } for _, genericItem := range value.Spec.AggrResources.GenericItems { qjv, _ := genericresource.GetResources(&genericItem) pending = pending.Add(qjv) - klog.V(10).Infof("[getAggAvaiResPri] Subtract all resources %+v in resctrlType=%T for job %s which can-run is set to: %v and status set to: %s but %v pod(s) are pending.", qjv, genericItem, value.Name, value.Status.CanRun, value.Status.State, value.Status.Pending) + klog.V(4).Infof("[getAggAvaiResPri] Subtract all resources %+v in resctrlType=%T for job %s which can-run is set to: %v and status set to: %s but no pod counts in the state have been defined.", qjv, genericItem, value.Name, value.Status.CanRun, value.Status.State) } + } else { // TODO: Hack to handle race condition when Running jobs have not yet updated the pod counts (In-Flight AW Jobs) // This hack uses the golang struct implied behavior of defining the object without a value. In this case @@ -1259,7 +1267,13 @@ func (qjm *XController) ScheduleNext() { qjm.cache.GetUnallocatedResources(), priorityindex, qj, "") klog.Infof("[ScheduleNext] XQJ %s with resources %v to be scheduled on aggregated idle resources %v", qj.Name, aggqj, resources) - if aggqj.LessEqual(resources) && qjm.nodeChecks(qjm.cache.GetUnallocatedHistograms(), qj) { + // Assume preemption will remove low priroity AWs in the system, optimistically dispatch such AWs + + if aggqj.LessEqual(resources) { + unallocatedHistogramMap := qjm.cache.GetUnallocatedHistograms() + if !qjm.nodeChecks(unallocatedHistogramMap, qj) { + klog.V(4).Infof("[ScheduleNext] Optimistic dispatch for AW %v in namespace %v requesting aggregated resources %v histogram for point in-time fragmented resources are available in the cluster %s", qj.Name, qj.Namespace, qjm.GetAggregatedResources(qj), proto.MarshalTextString(unallocatedHistogramMap["gpu"])) + } // Now evaluate quota fits := true klog.V(4).Infof("[ScheduleNext] HOL available resourse successful check for %s at %s activeQ=%t Unsched=%t &qj=%p Version=%s Status=%+v due to quota limits", qj.Name, time.Now().Sub(HOLStartTime), qjm.qjqueue.IfExistActiveQ(qj), qjm.qjqueue.IfExistUnschedulableQ(qj), qj, qj.ResourceVersion, qj.Status) @@ -2172,7 +2186,9 @@ func (cc *XController) manageQueueJob(qj *arbv1.AppWrapper, podPhaseChanges bool updateQj = qj.DeepCopy() } cc.updateEtcd(updateQj, "[syncQueueJob] setCompleted") - cc.quotaManager.Release(updateQj) + if cc.serverOption.QuotaEnabled { + cc.quotaManager.Release(updateQj) + } } // Bugfix to eliminate performance problem of overloading the event queue. diff --git a/pkg/controller/queuejobresources/pod/pod.go b/pkg/controller/queuejobresources/pod/pod.go index 97caaf42f..a17d1b4a2 100644 --- a/pkg/controller/queuejobresources/pod/pod.go +++ b/pkg/controller/queuejobresources/pod/pod.go @@ -243,6 +243,7 @@ func (qjrPod *QueueJobResPod) UpdateQueueJobStatus(queuejob *arbv1.AppWrapper) e } running := int32(queuejobresources.FilterPods(pods, v1.PodRunning)) + totalResourcesConsumed := queuejobresources.GetPodResourcesByPhase(v1.PodRunning, pods) pending := int32(queuejobresources.FilterPods(pods, v1.PodPending)) succeeded := int32(queuejobresources.FilterPods(pods, v1.PodSucceeded)) failed := int32(queuejobresources.FilterPods(pods, v1.PodFailed)) @@ -254,6 +255,10 @@ func (qjrPod *QueueJobResPod) UpdateQueueJobStatus(queuejob *arbv1.AppWrapper) e queuejob.Status.Running = running queuejob.Status.Succeeded = succeeded queuejob.Status.Failed = failed + //Total resources by all running pods + queuejob.Status.TotalGPU = totalResourcesConsumed.GPU + queuejob.Status.TotalCPU = totalResourcesConsumed.MilliCPU + queuejob.Status.TotalMemory = totalResourcesConsumed.Memory queuejob.Status.PendingPodConditions = nil for podName, cond := range podsConditionMap {