Skip to content

Commit fb739b0

Browse files
wallrjk8s-infra-cherrypick-robot
authored andcommitted
Re-implement gvkFixupWatcher as a watch.FilterFunc
Signed-off-by: Richard Wall <richard.wall@jetstack.io>
1 parent b32c137 commit fb739b0

File tree

1 file changed

+26
-34
lines changed

1 file changed

+26
-34
lines changed

pkg/cache/internal/informers_map.go

Lines changed: 26 additions & 34 deletions
Original file line numberDiff line numberDiff line change
@@ -409,41 +409,33 @@ func createMetadataListWatch(gvk schema.GroupVersionKind, ip *specificInformersM
409409
}, nil
410410
}
411411

412-
type gvkFixupWatcher struct {
413-
watcher watch.Interface
414-
ch chan watch.Event
415-
gvk schema.GroupVersionKind
416-
wg sync.WaitGroup
417-
}
418-
412+
// newGVKFixupWatcher adds a wrapper that preserves the GVK information when
413+
// events come in.
414+
//
415+
// This works around a bug where GVK information is not passed into mapping
416+
// functions when using the OnlyMetadata option in the builder.
417+
// This issue is most likely caused by kubernetes/kubernetes#80609.
418+
// See kubernetes-sigs/controller-runtime#1484.
419+
//
420+
// This was originally implemented as a cache.ResourceEventHandler wrapper but
421+
// that contained a data race which was resolved by setting the GVK in a watch
422+
// wrapper, before the objects are written to the cache.
423+
// See kubernetes-sigs/controller-runtime#1650.
424+
//
425+
// The original watch wrapper was found to be incompatible with
426+
// k8s.io/client-go/tools/cache.Reflector so it has been re-implemented as a
427+
// watch.Filter which is compatible.
428+
// See kubernetes-sigs/controller-runtime#1789.
419429
func newGVKFixupWatcher(gvk schema.GroupVersionKind, watcher watch.Interface) watch.Interface {
420-
ch := make(chan watch.Event)
421-
w := &gvkFixupWatcher{
422-
gvk: gvk,
423-
watcher: watcher,
424-
ch: ch,
425-
}
426-
w.wg.Add(1)
427-
go w.run()
428-
return w
429-
}
430-
431-
func (w *gvkFixupWatcher) run() {
432-
for e := range w.watcher.ResultChan() {
433-
e.Object.GetObjectKind().SetGroupVersionKind(w.gvk)
434-
w.ch <- e
435-
}
436-
w.wg.Done()
437-
}
438-
439-
func (w *gvkFixupWatcher) Stop() {
440-
w.watcher.Stop()
441-
w.wg.Wait()
442-
close(w.ch)
443-
}
444-
445-
func (w *gvkFixupWatcher) ResultChan() <-chan watch.Event {
446-
return w.ch
430+
return watch.Filter(
431+
watcher,
432+
func(in watch.Event) (out watch.Event, keep bool) {
433+
keep = true
434+
in.DeepCopyInto(&out)
435+
out.Object.GetObjectKind().SetGroupVersionKind(gvk)
436+
return out, keep
437+
},
438+
)
447439
}
448440

449441
// resyncPeriod returns a function which generates a duration each time it is

0 commit comments

Comments
 (0)