From da6108e6a940d3b2c14fe78f5953d68419421c7b Mon Sep 17 00:00:00 2001 From: "Pedro D. Bello-Maldonado" Date: Fri, 3 Feb 2023 15:38:25 -0500 Subject: [PATCH 01/11] Updated CRD to expose new field 'requeuingTimeMinutes' allowing users to specify how long to wait for the first requeuing period after dispatching. --- config/crd/bases/mcad.ibm.com_appwrappers.yaml | 3 +++ config/crd/bases/mcad.ibm.com_queuejobs.yaml | 5 ++++- config/crd/bases/mcad.ibm.com_schedulingspecs.yaml | 3 +++ deployment/mcad-controller/templates/deployment.yaml | 11 ++++++++++- .../mcad-controller/templates/deployment.yaml | 11 ++++++++++- 5 files changed, 30 insertions(+), 3 deletions(-) diff --git a/config/crd/bases/mcad.ibm.com_appwrappers.yaml b/config/crd/bases/mcad.ibm.com_appwrappers.yaml index 6bdf3053a..738244e90 100644 --- a/config/crd/bases/mcad.ibm.com_appwrappers.yaml +++ b/config/crd/bases/mcad.ibm.com_appwrappers.yaml @@ -257,6 +257,9 @@ spec: properties: minAvailable: type: integer + requeuingTimeMinutes: + type: integer + default: 5 nodeSelector: additionalProperties: type: string diff --git a/config/crd/bases/mcad.ibm.com_queuejobs.yaml b/config/crd/bases/mcad.ibm.com_queuejobs.yaml index 6962ff118..f54cd5fae 100644 --- a/config/crd/bases/mcad.ibm.com_queuejobs.yaml +++ b/config/crd/bases/mcad.ibm.com_queuejobs.yaml @@ -34,7 +34,7 @@ spec: type: object spec: description: Specification of the desired behavior of a cron job, including - the minAvailable + the minAvailable and the requeuingTimeMinutes properties: schedulerName: type: string @@ -43,6 +43,9 @@ spec: properties: minAvailable: type: integer + requeuingTimeMinutes: + type: integer + default: 5 nodeSelector: additionalProperties: type: string diff --git a/config/crd/bases/mcad.ibm.com_schedulingspecs.yaml b/config/crd/bases/mcad.ibm.com_schedulingspecs.yaml index 0087f6af5..6b1fff7a6 100644 --- a/config/crd/bases/mcad.ibm.com_schedulingspecs.yaml +++ b/config/crd/bases/mcad.ibm.com_schedulingspecs.yaml @@ -36,6 +36,9 @@ spec: properties: minAvailable: type: integer + requeuingTimeMinutes: + type: integer + default: 5 nodeSelector: additionalProperties: type: string diff --git a/deployment/mcad-controller/templates/deployment.yaml b/deployment/mcad-controller/templates/deployment.yaml index e8a6dfc96..8ea7cda93 100644 --- a/deployment/mcad-controller/templates/deployment.yaml +++ b/deployment/mcad-controller/templates/deployment.yaml @@ -136,6 +136,9 @@ spec: properties: minAvailable: type: integer + requeuingTimeMinutes: + type: integer + default: 5 nodeSelector: additionalProperties: type: string @@ -181,7 +184,7 @@ spec: type: object spec: description: Specification of the desired behavior of a cron job, including - the minAvailable + the minAvailable and the requeuingTimeMinutes properties: schedulerName: type: string @@ -190,6 +193,9 @@ spec: properties: minAvailable: type: integer + requeuingTimeMinutes: + type: integer + default: 5 nodeSelector: additionalProperties: type: string @@ -7225,6 +7231,9 @@ spec: properties: minAvailable: type: integer + requeuingTimeMinutes: + type: integer + default: 5 nodeSelector: additionalProperties: type: string diff --git a/deployment/mcad-operator/helm-charts/mcad-controller/templates/deployment.yaml b/deployment/mcad-operator/helm-charts/mcad-controller/templates/deployment.yaml index 12e06317d..e427afda5 100644 --- a/deployment/mcad-operator/helm-charts/mcad-controller/templates/deployment.yaml +++ b/deployment/mcad-operator/helm-charts/mcad-controller/templates/deployment.yaml @@ -118,6 +118,9 @@ spec: properties: minAvailable: type: integer + requeuingTimeMinutes: + type: integer + default: 5 nodeSelector: additionalProperties: type: string @@ -163,7 +166,7 @@ spec: type: object spec: description: Specification of the desired behavior of a cron job, including - the minAvailable + the minAvailable and the requeuingTimeMinutes properties: schedulerName: type: string @@ -172,6 +175,9 @@ spec: properties: minAvailable: type: integer + requeuingTimeMinutes: + type: integer + default: 5 nodeSelector: additionalProperties: type: string @@ -7198,6 +7204,9 @@ spec: properties: minAvailable: type: integer + requeuingTimeMinutes: + type: integer + default: 5 nodeSelector: additionalProperties: type: string From 3dc2b309ff55e239142c9b0e128fb8ba699d957b Mon Sep 17 00:00:00 2001 From: "Pedro D. Bello-Maldonado" Date: Fri, 3 Feb 2023 15:45:52 -0500 Subject: [PATCH 02/11] Modified requeuing strategy to use a requeuing time that grows exponentially. This time is set by default to 5 minutes but can be modified from the AppWrapper schedulingSpec field. --- pkg/apis/controller/v1beta1/schedulingspec.go | 1 + .../queuejob/queuejob_controller_ex.go | 35 +++++++------------ 2 files changed, 14 insertions(+), 22 deletions(-) diff --git a/pkg/apis/controller/v1beta1/schedulingspec.go b/pkg/apis/controller/v1beta1/schedulingspec.go index db59f8f71..ec026a137 100644 --- a/pkg/apis/controller/v1beta1/schedulingspec.go +++ b/pkg/apis/controller/v1beta1/schedulingspec.go @@ -48,6 +48,7 @@ type SchedulingSpec struct { type SchedulingSpecTemplate struct { NodeSelector map[string]string `json:"nodeSelector,omitempty" protobuf:"bytes,1,rep,name=nodeSelector"` MinAvailable int `json:"minAvailable,omitempty" protobuf:"bytes,2,rep,name=minAvailable"` + RequeuingTimeMinutes int `json:"requeuingTimeMinutes,omitempty" protobuf:"bytes,2,rep,name=requeuingTimeMinutes"` } // +k8s:deepcopy-gen:interfaces=k8s.io/apimachinery/pkg/runtime.Object diff --git a/pkg/controller/queuejob/queuejob_controller_ex.go b/pkg/controller/queuejob/queuejob_controller_ex.go index fc73a18c3..0f8d61b26 100644 --- a/pkg/controller/queuejob/queuejob_controller_ex.go +++ b/pkg/controller/queuejob/queuejob_controller_ex.go @@ -436,6 +436,7 @@ func (qjm *XController) PreemptQueueJobs() { message = fmt.Sprintf("Insufficient number of Running and Completed pods, minimum=%d, running=%d, completed=%d.", aw.Spec.SchedSpec.MinAvailable, aw.Status.Running, aw.Status.Succeeded) cond := GenerateAppWrapperCondition(arbv1.AppWrapperCondPreemptCandidate, v1.ConditionTrue, "MinPodsNotRunning", message) newjob.Status.Conditions = append(newjob.Status.Conditions, cond) + newjob.Spec.SchedSpec.RequeuingTimeMinutes = 2 * aw.Spec.SchedSpec.RequeuingTimeMinutes // Grow the requeuing waiting time exponentially updateNewJob = newjob.DeepCopy() //If pods failed scheduling generate new preempt condition @@ -460,9 +461,9 @@ func (qjm *XController) PreemptQueueJobs() { klog.V(4).Infof("[PreemptQueueJobs] Adding preempted AppWrapper %s/%s to backoff queue.", aw.Name, aw.Namespace) go qjm.backoff(aw, "PreemptionTriggered", string(message)) - } } + func (qjm *XController) preemptAWJobs(preemptAWs []*arbv1.AppWrapper) { if preemptAWs == nil { return @@ -503,25 +504,15 @@ func (qjm *XController) GetQueueJobsEligibleForPreemption() []*arbv1.AppWrapper if (int(value.Status.Running) + int(value.Status.Succeeded)) < replicas { - //Check to see if if this AW job has been dispatched for a time window before preempting - conditionsLen := len(value.Status.Conditions) - var dispatchConditionExists bool - dispatchConditionExists = false - var condition arbv1.AppWrapperCondition - // Get the last time the AppWrapper was dispatched - for i := (conditionsLen - 1); i > 0; i-- { - condition = value.Status.Conditions[i] - if condition.Type != arbv1.AppWrapperCondDispatched { - continue - } - dispatchConditionExists = true - break - } + // Check for the minimum age and then skip preempt if current time is not beyond minimum age + // The minimum age is controlled by the requeuingTimeMinutes stanza + // For preemption, the time is compared to the last condition in the AppWrapper + condition := value.Status.Conditions[len(value.Status.Conditions) - 1] + requeuingTimeMinutes := value.Spec.SchedSpec.RequeuingTimeMinutes + minAge := condition.LastTransitionMicroTime.Add(time.Duration(requeuingTimeMinutes) * time.Minute) + currentTime := time.Now() - // Now check for 0 running pods and for the minimum age and then - // skip preempt if current time is not beyond minimum age ie 10 mins - minAge := condition.LastTransitionMicroTime.Add(600 * time.Second) - if (value.Status.Running <= 0) && (dispatchConditionExists && (time.Now().Before(minAge))) { + if currentTime.Before(minAge) { continue } @@ -530,16 +521,16 @@ func (qjm *XController) GetQueueJobsEligibleForPreemption() []*arbv1.AppWrapper qjobs = append(qjobs, value) } } else { - //Preempt when schedulingSpec stanza is not set but pods fails scheduling. - //ignore co-scheduler pods + // Preempt when schedulingSpec stanza is not set but pods fails scheduling. + // ignore co-scheduler pods if len(value.Status.PendingPodConditions) > 0 { klog.V(3).Infof("AppWrapper %s is eligible for preemption Running: %v , Succeeded: %v due to failed scheduling !!! \n", value.Name, value.Status.Running, value.Status.Succeeded) qjobs = append(qjobs, value) } - } } } + return qjobs } From 38bb8460e7767be78997cc97e9b3a7e6789e0c1e Mon Sep 17 00:00:00 2001 From: "Pedro D. Bello-Maldonado" Date: Sat, 4 Feb 2023 11:49:19 -0500 Subject: [PATCH 03/11] Added a test to evaluate the exponentially growing requeuing mechanism with user supplied initial time. This tests uses init containers to trick the requeuing mechanism into thinking the PODs have failed and are not going to complete. --- test/e2e/queue.go | 13 +++---- test/e2e/util.go | 92 ++++++++++++++++++++++++++++++++++++++++++++++- 2 files changed, 98 insertions(+), 7 deletions(-) diff --git a/test/e2e/queue.go b/test/e2e/queue.go index a1db58aa6..a8ddcee03 100644 --- a/test/e2e/queue.go +++ b/test/e2e/queue.go @@ -106,7 +106,6 @@ var _ = Describe("AppWrapper E2E Test", func() { // Using quite mode due to creating of pods in earlier step. err = waitAWReadyQuiet(context, aw2) Expect(err).NotTo(HaveOccurred()) - }) It("MCAD CPU Preemption Test", func() { @@ -130,13 +129,15 @@ var _ = Describe("AppWrapper E2E Test", func() { err = waitAWAnyPodsExists(context, aw2) Expect(err).To(HaveOccurred()) - // This should fit on cluster, initially queued because of aw2 above but should eventually - // run after prevention of aw2 above. - aw3 := createDeploymentAWwith425CPU(context, "aw-deployment-2-425cpu") + // Create a job with init containers that need 200 seconds to be ready before the container starts. + // The requeuing mechanism is set to start at 1 minute, which is not enough time for the PODs to be completed. + // The job should be requeued 3 times before it finishes since the wait time is doubled each time the job is requeued (i.e., initially it waits + // for 1 minutes before requeuing, then 2 minutes, and then 4 minutes). Since the init containers take 3 minutes + // and 20 seconds to finish, a 4 minute wait should be long enough to finish the job successfully + aw3 := createJobAWWithInitContainer(context, "aw-deployment-3-init-container") appwrappers = append(appwrappers, aw3) - // Since preemption takes some time, increasing timeout wait time to 2 minutes - err = waitAWPodsExists(context, aw3, 120000*time.Millisecond) + err = waitAWPodsCompleted(context, aw3) // This test waits for 10 minutes to make sure all PODs complete Expect(err).NotTo(HaveOccurred()) }) diff --git a/test/e2e/util.go b/test/e2e/util.go index a88979214..c718b277b 100644 --- a/test/e2e/util.go +++ b/test/e2e/util.go @@ -65,6 +65,7 @@ import ( var ninetySeconds = 90 * time.Second var threeMinutes = 180 * time.Second +var tenMinutes = 600 * time.Second var threeHundredSeconds = 300 * time.Second var oneCPU = v1.ResourceList{"cpu": resource.MustParse("1000m")} @@ -654,7 +655,7 @@ func waitAWPodsReadyEx(ctx *context, aw *arbv1.AppWrapper, taskNum int, quite bo } func waitAWPodsCompletedEx(ctx *context, aw *arbv1.AppWrapper, taskNum int, quite bool) error { - return wait.Poll(100*time.Millisecond, ninetySeconds, awPodPhase(ctx, aw, + return wait.Poll(100*time.Millisecond, tenMinutes, awPodPhase(ctx, aw, []v1.PodPhase{v1.PodSucceeded}, taskNum, quite)) } @@ -744,6 +745,95 @@ func createReplicaSet(context *context, name string, rep int32, img string, req return deployment } +func createJobAWWithInitContainer(context *context, name string) *arbv1.AppWrapper { + rb := []byte(`{"apiVersion": "batch/v1", + "kind": "Job", + "metadata": { + "name": "aw-job-3-init-container", + "namespace": "test", + "labels": { + "app": "aw-job-3-init-container" + } + }, + "spec": { + "parallelism": 3, + "template": { + "metadata": { + "labels": { + "app": "aw-job-3-init-container" + }, + "annotations": { + "appwrapper.mcad.ibm.com/appwrapper-name": "aw-job-3-init-container" + } + }, + "spec": { + "terminationGracePeriodSeconds": 1, + "restartPolicy": "Never", + "initContainers": [ + { + "name": "aw-job-3-init-container", + "image": "k8s.gcr.io/busybox:latest", + "command": ["sleep", "200"], + "resources": { + "requests": { + "cpu": "500m" + } + } + } + ], + "containers": [ + { + "name": "aw-job-3-container", + "image": "k8s.gcr.io/busybox:latest", + "command": ["sleep", "10"], + "resources": { + "requests": { + "cpu": "500m" + } + } + } + ] + } + } + }} `) + + var minAvailable int = 3 + var requeuingTimeMinutes int = 1 + + aw := &arbv1.AppWrapper{ + ObjectMeta: metav1.ObjectMeta{ + Name: name, + Namespace: context.namespace, + }, + Spec: arbv1.AppWrapperSpec{ + SchedSpec: arbv1.SchedulingSpecTemplate{ + MinAvailable: minAvailable, + RequeuingTimeMinutes: requeuingTimeMinutes, + }, + AggrResources: arbv1.AppWrapperResourceList{ + GenericItems: []arbv1.AppWrapperGenericResource{ + { + ObjectMeta: metav1.ObjectMeta{ + Name: fmt.Sprintf("%s-%s", name, "aw-job-3-init-container"), + Namespace: context.namespace, + }, + DesiredAvailable: 1, + GenericTemplate: runtime.RawExtension{ + Raw: rb, + }, + CompletionStatus: "Complete", + }, + }, + }, + }, + } + + appwrapper, err := context.karclient.ArbV1().AppWrappers(context.namespace).Create(aw) + Expect(err).NotTo(HaveOccurred()) + + return appwrapper +} + func createDeploymentAW(context *context, name string) *arbv1.AppWrapper { rb := []byte(`{"apiVersion": "apps/v1", "kind": "Deployment", From 80591c0b674c3ed60363956f189a5057b9076b77 Mon Sep 17 00:00:00 2001 From: "Pedro D. Bello-Maldonado" Date: Sat, 4 Feb 2023 18:44:49 -0500 Subject: [PATCH 04/11] Fixed naming issue in requeuing test --- test/e2e/queue.go | 2 +- test/e2e/util.go | 6 +++--- 2 files changed, 4 insertions(+), 4 deletions(-) diff --git a/test/e2e/queue.go b/test/e2e/queue.go index a8ddcee03..73d1e9075 100644 --- a/test/e2e/queue.go +++ b/test/e2e/queue.go @@ -134,7 +134,7 @@ var _ = Describe("AppWrapper E2E Test", func() { // The job should be requeued 3 times before it finishes since the wait time is doubled each time the job is requeued (i.e., initially it waits // for 1 minutes before requeuing, then 2 minutes, and then 4 minutes). Since the init containers take 3 minutes // and 20 seconds to finish, a 4 minute wait should be long enough to finish the job successfully - aw3 := createJobAWWithInitContainer(context, "aw-deployment-3-init-container") + aw3 := createJobAWWithInitContainer(context, "aw-job-3-init-container") appwrappers = append(appwrappers, aw3) err = waitAWPodsCompleted(context, aw3) // This test waits for 10 minutes to make sure all PODs complete diff --git a/test/e2e/util.go b/test/e2e/util.go index c718b277b..30d81401e 100644 --- a/test/e2e/util.go +++ b/test/e2e/util.go @@ -771,7 +771,7 @@ func createJobAWWithInitContainer(context *context, name string) *arbv1.AppWrapp "restartPolicy": "Never", "initContainers": [ { - "name": "aw-job-3-init-container", + "name": "job-init-container", "image": "k8s.gcr.io/busybox:latest", "command": ["sleep", "200"], "resources": { @@ -783,7 +783,7 @@ func createJobAWWithInitContainer(context *context, name string) *arbv1.AppWrapp ], "containers": [ { - "name": "aw-job-3-container", + "name": "job-container", "image": "k8s.gcr.io/busybox:latest", "command": ["sleep", "10"], "resources": { @@ -814,7 +814,7 @@ func createJobAWWithInitContainer(context *context, name string) *arbv1.AppWrapp GenericItems: []arbv1.AppWrapperGenericResource{ { ObjectMeta: metav1.ObjectMeta{ - Name: fmt.Sprintf("%s-%s", name, "aw-job-3-init-container"), + Name: name, Namespace: context.namespace, }, DesiredAvailable: 1, From 63a4fff4af14f377c393910bf459a5590dd0eb56 Mon Sep 17 00:00:00 2001 From: "Pedro D. Bello-Maldonado" Date: Mon, 6 Feb 2023 15:09:26 -0500 Subject: [PATCH 05/11] Updated requeuing test to separate the test for requeuing --- test/e2e/queue.go | 25 ++++++++++++++++++++++--- 1 file changed, 22 insertions(+), 3 deletions(-) diff --git a/test/e2e/queue.go b/test/e2e/queue.go index 73d1e9075..488d64f06 100644 --- a/test/e2e/queue.go +++ b/test/e2e/queue.go @@ -110,6 +110,7 @@ var _ = Describe("AppWrapper E2E Test", func() { It("MCAD CPU Preemption Test", func() { fmt.Fprintf(os.Stdout, "[e2e] MCAD CPU Preemption Test - Started.\n") + context := initTestContext() var appwrappers []*arbv1.AppWrapper appwrappersPtr := &appwrappers @@ -129,15 +130,33 @@ var _ = Describe("AppWrapper E2E Test", func() { err = waitAWAnyPodsExists(context, aw2) Expect(err).To(HaveOccurred()) + // This should fit on cluster, initially queued because of aw2 above but should eventually + // run after prevention of aw2 above. + aw3 := createDeploymentAWwith425CPU(context, "aw-deployment-2-425cpu") + appwrappers = append(appwrappers, aw3) + + // Since preemption takes some time, increasing timeout wait time to 2 minutes + err = waitAWPodsExists(context, aw3, 120000*time.Millisecond) + Expect(err).NotTo(HaveOccurred()) + }) + + It("MCAD CPU Requeuing Test", func() { + fmt.Fprintf(os.Stdout, "[e2e] MCAD CPU Requeuing Test - Started.\n") + + context := initTestContext() + var appwrappers []*arbv1.AppWrapper + appwrappersPtr := &appwrappers + defer cleanupTestObjectsPtr(context, appwrappersPtr) + // Create a job with init containers that need 200 seconds to be ready before the container starts. // The requeuing mechanism is set to start at 1 minute, which is not enough time for the PODs to be completed. // The job should be requeued 3 times before it finishes since the wait time is doubled each time the job is requeued (i.e., initially it waits // for 1 minutes before requeuing, then 2 minutes, and then 4 minutes). Since the init containers take 3 minutes // and 20 seconds to finish, a 4 minute wait should be long enough to finish the job successfully - aw3 := createJobAWWithInitContainer(context, "aw-job-3-init-container") - appwrappers = append(appwrappers, aw3) + aw := createJobAWWithInitContainer(context, "aw-job-3-init-container") + appwrappers = append(appwrappers, aw) - err = waitAWPodsCompleted(context, aw3) // This test waits for 10 minutes to make sure all PODs complete + err := waitAWPodsCompleted(context, aw) // This test waits for 10 minutes to make sure all PODs complete Expect(err).NotTo(HaveOccurred()) }) From fd0ed2f6bda58fbeb619ab0830f57e363cf22926 Mon Sep 17 00:00:00 2001 From: "Pedro D. Bello-Maldonado" Date: Mon, 6 Feb 2023 18:22:06 -0500 Subject: [PATCH 06/11] Attempt to fix test issue by specifying requeuing time. 5 minutes of wait time seems to be too long. --- test/e2e/queue.go | 12 ++++++------ test/e2e/util.go | 3 ++- 2 files changed, 8 insertions(+), 7 deletions(-) diff --git a/test/e2e/queue.go b/test/e2e/queue.go index 488d64f06..bb41856e8 100644 --- a/test/e2e/queue.go +++ b/test/e2e/queue.go @@ -371,7 +371,7 @@ var _ = Describe("AppWrapper E2E Test", func() { // This should fit on cluster with customPodResources matching deployment resource demands so AW pods are created aw := createGenericDeploymentCustomPodResourcesWithCPUAW( - context, "aw-deployment-2-550-vs-550-cpu", "550m", "550m", 2) + context, "aw-deployment-2-550-vs-550-cpu", "550m", "550m", 2, 1) appwrappers = append(appwrappers, aw) @@ -384,6 +384,7 @@ var _ = Describe("AppWrapper E2E Test", func() { It("MCAD Scheduling Fail Fast Preemption Test", func() { fmt.Fprintf(os.Stdout, "[e2e] MCAD Scheduling Fail Fast Preemption Test - Started.\n") + context := initTestContext() var appwrappers []*arbv1.AppWrapper appwrappersPtr := &appwrappers @@ -398,7 +399,7 @@ var _ = Describe("AppWrapper E2E Test", func() { // This should not fit on any node but should dispatch because there is enough aggregated resources. aw2 := createGenericDeploymentCustomPodResourcesWithCPUAW( - context, "aw-ff-deployment-1-700-cpu", "700m", "700m", 1) + context, "aw-ff-deployment-1-700-cpu", "700m", "700m", 1, 1) appwrappers = append(appwrappers, aw2) @@ -411,7 +412,7 @@ var _ = Describe("AppWrapper E2E Test", func() { // This should fit on cluster after AW aw-deployment-1-700-cpu above is automatically preempted on // scheduling failure aw3 := createGenericDeploymentCustomPodResourcesWithCPUAW( - context, "aw-ff-deployment-2-340-cpu", "340m", "340m", 2) + context, "aw-ff-deployment-2-340-cpu", "340m", "340m", 2, 1) appwrappers = append(appwrappers, aw3) @@ -426,7 +427,6 @@ var _ = Describe("AppWrapper E2E Test", func() { // Make sure pods from AW aw-deployment-1-700-cpu above do not exist proving preemption err = waitAWAnyPodsExists(context, aw2) Expect(err).To(HaveOccurred()) - }) It("MCAD Bad Custom Pod Resources vs. Deployment Pod Resource Not Queuing Test", func() { @@ -445,7 +445,7 @@ var _ = Describe("AppWrapper E2E Test", func() { // This should not fit on cluster but customPodResources is incorrect so AW pods are created aw2 := createGenericDeploymentCustomPodResourcesWithCPUAW( - context, "aw-deployment-2-425-vs-426-cpu", "425m", "426m", 2) + context, "aw-deployment-2-425-vs-426-cpu", "425m", "426m", 2, 1) appwrappers = append(appwrappers, aw2) @@ -472,7 +472,7 @@ var _ = Describe("AppWrapper E2E Test", func() { // This should fit on cluster but customPodResources is incorrect so AW pods are not created aw2 := createGenericDeploymentCustomPodResourcesWithCPUAW( - context, "aw-deployment-2-426-vs-425-cpu", "426m", "425m", 2) + context, "aw-deployment-2-426-vs-425-cpu", "426m", "425m", 2, 1) appwrappers = append(appwrappers, aw2) diff --git a/test/e2e/util.go b/test/e2e/util.go index 30d81401e..f5ee80dfd 100644 --- a/test/e2e/util.go +++ b/test/e2e/util.go @@ -2291,7 +2291,7 @@ func createGenericDeploymentWithCPUAW(context *context, name string, cpuDemand s return appwrapper } -func createGenericDeploymentCustomPodResourcesWithCPUAW(context *context, name string, customPodCpuDemand string, cpuDemand string, replicas int) *arbv1.AppWrapper { +func createGenericDeploymentCustomPodResourcesWithCPUAW(context *context, name string, customPodCpuDemand string, cpuDemand string, replicas int, requeuingTimeMinutes int) *arbv1.AppWrapper { rb := []byte(fmt.Sprintf(`{ "apiVersion": "apps/v1", "kind": "Deployment", @@ -2350,6 +2350,7 @@ func createGenericDeploymentCustomPodResourcesWithCPUAW(context *context, name s Spec: arbv1.AppWrapperSpec{ SchedSpec: arbv1.SchedulingSpecTemplate{ MinAvailable: schedSpecMin, + RequeuingTimeMinutes: requeuingTimeMinutes, }, AggrResources: arbv1.AppWrapperResourceList{ GenericItems: []arbv1.AppWrapperGenericResource{ From f4068a8ad1fb96d77e45a441be67652700be0bef Mon Sep 17 00:00:00 2001 From: "Pedro D. Bello-Maldonado" Date: Wed, 8 Feb 2023 22:35:01 -0500 Subject: [PATCH 07/11] Updated CRD to include different requeuing control options --- .../crd/bases/mcad.ibm.com_appwrappers.yaml | 25 +++++- config/crd/bases/mcad.ibm.com_queuejobs.yaml | 27 ++++++- .../bases/mcad.ibm.com_schedulingspecs.yaml | 25 +++++- .../mcad-controller/templates/deployment.yaml | 77 ++++++++++++++++--- .../mcad-controller/templates/deployment.yaml | 77 ++++++++++++++++--- 5 files changed, 201 insertions(+), 30 deletions(-) diff --git a/config/crd/bases/mcad.ibm.com_appwrappers.yaml b/config/crd/bases/mcad.ibm.com_appwrappers.yaml index 738244e90..8fc1e52d3 100644 --- a/config/crd/bases/mcad.ibm.com_appwrappers.yaml +++ b/config/crd/bases/mcad.ibm.com_appwrappers.yaml @@ -257,9 +257,28 @@ spec: properties: minAvailable: type: integer - requeuingTimeMinutes: - type: integer - default: 5 + requeuing: + description: Specification of the requeuing strategy based on + waiting time + properties: + initialTimeInSeconds: + type: integer + timeInSeconds: + type: integer + default: 300 + maxTimeInSeconds: + type: integer + default: 0 + growthType: + type: string + default: "exponential" + numRequeuings: + type: integer + default: 0 + maxNumRequeuings: + type: integer + default: 0 + type: object nodeSelector: additionalProperties: type: string diff --git a/config/crd/bases/mcad.ibm.com_queuejobs.yaml b/config/crd/bases/mcad.ibm.com_queuejobs.yaml index f54cd5fae..b32bd425c 100644 --- a/config/crd/bases/mcad.ibm.com_queuejobs.yaml +++ b/config/crd/bases/mcad.ibm.com_queuejobs.yaml @@ -34,7 +34,7 @@ spec: type: object spec: description: Specification of the desired behavior of a cron job, including - the minAvailable and the requeuingTimeMinutes + the minAvailable and the requeuing strategy properties: schedulerName: type: string @@ -43,9 +43,28 @@ spec: properties: minAvailable: type: integer - requeuingTimeMinutes: - type: integer - default: 5 + requeuing: + description: Specification of the requeuing strategy based on + waiting time + properties: + initialTimeInSeconds: + type: integer + timeInSeconds: + type: integer + default: 300 + maxTimeInSeconds: + type: integer + default: 0 + growthType: + type: string + default: "exponential" + numRequeuings: + type: integer + default: 0 + maxNumRequeuings: + type: integer + default: 0 + type: object nodeSelector: additionalProperties: type: string diff --git a/config/crd/bases/mcad.ibm.com_schedulingspecs.yaml b/config/crd/bases/mcad.ibm.com_schedulingspecs.yaml index 6b1fff7a6..73999d042 100644 --- a/config/crd/bases/mcad.ibm.com_schedulingspecs.yaml +++ b/config/crd/bases/mcad.ibm.com_schedulingspecs.yaml @@ -36,9 +36,28 @@ spec: properties: minAvailable: type: integer - requeuingTimeMinutes: - type: integer - default: 5 + requeuing: + description: Specification of the requeuing strategy based on + waiting time + properties: + initialTimeInSeconds: + type: integer + timeInSeconds: + type: integer + default: 300 + maxTimeInSeconds: + type: integer + default: 0 + growthType: + type: string + default: "exponential" + numRequeuings: + type: integer + default: 0 + maxNumRequeuings: + type: integer + default: 0 + type: object nodeSelector: additionalProperties: type: string diff --git a/deployment/mcad-controller/templates/deployment.yaml b/deployment/mcad-controller/templates/deployment.yaml index 8ea7cda93..4a6409a1a 100644 --- a/deployment/mcad-controller/templates/deployment.yaml +++ b/deployment/mcad-controller/templates/deployment.yaml @@ -136,9 +136,28 @@ spec: properties: minAvailable: type: integer - requeuingTimeMinutes: - type: integer - default: 5 + requeuing: + description: Specification of the requeuing strategy based on + waiting time + properties: + initialTimeInSeconds: + type: integer + timeInSeconds: + type: integer + default: 300 + maxTimeInSeconds: + type: integer + default: 0 + growthType: + type: string + default: "exponential" + numRequeuings: + type: integer + default: 0 + maxNumRequeuings: + type: integer + default: 0 + type: object nodeSelector: additionalProperties: type: string @@ -184,7 +203,7 @@ spec: type: object spec: description: Specification of the desired behavior of a cron job, including - the minAvailable and the requeuingTimeMinutes + the minAvailable and the requeuing strategy properties: schedulerName: type: string @@ -193,9 +212,28 @@ spec: properties: minAvailable: type: integer - requeuingTimeMinutes: - type: integer - default: 5 + requeuing: + description: Specification of the requeuing strategy based on + waiting time + properties: + initialTimeInSeconds: + type: integer + timeInSeconds: + type: integer + default: 300 + maxTimeInSeconds: + type: integer + default: 0 + growthType: + type: string + default: "exponential" + numRequeuings: + type: integer + default: 0 + maxNumRequeuings: + type: integer + default: 0 + type: object nodeSelector: additionalProperties: type: string @@ -7231,9 +7269,28 @@ spec: properties: minAvailable: type: integer - requeuingTimeMinutes: - type: integer - default: 5 + requeuing: + description: Specification of the requeuing strategy based on + waiting time + properties: + initialTimeInSeconds: + type: integer + timeInSeconds: + type: integer + default: 300 + maxTimeInSeconds: + type: integer + default: 0 + growthType: + type: string + default: "exponential" + numRequeuings: + type: integer + default: 0 + maxNumRequeuings: + type: integer + default: 0 + type: object nodeSelector: additionalProperties: type: string diff --git a/deployment/mcad-operator/helm-charts/mcad-controller/templates/deployment.yaml b/deployment/mcad-operator/helm-charts/mcad-controller/templates/deployment.yaml index e427afda5..aca07f6e2 100644 --- a/deployment/mcad-operator/helm-charts/mcad-controller/templates/deployment.yaml +++ b/deployment/mcad-operator/helm-charts/mcad-controller/templates/deployment.yaml @@ -118,9 +118,28 @@ spec: properties: minAvailable: type: integer - requeuingTimeMinutes: - type: integer - default: 5 + requeuing: + description: Specification of the requeuing strategy based on + waiting time + properties: + initialTimeInSeconds: + type: integer + timeInSeconds: + type: integer + default: 300 + maxTimeInSeconds: + type: integer + default: 0 + growthType: + type: string + default: "exponential" + numRequeuings: + type: integer + default: 0 + maxNumRequeuings: + type: integer + default: 0 + type: object nodeSelector: additionalProperties: type: string @@ -166,7 +185,7 @@ spec: type: object spec: description: Specification of the desired behavior of a cron job, including - the minAvailable and the requeuingTimeMinutes + the minAvailable and the requeuing strategy properties: schedulerName: type: string @@ -175,9 +194,28 @@ spec: properties: minAvailable: type: integer - requeuingTimeMinutes: - type: integer - default: 5 + requeuing: + description: Specification of the requeuing strategy based on + waiting time + properties: + initialTimeInSeconds: + type: integer + timeInSeconds: + type: integer + default: 300 + maxTimeInSeconds: + type: integer + default: 0 + growthType: + type: string + default: "exponential" + numRequeuings: + type: integer + default: 0 + maxNumRequeuings: + type: integer + default: 0 + type: object nodeSelector: additionalProperties: type: string @@ -7204,9 +7242,28 @@ spec: properties: minAvailable: type: integer - requeuingTimeMinutes: - type: integer - default: 5 + requeuing: + description: Specification of the requeuing strategy based on + waiting time + properties: + initialTimeInSeconds: + type: integer + timeInSeconds: + type: integer + default: 300 + maxTimeInSeconds: + type: integer + default: 0 + growthType: + type: string + default: "exponential" + numRequeuings: + type: int + default: 0 + maxNumRequeuings: + type: int + default: 0 + type: object nodeSelector: additionalProperties: type: string From a1f0d3e3e97999bd7ebe343d5bb33070886c8e12 Mon Sep 17 00:00:00 2001 From: "Pedro D. Bello-Maldonado" Date: Wed, 8 Feb 2023 22:37:20 -0500 Subject: [PATCH 08/11] New logic to requeu AppWrapper based on the latest condition based on time (not location in the list of conditions). Users can now control different ways to requeue the job and to stop it from requeuing. --- pkg/apis/controller/v1beta1/schedulingspec.go | 11 ++- .../queuejob/queuejob_controller_ex.go | 75 ++++++++++++++++--- 2 files changed, 74 insertions(+), 12 deletions(-) diff --git a/pkg/apis/controller/v1beta1/schedulingspec.go b/pkg/apis/controller/v1beta1/schedulingspec.go index ec026a137..fa83b223e 100644 --- a/pkg/apis/controller/v1beta1/schedulingspec.go +++ b/pkg/apis/controller/v1beta1/schedulingspec.go @@ -48,7 +48,16 @@ type SchedulingSpec struct { type SchedulingSpecTemplate struct { NodeSelector map[string]string `json:"nodeSelector,omitempty" protobuf:"bytes,1,rep,name=nodeSelector"` MinAvailable int `json:"minAvailable,omitempty" protobuf:"bytes,2,rep,name=minAvailable"` - RequeuingTimeMinutes int `json:"requeuingTimeMinutes,omitempty" protobuf:"bytes,2,rep,name=requeuingTimeMinutes"` + Requeuing RequeuingTemplate `json:"requeuing,omitempty" protobuf:"bytes,1,rep,name=requeuing"` +} + +type RequeuingTemplate struct { + InitialTimeInSeconds int `json:"initialTimeInSeconds,omitempty" protobuf:"bytes,1,rep,name=initialTimeInSeconds"` + TimeInSeconds int `json:"timeInSeconds,omitempty" protobuf:"bytes,2,rep,name=timeInSeconds"` + MaxTimeInSeconds int `json:"maxTimeInSeconds,omitempty" protobuf:"bytes,3,rep,name=maxTimeInSeconds"` + GrowthType string `json:"growthType,omitempty" protobuf:"bytes,4,rep,name=growthType"` + NumRequeuings int `json:"numRequeuings,omitempty" protobuf:"bytes,5,rep,name=numRequeuings"` + MaxNumRequeuings int `json:"maxNumRequeuings,omitempty" protobuf:"bytes,6,rep,name=maxNumRequeuings"` } // +k8s:deepcopy-gen:interfaces=k8s.io/apimachinery/pkg/runtime.Object diff --git a/pkg/controller/queuejob/queuejob_controller_ex.go b/pkg/controller/queuejob/queuejob_controller_ex.go index 0f8d61b26..5654e5d9d 100644 --- a/pkg/controller/queuejob/queuejob_controller_ex.go +++ b/pkg/controller/queuejob/queuejob_controller_ex.go @@ -424,6 +424,10 @@ func NewJobController(config *rest.Config, serverOption *options.ServerOption) * func (qjm *XController) PreemptQueueJobs() { qjobs := qjm.GetQueueJobsEligibleForPreemption() for _, aw := range qjobs { + if aw.Status.State == arbv1.AppWrapperStateCompleted || aw.Status.State == arbv1.AppWrapperStateDeleted || aw.Status.State == arbv1.AppWrapperStateFailed { + continue + } + var updateNewJob *arbv1.AppWrapper var message string newjob, e := qjm.queueJobLister.AppWrappers(aw.Namespace).Get(aw.Name) @@ -431,16 +435,35 @@ func (qjm *XController) PreemptQueueJobs() { continue } newjob.Status.CanRun = false + cleanAppWrapper := false if (aw.Status.Running + aw.Status.Succeeded) < int32(aw.Spec.SchedSpec.MinAvailable) { message = fmt.Sprintf("Insufficient number of Running and Completed pods, minimum=%d, running=%d, completed=%d.", aw.Spec.SchedSpec.MinAvailable, aw.Status.Running, aw.Status.Succeeded) cond := GenerateAppWrapperCondition(arbv1.AppWrapperCondPreemptCandidate, v1.ConditionTrue, "MinPodsNotRunning", message) newjob.Status.Conditions = append(newjob.Status.Conditions, cond) - newjob.Spec.SchedSpec.RequeuingTimeMinutes = 2 * aw.Spec.SchedSpec.RequeuingTimeMinutes // Grow the requeuing waiting time exponentially - updateNewJob = newjob.DeepCopy() - //If pods failed scheduling generate new preempt condition + if aw.Spec.SchedSpec.Requeuing.GrowthType == "exponential" { + newjob.Spec.SchedSpec.Requeuing.TimeInSeconds += aw.Spec.SchedSpec.Requeuing.TimeInSeconds + } else if aw.Spec.SchedSpec.Requeuing.GrowthType == "linear" { + newjob.Spec.SchedSpec.Requeuing.TimeInSeconds += aw.Spec.SchedSpec.Requeuing.InitialTimeInSeconds + } + + if aw.Spec.SchedSpec.Requeuing.MaxTimeInSeconds > 0 { + if aw.Spec.SchedSpec.Requeuing.MaxTimeInSeconds <= newjob.Spec.SchedSpec.Requeuing.TimeInSeconds { + newjob.Spec.SchedSpec.Requeuing.TimeInSeconds = aw.Spec.SchedSpec.Requeuing.MaxTimeInSeconds + } + } + + if newjob.Spec.SchedSpec.Requeuing.MaxNumRequeuings > 0 && newjob.Spec.SchedSpec.Requeuing.NumRequeuings == newjob.Spec.SchedSpec.Requeuing.MaxNumRequeuings { + newjob.Status.State = arbv1.AppWrapperStateDeleted + cleanAppWrapper = true + } else { + newjob.Spec.SchedSpec.Requeuing.NumRequeuings += 1 + } + + updateNewJob = newjob.DeepCopy() } else { + //If pods failed scheduling generate new preempt condition message = fmt.Sprintf("Pods failed scheduling failed=%v, running=%v.", len(aw.Status.PendingPodConditions), aw.Status.Running) index := getIndexOfMatchedCondition(newjob, arbv1.AppWrapperCondPreemptCandidate, "PodsFailedScheduling") //ignore co-scheduler failed scheduling events. This is a temp @@ -458,9 +481,13 @@ func (qjm *XController) PreemptQueueJobs() { if err := qjm.updateEtcd(updateNewJob, "PreemptQueueJobs - CanRun: false"); err != nil { klog.Errorf("Failed to update status of AppWrapper %v/%v: %v", aw.Namespace, aw.Name, err) } - klog.V(4).Infof("[PreemptQueueJobs] Adding preempted AppWrapper %s/%s to backoff queue.", - aw.Name, aw.Namespace) - go qjm.backoff(aw, "PreemptionTriggered", string(message)) + if cleanAppWrapper { + klog.V(4).Infof("[PreemptQueueJobs] Deleting AppWrapper %s/%s due to maximum number of requeuings exceeded.", aw.Name, aw.Namespace) + go qjm.Cleanup(aw) + } else { + klog.V(4).Infof("[PreemptQueueJobs] Adding preempted AppWrapper %s/%s to backoff queue.", aw.Name, aw.Namespace) + go qjm.backoff(aw, "PreemptionTriggered", string(message)) + } } } @@ -504,18 +531,44 @@ func (qjm *XController) GetQueueJobsEligibleForPreemption() []*arbv1.AppWrapper if (int(value.Status.Running) + int(value.Status.Succeeded)) < replicas { + // Find the dispatched condition if there is any + numConditions := len(value.Status.Conditions) + var dispatchedCondition arbv1.AppWrapperCondition + dispatchedConditionExists := false + + for i := numConditions - 1; i > 0; i-- { + dispatchedCondition = value.Status.Conditions[i] + if dispatchedCondition.Type != arbv1.AppWrapperCondDispatched { + continue + } + dispatchedConditionExists = true + break + } + // Check for the minimum age and then skip preempt if current time is not beyond minimum age - // The minimum age is controlled by the requeuingTimeMinutes stanza - // For preemption, the time is compared to the last condition in the AppWrapper - condition := value.Status.Conditions[len(value.Status.Conditions) - 1] - requeuingTimeMinutes := value.Spec.SchedSpec.RequeuingTimeMinutes - minAge := condition.LastTransitionMicroTime.Add(time.Duration(requeuingTimeMinutes) * time.Minute) + // The minimum age is controlled by the requeuing.TimeInSeconds stanza + // For preemption, the time is compared to the last condition or the dispatched condition in the AppWrapper, whichever happened later + lastCondition := value.Status.Conditions[numConditions - 1] + var condition arbv1.AppWrapperCondition + + if dispatchedConditionExists && dispatchedCondition.LastTransitionMicroTime.After(lastCondition.LastTransitionMicroTime.Time) { + condition = dispatchedCondition + } else { + condition = lastCondition + } + + requeuingTimeInSeconds := value.Spec.SchedSpec.Requeuing.TimeInSeconds + minAge := condition.LastTransitionMicroTime.Add(time.Duration(requeuingTimeInSeconds) * time.Second) currentTime := time.Now() if currentTime.Before(minAge) { continue } + if value.Spec.SchedSpec.Requeuing.InitialTimeInSeconds == 0 { + value.Spec.SchedSpec.Requeuing.InitialTimeInSeconds = value.Spec.SchedSpec.Requeuing.TimeInSeconds + } + if replicas > 0 { klog.V(3).Infof("AppWrapper %s is eligible for preemption Running: %v - minAvailable: %v , Succeeded: %v !!! \n", value.Name, value.Status.Running, replicas, value.Status.Succeeded) qjobs = append(qjobs, value) From aab17f592682f30160d5a59334cacfcaba645695 Mon Sep 17 00:00:00 2001 From: "Pedro D. Bello-Maldonado" Date: Wed, 8 Feb 2023 22:38:10 -0500 Subject: [PATCH 09/11] Updated testing for new requeuing mechanism --- test/e2e/queue.go | 37 +++++++++++++++++++++++++++---------- test/e2e/util.go | 23 ++++++++++++++--------- 2 files changed, 41 insertions(+), 19 deletions(-) diff --git a/test/e2e/queue.go b/test/e2e/queue.go index bb41856e8..0ab5ee04b 100644 --- a/test/e2e/queue.go +++ b/test/e2e/queue.go @@ -36,7 +36,6 @@ package e2e import ( "fmt" "os" - "time" . "github.com/onsi/ginkgo" @@ -140,7 +139,7 @@ var _ = Describe("AppWrapper E2E Test", func() { Expect(err).NotTo(HaveOccurred()) }) - It("MCAD CPU Requeuing Test", func() { + It("MCAD CPU Requeuing - Completion After enough Requeuing Times Test", func() { fmt.Fprintf(os.Stdout, "[e2e] MCAD CPU Requeuing Test - Started.\n") context := initTestContext() @@ -153,13 +152,31 @@ var _ = Describe("AppWrapper E2E Test", func() { // The job should be requeued 3 times before it finishes since the wait time is doubled each time the job is requeued (i.e., initially it waits // for 1 minutes before requeuing, then 2 minutes, and then 4 minutes). Since the init containers take 3 minutes // and 20 seconds to finish, a 4 minute wait should be long enough to finish the job successfully - aw := createJobAWWithInitContainer(context, "aw-job-3-init-container") + aw := createJobAWWithInitContainer(context, "aw-job-3-init-container", 60, "exponential", 0) appwrappers = append(appwrappers, aw) - err := waitAWPodsCompleted(context, aw) // This test waits for 10 minutes to make sure all PODs complete + err := waitAWPodsCompleted(context, aw, 600 * time.Second) // This test waits for 10 minutes to make sure all PODs complete Expect(err).NotTo(HaveOccurred()) }) + It("MCAD CPU Requeuing - Deletion After Maximum Requeuing Times Test", func() { + fmt.Fprintf(os.Stdout, "[e2e] MCAD CPU Requeuing Test - Started.\n") + + context := initTestContext() + var appwrappers []*arbv1.AppWrapper + appwrappersPtr := &appwrappers + defer cleanupTestObjectsPtr(context, appwrappersPtr) + + // Create a job with init containers that need 200 seconds to be ready before the container starts. + // The requeuing mechanism is set to fire after 1 second (plus the 60 seconds time interval of the background thread) + // Within 5 minutes, the AppWrapper will be requeued up to 3 times at which point it will be deleted + aw := createJobAWWithInitContainer(context, "aw-job-3-init-container", 1, "none", 3) + appwrappers = append(appwrappers, aw) + + err := waitAWPodsCompleted(context, aw, 300 * time.Second) + Expect(err).To(HaveOccurred()) + }) + It("Create AppWrapper - StatefulSet Only - 2 Pods", func() { fmt.Fprintf(os.Stdout, "[e2e] Create AppWrapper - StatefulSet Only - 2 Pods - Started.\n") @@ -371,7 +388,7 @@ var _ = Describe("AppWrapper E2E Test", func() { // This should fit on cluster with customPodResources matching deployment resource demands so AW pods are created aw := createGenericDeploymentCustomPodResourcesWithCPUAW( - context, "aw-deployment-2-550-vs-550-cpu", "550m", "550m", 2, 1) + context, "aw-deployment-2-550-vs-550-cpu", "550m", "550m", 2, 60) appwrappers = append(appwrappers, aw) @@ -399,7 +416,7 @@ var _ = Describe("AppWrapper E2E Test", func() { // This should not fit on any node but should dispatch because there is enough aggregated resources. aw2 := createGenericDeploymentCustomPodResourcesWithCPUAW( - context, "aw-ff-deployment-1-700-cpu", "700m", "700m", 1, 1) + context, "aw-ff-deployment-1-700-cpu", "700m", "700m", 1, 60) appwrappers = append(appwrappers, aw2) @@ -412,7 +429,7 @@ var _ = Describe("AppWrapper E2E Test", func() { // This should fit on cluster after AW aw-deployment-1-700-cpu above is automatically preempted on // scheduling failure aw3 := createGenericDeploymentCustomPodResourcesWithCPUAW( - context, "aw-ff-deployment-2-340-cpu", "340m", "340m", 2, 1) + context, "aw-ff-deployment-2-340-cpu", "340m", "340m", 2, 60) appwrappers = append(appwrappers, aw3) @@ -445,7 +462,7 @@ var _ = Describe("AppWrapper E2E Test", func() { // This should not fit on cluster but customPodResources is incorrect so AW pods are created aw2 := createGenericDeploymentCustomPodResourcesWithCPUAW( - context, "aw-deployment-2-425-vs-426-cpu", "425m", "426m", 2, 1) + context, "aw-deployment-2-425-vs-426-cpu", "425m", "426m", 2, 60) appwrappers = append(appwrappers, aw2) @@ -472,7 +489,7 @@ var _ = Describe("AppWrapper E2E Test", func() { // This should fit on cluster but customPodResources is incorrect so AW pods are not created aw2 := createGenericDeploymentCustomPodResourcesWithCPUAW( - context, "aw-deployment-2-426-vs-425-cpu", "426m", "425m", 2, 1) + context, "aw-deployment-2-426-vs-425-cpu", "426m", "425m", 2, 60) appwrappers = append(appwrappers, aw2) @@ -517,7 +534,7 @@ var _ = Describe("AppWrapper E2E Test", func() { aw := createGenericJobAWWithScheduleSpec(context, "aw-test-job-with-scheduling-spec") err1 := waitAWPodsReady(context, aw) Expect(err1).NotTo(HaveOccurred()) - err2 := waitAWPodsCompleted(context, aw) + err2 := waitAWPodsCompleted(context, aw, 90 * time.Second) Expect(err2).NotTo(HaveOccurred()) // Once pods are completed, we wait for them to see if they change their status to anything BUT "Completed" diff --git a/test/e2e/util.go b/test/e2e/util.go index f5ee80dfd..afa94bf63 100644 --- a/test/e2e/util.go +++ b/test/e2e/util.go @@ -612,8 +612,8 @@ func waitAWPodsReady(ctx *context, aw *arbv1.AppWrapper) error { return waitAWPodsReadyEx(ctx, aw, int(aw.Spec.SchedSpec.MinAvailable), false) } -func waitAWPodsCompleted(ctx *context, aw *arbv1.AppWrapper) error { - return waitAWPodsCompletedEx(ctx, aw, int(aw.Spec.SchedSpec.MinAvailable), false) +func waitAWPodsCompleted(ctx *context, aw *arbv1.AppWrapper, timeout time.Duration) error { + return waitAWPodsCompletedEx(ctx, aw, int(aw.Spec.SchedSpec.MinAvailable), false, timeout) } func waitAWPodsNotCompleted(ctx *context, aw *arbv1.AppWrapper) error { @@ -654,8 +654,8 @@ func waitAWPodsReadyEx(ctx *context, aw *arbv1.AppWrapper, taskNum int, quite bo []v1.PodPhase{v1.PodRunning, v1.PodSucceeded}, taskNum, quite)) } -func waitAWPodsCompletedEx(ctx *context, aw *arbv1.AppWrapper, taskNum int, quite bool) error { - return wait.Poll(100*time.Millisecond, tenMinutes, awPodPhase(ctx, aw, +func waitAWPodsCompletedEx(ctx *context, aw *arbv1.AppWrapper, taskNum int, quite bool, timeout time.Duration ) error { + return wait.Poll(100*time.Millisecond, timeout, awPodPhase(ctx, aw, []v1.PodPhase{v1.PodSucceeded}, taskNum, quite)) } @@ -745,7 +745,7 @@ func createReplicaSet(context *context, name string, rep int32, img string, req return deployment } -func createJobAWWithInitContainer(context *context, name string) *arbv1.AppWrapper { +func createJobAWWithInitContainer(context *context, name string, requeuingTimeInSeconds int, requeuingGrowthType string, requeuingMaxNumRequeuings int ) *arbv1.AppWrapper { rb := []byte(`{"apiVersion": "batch/v1", "kind": "Job", "metadata": { @@ -798,7 +798,6 @@ func createJobAWWithInitContainer(context *context, name string) *arbv1.AppWrapp }} `) var minAvailable int = 3 - var requeuingTimeMinutes int = 1 aw := &arbv1.AppWrapper{ ObjectMeta: metav1.ObjectMeta{ @@ -808,7 +807,11 @@ func createJobAWWithInitContainer(context *context, name string) *arbv1.AppWrapp Spec: arbv1.AppWrapperSpec{ SchedSpec: arbv1.SchedulingSpecTemplate{ MinAvailable: minAvailable, - RequeuingTimeMinutes: requeuingTimeMinutes, + Requeuing: arbv1.RequeuingTemplate{ + TimeInSeconds: requeuingTimeInSeconds, + GrowthType: requeuingGrowthType, + MaxNumRequeuings: requeuingMaxNumRequeuings, + }, }, AggrResources: arbv1.AppWrapperResourceList{ GenericItems: []arbv1.AppWrapperGenericResource{ @@ -2291,7 +2294,7 @@ func createGenericDeploymentWithCPUAW(context *context, name string, cpuDemand s return appwrapper } -func createGenericDeploymentCustomPodResourcesWithCPUAW(context *context, name string, customPodCpuDemand string, cpuDemand string, replicas int, requeuingTimeMinutes int) *arbv1.AppWrapper { +func createGenericDeploymentCustomPodResourcesWithCPUAW(context *context, name string, customPodCpuDemand string, cpuDemand string, replicas int, requeuingTimeInSeconds int) *arbv1.AppWrapper { rb := []byte(fmt.Sprintf(`{ "apiVersion": "apps/v1", "kind": "Deployment", @@ -2350,7 +2353,9 @@ func createGenericDeploymentCustomPodResourcesWithCPUAW(context *context, name s Spec: arbv1.AppWrapperSpec{ SchedSpec: arbv1.SchedulingSpecTemplate{ MinAvailable: schedSpecMin, - RequeuingTimeMinutes: requeuingTimeMinutes, + Requeuing: arbv1.RequeuingTemplate{ + TimeInSeconds: requeuingTimeInSeconds, + }, }, AggrResources: arbv1.AppWrapperResourceList{ GenericItems: []arbv1.AppWrapperGenericResource{ From d375cf3764b8c7bc39b8e6fba07c59ce0e6ff0ad Mon Sep 17 00:00:00 2001 From: "Pedro D. Bello-Maldonado" Date: Thu, 9 Feb 2023 09:16:03 -0500 Subject: [PATCH 10/11] Increased time for requeuing to completion test --- test/e2e/queue.go | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/test/e2e/queue.go b/test/e2e/queue.go index 0ab5ee04b..7699e427d 100644 --- a/test/e2e/queue.go +++ b/test/e2e/queue.go @@ -139,7 +139,7 @@ var _ = Describe("AppWrapper E2E Test", func() { Expect(err).NotTo(HaveOccurred()) }) - It("MCAD CPU Requeuing - Completion After enough Requeuing Times Test", func() { + It("MCAD CPU Requeuing - Completion After Enough Requeuing Times Test", func() { fmt.Fprintf(os.Stdout, "[e2e] MCAD CPU Requeuing Test - Started.\n") context := initTestContext() @@ -155,7 +155,7 @@ var _ = Describe("AppWrapper E2E Test", func() { aw := createJobAWWithInitContainer(context, "aw-job-3-init-container", 60, "exponential", 0) appwrappers = append(appwrappers, aw) - err := waitAWPodsCompleted(context, aw, 600 * time.Second) // This test waits for 10 minutes to make sure all PODs complete + err := waitAWPodsCompleted(context, aw, 720 * time.Second) // This test waits for 10 minutes to make sure all PODs complete Expect(err).NotTo(HaveOccurred()) }) From b142864e33869a65895ea2f3d75e479570d7dc77 Mon Sep 17 00:00:00 2001 From: "Pedro D. Bello-Maldonado" Date: Tue, 14 Feb 2023 15:34:56 -0500 Subject: [PATCH 11/11] Updated the CONTROLLER_VERSION --- CONTROLLER_VERSION | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/CONTROLLER_VERSION b/CONTROLLER_VERSION index b615a11c5..971333c39 100644 --- a/CONTROLLER_VERSION +++ b/CONTROLLER_VERSION @@ -1 +1 @@ -1.29.50 +1.29.51