Skip to content

Commit da1fcfe

Browse files
xrstfturkenh
authored andcommitted
UPSTREAM: <carry>: kcp modifications
Signed-off-by: Dr. Stefan Schimanski <stefan.schimanski@gmail.com>
1 parent 0f7927c commit da1fcfe

20 files changed

+605
-35
lines changed

.prow.yaml

Lines changed: 13 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,13 @@
1+
presubmits:
2+
- name: pull-controller-runtime-everything
3+
always_run: true
4+
decorate: true
5+
clone_uri: "ssh://git@github.com/kcp-dev/controller-runtime.git"
6+
labels:
7+
preset-goproxy: "true"
8+
spec:
9+
containers:
10+
- image: ghcr.io/kcp-dev/infra/build:1.22.2-1
11+
command:
12+
- make
13+
- test

DOWNSTREAM_OWNERS

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,5 @@
1+
approvers:
2+
- sttts
3+
- xrstf
4+
- mjudeikis
5+
- embik

DOWNSTREAM_OWNERS_ALIASES

Whitespace-only changes.

go.mod

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -31,6 +31,11 @@ require (
3131
sigs.k8s.io/yaml v1.4.0
3232
)
3333

34+
require (
35+
github.com/kcp-dev/apimachinery/v2 v2.0.0
36+
github.com/kcp-dev/logicalcluster/v3 v3.0.5
37+
)
38+
3439
require (
3540
cel.dev/expr v0.18.0 // indirect
3641
github.com/antlr4-go/antlr/v4 v4.13.0 // indirect

go.sum

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -75,6 +75,10 @@ github.com/josharian/intern v1.0.0 h1:vlS4z54oSdjm0bgjRigI+G1HpF+tI+9rE5LLzOg8Hm
7575
github.com/josharian/intern v1.0.0/go.mod h1:5DoeVV0s6jJacbCEi61lwdGj/aVlrQvzHFFd8Hwg//Y=
7676
github.com/json-iterator/go v1.1.12 h1:PV8peI4a0ysnczrg+LtxykD8LfKY9ML6u2jnxaEnrnM=
7777
github.com/json-iterator/go v1.1.12/go.mod h1:e30LSqwooZae/UwlEbR2852Gd8hjQvJoHmT4TnhNGBo=
78+
github.com/kcp-dev/apimachinery/v2 v2.0.0 h1:hQuhBBh+AvUYYMRG+nDzo1VXxNCdMAE95wSD2uB7nxw=
79+
github.com/kcp-dev/apimachinery/v2 v2.0.0/go.mod h1:cXCx7fku8/rYK23PNEBRLQ5ByoABoA+CZeJNC81TO0g=
80+
github.com/kcp-dev/logicalcluster/v3 v3.0.5 h1:JbYakokb+5Uinz09oTXomSUJVQsqfxEvU4RyHUYxHOU=
81+
github.com/kcp-dev/logicalcluster/v3 v3.0.5/go.mod h1:EWBUBxdr49fUB1cLMO4nOdBWmYifLbP1LfoL20KkXYY=
7882
github.com/kisielk/errcheck v1.5.0/go.mod h1:pFxgyoBC7bSaBwPgfKdkLd5X25qrDl4LWUI2bnpBCr8=
7983
github.com/kisielk/gotool v1.0.0/go.mod h1:XhKaO+MFFWcvkIS/tQcRk01m1F5IRFswLeQ+oQHNcck=
8084
github.com/kr/pretty v0.2.1/go.mod h1:ipq/a2n7PKx3OHsz4KJII5eveXtPO4qwEXGdVfWzfnI=

pkg/cache/cache.go

Lines changed: 10 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -23,6 +23,7 @@ import (
2323
"net/http"
2424
"slices"
2525
"sort"
26+
"strings"
2627
"time"
2728

2829
corev1 "k8s.io/api/core/v1"
@@ -170,6 +171,10 @@ type Options struct {
170171
// instead of `reconcile.Result{}`.
171172
SyncPeriod *time.Duration
172173

174+
// Indexers is the indexers that the informers will be configured to use.
175+
// Will always have the standard NamespaceIndex.
176+
Indexers toolscache.Indexers
177+
173178
// ReaderFailOnMissingInformer configures the cache to return a ErrResourceNotCached error when a user
174179
// requests, using Get() and List(), a resource the cache does not already have an informer for.
175180
//
@@ -436,6 +441,7 @@ func newCache(restConfig *rest.Config, opts Options) newCacheFunc {
436441
NewInformer: opts.NewInformer,
437442
}),
438443
readerFailOnMissingInformer: opts.ReaderFailOnMissingInformer,
444+
clusterIndexes: strings.HasSuffix(restConfig.Host, "/clusters/*"),
439445
}
440446
}
441447
}
@@ -546,6 +552,10 @@ func defaultOpts(config *rest.Config, opts Options) (Options, error) {
546552
if opts.SyncPeriod == nil {
547553
opts.SyncPeriod = &defaultSyncPeriod
548554
}
555+
556+
if opts.NewInformer == nil {
557+
opts.NewInformer = toolscache.NewSharedIndexInformer
558+
}
549559
return opts, nil
550560
}
551561

