diff --git a/pkg/client/informers/controller-externalversion/generic.go b/pkg/client/informers/controller-externalversion/generic.go index f084c476a..a1715087f 100644 --- a/pkg/client/informers/controller-externalversion/generic.go +++ b/pkg/client/informers/controller-externalversion/generic.go @@ -71,11 +71,6 @@ func (f *sharedInformerFactory) ForResource(resource schema.GroupVersionResource resource: resource.GroupResource(), informer: f.SchedulingSpec().SchedulingSpecs().Informer(), }, nil - case arbv1.SchemeGroupVersion.WithResource("queuejobs"): - return &genericInformer{ - resource: resource.GroupResource(), - informer: f.QueueJob().QueueJobs().Informer(), - }, nil case arbv1.SchemeGroupVersion.WithResource("appwrappers"): return &genericInformer{ resource: resource.GroupResource(), diff --git a/pkg/client/informers/controller-externalversion/v1/interface.go b/pkg/client/informers/controller-externalversion/v1/interface.go index 114dca284..92de6ac4c 100644 --- a/pkg/client/informers/controller-externalversion/v1/interface.go +++ b/pkg/client/informers/controller-externalversion/v1/interface.go @@ -37,16 +37,13 @@ import ( // Interface provides access to all the informers in this group version. type Interface interface { // SchedulingSpecs returns a SchedulingSpecInformer. - SchedulingSpecs() SchedulingSpecInformer - // QueueJobs returns a QueueJobInformer. - QueueJobs() QueueJobInformer + SchedulingSpecs() SchedulingSpecInformer // AppWrappers returns a QueueJobInformer. - AppWrappers() AppWrapperInformer + AppWrappers() AppWrapperInformer } - type version struct { - factory internalinterfaces.SharedInformerFactory + factory internalinterfaces.SharedInformerFactory tweakListOptions internalinterfaces.TweakListOptionsFunc } @@ -60,11 +57,6 @@ func (v *version) SchedulingSpecs() SchedulingSpecInformer { return &schedulingSpecInformer{factory: v.factory, tweakListOptions: v.tweakListOptions} } -// QueueJobs returns a QueueJobInformer. -func (v *version) QueueJobs() QueueJobInformer { - return &queueJobInformer{factory: v.factory, tweakListOptions: v.tweakListOptions} -} - func (v *version) AppWrappers() AppWrapperInformer { return &appWrapperInformer{factory: v.factory, tweakListOptions: v.tweakListOptions} } diff --git a/pkg/client/informers/controller-externalversion/v1/queuejob.go b/pkg/client/informers/controller-externalversion/v1/queuejob.go deleted file mode 100644 index 94fa1faff..000000000 --- a/pkg/client/informers/controller-externalversion/v1/queuejob.go +++ /dev/null @@ -1,86 +0,0 @@ -/* -Copyright 2017 The Kubernetes Authors. - -Licensed under the Apache License, Version 2.0 (the "License"); -you may not use this file except in compliance with the License. -You may obtain a copy of the License at - - http://www.apache.org/licenses/LICENSE-2.0 - -Unless required by applicable law or agreed to in writing, software -distributed under the License is distributed on an "AS IS" BASIS, -WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -See the License for the specific language governing permissions and -limitations under the License. -*/ -/* -Copyright 2019, 2021 The Multi-Cluster App Dispatcher Authors. - -Licensed under the Apache License, Version 2.0 (the "License"); -you may not use this file except in compliance with the License. -You may obtain a copy of the License at - - http://www.apache.org/licenses/LICENSE-2.0 - -Unless required by applicable law or agreed to in writing, software -distributed under the License is distributed on an "AS IS" BASIS, -WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -See the License for the specific language governing permissions and -limitations under the License. -*/ -package v1 - -import ( - "time" - - meta_v1 "k8s.io/apimachinery/pkg/apis/meta/v1" - "k8s.io/apimachinery/pkg/fields" - "k8s.io/client-go/rest" - "k8s.io/client-go/tools/cache" - - arbv1 "github.com/project-codeflare/multi-cluster-app-dispatcher/pkg/apis/controller/v1beta1" - "github.com/project-codeflare/multi-cluster-app-dispatcher/pkg/client/informers/controller-externalversion/internalinterfaces" - "github.com/project-codeflare/multi-cluster-app-dispatcher/pkg/client/listers/controller/v1" -) - -// QueueJobInformer provides access to a shared informer and lister for -// QueueJobs. -type QueueJobInformer interface { - Informer() cache.SharedIndexInformer - Lister() v1.QueueJobLister -} - -type queueJobInformer struct { - factory internalinterfaces.SharedInformerFactory - tweakListOptions internalinterfaces.TweakListOptionsFunc -} - -// NewQueueJobInformer constructs a new informer for QueueJob type. -// Always prefer using an informer factory to get a shared informer instead of getting an independent -// one. This reduces memory footprint and number of connections to the server. -func NewQueueJobInformer(client *rest.RESTClient, namespace string, resyncPeriod time.Duration, indexers cache.Indexers) cache.SharedIndexInformer { - source := cache.NewListWatchFromClient( - client, - arbv1.QueueJobPlural, - namespace, - fields.Everything()) - - return cache.NewSharedIndexInformer( - source, - &arbv1.QueueJob{}, - resyncPeriod, - indexers, - ) -} - -func (f *queueJobInformer) defaultQueueJobInformer(client *rest.RESTClient, resyncPeriod time.Duration) cache.SharedIndexInformer { - return NewQueueJobInformer(client, meta_v1.NamespaceAll, resyncPeriod, cache.Indexers{cache.NamespaceIndex: cache.MetaNamespaceIndexFunc}) -} - -func (f *queueJobInformer) Informer() cache.SharedIndexInformer { - return f.factory.InformerFor(&arbv1.QueueJob{}, f.defaultQueueJobInformer) -} - -func (f *queueJobInformer) Lister() v1.QueueJobLister { - return v1.NewQueueJobLister(f.Informer().GetIndexer()) -} diff --git a/pkg/client/listers/controller/v1/queuejob.go b/pkg/client/listers/controller/v1/queuejob.go deleted file mode 100644 index df1ed8e4e..000000000 --- a/pkg/client/listers/controller/v1/queuejob.go +++ /dev/null @@ -1,105 +0,0 @@ -/* -Copyright 2017 The Kubernetes Authors. - -Licensed under the Apache License, Version 2.0 (the "License"); -you may not use this file except in compliance with the License. -You may obtain a copy of the License at - - http://www.apache.org/licenses/LICENSE-2.0 - -Unless required by applicable law or agreed to in writing, software -distributed under the License is distributed on an "AS IS" BASIS, -WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -See the License for the specific language governing permissions and -limitations under the License. -*/ -/* -Copyright 2019, 2021 The Multi-Cluster App Dispatcher Authors. - -Licensed under the Apache License, Version 2.0 (the "License"); -you may not use this file except in compliance with the License. -You may obtain a copy of the License at - - http://www.apache.org/licenses/LICENSE-2.0 - -Unless required by applicable law or agreed to in writing, software -distributed under the License is distributed on an "AS IS" BASIS, -WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -See the License for the specific language governing permissions and -limitations under the License. -*/ -package v1 - -import ( - arbv1 "github.com/project-codeflare/multi-cluster-app-dispatcher/pkg/apis/controller/v1beta1" - - "k8s.io/apimachinery/pkg/api/errors" - "k8s.io/apimachinery/pkg/labels" - "k8s.io/client-go/tools/cache" -) - -// QueueJobLister helps list QueueJobs. -type QueueJobLister interface { - // List lists all QueueJobs in the indexer. - List(selector labels.Selector) (ret []*arbv1.QueueJob, err error) - // QueueJobs returns an object that can list and get QueueJobs. - QueueJobs(namespace string) QueueJobNamespaceLister -} - -// queueJobLister implements the QueueJobLister interface. -type queueJobLister struct { - indexer cache.Indexer -} - -// NewQueueJobLister returns a new QueueJobLister. -func NewQueueJobLister(indexer cache.Indexer) QueueJobLister { - return &queueJobLister{indexer: indexer} -} - -// List lists all QueueJobs in the indexer. -func (s *queueJobLister) List(selector labels.Selector) (ret []*arbv1.QueueJob, err error) { - err = cache.ListAll(s.indexer, selector, func(m interface{}) { - ret = append(ret, m.(*arbv1.QueueJob)) - }) - return ret, err -} - -// QueueJobs returns an object that can list and get QueueJobs. -func (s *queueJobLister) QueueJobs(namespace string) QueueJobNamespaceLister { - return queueJobNamespaceLister{indexer: s.indexer, namespace: namespace} -} - -// QueueJobNamespaceLister helps list and get QueueJobs. -type QueueJobNamespaceLister interface { - // List lists all QueueJobs in the indexer for a given namespace. - List(selector labels.Selector) (ret []*arbv1.QueueJob, err error) - // Get retrieves the QueueJob from the indexer for a given namespace and name. - Get(name string) (*arbv1.QueueJob, error) -} - -// queueJobNamespaceLister implements the QueueJobNamespaceLister -// interface. -type queueJobNamespaceLister struct { - indexer cache.Indexer - namespace string -} - -// List lists all QueueJobs in the indexer for a given namespace. -func (s queueJobNamespaceLister) List(selector labels.Selector) (ret []*arbv1.QueueJob, err error) { - err = cache.ListAllByNamespace(s.indexer, s.namespace, selector, func(m interface{}) { - ret = append(ret, m.(*arbv1.QueueJob)) - }) - return ret, err -} - -// Get retrieves the QueueJob from the indexer for a given namespace and name. -func (s queueJobNamespaceLister) Get(name string) (*arbv1.QueueJob, error) { - obj, exists, err := s.indexer.GetByKey(s.namespace + "/" + name) - if err != nil { - return nil, err - } - if !exists { - return nil, errors.NewNotFound(arbv1.Resource("queuejobs"), name) - } - return obj.(*arbv1.QueueJob), nil -} diff --git a/pkg/controller/clusterstate/api/cluster_info.go b/pkg/controller/clusterstate/api/cluster_info.go index 1b31bcf72..2bd382893 100644 --- a/pkg/controller/clusterstate/api/cluster_info.go +++ b/pkg/controller/clusterstate/api/cluster_info.go @@ -30,46 +30,7 @@ limitations under the License. */ package api -import "fmt" - // ClusterInfo is a snapshot of cluster by cache. type ClusterInfo struct { - Jobs []*JobInfo - Nodes []*NodeInfo } - -func (ci ClusterInfo) String() string { - - str := "Cache:\n" - - if len(ci.Nodes) != 0 { - str = str + "Nodes:\n" - for _, n := range ci.Nodes { - str = str + fmt.Sprintf("\t %s: idle(%v) used(%v) allocatable(%v) pods(%d)\n", - n.Name, n.Idle, n.Used, n.Allocatable, len(n.Tasks)) - - i := 0 - for _, p := range n.Tasks { - str = str + fmt.Sprintf("\t\t %d: %v\n", i, p) - i++ - } - } - } - - if len(ci.Jobs) != 0 { - str = str + "Jobs:\n" - for _, job := range ci.Jobs { - str = str + fmt.Sprintf("\t Job(%s) name(%s) minAvailable(%v)\n", - job.UID, job.Name, job.MinAvailable) - - i := 0 - for _, task := range job.Tasks { - str = str + fmt.Sprintf("\t\t %d: %v\n", i, task) - i++ - } - } - } - - return str -} diff --git a/pkg/controller/clusterstate/api/helpers.go b/pkg/controller/clusterstate/api/helpers.go index 0749d76c6..00a43f5b7 100644 --- a/pkg/controller/clusterstate/api/helpers.go +++ b/pkg/controller/clusterstate/api/helpers.go @@ -34,19 +34,8 @@ import ( "fmt" v1 "k8s.io/api/core/v1" - clientcache "k8s.io/client-go/tools/cache" - "k8s.io/klog/v2" ) -// PodKey returns the string key of a pod. -func PodKey(pod *v1.Pod) TaskID { - if key, err := clientcache.MetaNamespaceKeyFunc(pod); err != nil { - return TaskID(fmt.Sprintf("%v/%v", pod.Namespace, pod.Name)) - } else { - return TaskID(key) - } -} - func getTaskStatus(pod *v1.Pod) TaskStatus { switch pod.Status.Phase { case v1.PodRunning: @@ -111,19 +100,6 @@ func MergeErrors(errs ...error) error { return nil } -// JobTerminated checkes whether job was terminated. -func JobTerminated(job *JobInfo) bool { - if job.SchedSpec == nil && len(job.Tasks) == 0 { - klog.V(9).Infof("Job: %v is terminated.", job.UID) - return true - } else { - klog.V(10).Infof("Job: %v not terminated, scheduleSpec: %v, tasks: %v.", - job.UID, job.SchedSpec, job.Tasks) - return false - } - -} - func NewStringsMap(source map[string]string) map[string]string { target := make(map[string]string) diff --git a/pkg/controller/clusterstate/api/job_info.go b/pkg/controller/clusterstate/api/job_info.go deleted file mode 100644 index 163d2cbd8..000000000 --- a/pkg/controller/clusterstate/api/job_info.go +++ /dev/null @@ -1,289 +0,0 @@ -/* -Copyright 2017 The Kubernetes Authors. - -Licensed under the Apache License, Version 2.0 (the "License"); -you may not use this file except in compliance with the License. -You may obtain a copy of the License at - - http://www.apache.org/licenses/LICENSE-2.0 - -Unless required by applicable law or agreed to in writing, software -distributed under the License is distributed on an "AS IS" BASIS, -WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -See the License for the specific language governing permissions and -limitations under the License. -*/ -/* -Copyright 2019, 2021 The Multi-Cluster App Dispatcher Authors. - -Licensed under the Apache License, Version 2.0 (the "License"); -you may not use this file except in compliance with the License. -You may obtain a copy of the License at - - http://www.apache.org/licenses/LICENSE-2.0 - -Unless required by applicable law or agreed to in writing, software -distributed under the License is distributed on an "AS IS" BASIS, -WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -See the License for the specific language governing permissions and -limitations under the License. -*/ -package api - -import ( - "fmt" - - v1 "k8s.io/api/core/v1" - "k8s.io/apimachinery/pkg/types" - "k8s.io/klog/v2" - - "github.com/project-codeflare/multi-cluster-app-dispatcher/pkg/apis/controller/utils" - arbv1 "github.com/project-codeflare/multi-cluster-app-dispatcher/pkg/apis/controller/v1beta1" -) - -type TaskID types.UID - -type TaskInfo struct { - UID TaskID - Job JobID - - Name string - Namespace string - - Resreq *Resource - - NodeName string - Status TaskStatus - Priority int32 - - Pod *v1.Pod -} - -func NewTaskInfo(pod *v1.Pod) *TaskInfo { - req := EmptyResource() - - // TODO(k82cn): also includes initContainers' resource. - for _, c := range pod.Spec.Containers { - req.Add(NewResource(c.Resources.Requests)) - } - - pi := &TaskInfo{ - UID: TaskID(pod.UID), - Job: JobID(utils.GetJobID(pod)), - Name: pod.Name, - Namespace: pod.Namespace, - NodeName: pod.Spec.NodeName, - Status: getTaskStatus(pod), - Priority: 1, - - Pod: pod, - Resreq: req, - } - - if pod.Spec.Priority != nil { - pi.Priority = *pod.Spec.Priority - } - - return pi -} - -func (pi *TaskInfo) Clone() *TaskInfo { - return &TaskInfo{ - UID: pi.UID, - Job: pi.Job, - Name: pi.Name, - Namespace: pi.Namespace, - NodeName: pi.NodeName, - Status: pi.Status, - Priority: pi.Priority, - Pod: pi.Pod, - Resreq: pi.Resreq.Clone(), - } -} - -func (pi TaskInfo) String() string { - return fmt.Sprintf("Task (%v:%v/%v): job %v, status %v, pri %v, resreq %v", - pi.UID, pi.Namespace, pi.Name, pi.Job, pi.Status, pi.Priority, pi.Resreq) -} - -// JobID is the type of JobInfo's ID. -type JobID types.UID - -type tasksMap map[TaskID]*TaskInfo - -type JobInfo struct { - UID JobID - - Name string - Namespace string - - Priority int - - NodeSelector map[string]string - MinAvailable int - - // All tasks of the Job. - TaskStatusIndex map[TaskStatus]tasksMap - Tasks tasksMap - - Allocated *Resource - TotalRequest *Resource - - // Candidate hosts for this job. - Candidates []*NodeInfo - - SchedSpec *arbv1.SchedulingSpec -} - -func NewJobInfo(uid JobID) *JobInfo { - return &JobInfo{ - UID: uid, - - MinAvailable: 0, - NodeSelector: make(map[string]string), - - Allocated: EmptyResource(), - TotalRequest: EmptyResource(), - - TaskStatusIndex: map[TaskStatus]tasksMap{}, - Tasks: tasksMap{}, - } -} - -func (ps *JobInfo) UnsetSchedulingSpec() { - ps.SchedSpec = nil -} - -func (ps *JobInfo) SetSchedulingSpec(spec *arbv1.SchedulingSpec) { - ps.Name = spec.Name - ps.Namespace = spec.Namespace - ps.MinAvailable = spec.Spec.MinAvailable - - for k, v := range spec.Spec.NodeSelector { - ps.NodeSelector[k] = v - } - - ps.SchedSpec = spec -} - -func (ps *JobInfo) GetTasks(statuses ...TaskStatus) []*TaskInfo { - var res []*TaskInfo - - for _, status := range statuses { - if tasks, found := ps.TaskStatusIndex[status]; found { - for _, task := range tasks { - res = append(res, task.Clone()) - } - } - } - - return res -} - -func (ps *JobInfo) addTaskIndex(pi *TaskInfo) { - if _, found := ps.TaskStatusIndex[pi.Status]; !found { - ps.TaskStatusIndex[pi.Status] = tasksMap{} - } - - ps.TaskStatusIndex[pi.Status][pi.UID] = pi -} - -func (ps *JobInfo) AddTaskInfo(pi *TaskInfo) { - ps.Tasks[pi.UID] = pi - ps.addTaskIndex(pi) - - ps.TotalRequest.Add(pi.Resreq) - - if AllocatedStatus(pi.Status) { - ps.Allocated.Add(pi.Resreq) - } -} - -func (ps *JobInfo) UpdateTaskStatus(task *TaskInfo, status TaskStatus) error { - if err := validateStatusUpdate(task.Status, status); err != nil { - return err - } - - // Remove the task from the task list firstly - ps.DeleteTaskInfo(task) - - // Update task's status to the target status - task.Status = status - ps.AddTaskInfo(task) - - return nil -} - -func (ps *JobInfo) deleteTaskIndex(ti *TaskInfo) { - if tasks, found := ps.TaskStatusIndex[ti.Status]; found { - delete(tasks, ti.UID) - - if len(tasks) == 0 { - delete(ps.TaskStatusIndex, ti.Status) - } - } -} - -func (ps *JobInfo) DeleteTaskInfo(pi *TaskInfo) error { - if task, found := ps.Tasks[pi.UID]; found { - _, err := ps.TotalRequest.Sub(task.Resreq) - if err != nil { - klog.Warningf("[DeleteTaskInfo] Total requested subtraction err=%v", err) - } - - if AllocatedStatus(task.Status) { - _, err := ps.Allocated.Sub(task.Resreq) - if err != nil { - klog.Warningf("[DeleteTaskInfo] Allocated subtraction err=%v", err) - } - } - - delete(ps.Tasks, task.UID) - - ps.deleteTaskIndex(task) - return nil - } - - return fmt.Errorf("failed to find task <%v/%v> in job <%v/%v>", - pi.Namespace, pi.Name, ps.Namespace, ps.Name) -} - -func (ps *JobInfo) Clone() *JobInfo { - info := &JobInfo{ - UID: ps.UID, - Name: ps.Name, - Namespace: ps.Namespace, - Priority: ps.Priority, - - MinAvailable: ps.MinAvailable, - Allocated: EmptyResource(), - TotalRequest: EmptyResource(), - - TaskStatusIndex: map[TaskStatus]tasksMap{}, - Tasks: tasksMap{}, - } - if ps.NodeSelector != nil { - info.NodeSelector = map[string]string{} - for k, v := range ps.NodeSelector { - info.NodeSelector[k] = v - } - } - - for _, task := range ps.Tasks { - info.AddTaskInfo(task.Clone()) - } - - return info -} - -func (ps JobInfo) String() string { - res := "" - - i := 0 - for _, task := range ps.Tasks { - res = res + fmt.Sprintf("\n\t %d: %v", i, task) - i++ - } - - return fmt.Sprintf("Job (%v): name %v, minAvailable %d", ps.UID, ps.Name, ps.MinAvailable) + res -} diff --git a/pkg/controller/clusterstate/api/job_info_test.go b/pkg/controller/clusterstate/api/job_info_test.go deleted file mode 100644 index 4875e56c3..000000000 --- a/pkg/controller/clusterstate/api/job_info_test.go +++ /dev/null @@ -1,267 +0,0 @@ -/* -Copyright 2017 The Kubernetes Authors. - -Licensed under the Apache License, Version 2.0 (the "License"); -you may not use this file except in compliance with the License. -You may obtain a copy of the License at - - http://www.apache.org/licenses/LICENSE-2.0 - -Unless required by applicable law or agreed to in writing, software -distributed under the License is distributed on an "AS IS" BASIS, -WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -See the License for the specific language governing permissions and -limitations under the License. -*/ -/* -Copyright 2019, 2021 The Multi-Cluster App Dispatcher Authors. - -Licensed under the Apache License, Version 2.0 (the "License"); -you may not use this file except in compliance with the License. -You may obtain a copy of the License at - - http://www.apache.org/licenses/LICENSE-2.0 - -Unless required by applicable law or agreed to in writing, software -distributed under the License is distributed on an "AS IS" BASIS, -WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -See the License for the specific language governing permissions and -limitations under the License. -*/ -package api - -import ( - "reflect" - "testing" - - v1 "k8s.io/api/core/v1" - metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" -) - -func jobInfoEqual(l, r *JobInfo) bool { - if !reflect.DeepEqual(l, r) { - return false - } - - return true -} - -func TestAddTaskInfo(t *testing.T) { - // case1 - case01_uid := JobID("uid") - case01_owner := buildOwnerReference("uid") - - case01_pod1 := buildPod("c1", "p1", "", v1.PodPending, buildResourceList("1000m", "1G"), []metav1.OwnerReference{case01_owner}, make(map[string]string)) - case01_task1 := NewTaskInfo(case01_pod1) - case01_pod2 := buildPod("c1", "p2", "n1", v1.PodRunning, buildResourceList("2000m", "2G"), []metav1.OwnerReference{case01_owner}, make(map[string]string)) - case01_task2 := NewTaskInfo(case01_pod2) - case01_pod3 := buildPod("c1", "p3", "n1", v1.PodPending, buildResourceList("1000m", "1G"), []metav1.OwnerReference{case01_owner}, make(map[string]string)) - case01_task3 := NewTaskInfo(case01_pod3) - case01_pod4 := buildPod("c1", "p4", "n1", v1.PodPending, buildResourceList("1000m", "1G"), []metav1.OwnerReference{case01_owner}, make(map[string]string)) - case01_task4 := NewTaskInfo(case01_pod4) - - tests := []struct { - name string - uid JobID - pods []*v1.Pod - expected *JobInfo - }{ - { - name: "add 1 pending owner pod, 1 running owner pod", - uid: case01_uid, - pods: []*v1.Pod{case01_pod1, case01_pod2, case01_pod3, case01_pod4}, - expected: &JobInfo{ - UID: case01_uid, - MinAvailable: 0, - Allocated: buildResource("4000m", "4G"), - TotalRequest: buildResource("5000m", "5G"), - Tasks: tasksMap{ - case01_task1.UID: case01_task1, - case01_task2.UID: case01_task2, - case01_task3.UID: case01_task3, - case01_task4.UID: case01_task4, - }, - TaskStatusIndex: map[TaskStatus]tasksMap{ - Running: { - case01_task2.UID: case01_task2, - }, - Pending: { - case01_task1.UID: case01_task1, - }, - Bound: { - case01_task3.UID: case01_task3, - case01_task4.UID: case01_task4, - }, - }, - NodeSelector: make(map[string]string), - }, - }, - } - - for i, test := range tests { - ps := NewJobInfo(test.uid) - - for _, pod := range test.pods { - pi := NewTaskInfo(pod) - ps.AddTaskInfo(pi) - } - - if !jobInfoEqual(ps, test.expected) { - t.Errorf("podset info %d: \n expected: %v, \n got: %v \n", - i, test.expected, ps) - } - } -} - -func TestDeleteTaskInfo(t *testing.T) { - // case1 - case01_uid := JobID("owner1") - case01_owner := buildOwnerReference(string(case01_uid)) - case01_pod1 := buildPod("c1", "p1", "", v1.PodPending, buildResourceList("1000m", "1G"), []metav1.OwnerReference{case01_owner}, make(map[string]string)) - case01_task1 := NewTaskInfo(case01_pod1) - case01_pod2 := buildPod("c1", "p2", "n1", v1.PodRunning, buildResourceList("2000m", "2G"), []metav1.OwnerReference{case01_owner}, make(map[string]string)) - case01_pod3 := buildPod("c1", "p3", "n1", v1.PodRunning, buildResourceList("3000m", "3G"), []metav1.OwnerReference{case01_owner}, make(map[string]string)) - case01_task3 := NewTaskInfo(case01_pod3) - - // case2 - case02_uid := JobID("owner2") - case02_owner := buildOwnerReference(string(case02_uid)) - case02_pod1 := buildPod("c1", "p1", "", v1.PodPending, buildResourceList("1000m", "1G"), []metav1.OwnerReference{case02_owner}, make(map[string]string)) - case02_task1 := NewTaskInfo(case02_pod1) - case02_pod2 := buildPod("c1", "p2", "n1", v1.PodPending, buildResourceList("2000m", "2G"), []metav1.OwnerReference{case02_owner}, make(map[string]string)) - case02_pod3 := buildPod("c1", "p3", "n1", v1.PodRunning, buildResourceList("3000m", "3G"), []metav1.OwnerReference{case02_owner}, make(map[string]string)) - case02_task3 := NewTaskInfo(case02_pod3) - - tests := []struct { - name string - uid JobID - pods []*v1.Pod - rmPods []*v1.Pod - expected *JobInfo - }{ - { - name: "add 1 pending owner pod, 2 running owner pod, remove 1 running owner pod", - uid: case01_uid, - pods: []*v1.Pod{case01_pod1, case01_pod2, case01_pod3}, - rmPods: []*v1.Pod{case01_pod2}, - expected: &JobInfo{ - UID: case01_uid, - MinAvailable: 0, - Allocated: buildResource("3000m", "3G"), - TotalRequest: buildResource("4000m", "4G"), - Tasks: tasksMap{ - case01_task1.UID: case01_task1, - case01_task3.UID: case01_task3, - }, - TaskStatusIndex: map[TaskStatus]tasksMap{ - Pending: {case01_task1.UID: case01_task1}, - Running: {case01_task3.UID: case01_task3}, - }, - NodeSelector: make(map[string]string), - }, - }, - { - name: "add 2 pending owner pod, 1 running owner pod, remove 1 pending owner pod", - uid: case02_uid, - pods: []*v1.Pod{case02_pod1, case02_pod2, case02_pod3}, - rmPods: []*v1.Pod{case02_pod2}, - expected: &JobInfo{ - UID: case02_uid, - MinAvailable: 0, - Allocated: buildResource("3000m", "3G"), - TotalRequest: buildResource("4000m", "4G"), - Tasks: tasksMap{ - case02_task1.UID: case02_task1, - case02_task3.UID: case02_task3, - }, - TaskStatusIndex: map[TaskStatus]tasksMap{ - Pending: { - case02_task1.UID: case02_task1, - }, - Running: { - case02_task3.UID: case02_task3, - }, - }, - NodeSelector: make(map[string]string), - }, - }, - } - - for i, test := range tests { - ps := NewJobInfo(test.uid) - - for _, pod := range test.pods { - pi := NewTaskInfo(pod) - ps.AddTaskInfo(pi) - } - - for _, pod := range test.rmPods { - pi := NewTaskInfo(pod) - ps.DeleteTaskInfo(pi) - } - - if !jobInfoEqual(ps, test.expected) { - t.Errorf("podset info %d: \n expected: %v, \n got: %v \n", - i, test.expected, ps) - } - } -} - -func TestCloneJob(t *testing.T) { - case01UID := JobID("job_1") - case01Ns := "c1" - case01_owner := buildOwnerReference("uid") - - case01Pod1 := buildPod("c1", "p1", "", v1.PodPending, buildResourceList("1000m", "1G"), []metav1.OwnerReference{case01_owner}, make(map[string]string)) - case01Pod2 := buildPod("c1", "p2", "n1", v1.PodRunning, buildResourceList("2000m", "2G"), []metav1.OwnerReference{case01_owner}, make(map[string]string)) - case01Pod3 := buildPod("c1", "p3", "n1", v1.PodPending, buildResourceList("1000m", "1G"), []metav1.OwnerReference{case01_owner}, make(map[string]string)) - case01Pod4 := buildPod("c1", "p4", "n1", v1.PodPending, buildResourceList("1000m", "1G"), []metav1.OwnerReference{case01_owner}, make(map[string]string)) - case01Task1 := NewTaskInfo(case01Pod1) - case01Task2 := NewTaskInfo(case01Pod2) - case01Task3 := NewTaskInfo(case01Pod3) - case01Task4 := NewTaskInfo(case01Pod4) - - tests := []struct { - name string - expected *JobInfo - }{ - { - name: "add 1 pending owner pod, 1 running owner pod", - expected: &JobInfo{ - UID: case01UID, - Name: "job_1", - Namespace: case01Ns, - MinAvailable: 1, - - Allocated: buildResource("4000m", "4G"), - TotalRequest: buildResource("5000m", "5G"), - Tasks: tasksMap{ - case01Task1.UID: case01Task1, - case01Task2.UID: case01Task2, - case01Task3.UID: case01Task3, - case01Task4.UID: case01Task4, - }, - TaskStatusIndex: map[TaskStatus]tasksMap{ - Running: { - case01Task2.UID: case01Task2, - }, - Pending: { - case01Task1.UID: case01Task1, - }, - Bound: { - case01Task3.UID: case01Task3, - case01Task4.UID: case01Task4, - }, - }, - }, - }, - } - - for i, test := range tests { - clone := test.expected.Clone() - //assert.Equal(t, test.expected, clone) - if !jobInfoEqual(clone, test.expected) { - t.Errorf("clone info %d: \n expected: %v, \n got: %v \n", i, test.expected, clone) - } - } -} diff --git a/pkg/controller/clusterstate/api/node_info.go b/pkg/controller/clusterstate/api/node_info.go index 19fffd384..8696749db 100644 --- a/pkg/controller/clusterstate/api/node_info.go +++ b/pkg/controller/clusterstate/api/node_info.go @@ -34,7 +34,6 @@ import ( "fmt" v1 "k8s.io/api/core/v1" - "k8s.io/klog/v2" ) // NodeInfo is node level aggregated information. @@ -61,8 +60,6 @@ type NodeInfo struct { // Taints for potential filtering Taints []v1.Taint - - Tasks map[TaskID]*TaskInfo } func NewNodeInfo(node *v1.Node) *NodeInfo { @@ -75,11 +72,9 @@ func NewNodeInfo(node *v1.Node) *NodeInfo { Allocatable: EmptyResource(), Capability: EmptyResource(), - Labels: make(map[string]string), + Labels: make(map[string]string), Unschedulable: false, - Taints: []v1.Taint{}, - - Tasks: make(map[TaskID]*TaskInfo), + Taints: []v1.Taint{}, } } @@ -94,21 +89,15 @@ func NewNodeInfo(node *v1.Node) *NodeInfo { Allocatable: NewResource(node.Status.Allocatable), Capability: NewResource(node.Status.Capacity), - Labels: node.GetLabels(), + Labels: node.GetLabels(), Unschedulable: node.Spec.Unschedulable, - Taints: node.Spec.Taints, - - Tasks: make(map[TaskID]*TaskInfo), + Taints: node.Spec.Taints, } } func (ni *NodeInfo) Clone() *NodeInfo { res := NewNodeInfo(ni.Node) - for _, p := range ni.Tasks { - res.AddTask(p) - } - return res } @@ -116,18 +105,6 @@ func (ni *NodeInfo) SetNode(node *v1.Node) { if ni.Node == nil { ni.Idle = NewResource(node.Status.Allocatable) - for _, task := range ni.Tasks { - if task.Status == Releasing { - ni.Releasing.Add(task.Resreq) - } - - _, err := ni.Idle.Sub(task.Resreq) - if err != nil { - klog.Warningf("[SetNode] Node idle amount subtraction err=%v", err) - } - - ni.Used.Add(task.Resreq) - } } ni.Name = node.Name @@ -139,109 +116,9 @@ func (ni *NodeInfo) SetNode(node *v1.Node) { ni.Taints = NewTaints(node.Spec.Taints) } -func (ni *NodeInfo) PipelineTask(task *TaskInfo) error { - key := PodKey(task.Pod) - if _, found := ni.Tasks[key]; found { - return fmt.Errorf("task <%v/%v> already on node <%v>", - task.Namespace, task.Name, ni.Name) - } - - ti := task.Clone() - - if ni.Node != nil { - _, err := ni.Releasing.Sub(ti.Resreq) - if err != nil { - klog.Warningf("[PipelineTask] Node release subtraction err=%v", err) - } - - ni.Used.Add(ti.Resreq) - } - - ni.Tasks[key] = ti - - return nil -} - -func (ni *NodeInfo) AddTask(task *TaskInfo) error { - key := PodKey(task.Pod) - if _, found := ni.Tasks[key]; found { - return fmt.Errorf("task <%v/%v> already on node <%v>", - task.Namespace, task.Name, ni.Name) - } - - // Node will hold a copy of task to make sure the status - // change will not impact resource in node. - ti := task.Clone() - - if ni.Node != nil { - if ti.Status == Releasing { - ni.Releasing.Add(ti.Resreq) - } - _, err := ni.Idle.Sub(ti.Resreq) - if err != nil { - klog.Warningf("[AddTask] Idle resource subtract err=%v", err) - } - - ni.Used.Add(ti.Resreq) - } - - ni.Tasks[key] = ti - - return nil -} - -func (ni *NodeInfo) RemoveTask(ti *TaskInfo) error { - klog.V(10).Infof("Attempting to remove task: %s on node: %s", ti.Name, ni.Name) - - key := PodKey(ti.Pod) - - task, found := ni.Tasks[key] - if !found { - return fmt.Errorf("failed to find task <%v/%v> on host <%v>", - ti.Namespace, ti.Name, ni.Name) - } - - if ni.Node != nil { - klog.V(10).Infof("Found node for task: %s, node: %s, task status: %v", task.Name, ni.Name, task.Status) - if task.Status == Releasing { - _, err := ni.Releasing.Sub(task.Resreq) - if err != nil { - klog.Warningf("[RemoveTask] Node release subtraction err=%v", err) - } - } - - ni.Idle.Add(task.Resreq) - _, err := ni.Used.Sub(task.Resreq) - if err != nil { - klog.Warningf("[RemoveTask] Node usage subtraction err=%v", err) - } - } else { - klog.V(10).Infof("No node info found for task: %s, node: %s", task.Name, ni.Name) - } - - delete(ni.Tasks, key) - - return nil -} - -func (ni *NodeInfo) UpdateTask(ti *TaskInfo) error { - klog.V(10).Infof("Attempting to update task: %s on node: %s", ti.Name, ni.Name) - if err := ni.RemoveTask(ti); err != nil { - return err - } - - return ni.AddTask(ti) -} - func (ni NodeInfo) String() string { res := "" - i := 0 - for _, task := range ni.Tasks { - res = res + fmt.Sprintf("\n\t %d: %v", i, task) - i++ - } - return fmt.Sprintf("Node (%s): idle <%v>, used <%v>, releasing <%v>%s", ni.Name, ni.Idle, ni.Used, ni.Releasing, res) diff --git a/pkg/controller/clusterstate/api/node_info_test.go b/pkg/controller/clusterstate/api/node_info_test.go index d6e16d6ba..4294ef786 100644 --- a/pkg/controller/clusterstate/api/node_info_test.go +++ b/pkg/controller/clusterstate/api/node_info_test.go @@ -32,12 +32,10 @@ package api import ( "reflect" - "testing" - - "k8s.io/api/core/v1" - metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" ) +//MCAD schedules AWs and not pods. +//legacy code to schedule pods is removed func nodeInfoEqual(l, r *NodeInfo) bool { if !reflect.DeepEqual(l, r) { return false @@ -45,105 +43,3 @@ func nodeInfoEqual(l, r *NodeInfo) bool { return true } - -func TestNodeInfo_AddPod(t *testing.T) { - // case1 - case01_node := buildNode("n1", buildResourceList("8000m", "10G")) - case01_pod1 := buildPod("c1", "p1", "n1", v1.PodRunning, buildResourceList("1000m", "1G"), []metav1.OwnerReference{}, make(map[string]string)) - case01_pod2 := buildPod("c1", "p2", "n1", v1.PodRunning, buildResourceList("2000m", "2G"), []metav1.OwnerReference{}, make(map[string]string)) - - tests := []struct { - name string - node *v1.Node - pods []*v1.Pod - expected *NodeInfo - }{ - { - name: "add 2 running non-owner pod", - node: case01_node, - pods: []*v1.Pod{case01_pod1, case01_pod2}, - expected: &NodeInfo{ - Name: "n1", - Node: case01_node, - Idle: buildResource("5000m", "7G"), - Used: buildResource("3000m", "3G"), - Releasing: EmptyResource(), - Allocatable: buildResource("8000m", "10G"), - Capability: buildResource("8000m", "10G"), - Tasks: map[TaskID]*TaskInfo{ - "c1/p1": NewTaskInfo(case01_pod1), - "c1/p2": NewTaskInfo(case01_pod2), - }, - }, - }, - } - - for i, test := range tests { - ni := NewNodeInfo(test.node) - - for _, pod := range test.pods { - pi := NewTaskInfo(pod) - ni.AddTask(pi) - } - - if !nodeInfoEqual(ni, test.expected) { - t.Errorf("node info %d: \n expected %v, \n got %v \n", - i, test.expected, ni) - } - } -} - -func TestNodeInfo_RemovePod(t *testing.T) { - // case1 - case01_node := buildNode("n1", buildResourceList("8000m", "10G")) - case01_pod1 := buildPod("c1", "p1", "n1", v1.PodRunning, buildResourceList("1000m", "1G"), []metav1.OwnerReference{}, make(map[string]string)) - case01_pod2 := buildPod("c1", "p2", "n1", v1.PodRunning, buildResourceList("2000m", "2G"), []metav1.OwnerReference{}, make(map[string]string)) - case01_pod3 := buildPod("c1", "p3", "n1", v1.PodRunning, buildResourceList("3000m", "3G"), []metav1.OwnerReference{}, make(map[string]string)) - - tests := []struct { - name string - node *v1.Node - pods []*v1.Pod - rmPods []*v1.Pod - expected *NodeInfo - }{ - { - name: "add 3 running non-owner pod, remove 1 running non-owner pod", - node: case01_node, - pods: []*v1.Pod{case01_pod1, case01_pod2, case01_pod3}, - rmPods: []*v1.Pod{case01_pod2}, - expected: &NodeInfo{ - Name: "n1", - Node: case01_node, - Idle: buildResource("4000m", "6G"), - Used: buildResource("4000m", "4G"), - Releasing: EmptyResource(), - Allocatable: buildResource("8000m", "10G"), - Capability: buildResource("8000m", "10G"), - Tasks: map[TaskID]*TaskInfo{ - "c1/p1": NewTaskInfo(case01_pod1), - "c1/p3": NewTaskInfo(case01_pod3), - }, - }, - }, - } - - for i, test := range tests { - ni := NewNodeInfo(test.node) - - for _, pod := range test.pods { - pi := NewTaskInfo(pod) - ni.AddTask(pi) - } - - for _, pod := range test.rmPods { - pi := NewTaskInfo(pod) - ni.RemoveTask(pi) - } - - if !nodeInfoEqual(ni, test.expected) { - t.Errorf("node info %d: \n expected %v, \n got %v \n", - i, test.expected, ni) - } - } -} diff --git a/pkg/controller/clusterstate/api/test_utils.go b/pkg/controller/clusterstate/api/test_utils.go index c8f1f0609..f116ea2fc 100644 --- a/pkg/controller/clusterstate/api/test_utils.go +++ b/pkg/controller/clusterstate/api/test_utils.go @@ -34,7 +34,7 @@ import ( "fmt" "reflect" - "k8s.io/api/core/v1" + v1 "k8s.io/api/core/v1" "k8s.io/api/policy/v1beta1" "k8s.io/apimachinery/pkg/api/resource" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" @@ -56,20 +56,6 @@ func nodesEqual(l, r map[string]*NodeInfo) bool { return true } -func podsEqual(l, r map[string]*TaskInfo) bool { - if len(l) != len(r) { - return false - } - - for k, p := range l { - if !reflect.DeepEqual(p, r[k]) { - return false - } - } - - return true -} - func buildNode(name string, alloc v1.ResourceList) *v1.Node { return &v1.Node{ ObjectMeta: metav1.ObjectMeta{ diff --git a/pkg/controller/clusterstate/cache/cache.go b/pkg/controller/clusterstate/cache/cache.go index e011438e1..27f5ba000 100644 --- a/pkg/controller/clusterstate/cache/cache.go +++ b/pkg/controller/clusterstate/cache/cache.go @@ -68,7 +68,6 @@ type ClusterStateCache struct { nodeInformer clientv1.NodeInformer schedulingSpecInformer arbclient.SchedulingSpecInformer - Jobs map[api.JobID]*api.JobInfo Nodes map[string]*api.NodeInfo availableResources *api.Resource @@ -79,40 +78,9 @@ type ClusterStateCache struct { errTasks *cache.FIFO } -func taskKey(obj interface{}) (string, error) { - if obj == nil { - return "", fmt.Errorf("the object is nil") - } - - task, ok := obj.(*api.TaskInfo) - - if !ok { - return "", fmt.Errorf("failed to convert %v to TaskInfo", obj) - } - - return string(task.UID), nil -} - -func jobKey(obj interface{}) (string, error) { - if obj == nil { - return "", fmt.Errorf("the object is nil") - } - - job, ok := obj.(*api.JobInfo) - - if !ok { - return "", fmt.Errorf("failed to convert %v to TaskInfo", obj) - } - - return string(job.UID), nil -} - func newClusterStateCache(config *rest.Config) *ClusterStateCache { sc := &ClusterStateCache{ - Jobs: make(map[api.JobID]*api.JobInfo), - Nodes: make(map[string]*api.NodeInfo), - errTasks: cache.NewFIFO(taskKey), - deletedJobs: cache.NewFIFO(jobKey), + Nodes: make(map[string]*api.NodeInfo), } sc.kubeclient = kubernetes.NewForConfigOrDie(config) @@ -148,11 +116,7 @@ func newClusterStateCache(config *rest.Config) *ClusterStateCache { return false } }, - Handler: cache.ResourceEventHandlerFuncs{ - AddFunc: sc.AddPod, - UpdateFunc: sc.UpdatePod, - DeleteFunc: sc.DeletePod, - }, + Handler: cache.ResourceEventHandlerFuncs{}, }) // create queue informer @@ -164,11 +128,7 @@ func newClusterStateCache(config *rest.Config) *ClusterStateCache { schedulingSpecInformerFactory := informerfactory.NewSharedInformerFactory(queueClient, 0) // create informer for Queue information sc.schedulingSpecInformer = schedulingSpecInformerFactory.SchedulingSpec().SchedulingSpecs() - sc.schedulingSpecInformer.Informer().AddEventHandler(cache.ResourceEventHandlerFuncs{ - AddFunc: sc.AddSchedulingSpec, - UpdateFunc: sc.UpdateSchedulingSpec, - DeleteFunc: sc.DeleteSchedulingSpec, - }) + sc.schedulingSpecInformer.Informer().AddEventHandler(cache.ResourceEventHandlerFuncs{}) sc.availableResources = api.EmptyResource() sc.availableHistogram = api.NewResourceHistogram(api.EmptyResource(), api.EmptyResource()) @@ -187,12 +147,6 @@ func (sc *ClusterStateCache) Run(stopCh <-chan struct{}) { // Update cache go sc.updateCache() - // Re-sync error tasks. - go sc.resync() - - // Cleanup jobs. - go sc.cleanupJobs() - } func (sc *ClusterStateCache) WaitForCacheSync(stopCh <-chan struct{}) bool { @@ -333,49 +287,6 @@ func (sc *ClusterStateCache) updateState() error { return err } -func (sc *ClusterStateCache) deleteJob(job *api.JobInfo) { - klog.V(10).Infof("[deleteJob] Attempting to delete Job <%v:%v/%v>", job.UID, job.Namespace, job.Name) - - time.AfterFunc(5*time.Second, func() { - sc.deletedJobs.AddIfNotPresent(job) - }) -} - -func (sc *ClusterStateCache) processCleanupJob() error { - _, err := sc.deletedJobs.Pop(func(obj interface{}) error { - job, ok := obj.(*api.JobInfo) - if !ok { - return fmt.Errorf("failed to convert %v to *v1.Pod", obj) - } - - func() { - sc.Mutex.Lock() - defer sc.Mutex.Unlock() - - if api.JobTerminated(job) { - delete(sc.Jobs, job.UID) - klog.V(10).Infof("[processCleanupJob] Job <%v:%v/%v> was deleted.", job.UID, job.Namespace, job.Name) - } else { - // Retry - sc.deleteJob(job) - } - }() - - return nil - }) - - return err -} - -func (sc *ClusterStateCache) cleanupJobs() { - for { - err := sc.processCleanupJob() - if err != nil { - klog.Errorf("Failed to process job clean up: %v", err) - } - } -} - func (sc *ClusterStateCache) updateCache() { klog.V(9).Infof("Starting to update Cluster State Cache") @@ -389,56 +300,18 @@ func (sc *ClusterStateCache) updateCache() { } } -func (sc *ClusterStateCache) resync() { - for { - err := sc.processResyncTask() - if err != nil { - klog.Errorf("Failed to process resync: %v", err) - } - } -} - -func (sc *ClusterStateCache) processResyncTask() error { - _, err := sc.errTasks.Pop(func(obj interface{}) error { - task, ok := obj.(*api.TaskInfo) - if !ok { - return fmt.Errorf("failed to convert %v to *v1.Pod", obj) - } - - if err := sc.syncTask(task); err != nil { - klog.Errorf("Failed to sync pod <%v/%v>", task.Namespace, task.Name) - return err - } - return nil - }) - - return err -} - func (sc *ClusterStateCache) Snapshot() *api.ClusterInfo { sc.Mutex.Lock() defer sc.Mutex.Unlock() snapshot := &api.ClusterInfo{ Nodes: make([]*api.NodeInfo, 0, len(sc.Nodes)), - Jobs: make([]*api.JobInfo, 0, len(sc.Jobs)), } for _, value := range sc.Nodes { snapshot.Nodes = append(snapshot.Nodes, value.Clone()) } - for _, value := range sc.Jobs { - // If no scheduling spec, does not handle it. - if value.SchedSpec == nil { - // Jobs.Tasks are more recognizable than Jobs.UID - klog.V(10).Infof("The scheduling spec of Job <%v> with tasks <%+v> is nil, ignore it.", value.UID, value.Tasks) - continue - } - - snapshot.Jobs = append(snapshot.Jobs, value.Clone()) - } - return snapshot } @@ -465,28 +338,9 @@ func (sc *ClusterStateCache) String() string { if len(sc.Nodes) != 0 { str = str + "Nodes:\n" for _, n := range sc.Nodes { - str = str + fmt.Sprintf("\t %s: idle(%v) used(%v) allocatable(%v) pods(%d)\n", - n.Name, n.Idle, n.Used, n.Allocatable, len(n.Tasks)) + str = str + fmt.Sprintf("\t %s: idle(%v) used(%v) allocatable(%v)\n", + n.Name, n.Idle, n.Used, n.Allocatable) - i := 0 - for _, p := range n.Tasks { - str = str + fmt.Sprintf("\t\t %d: %v\n", i, p) - i++ - } - } - } - - if len(sc.Jobs) != 0 { - str = str + "Jobs:\n" - for _, job := range sc.Jobs { - str = str + fmt.Sprintf("\t Job(%s) name(%s) minAvailable(%v)\n", - job.UID, job.Name, job.MinAvailable) - - i := 0 - for _, task := range job.Tasks { - str = str + fmt.Sprintf("\t\t %d: %v\n", i, task) - i++ - } } } diff --git a/pkg/controller/clusterstate/cache/cache_test.go b/pkg/controller/clusterstate/cache/cache_test.go index 30714de91..1067a4cdb 100644 --- a/pkg/controller/clusterstate/cache/cache_test.go +++ b/pkg/controller/clusterstate/cache/cache_test.go @@ -33,9 +33,8 @@ package cache import ( "fmt" "reflect" - "testing" - "k8s.io/api/core/v1" + v1 "k8s.io/api/core/v1" "k8s.io/apimachinery/pkg/api/resource" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" "k8s.io/apimachinery/pkg/types" @@ -57,25 +56,6 @@ func nodesEqual(l, r map[string]*api.NodeInfo) bool { return true } -func jobsEqual(l, r map[api.JobID]*api.JobInfo) bool { - if len(l) != len(r) { - return false - } - - for k, p := range l { - if !reflect.DeepEqual(p, r[k]) { - return false - } - } - - return true -} - -func cacheEqual(l, r *ClusterStateCache) bool { - return nodesEqual(l.Nodes, r.Nodes) && - jobsEqual(l.Jobs, r.Jobs) -} - func buildNode(name string, alloc v1.ResourceList) *v1.Node { return &v1.Node{ ObjectMeta: metav1.ObjectMeta{ @@ -138,64 +118,3 @@ func buildOwnerReference(owner string) metav1.OwnerReference { UID: types.UID(owner), } } - -func TestAddPod(t *testing.T) { - - owner := buildOwnerReference("j1") - - // case 1: - pod1 := buildPod("c1", "p1", "", v1.PodPending, buildResourceList("1000m", "1G"), - []metav1.OwnerReference{owner}, make(map[string]string)) - pi1 := api.NewTaskInfo(pod1) - pod2 := buildPod("c1", "p2", "n1", v1.PodRunning, buildResourceList("1000m", "1G"), - []metav1.OwnerReference{owner}, make(map[string]string)) - pi2 := api.NewTaskInfo(pod2) - - j1 := api.NewJobInfo(api.JobID("j1")) - j1.AddTaskInfo(pi1) - j1.AddTaskInfo(pi2) - - node1 := buildNode("n1", buildResourceList("2000m", "10G")) - ni1 := api.NewNodeInfo(node1) - ni1.AddTask(pi2) - - tests := []struct { - pods []*v1.Pod - nodes []*v1.Node - expected *ClusterStateCache - }{ - { - pods: []*v1.Pod{pod1, pod2}, - nodes: []*v1.Node{node1}, - expected: &ClusterStateCache{ - Nodes: map[string]*api.NodeInfo{ - "n1": ni1, - }, - Jobs: map[api.JobID]*api.JobInfo{ - "j1": j1, - }, - }, - }, - } - - for i, test := range tests { - cache := &ClusterStateCache{ - Jobs: make(map[api.JobID]*api.JobInfo), - Nodes: make(map[string]*api.NodeInfo), - } - - for _, n := range test.nodes { - cache.AddNode(n) - } - - for _, p := range test.pods { - cache.AddPod(p) - } - - if !cacheEqual(cache, test.expected) { - t.Errorf("case %d: \n expected %v, \n got %v \n", - i, test.expected, cache) - } - } -} - diff --git a/pkg/controller/clusterstate/cache/event_handlers.go b/pkg/controller/clusterstate/cache/event_handlers.go index 0eb93ae09..3beb45085 100644 --- a/pkg/controller/clusterstate/cache/event_handlers.go +++ b/pkg/controller/clusterstate/cache/event_handlers.go @@ -31,17 +31,12 @@ limitations under the License. package cache import ( - "context" "fmt" v1 "k8s.io/api/core/v1" - "k8s.io/apimachinery/pkg/api/errors" - metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" "k8s.io/client-go/tools/cache" "k8s.io/klog/v2" - "github.com/project-codeflare/multi-cluster-app-dispatcher/pkg/apis/controller/utils" - arbv1 "github.com/project-codeflare/multi-cluster-app-dispatcher/pkg/apis/controller/v1beta1" arbapi "github.com/project-codeflare/multi-cluster-app-dispatcher/pkg/controller/clusterstate/api" ) @@ -49,227 +44,6 @@ func isTerminated(status arbapi.TaskStatus) bool { return status == arbapi.Succeeded || status == arbapi.Failed } -func (sc *ClusterStateCache) addTask(pi *arbapi.TaskInfo) error { - if len(pi.Job) != 0 { - if _, found := sc.Jobs[pi.Job]; !found { - sc.Jobs[pi.Job] = arbapi.NewJobInfo(pi.Job) - } - klog.V(7).Infof("Adding task: %s to job: %v.", pi.Name, pi.Job) - - sc.Jobs[pi.Job].AddTaskInfo(pi) - } else { - klog.V(10).Infof("No job ID for task: %s.", pi.Name) - - } - - if len(pi.NodeName) != 0 { - if _, found := sc.Nodes[pi.NodeName]; !found { - sc.Nodes[pi.NodeName] = arbapi.NewNodeInfo(nil) - } - - node := sc.Nodes[pi.NodeName] - if !isTerminated(pi.Status) { - klog.V(10).Infof("Adding Task: %s to node: %s.", pi.Name, pi.NodeName) - return node.AddTask(pi) - } else { - klog.V(10).Infof("Task: %s is terminated. Did not added not node: %s.", pi.Name, pi.NodeName) - } - } else { - klog.V(10).Infof("No related node found for for task: %s.", pi.Name) - - } - - return nil -} - -// Assumes that lock is already acquired. -func (sc *ClusterStateCache) addPod(pod *v1.Pod) error { - klog.V(9).Infof("Attempting to add pod: %s.", pod.Name) - pi := arbapi.NewTaskInfo(pod) - klog.V(10).Infof("New task: %s created for pod %s add with job id: %v", pi.Name, pod.Name, pi.Job) - - return sc.addTask(pi) -} - -func (sc *ClusterStateCache) syncTask(oldTask *arbapi.TaskInfo) error { - sc.Mutex.Lock() - defer sc.Mutex.Unlock() - - klog.V(9).Infof("Attempting to sync task: %s.", oldTask.Name) - - newPod, err := sc.kubeclient.CoreV1().Pods(oldTask.Namespace).Get(context.Background(), oldTask.Name, metav1.GetOptions{}) - if err != nil { - if errors.IsNotFound(err) { - sc.deleteTask(oldTask) - klog.V(3).Infof("Pod <%v/%v> was deleted, removed from cache.", oldTask.Namespace, oldTask.Name) - - return nil - } - return fmt.Errorf("failed to get Pod <%v/%v>: err %v", oldTask.Namespace, oldTask.Name, err) - } - - newTask := arbapi.NewTaskInfo(newPod) - - return sc.updateTask(oldTask, newTask) -} - -func (sc *ClusterStateCache) updateTask(oldTask, newTask *arbapi.TaskInfo) error { - klog.V(9).Infof("Updating.task: %s", oldTask.Name) - if err := sc.deleteTask(oldTask); err != nil { - return err - } - - return sc.addTask(newTask) -} - -// Assumes that lock is already acquired. -func (sc *ClusterStateCache) updatePod(oldPod, newPod *v1.Pod) error { - klog.V(9).Infof("Attempting to update pod: %s.", oldPod.Name) - if err := sc.deletePod(oldPod); err != nil { - return err - } - return sc.addPod(newPod) -} - -func (sc *ClusterStateCache) deleteTask(pi *arbapi.TaskInfo) error { - var jobErr, nodeErr error - - klog.V(9).Infof("Attempting to delete task: %s.", pi.Name) - - if len(pi.Job) != 0 { - if job, found := sc.Jobs[pi.Job]; found { - jobErr = job.DeleteTaskInfo(pi) - } else { - jobErr = fmt.Errorf("failed to find Job <%v> for Task %v/%v", - pi.Job, pi.Namespace, pi.Name) - } - } else { - klog.V(9).Infof("Job ID for task: %s is empty.", pi.Name) - - } - - if len(pi.NodeName) != 0 { - node := sc.Nodes[pi.NodeName] - if node != nil { - nodeErr = node.RemoveTask(pi) - } - } else { - klog.V(9).Infof("No node name for task: %s is found.", pi.Name) - - } - - if jobErr != nil || nodeErr != nil { - return arbapi.MergeErrors(jobErr, nodeErr) - } - - return nil -} - -// Assumes that lock is already acquired. -func (sc *ClusterStateCache) deletePod(pod *v1.Pod) error { - klog.V(10).Infof("Attempting to delete pod: %s.", pod.Name) - - pi := arbapi.NewTaskInfo(pod) - // Delete the Task in cache to handle Binding status. - task := pi - if job, found := sc.Jobs[pi.Job]; found { - klog.V(9).Infof("Found job %v to delete for pod: %s, task: %s.", job.UID, pod.Name, pi.Name) - if t, found := job.Tasks[pi.UID]; found { - klog.V(10).Infof("Found job task listed in job: %v.", job.UID) - task = t - } - } - if err := sc.deleteTask(task); err != nil { - return err - } - - // If job was terminated, delete it. - if job, found := sc.Jobs[pi.Job]; found && arbapi.JobTerminated(job) { - sc.deleteJob(job) - } - - return nil -} - -func (sc *ClusterStateCache) AddPod(obj interface{}) { - pod, ok := obj.(*v1.Pod) - if !ok { - klog.Errorf("Cannot convert to *v1.Pod: %v", obj) - return - } - - sc.Mutex.Lock() - defer sc.Mutex.Unlock() - - err := sc.addPod(pod) - if err != nil { - klog.Errorf("Failed to add pod <%s/%s> into cache: %v", - pod.Namespace, pod.Name, err) - return - } else { - klog.V(4).Infof("[AddPod] Added pod <%s/%v> into cache.", pod.Namespace, pod.Name) - } - return -} - -func (sc *ClusterStateCache) UpdatePod(oldObj, newObj interface{}) { - oldPod, ok := oldObj.(*v1.Pod) - if !ok { - klog.Errorf("Cannot convert oldObj to *v1.Pod: %v", oldObj) - return - } - newPod, ok := newObj.(*v1.Pod) - if !ok { - klog.Errorf("Cannot convert newObj to *v1.Pod: %v", newObj) - return - } - - sc.Mutex.Lock() - defer sc.Mutex.Unlock() - - err := sc.updatePod(oldPod, newPod) - if err != nil { - klog.Errorf("Failed to update pod %v in cache: %v", oldPod.Name, err) - return - } - - klog.V(4).Infof("[UpdatePod] Updated pod <%s/%v> in cache.", oldPod.Namespace, oldPod.Name) - - return -} - -func (sc *ClusterStateCache) DeletePod(obj interface{}) { - - var pod *v1.Pod - switch t := obj.(type) { - case *v1.Pod: - pod = t - klog.V(10).Infof("Handling Delete pod event for: %s.", pod.Name) - case cache.DeletedFinalStateUnknown: - var ok bool - pod, ok = t.Obj.(*v1.Pod) - if !ok { - klog.Errorf("Cannot convert to *v1.Pod: %v", t.Obj) - return - } - default: - klog.Errorf("Cannot convert to *v1.Pod: %v", t) - return - } - - sc.Mutex.Lock() - defer sc.Mutex.Unlock() - - err := sc.deletePod(pod) - if err != nil { - klog.V(6).Infof("Failed to delete pod %v from cache: %v", pod.Name, err) - return - } - - klog.V(3).Infof("Deleted pod <%s/%v> from cache.", pod.Namespace, pod.Name) - return -} - // Assumes that lock is already acquired. func (sc *ClusterStateCache) addNode(node *v1.Node) error { if sc.Nodes[node.Name] != nil { @@ -371,111 +145,3 @@ func (sc *ClusterStateCache) DeleteNode(obj interface{}) { } return } - -// Assumes that lock is already acquired. -func (sc *ClusterStateCache) setSchedulingSpec(ss *arbv1.SchedulingSpec) error { - job := arbapi.JobID(utils.GetController(ss)) - - if len(job) == 0 { - return fmt.Errorf("the controller of SchedulingSpec is empty") - } - - if _, found := sc.Jobs[job]; !found { - sc.Jobs[job] = arbapi.NewJobInfo(job) - } - - sc.Jobs[job].SetSchedulingSpec(ss) - - return nil -} - -// Assumes that lock is already acquired. -func (sc *ClusterStateCache) updateSchedulingSpec(oldQueue, newQueue *arbv1.SchedulingSpec) error { - return sc.setSchedulingSpec(newQueue) -} - -// Assumes that lock is already acquired. -func (sc *ClusterStateCache) deleteSchedulingSpec(ss *arbv1.SchedulingSpec) error { - jobID := arbapi.JobID(utils.GetController(ss)) - - job, found := sc.Jobs[jobID] - if !found { - return fmt.Errorf("can not found job %v:%v/%v", jobID, ss.Namespace, ss.Name) - } - - // Unset SchedulingSpec - job.UnsetSchedulingSpec() - sc.deleteJob(job) - - return nil -} - -func (sc *ClusterStateCache) AddSchedulingSpec(obj interface{}) { - ss, ok := obj.(*arbv1.SchedulingSpec) - if !ok { - klog.Errorf("Cannot convert to *arbv1.Queue: %v", obj) - return - } - - sc.Mutex.Lock() - defer sc.Mutex.Unlock() - - klog.V(4).Infof("Add SchedulingSpec(%s) into cache, spec(%#v)", ss.Name, ss.Spec) - err := sc.setSchedulingSpec(ss) - if err != nil { - klog.Errorf("Failed to add SchedulingSpec %s into cache: %v", ss.Name, err) - return - } - return -} - -func (sc *ClusterStateCache) UpdateSchedulingSpec(oldObj, newObj interface{}) { - oldSS, ok := oldObj.(*arbv1.SchedulingSpec) - if !ok { - klog.Errorf("Cannot convert oldObj to *arbv1.SchedulingSpec: %v", oldObj) - return - } - newSS, ok := newObj.(*arbv1.SchedulingSpec) - if !ok { - klog.Errorf("Cannot convert newObj to *arbv1.SchedulingSpec: %v", newObj) - return - } - - sc.Mutex.Lock() - defer sc.Mutex.Unlock() - - err := sc.updateSchedulingSpec(oldSS, newSS) - if err != nil { - klog.Errorf("Failed to update SchedulingSpec %s into cache: %v", oldSS.Name, err) - return - } - return -} - -func (sc *ClusterStateCache) DeleteSchedulingSpec(obj interface{}) { - var ss *arbv1.SchedulingSpec - switch t := obj.(type) { - case *arbv1.SchedulingSpec: - ss = t - case cache.DeletedFinalStateUnknown: - var ok bool - ss, ok = t.Obj.(*arbv1.SchedulingSpec) - if !ok { - klog.Errorf("Cannot convert to *arbv1.SchedulingSpec: %v", t.Obj) - return - } - default: - klog.Errorf("Cannot convert to *arbv1.SchedulingSpec: %v", t) - return - } - - sc.Mutex.Lock() - defer sc.Mutex.Unlock() - - err := sc.deleteSchedulingSpec(ss) - if err != nil { - klog.Errorf("Failed to delete SchedulingSpec %s from cache: %v", ss.Name, err) - return - } - return -} diff --git a/test/e2e/queue.go b/test/e2e/queue.go index cb95096be..4f8fbc207 100644 --- a/test/e2e/queue.go +++ b/test/e2e/queue.go @@ -120,16 +120,15 @@ var _ = Describe("AppWrapper E2E Test", func() { // This should fill up the worker node and most of the master node aw := createDeploymentAWwith550CPU(context, appendRandomString("aw-deployment-2-550cpu")) appwrappers = append(appwrappers, aw) - time.Sleep(2 * time.Minute) + time.Sleep(1 * time.Minute) err := waitAWPodsReady(context, aw) Expect(err).NotTo(HaveOccurred()) // This should not fit on cluster aw2 := createDeploymentAWwith426CPU(context, appendRandomString("aw-deployment-2-426cpu")) appwrappers = append(appwrappers, aw2) - err = waitAWAnyPodsExists(context, aw2) - Expect(err).To(HaveOccurred()) + Expect(err).NotTo(HaveOccurred()) // This should fit on cluster, initially queued because of aw2 above but should eventually // run after prevention of aw2 above. @@ -252,7 +251,7 @@ var _ = Describe("AppWrapper E2E Test", func() { aw := createBadPodTemplateAW(context, "aw-bad-podtemplate-2") appwrappers = append(appwrappers, aw) - err := waitAWPodsReady(context, aw) + err := waitAWPodsExists(context, aw, 30*time.Second) Expect(err).To(HaveOccurred()) }) @@ -296,15 +295,19 @@ var _ = Describe("AppWrapper E2E Test", func() { err := waitAWPodsReady(context, aw) Expect(err).NotTo(HaveOccurred()) - time.Sleep(2 * time.Minute) - aw1, err := context.karclient.ArbV1().AppWrappers(aw.Namespace).Get(aw.Name, metav1.GetOptions{}) - if err != nil { - fmt.Fprint(GinkgoWriter, "Error getting status") - } pass := false - fmt.Fprintf(GinkgoWriter, "[e2e] status of AW %v.\n", aw1.Status.State) - if len(aw1.Status.PendingPodConditions) == 0 { - pass = true + for true { + aw1, err := context.karclient.ArbV1().AppWrappers(aw.Namespace).Get(aw.Name, metav1.GetOptions{}) + if err != nil { + fmt.Fprint(GinkgoWriter, "Error getting status") + } + fmt.Fprintf(GinkgoWriter, "[e2e] status of AW %v.\n", aw1.Status.State) + if len(aw1.Status.PendingPodConditions) == 0 { + pass = true + } + if pass { + break + } } Expect(pass).To(BeTrue()) }) @@ -347,7 +350,7 @@ var _ = Describe("AppWrapper E2E Test", func() { aw := createBadGenericPodAW(context, "aw-bad-generic-pod-1") appwrappers = append(appwrappers, aw) - err := waitAWPodsReady(context, aw) + err := waitAWPodsCompleted(context, aw, 10*time.Second) Expect(err).To(HaveOccurred()) }) @@ -362,7 +365,7 @@ var _ = Describe("AppWrapper E2E Test", func() { aw := createBadGenericItemAW(context, "aw-bad-generic-item-1") appwrappers = append(appwrappers, aw) - err := waitAWPodsReady(context, aw) + err := waitAWPodsCompleted(context, aw, 10*time.Second) Expect(err).To(HaveOccurred()) }) @@ -432,7 +435,6 @@ var _ = Describe("AppWrapper E2E Test", func() { // This should fill up the worker node and most of the master node aw := createDeploymentAWwith550CPU(context, appendRandomString("aw-deployment-2-550cpu")) appwrappers = append(appwrappers, aw) - time.Sleep(1 * time.Minute) err := waitAWPodsReady(context, aw) Expect(err).NotTo(HaveOccurred(), "Expecting pods for app wrapper: aw-deployment-2-550cpu") @@ -461,15 +463,31 @@ var _ = Describe("AppWrapper E2E Test", func() { Expect(err).NotTo(HaveOccurred(), "Expecting pods for app wrapper: aw-ff-deployment-2-340-cpu") fmt.Fprintf(GinkgoWriter, "[e2e] MCAD Scheduling Fail Fast Preemption Test - Pods not found for app wrapper aw-ff-deployment-2-340-cpu\n") - // Make sure aw2 pods do not exist err = waitAWPodsReady(context, aw3) Expect(err).NotTo(HaveOccurred(), "Expecting no pods for app wrapper: aw-ff-deployment-2-340-cpu") fmt.Fprintf(GinkgoWriter, "[e2e] MCAD Scheduling Fail Fast Preemption Test - Ready pods found for app wrapper aw-ff-deployment-2-340-cpu\n") - // Make sure pods from AW aw-deployment-1-850-cpu above do not exist proving preemption - time.Sleep(5 * time.Minute) - err = waitAWAnyPodsExists(context, aw2) - Expect(err).To(HaveOccurred(), "Expecting no pods for app wrapper : aw-ff-deployment-1-850-cpu") + // Make sure pods from AW aw-deployment-1-850-cpu have preempted + var pass = false + for true { + aw2Update, err := context.karclient.ArbV1().AppWrappers(aw2.Namespace).Get(aw2.Name, metav1.GetOptions{}) + if err != nil { + fmt.Fprintf(GinkgoWriter, "[e2e] MCAD Scheduling Fail Fast Preemption Test - Error getting AW update %v", err) + } + for _, cond := range aw2Update.Status.Conditions { + if cond.Reason == "PreemptionTriggered" { + pass = true + fmt.Fprintf(GinkgoWriter, "[e2e] MCAD Scheduling Fail Fast Preemption Test - the pass value is %v", pass) + } + } + if pass { + break + } else { + time.Sleep(1 * time.Minute) + } + } + + Expect(pass).To(BeTrue(), "Expecting AW to be preempted : aw-ff-deployment-1-850-cpu") fmt.Fprintf(os.Stdout, "[e2e] MCAD Scheduling Fail Fast Preemption Test - Completed. Awaiting app wrapper cleanup\n") }) @@ -511,17 +529,17 @@ var _ = Describe("AppWrapper E2E Test", func() { // This should fill up the worker node and most of the master node aw := createDeploymentAWwith550CPU(context, appendRandomString("aw-deployment-2-550cpu")) appwrappers = append(appwrappers, aw) - time.Sleep(1 * time.Minute) err := waitAWPodsReady(context, aw) Expect(err).NotTo(HaveOccurred(), "Waiting for pods to be ready for app wrapper: aw-deployment-2-550cpu") // This should fit on cluster but customPodResources is incorrect so AW pods are not created + //NOTE: with deployment controlled removed this test case is invalid. + //Users should keep custompodresources equal to container resources. aw2 := createGenericDeploymentCustomPodResourcesWithCPUAW( - context, appendRandomString("aw-deployment-2-427-vs-425-cpu"), "427m", "425m", 2, 60) + context, appendRandomString("aw-deployment-2-427-vs-425-cpu"), "4270m", "425m", 2, 60) appwrappers = append(appwrappers, aw2) - time.Sleep(1 * time.Minute) err = waitAWAnyPodsExists(context, aw2) Expect(err).To(HaveOccurred(), "Waiting for no pods to exist for app wrapper: aw-deployment-2-427-vs-425-cpu") @@ -662,15 +680,15 @@ var _ = Describe("AppWrapper E2E Test", func() { // This should fill up the worker node and most of the master node aw := createDeploymentAWwith550CPU(context, appendRandomString("aw-deployment-2-550cpu")) appwrappers = append(appwrappers, aw) - time.Sleep(1 * time.Minute) err := waitAWPodsReady(context, aw) Expect(err).NotTo(HaveOccurred(), "Waiting for pods to be ready for app wrapper: aw-deployment-2-550cpu") // This should not fit on cluster + // there may be a false positive dispatch which will cause MCAD to requeue AW aw2 := createDeploymentAWwith426CPU(context, appendRandomString("aw-deployment-2-426cpu")) appwrappers = append(appwrappers, aw2) - err = waitAWAnyPodsExists(context, aw2) + err = waitAWPodsReady(context, aw2) Expect(err).To(HaveOccurred(), "No pods for app wrapper `aw-deployment-2-426cpu` are expected.") }) diff --git a/test/e2e/util.go b/test/e2e/util.go index 62ace9a18..870d016c9 100644 --- a/test/e2e/util.go +++ b/test/e2e/util.go @@ -501,7 +501,7 @@ func awNamespacePhase(ctx *context, aw *arbv1.AppWrapper, phase []v1.NamespacePh } func waitAWPodsReady(ctx *context, aw *arbv1.AppWrapper) error { - return waitAWPodsReadyEx(ctx, aw, threeHundredSeconds, int(aw.Spec.SchedSpec.MinAvailable), false) + return waitAWPodsReadyEx(ctx, aw, ninetySeconds, int(aw.Spec.SchedSpec.MinAvailable), false) } func waitAWPodsCompleted(ctx *context, aw *arbv1.AppWrapper, timeout time.Duration) error {