Skip to content

Commit 6627ea2

Browse files
authored
Merge pull request #1801 from k8s-infra-cherrypick-robot/cherry-pick-1790-to-release-0.11
🐛 [release-0.11] Fixed a bug in newGVKFixupWatcher which caused the metadata informer to hang
2 parents f236f03 + e11297b commit 6627ea2

File tree

3 files changed

+149
-34
lines changed

3 files changed

+149
-34
lines changed

pkg/cache/internal/informers_map.go

Lines changed: 24 additions & 34 deletions
Original file line numberDiff line numberDiff line change
@@ -409,41 +409,31 @@ func createMetadataListWatch(gvk schema.GroupVersionKind, ip *specificInformersM
409409
}, nil
410410
}
411411

412-
type gvkFixupWatcher struct {
413-
watcher watch.Interface
414-
ch chan watch.Event
415-
gvk schema.GroupVersionKind
416-
wg sync.WaitGroup
417-
}
418-
412+
// newGVKFixupWatcher adds a wrapper that preserves the GVK information when
413+
// events come in.
414+
//
415+
// This works around a bug where GVK information is not passed into mapping
416+
// functions when using the OnlyMetadata option in the builder.
417+
// This issue is most likely caused by kubernetes/kubernetes#80609.
418+
// See kubernetes-sigs/controller-runtime#1484.
419+
//
420+
// This was originally implemented as a cache.ResourceEventHandler wrapper but
421+
// that contained a data race which was resolved by setting the GVK in a watch
422+
// wrapper, before the objects are written to the cache.
423+
// See kubernetes-sigs/controller-runtime#1650.
424+
//
425+
// The original watch wrapper was found to be incompatible with
426+
// k8s.io/client-go/tools/cache.Reflector so it has been re-implemented as a
427+
// watch.Filter which is compatible.
428+
// See kubernetes-sigs/controller-runtime#1789.
419429
func newGVKFixupWatcher(gvk schema.GroupVersionKind, watcher watch.Interface) watch.Interface {
420-
ch := make(chan watch.Event)
421-
w := &gvkFixupWatcher{
422-
gvk: gvk,
423-
watcher: watcher,
424-
ch: ch,
425-
}
426-
w.wg.Add(1)
427-
go w.run()
428-
return w
429-
}
430-
431-
func (w *gvkFixupWatcher) run() {
432-
for e := range w.watcher.ResultChan() {
433-
e.Object.GetObjectKind().SetGroupVersionKind(w.gvk)
434-
w.ch <- e
435-
}
436-
w.wg.Done()
437-
}
438-
439-
func (w *gvkFixupWatcher) Stop() {
440-
w.watcher.Stop()
441-
w.wg.Wait()
442-
close(w.ch)
443-
}
444-
445-
func (w *gvkFixupWatcher) ResultChan() <-chan watch.Event {
446-
return w.ch
430+
return watch.Filter(
431+
watcher,
432+
func(in watch.Event) (watch.Event, bool) {
433+
in.Object.GetObjectKind().SetGroupVersionKind(gvk)
434+
return in, true
435+
},
436+
)
447437
}
448438