pkg/cache/defaulting_test.go

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -426,6 +426,9 @@ func TestDefaultOpts(t *testing.T) {
426426
t.Fatal(err)
427427
}
428428

429+
// We cannot reference kcp.NewInformerWithClusterIndexes due to import cycle.
430+
defaulted.NewInformerFunc = nil
431+
429432
if diff := tc.verification(defaulted); diff != "" {
430433
t.Errorf("expected config differs from actual: %s", diff)
431434
}

pkg/cache/informer_cache.go

Lines changed: 15 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -31,6 +31,8 @@ import (
3131
"sigs.k8s.io/controller-runtime/pkg/cache/internal"
3232
"sigs.k8s.io/controller-runtime/pkg/client"
3333
"sigs.k8s.io/controller-runtime/pkg/client/apiutil"
34+
35+
"github.com/kcp-dev/logicalcluster/v3"
3436
)
3537

3638
var (
@@ -68,6 +70,7 @@ type informerCache struct {
6870
scheme *runtime.Scheme
6971
*internal.Informers
7072
readerFailOnMissingInformer bool
73+
clusterIndexes bool
7174
}
7275

7376
// Get implements Reader.
@@ -217,10 +220,10 @@ func (ic *informerCache) IndexField(ctx context.Context, obj client.Object, fiel
217220
if err != nil {
218221
return err
219222
}
220-
return indexByField(informer, field, extractValue)
223+
return indexByField(informer, field, extractValue, ic.clusterIndexes)
221224
}
222225

223-
func indexByField(informer Informer, field string, extractValue client.IndexerFunc) error {
226+
func indexByField(informer Informer, field string, extractValue client.IndexerFunc, clusterIndexes bool) error {
224227
indexFunc := func(objRaw interface{}) ([]string, error) {
225228
// TODO(directxman12): check if this is the correct type?
226229
obj, isObj := objRaw.(client.Object)
@@ -233,6 +236,13 @@ func indexByField(informer Informer, field string, extractValue client.IndexerFu
233236
}
234237
ns := meta.GetNamespace()
235238

239+
keyFunc := internal.KeyToNamespacedKey
240+
if clusterName := logicalcluster.From(obj); clusterIndexes && !clusterName.Empty() {
241+
keyFunc = func(ns, val string) string {
242+
return internal.KeyToClusteredKey(clusterName.String(), ns, val)
243+
}
244+
}
245+
236246
rawVals := extractValue(obj)
237247
var vals []string
238248
if ns == "" {
@@ -242,14 +252,15 @@ func indexByField(informer Informer, field string, extractValue client.IndexerFu
242252
// if we need to add non-namespaced versions too, double the length
243253
vals = make([]string, len(rawVals)*2)
244254
}
255+
245256
for i, rawVal := range rawVals {
246257
// save a namespaced variant, so that we can ask
247258
// "what are all the object matching a given index *in a given namespace*"
248-
vals[i] = internal.KeyToNamespacedKey(ns, rawVal)
259+
vals[i] = keyFunc(ns, rawVal)
249260
if ns != "" {
250261
// if we have a namespace, also inject a special index key for listing
251262
// regardless of the object namespace
252-
vals[i+len(rawVals)] = internal.KeyToNamespacedKey("", rawVal)
263+
vals[i+len(rawVals)] = keyFunc("", rawVal)
253264
}
254265
}
255266

pkg/cache/internal/cache_reader.go

Lines changed: 44 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -21,6 +21,9 @@ import (
2121
"fmt"
2222
"reflect"
2323

24+
kcpcache "github.com/kcp-dev/apimachinery/v2/pkg/cache"
25+
"github.com/kcp-dev/logicalcluster/v3"
26+
2427
apierrors "k8s.io/apimachinery/pkg/api/errors"
2528
apimeta "k8s.io/apimachinery/pkg/api/meta"
2629
"k8s.io/apimachinery/pkg/fields"
@@ -31,6 +34,7 @@ import (
3134

3235
"sigs.k8s.io/controller-runtime/pkg/client"
3336
"sigs.k8s.io/controller-runtime/pkg/internal/field/selector"
37+
"sigs.k8s.io/controller-runtime/pkg/kontext"
3438
)
3539

3640
// CacheReader is a client.Reader.
@@ -54,12 +58,22 @@ type CacheReader struct {
5458
}
5559

5660
// Get checks the indexer for the object and writes a copy of it if found.
57-
func (c *CacheReader) Get(_ context.Context, key client.ObjectKey, out client.Object, _ ...client.GetOption) error {
61+
func (c *CacheReader) Get(ctx context.Context, key client.ObjectKey, out client.Object, _ ...client.GetOption) error {
5862
if c.scopeName == apimeta.RESTScopeNameRoot {
5963
key.Namespace = ""
6064
}
6165
storeKey := objectKeyToStoreKey(key)
6266

67+
// create cluster-aware key for KCP
68+
_, isClusterAware := c.indexer.GetIndexers()[kcpcache.ClusterAndNamespaceIndexName]
69+
clusterName, _ := kontext.ClusterFrom(ctx)
70+
if isClusterAware && clusterName.Empty() {
71+
return fmt.Errorf("cluster-aware cache requires a cluster in context")
72+
}
73+
if isClusterAware {
74+
storeKey = clusterName.String() + "|" + storeKey
75+
}
76+
6377
// Lookup the object from the indexer cache
6478
obj, exists, err := c.indexer.GetByKey(storeKey)
6579
if err != nil {
@@ -105,7 +119,7 @@ func (c *CacheReader) Get(_ context.Context, key client.ObjectKey, out client.Ob
105119
}
106120

107121
// List lists items out of the indexer and writes them to out.
108-
func (c *CacheReader) List(_ context.Context, out client.ObjectList, opts ...client.ListOption) error {
122+
func (c *CacheReader) List(ctx context.Context, out client.ObjectList, opts ...client.ListOption) error {
109123
var objs []interface{}
110124
var err error
111125

@@ -116,6 +130,9 @@ func (c *CacheReader) List(_ context.Context, out client.ObjectList, opts ...cli
116130
return fmt.Errorf("continue list option is not supported by the cache")
117131
}
118132

133+
_, isClusterAware := c.indexer.GetIndexers()[kcpcache.ClusterAndNamespaceIndexName]
134+
clusterName, _ := kontext.ClusterFrom(ctx)
135+
119136
switch {
120137
case listOpts.FieldSelector != nil:
121138
requiresExact := selector.RequiresExactMatch(listOpts.FieldSelector)
@@ -125,11 +142,19 @@ func (c *CacheReader) List(_ context.Context, out client.ObjectList, opts ...cli
125142
// list all objects by the field selector. If this is namespaced and we have one, ask for the
126143
// namespaced index key. Otherwise, ask for the non-namespaced variant by using the fake "all namespaces"
127144
// namespace.
128-
objs, err = byIndexes(c.indexer, listOpts.FieldSelector.Requirements(), listOpts.Namespace)
145+
objs, err = byIndexes(c.indexer, listOpts.FieldSelector.Requirements(), clusterName, listOpts.Namespace)
129146
case listOpts.Namespace != "":
130-
objs, err = c.indexer.ByIndex(cache.NamespaceIndex, listOpts.Namespace)
147+
if isClusterAware && !clusterName.Empty() {
148+
objs, err = c.indexer.ByIndex(kcpcache.ClusterAndNamespaceIndexName, kcpcache.ClusterAndNamespaceIndexKey(clusterName, listOpts.Namespace))
149+
} else {
150+
objs, err = c.indexer.ByIndex(cache.NamespaceIndex, listOpts.Namespace)
151+
}
131152
default:
132-
objs = c.indexer.List()
153+
if isClusterAware && !clusterName.Empty() {
154+
objs, err = c.indexer.ByIndex(kcpcache.ClusterIndexName, kcpcache.ClusterIndexKey(clusterName))
155+
} else {
156+
objs = c.indexer.List()
157+
}
133158
}
134159
if err != nil {
135160
return err
@@ -177,16 +202,22 @@ func (c *CacheReader) List(_ context.Context, out client.ObjectList, opts ...cli
177202
return apimeta.SetList(out, runtimeObjs)
178203
}
179204

180-
func byIndexes(indexer cache.Indexer, requires fields.Requirements, namespace string) ([]interface{}, error) {
205+
func byIndexes(indexer cache.Indexer, requires fields.Requirements, clusterName logicalcluster.Name, namespace string) ([]interface{}, error) {
181206
var (
182207
err error
183208
objs []interface{}
184209
vals []string
185210
)
186211
indexers := indexer.GetIndexers()
212+
_, isClusterAware := indexers[kcpcache.ClusterAndNamespaceIndexName]
187213
for idx, req := range requires {
188214
indexName := FieldIndexName(req.Field)
189-
indexedValue := KeyToNamespacedKey(namespace, req.Value)
215+
var indexedValue string
216+
if isClusterAware {
217+
indexedValue = KeyToClusteredKey(clusterName.String(), namespace, req.Value)
218+
} else {
219+
indexedValue = KeyToNamespacedKey(namespace, req.Value)
220+
}
190221
if idx == 0 {
191222
// we use first require to get snapshot data
192223
// TODO(halfcrazy): use complicated index when client-go provides byIndexes
@@ -253,3 +284,9 @@ func KeyToNamespacedKey(ns string, baseKey string) string {
253284
}
254285
return allNamespacesNamespace + "/" + baseKey
255286
}
287+
288+
// KeyToClusteredKey prefixes the given index key with a cluster name
289+
// for use in field selector indexes.
290+
func KeyToClusteredKey(clusterName string, ns string, baseKey string) string {
291+
return clusterName + "|" + KeyToNamespacedKey(ns, baseKey)
292+
}

0 commit comments

Comments
 (0)