Skip to content

fix double counting when AWs dont have any pods running #415

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 14 commits into from
Jun 28, 2023
Merged
Show file tree
Hide file tree
Changes from 9 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
8 changes: 8 additions & 0 deletions pkg/apis/controller/v1beta1/appwrapper.go
Original file line number Diff line number Diff line change
Expand Up @@ -263,6 +263,14 @@ type AppWrapperStatus struct {

// Represents the latest available observations of pods under appwrapper
PendingPodConditions []PendingPodSpec `json:"pendingpodconditions"`

//Resources consumed

TotalCPU float64 `json:"totalcpu,omitempty"`

TotalMemory float64 `json:"totalmemory,omitempty"`

TotalGPU int64 `json:"totalgpu,omitempty"`
}

type AppWrapperState string
Expand Down
24 changes: 12 additions & 12 deletions pkg/controller/clusterstate/cache/cache.go
Original file line number Diff line number Diff line change
Expand Up @@ -33,11 +33,12 @@ package cache
import (
"context"
"fmt"
"github.com/golang/protobuf/proto"
dto "github.com/prometheus/client_model/go"
"sync"
"time"

"github.com/golang/protobuf/proto"
dto "github.com/prometheus/client_model/go"

v1 "k8s.io/api/core/v1"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/client-go/informers"
Expand Down Expand Up @@ -210,7 +211,6 @@ func (sc *ClusterStateCache) GetUnallocatedResources() *api.Resource {
return r.Add(sc.availableResources)
}


func (sc *ClusterStateCache) GetUnallocatedHistograms() map[string]*dto.Metric {
sc.Mutex.Lock()
defer sc.Mutex.Unlock()
Expand Down Expand Up @@ -238,7 +238,7 @@ func (sc *ClusterStateCache) GetResourceCapacities() *api.Resource {

// Save the cluster state.
func (sc *ClusterStateCache) saveState(available *api.Resource, capacity *api.Resource,
availableHistogram *api.ResourceHistogram) error {
availableHistogram *api.ResourceHistogram) error {
klog.V(12).Infof("Saving Cluster State")

sc.Mutex.Lock()
Expand All @@ -261,7 +261,7 @@ func (sc *ClusterStateCache) updateState() error {
idleMin := api.EmptyResource()
idleMax := api.EmptyResource()

firstNode := true
firstNode := true
for _, value := range cluster.Nodes {
// Do not use any Unschedulable nodes in calculations
if value.Unschedulable == true {
Expand All @@ -277,12 +277,12 @@ func (sc *ClusterStateCache) updateState() error {
// Collect Min and Max for histogram
if firstNode {
idleMin.MilliCPU = idle.MilliCPU
idleMin.Memory = idle.Memory
idleMin.GPU = idle.GPU
idleMin.Memory = idle.Memory
idleMin.GPU = idle.GPU

idleMax.MilliCPU = idle.MilliCPU
idleMax.Memory = idle.Memory
idleMax.GPU = idle.GPU
idleMax.Memory = idle.Memory
idleMax.GPU = idle.GPU
firstNode = false
} else {
if value.Idle.MilliCPU < idleMin.MilliCPU {
Expand All @@ -293,7 +293,7 @@ func (sc *ClusterStateCache) updateState() error {

if value.Idle.Memory < idleMin.Memory {
idleMin.Memory = value.Idle.Memory
} else if value.Idle.Memory > idleMax.Memory{
} else if value.Idle.Memory > idleMax.Memory {
idleMax.Memory = value.Idle.Memory
}

Expand Down Expand Up @@ -354,7 +354,7 @@ func (sc *ClusterStateCache) processCleanupJob() error {

if api.JobTerminated(job) {
delete(sc.Jobs, job.UID)
klog.V(3).Infof("[processCleanupJob] Job <%v:%v/%v> was deleted.", job.UID, job.Namespace, job.Name)
klog.V(10).Infof("[processCleanupJob] Job <%v:%v/%v> was deleted.", job.UID, job.Namespace, job.Name)
} else {
// Retry
sc.deleteJob(job)
Expand Down Expand Up @@ -432,7 +432,7 @@ func (sc *ClusterStateCache) Snapshot() *api.ClusterInfo {
// If no scheduling spec, does not handle it.
if value.SchedSpec == nil {
// Jobs.Tasks are more recognizable than Jobs.UID
klog.V(5).Infof("The scheduling spec of Job <%v> with tasks <%+v> is nil, ignore it.", value.UID, value.Tasks)
klog.V(10).Infof("The scheduling spec of Job <%v> with tasks <%+v> is nil, ignore it.", value.UID, value.Tasks)
continue
}

Expand Down
61 changes: 31 additions & 30 deletions pkg/controller/queuejob/queuejob_controller_ex.go
Original file line number Diff line number Diff line change
Expand Up @@ -41,6 +41,7 @@ import (
"strings"
"time"

"github.com/gogo/protobuf/proto"
qmutils "github.com/project-codeflare/multi-cluster-app-dispatcher/pkg/quotaplugins/util"

"github.com/project-codeflare/multi-cluster-app-dispatcher/pkg/controller/quota/quotaforestmanager"
Expand Down Expand Up @@ -869,6 +870,17 @@ func (qjm *XController) getDispatchedAppWrappers() (map[string]*clusterstateapi.
return awrRetVal, awsRetVal
}

func (qjm *XController) addTotalSnapshotResourcesConsumedByAw(totalgpu int64, totalcpu float64, totalmemory float64) *clusterstateapi.Resource {

totalResource := clusterstateapi.EmptyResource()
totalResource.GPU = totalgpu
totalResource.MilliCPU = totalcpu
totalResource.Memory = totalmemory

return totalResource

}

func (qjm *XController) getAggregatedAvailableResourcesPriority(unallocatedClusterResources *clusterstateapi.
Resource, targetpr float64, requestingJob *arbv1.AppWrapper, agentId string) (*clusterstateapi.Resource, []*arbv1.AppWrapper) {
r := unallocatedClusterResources.Clone()
Expand Down Expand Up @@ -915,14 +927,8 @@ func (qjm *XController) getAggregatedAvailableResourcesPriority(unallocatedClust
klog.V(10).Infof("[getAggAvaiResPri] %s: Added %s to candidate preemptable job with priority %f.", time.Now().String(), value.Name, value.Status.SystemPriority)
}

for _, resctrl := range qjm.qjobResControls {
qjv := resctrl.GetAggregatedResources(value)
preemptable = preemptable.Add(qjv)
}
for _, genericItem := range value.Spec.AggrResources.GenericItems {
qjv, _ := genericresource.GetResources(&genericItem)
preemptable = preemptable.Add(qjv)
}
totalResource := qjm.addTotalSnapshotResourcesConsumedByAw(value.Status.TotalGPU, value.Status.TotalCPU, value.Status.TotalMemory)
preemptable = preemptable.Add(totalResource)

continue
} else if qjm.isDispatcher {
Expand All @@ -934,30 +940,17 @@ func (qjm *XController) getAggregatedAvailableResourcesPriority(unallocatedClust
continue
} else if value.Status.State == arbv1.AppWrapperStateEnqueued {
// Don't count the resources that can run but not yet realized (job orchestration pending or partially running).
for _, resctrl := range qjm.qjobResControls {
qjv := resctrl.GetAggregatedResources(value)
pending = pending.Add(qjv)
klog.V(10).Infof("[getAggAvaiResPri] Subtract all resources %+v in resctrlType=%T for job %s which can-run is set to: %v but state is still pending.", qjv, resctrl, value.Name, value.Status.CanRun)
}
for _, genericItem := range value.Spec.AggrResources.GenericItems {
qjv, _ := genericresource.GetResources(&genericItem)
pending = pending.Add(qjv)
klog.V(10).Infof("[getAggAvaiResPri] Subtract all resources %+v in resctrlType=%T for job %s which can-run is set to: %v but state is still pending.", qjv, genericItem, value.Name, value.Status.CanRun)
}

totalResource := qjm.addTotalSnapshotResourcesConsumedByAw(value.Status.TotalGPU, value.Status.TotalCPU, value.Status.TotalMemory)
pending = pending.Add(totalResource)

continue
} else if value.Status.State == arbv1.AppWrapperStateActive {
if value.Status.Pending > 0 {
//Don't count partially running jobs with pods still pending.
for _, resctrl := range qjm.qjobResControls {
qjv := resctrl.GetAggregatedResources(value)
pending = pending.Add(qjv)
klog.V(10).Infof("[getAggAvaiResPri] Subtract all resources %+v in resctrlType=%T for job %s which can-run is set to: %v and status set to: %s but %v pod(s) are pending.", qjv, resctrl, value.Name, value.Status.CanRun, value.Status.State, value.Status.Pending)
}
for _, genericItem := range value.Spec.AggrResources.GenericItems {
qjv, _ := genericresource.GetResources(&genericItem)
pending = pending.Add(qjv)
klog.V(10).Infof("[getAggAvaiResPri] Subtract all resources %+v in resctrlType=%T for job %s which can-run is set to: %v and status set to: %s but %v pod(s) are pending.", qjv, genericItem, value.Name, value.Status.CanRun, value.Status.State, value.Status.Pending)
}
totalResource := qjm.addTotalSnapshotResourcesConsumedByAw(value.Status.TotalGPU, value.Status.TotalCPU, value.Status.TotalMemory)
pending = pending.Add(totalResource)

} else {
// TODO: Hack to handle race condition when Running jobs have not yet updated the pod counts (In-Flight AW Jobs)
// This hack uses the golang struct implied behavior of defining the object without a value. In this case
Expand Down Expand Up @@ -1259,7 +1252,13 @@ func (qjm *XController) ScheduleNext() {
qjm.cache.GetUnallocatedResources(), priorityindex, qj, "")
klog.Infof("[ScheduleNext] XQJ %s with resources %v to be scheduled on aggregated idle resources %v", qj.Name, aggqj, resources)

if aggqj.LessEqual(resources) && qjm.nodeChecks(qjm.cache.GetUnallocatedHistograms(), qj) {
// Assume preemption will remove low priroity AWs in the system, optimistically dispatch such AWs

if aggqj.LessEqual(resources) {
unallocatedHistogramMap := qjm.cache.GetUnallocatedHistograms()
if !qjm.nodeChecks(unallocatedHistogramMap, qj) {
klog.V(4).Infof("[ScheduleNext] Optimistic dispatch for AW %v in namespace %v requesting aggregated resources %v histogram for point in-time fragmented resources are available in the cluster %s", qj.Name, qj.Namespace, qjm.GetAggregatedResources(qj), proto.MarshalTextString(unallocatedHistogramMap["gpu"]))
}
// Now evaluate quota
fits := true
klog.V(4).Infof("[ScheduleNext] HOL available resourse successful check for %s at %s activeQ=%t Unsched=%t &qj=%p Version=%s Status=%+v due to quota limits", qj.Name, time.Now().Sub(HOLStartTime), qjm.qjqueue.IfExistActiveQ(qj), qjm.qjqueue.IfExistUnschedulableQ(qj), qj, qj.ResourceVersion, qj.Status)
Expand Down Expand Up @@ -2172,7 +2171,9 @@ func (cc *XController) manageQueueJob(qj *arbv1.AppWrapper, podPhaseChanges bool
updateQj = qj.DeepCopy()
}
cc.updateEtcd(updateQj, "[syncQueueJob] setCompleted")
cc.quotaManager.Release(updateQj)
if cc.serverOption.QuotaEnabled {
cc.quotaManager.Release(updateQj)
}
}

// Bugfix to eliminate performance problem of overloading the event queue.
Expand Down
5 changes: 5 additions & 0 deletions pkg/controller/queuejobresources/pod/pod.go
Original file line number Diff line number Diff line change
Expand Up @@ -243,6 +243,7 @@ func (qjrPod *QueueJobResPod) UpdateQueueJobStatus(queuejob *arbv1.AppWrapper) e
}

running := int32(queuejobresources.FilterPods(pods, v1.PodRunning))
totalResourcesConsumed := queuejobresources.GetPodResourcesByPhase(v1.PodRunning, pods)
pending := int32(queuejobresources.FilterPods(pods, v1.PodPending))
succeeded := int32(queuejobresources.FilterPods(pods, v1.PodSucceeded))
failed := int32(queuejobresources.FilterPods(pods, v1.PodFailed))
Expand All @@ -254,6 +255,10 @@ func (qjrPod *QueueJobResPod) UpdateQueueJobStatus(queuejob *arbv1.AppWrapper) e
queuejob.Status.Running = running
queuejob.Status.Succeeded = succeeded
queuejob.Status.Failed = failed
//Total resources by all running pods
queuejob.Status.TotalGPU = totalResourcesConsumed.GPU
queuejob.Status.TotalCPU = totalResourcesConsumed.MilliCPU
queuejob.Status.TotalMemory = totalResourcesConsumed.Memory

queuejob.Status.PendingPodConditions = nil
for podName, cond := range podsConditionMap {
Expand Down