Skip to content

Commit 78ce10e

Browse files
authored
Merge pull request #1682 from k8s-infra-cherrypick-robot/cherry-pick-1678-to-release-0.10
🌱 Source should retry to get informers until timeout expires
2 parents a4c56b0 + de32618 commit 78ce10e

File tree

2 files changed

+31
-8
lines changed

2 files changed

+31
-8
lines changed

pkg/source/source.go

Lines changed: 26 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -21,8 +21,10 @@ import (
2121
"errors"
2222
"fmt"
2323
"sync"
24+
"time"
2425

2526
"k8s.io/apimachinery/pkg/api/meta"
27+
"k8s.io/apimachinery/pkg/util/wait"
2628
"k8s.io/client-go/util/workqueue"
2729
"sigs.k8s.io/controller-runtime/pkg/client"
2830
"sigs.k8s.io/controller-runtime/pkg/event"
@@ -119,17 +121,34 @@ func (ks *Kind) Start(ctx context.Context, handler handler.EventHandler, queue w
119121
ctx, ks.startCancel = context.WithCancel(ctx)
120122
ks.started = make(chan error)
121123
go func() {
122-
// Lookup the Informer from the Cache and add an EventHandler which populates the Queue
123-
i, err := ks.cache.GetInformer(ctx, ks.Type)
124-
if err != nil {
125-
kindMatchErr := &meta.NoKindMatchError{}
126-
if errors.As(err, &kindMatchErr) {
127-
log.Error(err, "if kind is a CRD, it should be installed before calling Start",
128-
"kind", kindMatchErr.GroupKind)
124+
var (
125+
i cache.Informer
126+
lastErr error
127+
)
128+
129+
// Tries to get an informer until it returns true,
130+
// an error or the specified context is cancelled or expired.
131+
if err := wait.PollImmediateUntilWithContext(ctx, 10*time.Second, func(ctx context.Context) (bool, error) {
132+
// Lookup the Informer from the Cache and add an EventHandler which populates the Queue
133+
i, lastErr = ks.cache.GetInformer(ctx, ks.Type)
134+
if lastErr != nil {
135+
kindMatchErr := &meta.NoKindMatchError{}
136+
if errors.As(lastErr, &kindMatchErr) {
137+
log.Error(lastErr, "if kind is a CRD, it should be installed before calling Start",
138+
"kind", kindMatchErr.GroupKind)
139+
}
140+
return false, nil // Retry.
141+
}
142+
return true, nil
143+
}); err != nil {
144+
if lastErr != nil {
145+
ks.started <- fmt.Errorf("failed to get informer from cache: %w", lastErr)
146+
return
129147
}
130148
ks.started <- err
131149
return
132150
}
151+
133152
i.AddEventHandler(internal.EventHandler{Queue: queue, EventHandler: handler, Predicates: prct})
134153
if !ks.cache.WaitForCacheSync(ctx) {
135154
// Would be great to return something more informative here

pkg/source/source_test.go

Lines changed: 5 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -19,6 +19,7 @@ package source_test
1919
import (
2020
"context"
2121
"fmt"
22+
"time"
2223

2324
. "github.com/onsi/ginkgo"
2425
. "github.com/onsi/gomega"
@@ -218,13 +219,16 @@ var _ = Describe("Source", func() {
218219
ic.Error = fmt.Errorf("test error")
219220
q := workqueue.NewNamedRateLimitingQueue(workqueue.DefaultControllerRateLimiter(), "test")
220221

222+
ctx, cancel := context.WithTimeout(ctx, 2*time.Second)
223+
defer cancel()
224+
221225
instance := &source.Kind{
222226
Type: &corev1.Pod{},
223227
}
224228
Expect(instance.InjectCache(ic)).To(Succeed())
225229
err := instance.Start(ctx, handler.Funcs{}, q)
226230
Expect(err).NotTo(HaveOccurred())
227-
Expect(instance.WaitForSync(context.Background())).To(HaveOccurred())
231+
Eventually(instance.WaitForSync(context.Background())).Should(HaveOccurred())
228232
})
229233
})
230234
})

0 commit comments

Comments
 (0)