Skip to content

Commit 7171e8e

Browse files
authored
fix double counting when AWs dont have any pods running (#415)
* fix double counting when AWs dont have any pods running * use proposed preemptions to dispatch * perform histrogram checks prior to preemption or pending res eval * remove histograms * negate nodechecks histogram * fix double counting for pending AWs * remove noisy logging * print optimistic dispatch msg when preempt res acquired * release quota on completed AWs only when QM is enabled * update CRDs * add all AW resources for status dispatched * add label to test dep * rollback test case change * reserve all res during dispatch
1 parent f62b436 commit 7171e8e

File tree

7 files changed

+89
-24
lines changed

7 files changed

+89
-24
lines changed

config/crd/bases/mcad.ibm.com_appwrappers.yaml

Lines changed: 12 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -795,6 +795,18 @@ spec:
795795
type: boolean
796796
message:
797797
type: string
798+
totalcpu:
799+
description: The number of cpu consumed by all pods belonging to an appwrapper.
800+
format: int32
801+
type: integer
802+
totalmemory:
803+
description: The amount of memory consumed by all pods belonging to an appwrapper.
804+
format: int32
805+
type: integer
806+
totalgpu:
807+
description: The total number of GPUs consumed by all pods belonging to an appwrapper.
808+
format: int32
809+
type: integer
798810
pending:
799811
description: The number of pending pods.
800812
format: int32

deployment/mcad-controller/crds/mcad.ibm.com_appwrappers.yaml

Lines changed: 12 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -795,6 +795,18 @@ spec:
795795
type: boolean
796796
message:
797797
type: string
798+
totalcpu:
799+
description: The number of cpu consumed by all pods belonging to an appwrapper.
800+
format: int32
801+
type: integer
802+
totalmemory:
803+
description: The amount of memory consumed by all pods belonging to an appwrapper.
804+
format: int32
805+
type: integer
806+
totalgpu:
807+
description: The total number of GPUs consumed by all pods belonging to an appwrapper.
808+
format: int32
809+
type: integer
798810
pending:
799811
description: The number of pending pods.
800812
format: int32

deployment/mcad-operator/helm-charts/mcad-controller/templates/deployment.yaml

Lines changed: 12 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -7762,6 +7762,18 @@ spec:
77627762
type: boolean
77637763
message:
77647764
type: string
7765+
totalcpu:
7766+
description: The number of cpu consumed by all pods belonging to an appwrapper.
7767+
format: int32
7768+
type: integer
7769+
totalmemory:
7770+
description: The amount of memory consumed by all pods belonging to an appwrapper.
7771+
format: int32
7772+
type: integer
7773+
totalgpu:
7774+
description: The total number of GPUs consumed by all pods belonging to an appwrapper.
7775+
format: int32
7776+
type: integer
77657777
pending:
77667778
description: The number of pending pods.
77677779
format: int32

pkg/apis/controller/v1beta1/appwrapper.go

Lines changed: 8 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -263,6 +263,14 @@ type AppWrapperStatus struct {
263263

264264
// Represents the latest available observations of pods under appwrapper
265265
PendingPodConditions []PendingPodSpec `json:"pendingpodconditions"`
266+
267+
//Resources consumed
268+
269+
TotalCPU float64 `json:"totalcpu,omitempty"`
270+
271+
TotalMemory float64 `json:"totalmemory,omitempty"`
272+
273+
TotalGPU int64 `json:"totalgpu,omitempty"`
266274
}
267275

268276
type AppWrapperState string

pkg/controller/clusterstate/cache/cache.go

Lines changed: 12 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -33,11 +33,12 @@ package cache
3333
import (
3434
"context"
3535
"fmt"
36-
"github.com/golang/protobuf/proto"
37-
dto "github.com/prometheus/client_model/go"
3836
"sync"
3937
"time"
4038

39+
"github.com/golang/protobuf/proto"
40+
dto "github.com/prometheus/client_model/go"
41+
4142
v1 "k8s.io/api/core/v1"
4243
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
4344
"k8s.io/client-go/informers"
@@ -210,7 +211,6 @@ func (sc *ClusterStateCache) GetUnallocatedResources() *api.Resource {
210211
return r.Add(sc.availableResources)
211212
}
212213

213-
214214
func (sc *ClusterStateCache) GetUnallocatedHistograms() map[string]*dto.Metric {
215215
sc.Mutex.Lock()
216216
defer sc.Mutex.Unlock()
@@ -238,7 +238,7 @@ func (sc *ClusterStateCache) GetResourceCapacities() *api.Resource {
238238

239239
// Save the cluster state.
240240
func (sc *ClusterStateCache) saveState(available *api.Resource, capacity *api.Resource,
241-
availableHistogram *api.ResourceHistogram) error {
241+
availableHistogram *api.ResourceHistogram) error {
242242
klog.V(12).Infof("Saving Cluster State")
243243

244244
sc.Mutex.Lock()
@@ -261,7 +261,7 @@ func (sc *ClusterStateCache) updateState() error {
261261
idleMin := api.EmptyResource()
262262
idleMax := api.EmptyResource()
263263

264-
firstNode := true
264+
firstNode := true
265265
for _, value := range cluster.Nodes {
266266
// Do not use any Unschedulable nodes in calculations
267267
if value.Unschedulable == true {
@@ -277,12 +277,12 @@ func (sc *ClusterStateCache) updateState() error {
277277
// Collect Min and Max for histogram
278278
if firstNode {
279279
idleMin.MilliCPU = idle.MilliCPU
280-
idleMin.Memory = idle.Memory
281-
idleMin.GPU = idle.GPU
280+
idleMin.Memory = idle.Memory
281+
idleMin.GPU = idle.GPU
282282

283283
idleMax.MilliCPU = idle.MilliCPU
284-
idleMax.Memory = idle.Memory
285-
idleMax.GPU = idle.GPU
284+
idleMax.Memory = idle.Memory
285+
idleMax.GPU = idle.GPU
286286
firstNode = false
287287
} else {
288288
if value.Idle.MilliCPU < idleMin.MilliCPU {
@@ -293,7 +293,7 @@ func (sc *ClusterStateCache) updateState() error {
293293

294294
if value.Idle.Memory < idleMin.Memory {
295295
idleMin.Memory = value.Idle.Memory
296-
} else if value.Idle.Memory > idleMax.Memory{
296+
} else if value.Idle.Memory > idleMax.Memory {
297297
idleMax.Memory = value.Idle.Memory
298298
}
299299

@@ -354,7 +354,7 @@ func (sc *ClusterStateCache) processCleanupJob() error {
354354

355355
if api.JobTerminated(job) {
356356
delete(sc.Jobs, job.UID)
357-
klog.V(3).Infof("[processCleanupJob] Job <%v:%v/%v> was deleted.", job.UID, job.Namespace, job.Name)
357+
klog.V(10).Infof("[processCleanupJob] Job <%v:%v/%v> was deleted.", job.UID, job.Namespace, job.Name)
358358
} else {
359359
// Retry
360360
sc.deleteJob(job)
@@ -432,7 +432,7 @@ func (sc *ClusterStateCache) Snapshot() *api.ClusterInfo {
432432
// If no scheduling spec, does not handle it.
433433
if value.SchedSpec == nil {
434434
// Jobs.Tasks are more recognizable than Jobs.UID
435-
klog.V(5).Infof("The scheduling spec of Job <%v> with tasks <%+v> is nil, ignore it.", value.UID, value.Tasks)
435+
klog.V(10).Infof("The scheduling spec of Job <%v> with tasks <%+v> is nil, ignore it.", value.UID, value.Tasks)
436436
continue
437437
}
438438

pkg/controller/queuejob/queuejob_controller_ex.go

Lines changed: 28 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -41,6 +41,7 @@ import (
4141
"strings"
4242
"time"
4343

44+
"github.com/gogo/protobuf/proto"
4445
qmutils "github.com/project-codeflare/multi-cluster-app-dispatcher/pkg/quotaplugins/util"
4546

4647
"github.com/project-codeflare/multi-cluster-app-dispatcher/pkg/controller/quota/quotaforestmanager"
@@ -869,6 +870,17 @@ func (qjm *XController) getDispatchedAppWrappers() (map[string]*clusterstateapi.
869870
return awrRetVal, awsRetVal
870871
}
871872

873+
func (qjm *XController) addTotalSnapshotResourcesConsumedByAw(totalgpu int64, totalcpu float64, totalmemory float64) *clusterstateapi.Resource {
874+
875+
totalResource := clusterstateapi.EmptyResource()
876+
totalResource.GPU = totalgpu
877+
totalResource.MilliCPU = totalcpu
878+
totalResource.Memory = totalmemory
879+
880+
return totalResource
881+
882+
}
883+
872884
func (qjm *XController) getAggregatedAvailableResourcesPriority(unallocatedClusterResources *clusterstateapi.
873885
Resource, targetpr float64, requestingJob *arbv1.AppWrapper, agentId string) (*clusterstateapi.Resource, []*arbv1.AppWrapper) {
874886
r := unallocatedClusterResources.Clone()
@@ -915,14 +927,8 @@ func (qjm *XController) getAggregatedAvailableResourcesPriority(unallocatedClust
915927
klog.V(10).Infof("[getAggAvaiResPri] %s: Added %s to candidate preemptable job with priority %f.", time.Now().String(), value.Name, value.Status.SystemPriority)
916928
}
917929

918-
for _, resctrl := range qjm.qjobResControls {
919-
qjv := resctrl.GetAggregatedResources(value)
920-
preemptable = preemptable.Add(qjv)
921-
}
922-
for _, genericItem := range value.Spec.AggrResources.GenericItems {
923-
qjv, _ := genericresource.GetResources(&genericItem)
924-
preemptable = preemptable.Add(qjv)
925-
}
930+
totalResource := qjm.addTotalSnapshotResourcesConsumedByAw(value.Status.TotalGPU, value.Status.TotalCPU, value.Status.TotalMemory)
931+
preemptable = preemptable.Add(totalResource)
926932

927933
continue
928934
} else if qjm.isDispatcher {
@@ -944,20 +950,22 @@ func (qjm *XController) getAggregatedAvailableResourcesPriority(unallocatedClust
944950
pending = pending.Add(qjv)
945951
klog.V(10).Infof("[getAggAvaiResPri] Subtract all resources %+v in resctrlType=%T for job %s which can-run is set to: %v but state is still pending.", qjv, genericItem, value.Name, value.Status.CanRun)
946952
}
953+
947954
continue
948955
} else if value.Status.State == arbv1.AppWrapperStateActive {
949956
if value.Status.Pending > 0 {
950957
//Don't count partially running jobs with pods still pending.
951958
for _, resctrl := range qjm.qjobResControls {
952959
qjv := resctrl.GetAggregatedResources(value)
953960
pending = pending.Add(qjv)
954-
klog.V(10).Infof("[getAggAvaiResPri] Subtract all resources %+v in resctrlType=%T for job %s which can-run is set to: %v and status set to: %s but %v pod(s) are pending.", qjv, resctrl, value.Name, value.Status.CanRun, value.Status.State, value.Status.Pending)
961+
klog.V(4).Infof("[getAggAvaiResPri] Subtract all resources %+v in resctrlType=%T for job %s which can-run is set to: %v and status set to: %s but no pod counts in the state have been defined.", qjv, resctrl, value.Name, value.Status.CanRun, value.Status.State)
955962
}
956963
for _, genericItem := range value.Spec.AggrResources.GenericItems {
957964
qjv, _ := genericresource.GetResources(&genericItem)
958965
pending = pending.Add(qjv)
959-
klog.V(10).Infof("[getAggAvaiResPri] Subtract all resources %+v in resctrlType=%T for job %s which can-run is set to: %v and status set to: %s but %v pod(s) are pending.", qjv, genericItem, value.Name, value.Status.CanRun, value.Status.State, value.Status.Pending)
966+
klog.V(4).Infof("[getAggAvaiResPri] Subtract all resources %+v in resctrlType=%T for job %s which can-run is set to: %v and status set to: %s but no pod counts in the state have been defined.", qjv, genericItem, value.Name, value.Status.CanRun, value.Status.State)
960967
}
968+
961969
} else {
962970
// TODO: Hack to handle race condition when Running jobs have not yet updated the pod counts (In-Flight AW Jobs)
963971
// This hack uses the golang struct implied behavior of defining the object without a value. In this case
@@ -1259,7 +1267,13 @@ func (qjm *XController) ScheduleNext() {
12591267
qjm.cache.GetUnallocatedResources(), priorityindex, qj, "")
12601268
klog.Infof("[ScheduleNext] XQJ %s with resources %v to be scheduled on aggregated idle resources %v", qj.Name, aggqj, resources)
12611269

1262-
if aggqj.LessEqual(resources) && qjm.nodeChecks(qjm.cache.GetUnallocatedHistograms(), qj) {
1270+
// Assume preemption will remove low priroity AWs in the system, optimistically dispatch such AWs
1271+
1272+
if aggqj.LessEqual(resources) {
1273+
unallocatedHistogramMap := qjm.cache.GetUnallocatedHistograms()
1274+
if !qjm.nodeChecks(unallocatedHistogramMap, qj) {
1275+
klog.V(4).Infof("[ScheduleNext] Optimistic dispatch for AW %v in namespace %v requesting aggregated resources %v histogram for point in-time fragmented resources are available in the cluster %s", qj.Name, qj.Namespace, qjm.GetAggregatedResources(qj), proto.MarshalTextString(unallocatedHistogramMap["gpu"]))
1276+
}
12631277
// Now evaluate quota
12641278
fits := true
12651279
klog.V(4).Infof("[ScheduleNext] HOL available resourse successful check for %s at %s activeQ=%t Unsched=%t &qj=%p Version=%s Status=%+v due to quota limits", qj.Name, time.Now().Sub(HOLStartTime), qjm.qjqueue.IfExistActiveQ(qj), qjm.qjqueue.IfExistUnschedulableQ(qj), qj, qj.ResourceVersion, qj.Status)
@@ -2172,7 +2186,9 @@ func (cc *XController) manageQueueJob(qj *arbv1.AppWrapper, podPhaseChanges bool
21722186
updateQj = qj.DeepCopy()
21732187
}
21742188
cc.updateEtcd(updateQj, "[syncQueueJob] setCompleted")
2175-
cc.quotaManager.Release(updateQj)
2189+
if cc.serverOption.QuotaEnabled {
2190+
cc.quotaManager.Release(updateQj)
2191+
}
21762192
}
21772193

21782194
// Bugfix to eliminate performance problem of overloading the event queue.

pkg/controller/queuejobresources/pod/pod.go

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -243,6 +243,7 @@ func (qjrPod *QueueJobResPod) UpdateQueueJobStatus(queuejob *arbv1.AppWrapper) e
243243
}
244244

245245
running := int32(queuejobresources.FilterPods(pods, v1.PodRunning))
246+
totalResourcesConsumed := queuejobresources.GetPodResourcesByPhase(v1.PodRunning, pods)
246247
pending := int32(queuejobresources.FilterPods(pods, v1.PodPending))
247248
succeeded := int32(queuejobresources.FilterPods(pods, v1.PodSucceeded))
248249
failed := int32(queuejobresources.FilterPods(pods, v1.PodFailed))
@@ -254,6 +255,10 @@ func (qjrPod *QueueJobResPod) UpdateQueueJobStatus(queuejob *arbv1.AppWrapper) e
254255
queuejob.Status.Running = running
255256
queuejob.Status.Succeeded = succeeded
256257
queuejob.Status.Failed = failed
258+
//Total resources by all running pods
259+
queuejob.Status.TotalGPU = totalResourcesConsumed.GPU
260+
queuejob.Status.TotalCPU = totalResourcesConsumed.MilliCPU
261+
queuejob.Status.TotalMemory = totalResourcesConsumed.Memory
257262

258263
queuejob.Status.PendingPodConditions = nil
259264
for podName, cond := range podsConditionMap {

0 commit comments

Comments
 (0)