Skip to content

Commit edce2c4

Browse files
authored
Incorrect handling of errors in the worker thread (#482)
* Incorrect handling of errors in the worker thread Fixes #481 * Fixed test file name. * PR review updates Small improvement to e2e test.
1 parent b9b87ea commit edce2c4

File tree

3 files changed

+122
-13
lines changed

3 files changed

+122
-13
lines changed

pkg/controller/queuejob/queuejob_controller_ex.go

Lines changed: 33 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -31,6 +31,7 @@ limitations under the License.
3131
package queuejob
3232

3333
import (
34+
"errors"
3435
"fmt"
3536
"math"
3637
"math/rand"
@@ -69,7 +70,7 @@ import (
6970

7071
"k8s.io/apimachinery/pkg/runtime"
7172
"k8s.io/apimachinery/pkg/runtime/schema"
72-
"k8s.io/apimachinery/pkg/runtime/serializer/json"
73+
runtimeJson "k8s.io/apimachinery/pkg/runtime/serializer/json"
7374

7475
"github.com/project-codeflare/multi-cluster-app-dispatcher/pkg/controller/queuejobresources"
7576
resconfigmap "github.com/project-codeflare/multi-cluster-app-dispatcher/pkg/controller/queuejobresources/configmap" // ConfigMap
@@ -675,7 +676,7 @@ func GetPodTemplate(qjobRes *arbv1.AppWrapperResource) (*v1.PodTemplateSpec, err
675676
rtScheme := runtime.NewScheme()
676677
v1.AddToScheme(rtScheme)
677678

678-
jsonSerializer := json.NewYAMLSerializer(json.DefaultMetaFactory, rtScheme, rtScheme)
679+
jsonSerializer := runtimeJson.NewYAMLSerializer(runtimeJson.DefaultMetaFactory, rtScheme, rtScheme)
679680

680681
podGVK := schema.GroupVersion{Group: v1.GroupName, Version: "v1"}.WithKind("PodTemplate")
681682

@@ -1903,20 +1904,19 @@ func (cc *XController) worker() {
19031904

19041905
// sync AppWrapper
19051906
if err := cc.syncQueueJob(queuejob); err != nil {
1906-
klog.Errorf("[worker] Failed to sync AppWrapper '%s/%s', err %#v", queuejob.Namespace, queuejob.Name, err)
19071907
// If any error, requeue it.
19081908
return err
19091909
}
19101910

19111911
klog.V(10).Infof("[worker] Ending %s Delay=%.6f seconds &newQJ=%p Version=%s Status=%+v", queuejob.Name, time.Now().Sub(queuejob.Status.ControllerFirstTimestamp.Time).Seconds(), queuejob, queuejob.ResourceVersion, queuejob.Status)
19121912
return nil
19131913
})
1914-
if err != nil {
1914+
if err != nil && !CanIgnoreAPIError(err) && !IsJsonSyntaxError(err) {
19151915
klog.Warningf("[worker] Fail to process item from eventQueue, err %v. Attempting to re-enqueque...", err)
19161916
if err00 := cc.enqueueIfNotPresent(item); err00 != nil {
1917-
klog.Errorf("[worker] Fatal error railed to re-enqueue item, err %v", err00)
1917+
klog.Errorf("[worker] Fatal error trying to re-enqueue item, err =%v", err00)
19181918
} else {
1919-
klog.Warning("[worker] Item re-enqueued")
1919+
klog.Warning("[worker] Item re-enqueued.")
19201920
}
19211921
return
19221922
}
@@ -2140,7 +2140,6 @@ func (cc *XController) manageQueueJob(qj *arbv1.AppWrapper, podPhaseChanges bool
21402140
klog.Errorf("[manageQueueJob] Error dispatching generic item for app wrapper='%s/%s' type=%v err=%v", qj.Namespace, qj.Name, err00)
21412141
}
21422142
dispatchFailureMessage = fmt.Sprintf("%s/%s creation failure: %+v", qj.Namespace, qj.Name, err00)
2143-
klog.Errorf("[manageQueueJob] Error dispatching job=%s Status=%+v err=%+v", qj.Name, qj.Status, err00)
21442143
dispatched = false
21452144
}
21462145
}
@@ -2337,7 +2336,7 @@ func (cc *XController) Cleanup(appwrapper *arbv1.AppWrapper) error {
23372336
// we call clean-up for each controller
23382337
for _, ar := range appwrapper.Spec.AggrResources.Items {
23392338
err00 := cc.qjobResControls[ar.Type].Cleanup(appwrapper, &ar)
2340-
if err00 != nil && !apierrors.IsNotFound(err00) {
2339+
if err00 != nil && !CanIgnoreAPIError(err00) && !IsJsonSyntaxError(err00) {
23412340
klog.Errorf("[Cleanup] Error deleting item %s from app wrapper='%s/%s' err=%v.",
23422341
ar.Type, appwrapper.Namespace, appwrapper.Name, err00)
23432342
err = multierror.Append(err, err00)
@@ -2350,14 +2349,19 @@ func (cc *XController) Cleanup(appwrapper *arbv1.AppWrapper) error {
23502349
if appwrapper.Spec.AggrResources.GenericItems != nil {
23512350
for _, ar := range appwrapper.Spec.AggrResources.GenericItems {
23522351
genericResourceName, gvk, err00 := cc.genericresources.Cleanup(appwrapper, &ar)
2353-
if err00 != nil && !apierrors.IsNotFound(err00) {
2352+
if err00 != nil && !CanIgnoreAPIError(err00) && !IsJsonSyntaxError(err00) {
23542353
klog.Errorf("[Cleanup] Error deleting generic item %s, from app wrapper='%s/%s' err=%v.",
23552354
genericResourceName, appwrapper.Namespace, appwrapper.Name, err00)
23562355
err = multierror.Append(err, err00)
23572356
continue
23582357
}
2359-
klog.V(3).Infof("[Cleanup] Deleted generic item %s, GVK=%s.%s.%s from app wrapper='%s/%s'",
2360-
genericResourceName, gvk.Group, gvk.Version, gvk.Kind, appwrapper.Namespace, appwrapper.Name)
2358+
if gvk != nil {
2359+
klog.V(3).Infof("[Cleanup] Deleted generic item '%s', GVK=%s.%s.%s from app wrapper='%s/%s'",
2360+
genericResourceName, gvk.Group, gvk.Version, gvk.Kind, appwrapper.Namespace, appwrapper.Name)
2361+
} else {
2362+
klog.V(3).Infof("[Cleanup] Deleted generic item '%s' from app wrapper='%s/%s'",
2363+
genericResourceName, appwrapper.Namespace, appwrapper.Name)
2364+
}
23612365
}
23622366
}
23632367

@@ -2443,3 +2447,21 @@ func (qjm *XController) schedulingAWAtomicSet(qj *arbv1.AppWrapper) {
24432447
qjm.schedulingAW = qj
24442448
qjm.schedulingMutex.Unlock()
24452449
}
2450+
2451+
func IsJsonSyntaxError(err error) bool {
2452+
var tt *jsons.SyntaxError
2453+
if err == nil {
2454+
return false
2455+
} else if err.Error() == "Job resource template item not define as a PodTemplate" {
2456+
return true
2457+
} else if err.Error() == "name is required" {
2458+
return true
2459+
} else if errors.As(err, &tt) {
2460+
return true
2461+
} else {
2462+
return false
2463+
}
2464+
}
2465+
func CanIgnoreAPIError(err error) bool {
2466+
return err == nil || apierrors.IsNotFound(err) || apierrors.IsInvalid(err)
2467+
}
Lines changed: 87 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,87 @@
1+
/*
2+
Copyright 2022, 2203 The Multi-Cluster App Dispatcher Authors.
3+
4+
Licensed under the Apache License, Version 2.0 (the "License");
5+
you may not use this file except in compliance with the License.
6+
You may obtain a copy of the License at
7+
8+
http://www.apache.org/licenses/LICENSE-2.0
9+
10+
Unless required by applicable law or agreed to in writing, software
11+
distributed under the License is distributed on an "AS IS" BASIS,
12+
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13+
See the License for the specific language governing permissions and
14+
limitations under the License.
15+
*/
16+
17+
package queuejob_test
18+
19+
import (
20+
"encoding/json"
21+
"errors"
22+
"fmt"
23+
"testing"
24+
25+
apierrors "k8s.io/apimachinery/pkg/api/errors"
26+
"k8s.io/apimachinery/pkg/runtime/schema"
27+
28+
arbv1 "github.com/project-codeflare/multi-cluster-app-dispatcher/pkg/apis/controller/v1beta1"
29+
"github.com/project-codeflare/multi-cluster-app-dispatcher/pkg/controller/queuejob"
30+
"github.com/stretchr/testify/assert"
31+
)
32+
33+
// TestIsJsonSyntaxError function validates that the error is related to JSON parsing of
34+
// generic items
35+
func TestIsJsonSyntaxError(t *testing.T) {
36+
// Define the test table
37+
var tests = []struct {
38+
name string
39+
inputErr error
40+
expectedValue bool
41+
}{
42+
{"Nill error", nil, false},
43+
{"Job resource template item not define as a PodTemplate", errors.New("Job resource template item not define as a PodTemplate"), true},
44+
{"json.SyntaxError", new(json.SyntaxError), true},
45+
{"random errror", errors.New("some error"), false},
46+
{"wrapped JSON error", fmt.Errorf("wrapping syntax error, %w", new(json.SyntaxError)), true},
47+
}
48+
// Execute tests in parallel
49+
for _, tc := range tests {
50+
tc := tc // capture range variable
51+
t.Run(tc.name, func(t *testing.T) {
52+
t.Parallel()
53+
assert.Equal(t, tc.expectedValue, queuejob.IsJsonSyntaxError(tc.inputErr))
54+
})
55+
}
56+
}
57+
func TestCanIgnoreAPIError(t *testing.T) {
58+
59+
invalidErr := apierrors.NewInvalid(schema.GroupKind{Group: arbv1.GroupName, Kind: arbv1.AppWrapperPlural}, "a test app wrapper", nil)
60+
notFoundErr := apierrors.NewNotFound(arbv1.Resource("appwrappers"), "a test app wrapper")
61+
conflictErr := apierrors.NewConflict(arbv1.Resource("appwrappers"), "a test app wrapper", errors.New("an appwrapper update conflict"))
62+
63+
// Define the test table
64+
var tests = []struct {
65+
name string
66+
inputErr error
67+
expectedValue bool
68+
}{
69+
{"Nill error", nil, true},
70+
{"apierrors.IsInvalid", invalidErr, true},
71+
{"apierrors.IsNotFound", notFoundErr, true},
72+
{"apierrors.IsConflicted", conflictErr, false},
73+
{"generic error", errors.New("some error"), false},
74+
{"wrapped apierrors.IsInvalid", fmt.Errorf("wrapped invalid %w", invalidErr), true},
75+
{"wrapped apierrors.IsNotFound", fmt.Errorf("wrapped invalid %w", notFoundErr), true},
76+
{"wrapped apierrors.IsConflicted", fmt.Errorf("wrapped invalid %w", conflictErr), false},
77+
}
78+
// Execute tests in parallel
79+
for _, tc := range tests {
80+
tc := tc // capture range variable
81+
t.Run(tc.name, func(t *testing.T) {
82+
t.Parallel()
83+
assert.Equal(t, tc.expectedValue, queuejob.CanIgnoreAPIError(tc.inputErr))
84+
})
85+
}
86+
87+
}

test/e2e/queue.go

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -535,14 +535,14 @@ var _ = Describe("AppWrapper E2E Test", func() {
535535
defer cleanupTestObjectsPtr(context, appwrappersPtr)
536536

537537
aw := createGenericAWTimeoutWithStatus(context, "aw-test-jobtimeout-with-comp-1")
538+
appwrappers = append(appwrappers, aw)
538539
err1 := waitAWPodsReady(context, aw)
539540
Expect(err1).NotTo(HaveOccurred(), "Expecting pods to be ready for app wrapper: aw-test-jobtimeout-with-comp-1")
540-
time.Sleep(60 * time.Second)
541+
time.Sleep(90 * time.Second)
541542
aw1, err := context.karclient.ArbV1().AppWrappers(aw.Namespace).Get(aw.Name, metav1.GetOptions{})
542543
Expect(err).NotTo(HaveOccurred(), "Expecting no error when getting app wrapper status")
543544
fmt.Fprintf(GinkgoWriter, "[e2e] status of app wrapper: %v.\n", aw1.Status)
544545
Expect(aw1.Status.State).To(Equal(arbv1.AppWrapperStateFailed), "Expecting a failed state")
545-
appwrappers = append(appwrappers, aw)
546546
fmt.Fprintf(os.Stdout, "[e2e] MCAD app wrapper timeout Test - Completed.\n")
547547
})
548548

0 commit comments

Comments
 (0)