449439
// resyncPeriod returns a function which generates a duration each time it is
Lines changed: 94 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,94 @@
1+
/*
2+
Copyright 2022 The Kubernetes Authors.
3+
4+
Licensed under the Apache License, Version 2.0 (the "License");
5+
you may not use this file except in compliance with the License.
6+
You may obtain a copy of the License at
7+
8+
http://www.apache.org/licenses/LICENSE-2.0
9+
10+
Unless required by applicable law or agreed to in writing, software
11+
distributed under the License is distributed on an "AS IS" BASIS,
12+
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13+
See the License for the specific language governing permissions and
14+
limitations under the License.
15+
*/
16+
17+
package internal
18+
19+
import (
20+
"fmt"
21+
22+
. "github.com/onsi/ginkgo"
23+
. "github.com/onsi/gomega"
24+
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
25+
"k8s.io/apimachinery/pkg/runtime"
26+
"k8s.io/apimachinery/pkg/runtime/schema"
27+
"k8s.io/apimachinery/pkg/watch"
28+
)
29+
30+
// Test that gvkFixupWatcher behaves like watch.FakeWatcher
31+
// and that it overrides the GVK.
32+
// These tests are adapted from the watch.FakeWatcher tests in:
33+
// https://github.com/kubernetes/kubernetes/blob/adbda068c1808fcc8a64a94269e0766b5c46ec41/staging/src/k8s.io/apimachinery/pkg/watch/watch_test.go#L33-L78
34+
var _ = Describe("gvkFixupWatcher", func() {
35+
It("behaves like watch.FakeWatcher", func() {
36+
newTestType := func(name string) runtime.Object {
37+
return &metav1.PartialObjectMetadata{
38+
ObjectMeta: metav1.ObjectMeta{
39+
Name: name,
40+
},
41+
}
42+
}
43+
44+
f := watch.NewFake()
45+
// This is the GVK which we expect the wrapper to set on all the events
46+
expectedGVK := schema.GroupVersionKind{
47+
Group: "testgroup",
48+
Version: "v1test2",
49+
Kind: "TestKind",
50+
}
51+
gvkfw := newGVKFixupWatcher(expectedGVK, f)
52+
53+
table := []struct {
54+
t watch.EventType
55+
s runtime.Object
56+
}{
57+
{watch.Added, newTestType("foo")},
58+
{watch.Modified, newTestType("qux")},
59+
{watch.Modified, newTestType("bar")},
60+
{watch.Deleted, newTestType("bar")},
61+
{watch.Error, newTestType("error: blah")},
62+
}
63+
64+
consumer := func(w watch.Interface) {
65+
for _, expect := range table {
66+
By(fmt.Sprintf("Fixing up watch.EventType: %v and passing it on", expect.t))
67+
got, ok := <-w.ResultChan()
68+
Expect(ok).To(BeTrue(), "closed early")
69+
Expect(expect.t).To(Equal(got.Type), "unexpected Event.Type or out-of-order Event")
70+
Expect(got.Object).To(BeAssignableToTypeOf(&metav1.PartialObjectMetadata{}), "unexpected Event.Object type")
71+
a := got.Object.(*metav1.PartialObjectMetadata)
72+
Expect(got.Object.GetObjectKind().GroupVersionKind()).To(Equal(expectedGVK), "GVK was not fixed up")
73+
expected := expect.s.DeepCopyObject()
74+
expected.GetObjectKind().SetGroupVersionKind(schema.GroupVersionKind{})
75+
actual := a.DeepCopyObject()
76+
actual.GetObjectKind().SetGroupVersionKind(schema.GroupVersionKind{})
77+
Expect(actual).To(Equal(expected), "unexpected change to the Object")
78+
}
79+
Eventually(w.ResultChan()).Should(BeClosed())
80+
}
81+
82+
sender := func() {
83+
f.Add(newTestType("foo"))
84+
f.Action(watch.Modified, newTestType("qux"))
85+
f.Modify(newTestType("bar"))
86+
f.Delete(newTestType("bar"))
87+
f.Error(newTestType("error: blah"))
88+
f.Stop()
89+
}
90+
91+
go sender()
92+
consumer(gvkfw)
93+
})
94+
})
Lines changed: 31 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,31 @@
1+
/*
2+
Copyright 2022 The Kubernetes Authors.
3+
4+
Licensed under the Apache License, Version 2.0 (the "License");
5+
you may not use this file except in compliance with the License.
6+
You may obtain a copy of the License at
7+
8+
http://www.apache.org/licenses/LICENSE-2.0
9+
10+
Unless required by applicable law or agreed to in writing, software
11+
distributed under the License is distributed on an "AS IS" BASIS,
12+
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13+
See the License for the specific language governing permissions and
14+
limitations under the License.
15+
*/
16+
17+
package internal
18+
19+
import (
20+
"testing"
21+
22+
. "github.com/onsi/ginkgo"
23+
. "github.com/onsi/gomega"
24+
"sigs.k8s.io/controller-runtime/pkg/envtest/printer"
25+
)
26+
27+
func TestSource(t *testing.T) {
28+
RegisterFailHandler(Fail)
29+
suiteName := "Cache Internal Suite"
30+
RunSpecsWithDefaultAndCustomReporters(t, suiteName, []Reporter{printer.NewlineReporter{}, printer.NewProwReporter(suiteName)})
31+
}

0 commit comments

Comments
 (0)