diff --git a/pkg/controller/queuejob/queuejob_controller_ex.go b/pkg/controller/queuejob/queuejob_controller_ex.go index 6290bcdea..1062f9a86 100644 --- a/pkg/controller/queuejob/queuejob_controller_ex.go +++ b/pkg/controller/queuejob/queuejob_controller_ex.go @@ -31,6 +31,7 @@ limitations under the License. package queuejob import ( + "errors" "fmt" "math" "math/rand" @@ -69,7 +70,7 @@ import ( "k8s.io/apimachinery/pkg/runtime" "k8s.io/apimachinery/pkg/runtime/schema" - "k8s.io/apimachinery/pkg/runtime/serializer/json" + runtimeJson "k8s.io/apimachinery/pkg/runtime/serializer/json" "github.com/project-codeflare/multi-cluster-app-dispatcher/pkg/controller/queuejobresources" resconfigmap "github.com/project-codeflare/multi-cluster-app-dispatcher/pkg/controller/queuejobresources/configmap" // ConfigMap @@ -675,7 +676,7 @@ func GetPodTemplate(qjobRes *arbv1.AppWrapperResource) (*v1.PodTemplateSpec, err rtScheme := runtime.NewScheme() v1.AddToScheme(rtScheme) - jsonSerializer := json.NewYAMLSerializer(json.DefaultMetaFactory, rtScheme, rtScheme) + jsonSerializer := runtimeJson.NewYAMLSerializer(runtimeJson.DefaultMetaFactory, rtScheme, rtScheme) podGVK := schema.GroupVersion{Group: v1.GroupName, Version: "v1"}.WithKind("PodTemplate") @@ -1903,7 +1904,6 @@ func (cc *XController) worker() { // sync AppWrapper if err := cc.syncQueueJob(queuejob); err != nil { - klog.Errorf("[worker] Failed to sync AppWrapper '%s/%s', err %#v", queuejob.Namespace, queuejob.Name, err) // If any error, requeue it. return err } @@ -1911,12 +1911,12 @@ func (cc *XController) worker() { klog.V(10).Infof("[worker] Ending %s Delay=%.6f seconds &newQJ=%p Version=%s Status=%+v", queuejob.Name, time.Now().Sub(queuejob.Status.ControllerFirstTimestamp.Time).Seconds(), queuejob, queuejob.ResourceVersion, queuejob.Status) return nil }) - if err != nil { + if err != nil && !CanIgnoreAPIError(err) && !IsJsonSyntaxError(err) { klog.Warningf("[worker] Fail to process item from eventQueue, err %v. Attempting to re-enqueque...", err) if err00 := cc.enqueueIfNotPresent(item); err00 != nil { - klog.Errorf("[worker] Fatal error railed to re-enqueue item, err %v", err00) + klog.Errorf("[worker] Fatal error trying to re-enqueue item, err =%v", err00) } else { - klog.Warning("[worker] Item re-enqueued") + klog.Warning("[worker] Item re-enqueued.") } return } @@ -2140,7 +2140,6 @@ func (cc *XController) manageQueueJob(qj *arbv1.AppWrapper, podPhaseChanges bool klog.Errorf("[manageQueueJob] Error dispatching generic item for app wrapper='%s/%s' type=%v err=%v", qj.Namespace, qj.Name, err00) } dispatchFailureMessage = fmt.Sprintf("%s/%s creation failure: %+v", qj.Namespace, qj.Name, err00) - klog.Errorf("[manageQueueJob] Error dispatching job=%s Status=%+v err=%+v", qj.Name, qj.Status, err00) dispatched = false } } @@ -2337,7 +2336,7 @@ func (cc *XController) Cleanup(appwrapper *arbv1.AppWrapper) error { // we call clean-up for each controller for _, ar := range appwrapper.Spec.AggrResources.Items { err00 := cc.qjobResControls[ar.Type].Cleanup(appwrapper, &ar) - if err00 != nil && !apierrors.IsNotFound(err00) { + if err00 != nil && !CanIgnoreAPIError(err00) && !IsJsonSyntaxError(err00) { klog.Errorf("[Cleanup] Error deleting item %s from app wrapper='%s/%s' err=%v.", ar.Type, appwrapper.Namespace, appwrapper.Name, err00) err = multierror.Append(err, err00) @@ -2350,14 +2349,19 @@ func (cc *XController) Cleanup(appwrapper *arbv1.AppWrapper) error { if appwrapper.Spec.AggrResources.GenericItems != nil { for _, ar := range appwrapper.Spec.AggrResources.GenericItems { genericResourceName, gvk, err00 := cc.genericresources.Cleanup(appwrapper, &ar) - if err00 != nil && !apierrors.IsNotFound(err00) { + if err00 != nil && !CanIgnoreAPIError(err00) && !IsJsonSyntaxError(err00) { klog.Errorf("[Cleanup] Error deleting generic item %s, from app wrapper='%s/%s' err=%v.", genericResourceName, appwrapper.Namespace, appwrapper.Name, err00) err = multierror.Append(err, err00) continue } - klog.V(3).Infof("[Cleanup] Deleted generic item %s, GVK=%s.%s.%s from app wrapper='%s/%s'", - genericResourceName, gvk.Group, gvk.Version, gvk.Kind, appwrapper.Namespace, appwrapper.Name) + if gvk != nil { + klog.V(3).Infof("[Cleanup] Deleted generic item '%s', GVK=%s.%s.%s from app wrapper='%s/%s'", + genericResourceName, gvk.Group, gvk.Version, gvk.Kind, appwrapper.Namespace, appwrapper.Name) + } else { + klog.V(3).Infof("[Cleanup] Deleted generic item '%s' from app wrapper='%s/%s'", + genericResourceName, appwrapper.Namespace, appwrapper.Name) + } } } @@ -2443,3 +2447,21 @@ func (qjm *XController) schedulingAWAtomicSet(qj *arbv1.AppWrapper) { qjm.schedulingAW = qj qjm.schedulingMutex.Unlock() } + +func IsJsonSyntaxError(err error) bool { + var tt *jsons.SyntaxError + if err == nil { + return false + } else if err.Error() == "Job resource template item not define as a PodTemplate" { + return true + } else if err.Error() == "name is required" { + return true + } else if errors.As(err, &tt) { + return true + } else { + return false + } +} +func CanIgnoreAPIError(err error) bool { + return err == nil || apierrors.IsNotFound(err) || apierrors.IsInvalid(err) +} diff --git a/pkg/controller/queuejob/queuejob_controller_ex_test.go b/pkg/controller/queuejob/queuejob_controller_ex_test.go new file mode 100644 index 000000000..ca7b8fb24 --- /dev/null +++ b/pkg/controller/queuejob/queuejob_controller_ex_test.go @@ -0,0 +1,87 @@ +/* +Copyright 2022, 2203 The Multi-Cluster App Dispatcher Authors. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package queuejob_test + +import ( + "encoding/json" + "errors" + "fmt" + "testing" + + apierrors "k8s.io/apimachinery/pkg/api/errors" + "k8s.io/apimachinery/pkg/runtime/schema" + + arbv1 "github.com/project-codeflare/multi-cluster-app-dispatcher/pkg/apis/controller/v1beta1" + "github.com/project-codeflare/multi-cluster-app-dispatcher/pkg/controller/queuejob" + "github.com/stretchr/testify/assert" +) + +// TestIsJsonSyntaxError function validates that the error is related to JSON parsing of +// generic items +func TestIsJsonSyntaxError(t *testing.T) { + // Define the test table + var tests = []struct { + name string + inputErr error + expectedValue bool + }{ + {"Nill error", nil, false}, + {"Job resource template item not define as a PodTemplate", errors.New("Job resource template item not define as a PodTemplate"), true}, + {"json.SyntaxError", new(json.SyntaxError), true}, + {"random errror", errors.New("some error"), false}, + {"wrapped JSON error", fmt.Errorf("wrapping syntax error, %w", new(json.SyntaxError)), true}, + } + // Execute tests in parallel + for _, tc := range tests { + tc := tc // capture range variable + t.Run(tc.name, func(t *testing.T) { + t.Parallel() + assert.Equal(t, tc.expectedValue, queuejob.IsJsonSyntaxError(tc.inputErr)) + }) + } +} +func TestCanIgnoreAPIError(t *testing.T) { + + invalidErr := apierrors.NewInvalid(schema.GroupKind{Group: arbv1.GroupName, Kind: arbv1.AppWrapperPlural}, "a test app wrapper", nil) + notFoundErr := apierrors.NewNotFound(arbv1.Resource("appwrappers"), "a test app wrapper") + conflictErr := apierrors.NewConflict(arbv1.Resource("appwrappers"), "a test app wrapper", errors.New("an appwrapper update conflict")) + + // Define the test table + var tests = []struct { + name string + inputErr error + expectedValue bool + }{ + {"Nill error", nil, true}, + {"apierrors.IsInvalid", invalidErr, true}, + {"apierrors.IsNotFound", notFoundErr, true}, + {"apierrors.IsConflicted", conflictErr, false}, + {"generic error", errors.New("some error"), false}, + {"wrapped apierrors.IsInvalid", fmt.Errorf("wrapped invalid %w", invalidErr), true}, + {"wrapped apierrors.IsNotFound", fmt.Errorf("wrapped invalid %w", notFoundErr), true}, + {"wrapped apierrors.IsConflicted", fmt.Errorf("wrapped invalid %w", conflictErr), false}, + } + // Execute tests in parallel + for _, tc := range tests { + tc := tc // capture range variable + t.Run(tc.name, func(t *testing.T) { + t.Parallel() + assert.Equal(t, tc.expectedValue, queuejob.CanIgnoreAPIError(tc.inputErr)) + }) + } + +} diff --git a/test/e2e/queue.go b/test/e2e/queue.go index a6d5414e9..cb95096be 100644 --- a/test/e2e/queue.go +++ b/test/e2e/queue.go @@ -535,14 +535,14 @@ var _ = Describe("AppWrapper E2E Test", func() { defer cleanupTestObjectsPtr(context, appwrappersPtr) aw := createGenericAWTimeoutWithStatus(context, "aw-test-jobtimeout-with-comp-1") + appwrappers = append(appwrappers, aw) err1 := waitAWPodsReady(context, aw) Expect(err1).NotTo(HaveOccurred(), "Expecting pods to be ready for app wrapper: aw-test-jobtimeout-with-comp-1") - time.Sleep(60 * time.Second) + time.Sleep(90 * time.Second) aw1, err := context.karclient.ArbV1().AppWrappers(aw.Namespace).Get(aw.Name, metav1.GetOptions{}) Expect(err).NotTo(HaveOccurred(), "Expecting no error when getting app wrapper status") fmt.Fprintf(GinkgoWriter, "[e2e] status of app wrapper: %v.\n", aw1.Status) Expect(aw1.Status.State).To(Equal(arbv1.AppWrapperStateFailed), "Expecting a failed state") - appwrappers = append(appwrappers, aw) fmt.Fprintf(os.Stdout, "[e2e] MCAD app wrapper timeout Test - Completed.\n") })