From 769ed776c4250c0b17de3b4cb0692f6f8efe5b49 Mon Sep 17 00:00:00 2001 From: Christoph Mewes Date: Mon, 9 Oct 2023 11:29:26 +0200 Subject: [PATCH] Rebase kcp modifications onto v0.16.2 --- .prow.yaml | 13 ++ DOWNSTREAM_OWNERS | 8 + DOWNSTREAM_OWNERS_ALIASES | 0 Makefile | 9 +- examples/scratch-env/go.sum | 4 + go.mod | 2 + go.sum | 4 + pkg/cache/cache.go | 18 +- pkg/cache/cache_test.go | 7 +- pkg/cache/informer_cache.go | 14 +- pkg/cache/internal/cache_reader.go | 42 ++++- pkg/cache/internal/informers.go | 10 +- pkg/client/client.go | 8 + pkg/client/interfaces.go | 7 + pkg/cluster/cluster.go | 11 +- pkg/controller/controllertest/util.go | 8 + pkg/handler/enqueue.go | 39 ++--- pkg/handler/enqueue_owner.go | 11 +- pkg/kcp/wrappers.go | 243 ++++++++++++++++++++++++++ pkg/kontext/kontext.go | 24 +++ pkg/manager/manager.go | 6 + pkg/reconcile/reconcile.go | 4 + 22 files changed, 444 insertions(+), 48 deletions(-) create mode 100644 .prow.yaml create mode 100644 DOWNSTREAM_OWNERS create mode 100644 DOWNSTREAM_OWNERS_ALIASES create mode 100644 pkg/kcp/wrappers.go create mode 100644 pkg/kontext/kontext.go diff --git a/.prow.yaml b/.prow.yaml new file mode 100644 index 0000000000..ab314e9f19 --- /dev/null +++ b/.prow.yaml @@ -0,0 +1,13 @@ +presubmits: + - name: pull-controller-runtime-everything + always_run: true + decorate: true + clone_uri: "ssh://git@github.com/kcp-dev/controller-runtime.git" + labels: + preset-goproxy: "true" + spec: + containers: + - image: ghcr.io/kcp-dev/infra/build:1.20.9-1 + command: + - make + - test diff --git a/DOWNSTREAM_OWNERS b/DOWNSTREAM_OWNERS new file mode 100644 index 0000000000..bba929233a --- /dev/null +++ b/DOWNSTREAM_OWNERS @@ -0,0 +1,8 @@ +approvers: + - davidfestal + - ncdc + - stevekuznetsov + - sttts +reviewers: + - fabianvf + - varshaprasad96 diff --git a/DOWNSTREAM_OWNERS_ALIASES b/DOWNSTREAM_OWNERS_ALIASES new file mode 100644 index 0000000000..e69de29bb2 diff --git a/Makefile b/Makefile index 007889c5a5..ae1709f82e 100644 --- a/Makefile +++ b/Makefile @@ -73,10 +73,11 @@ $(GO_APIDIFF): $(TOOLS_DIR)/go.mod # Build go-apidiff from tools folder. $(CONTROLLER_GEN): $(TOOLS_DIR)/go.mod # Build controller-gen from tools folder. cd $(TOOLS_DIR) && go build -tags=tools -o bin/controller-gen sigs.k8s.io/controller-tools/cmd/controller-gen -$(GOLANGCI_LINT): .github/workflows/golangci-lint.yml # Download golanci-lint using hack script into tools folder. - hack/ensure-golangci-lint.sh \ - -b $(TOOLS_BIN_DIR) \ - $(shell cat .github/workflows/golangci-lint.yml | grep "version: v" | sed 's/.*version: //') +$(GOLANGCI_LINT): .github/workflows/golangci-lint.yml # Download golangci-lint using hack script into tools folder. + GOBIN=$(abspath $(TOOLS_BIN_DIR)) go install github.com/golangci/golangci-lint/cmd/golangci-lint@$(shell cat .github/workflows/golangci-lint.yml | grep "version: v" | sed 's/.*version: //') + +.PHONY: tools +tools: $(GO_APIDIFF) $(CONTROLLER_GEN) $(GOLANGCI_LINT) ## -------------------------------------- ## Linting diff --git a/examples/scratch-env/go.sum b/examples/scratch-env/go.sum index 207d43fa7b..13629b20f3 100644 --- a/examples/scratch-env/go.sum +++ b/examples/scratch-env/go.sum @@ -936,6 +936,10 @@ github.com/julienschmidt/httprouter v1.3.0/go.mod h1:JR6WtHb+2LUe8TCKY3cZOxFyyO8 github.com/jung-kurt/gofpdf v1.0.0/go.mod h1:7Id9E/uU8ce6rXgefFLlgrJj/GYY22cpxn+r32jIOes= github.com/jung-kurt/gofpdf v1.0.3-0.20190309125859-24315acbbda5/go.mod h1:7Id9E/uU8ce6rXgefFLlgrJj/GYY22cpxn+r32jIOes= github.com/kballard/go-shellquote v0.0.0-20180428030007-95032a82bc51/go.mod h1:CzGEWj7cYgsdH8dAjBGEr58BoE7ScuLd+fwFZ44+/x8= +github.com/kcp-dev/apimachinery/v2 v2.0.0-alpha.0.0.20230926071920-57d168bcbe34 h1:tom0JX5OmAeOOmkGv8LaYHDtA1xAKDiQL5U0vhYYgdM= +github.com/kcp-dev/apimachinery/v2 v2.0.0-alpha.0.0.20230926071920-57d168bcbe34/go.mod h1:cWoaYGHl1nlzdEM2xvMzIASkEZJZLSf5nhe17M7wDhw= +github.com/kcp-dev/logicalcluster/v3 v3.0.4 h1:q7KngML/QM7sWl8aVzmfZF0TPMnBwYNxsPKfwUvvBvU= +github.com/kcp-dev/logicalcluster/v3 v3.0.4/go.mod h1:EWBUBxdr49fUB1cLMO4nOdBWmYifLbP1LfoL20KkXYY= github.com/kisielk/errcheck v1.1.0/go.mod h1:EZBBE59ingxPouuu3KfxchcWSUPOHkagtvWXihfKN4Q= github.com/kisielk/errcheck v1.5.0/go.mod h1:pFxgyoBC7bSaBwPgfKdkLd5X25qrDl4LWUI2bnpBCr8= github.com/kisielk/gotool v1.0.0/go.mod h1:XhKaO+MFFWcvkIS/tQcRk01m1F5IRFswLeQ+oQHNcck= diff --git a/go.mod b/go.mod index d3b267547c..282bb018e6 100644 --- a/go.mod +++ b/go.mod @@ -10,6 +10,8 @@ require ( github.com/go-logr/zapr v1.2.4 github.com/google/go-cmp v0.5.9 github.com/google/gofuzz v1.2.0 + github.com/kcp-dev/apimachinery/v2 v2.0.0-alpha.0.0.20230926071920-57d168bcbe34 + github.com/kcp-dev/logicalcluster/v3 v3.0.4 github.com/onsi/ginkgo/v2 v2.11.0 github.com/onsi/gomega v1.27.10 github.com/prometheus/client_golang v1.16.0 diff --git a/go.sum b/go.sum index 46fc3a52cd..366cc4556f 100644 --- a/go.sum +++ b/go.sum @@ -214,6 +214,10 @@ github.com/json-iterator/go v1.1.12 h1:PV8peI4a0ysnczrg+LtxykD8LfKY9ML6u2jnxaEnr github.com/json-iterator/go v1.1.12/go.mod h1:e30LSqwooZae/UwlEbR2852Gd8hjQvJoHmT4TnhNGBo= github.com/jstemmer/go-junit-report v0.0.0-20190106144839-af01ea7f8024/go.mod h1:6v2b51hI/fHJwM22ozAgKL4VKDeJcHhJFhtBdhmNjmU= github.com/jstemmer/go-junit-report v0.9.1/go.mod h1:Brl9GWCQeLvo8nXZwPNNblvFj/XSXhF0NWZEnDohbsk= +github.com/kcp-dev/apimachinery/v2 v2.0.0-alpha.0.0.20230926071920-57d168bcbe34 h1:tom0JX5OmAeOOmkGv8LaYHDtA1xAKDiQL5U0vhYYgdM= +github.com/kcp-dev/apimachinery/v2 v2.0.0-alpha.0.0.20230926071920-57d168bcbe34/go.mod h1:cWoaYGHl1nlzdEM2xvMzIASkEZJZLSf5nhe17M7wDhw= +github.com/kcp-dev/logicalcluster/v3 v3.0.4 h1:q7KngML/QM7sWl8aVzmfZF0TPMnBwYNxsPKfwUvvBvU= +github.com/kcp-dev/logicalcluster/v3 v3.0.4/go.mod h1:EWBUBxdr49fUB1cLMO4nOdBWmYifLbP1LfoL20KkXYY= github.com/kisielk/errcheck v1.5.0/go.mod h1:pFxgyoBC7bSaBwPgfKdkLd5X25qrDl4LWUI2bnpBCr8= github.com/kisielk/gotool v1.0.0/go.mod h1:XhKaO+MFFWcvkIS/tQcRk01m1F5IRFswLeQ+oQHNcck= github.com/kr/pretty v0.1.0/go.mod h1:dAy3ld7l9f0ibDNOQOHHMYYIIbhfbHSm3C4ZsoJORNo= diff --git a/pkg/cache/cache.go b/pkg/cache/cache.go index d8446e85b3..055c62e451 100644 --- a/pkg/cache/cache.go +++ b/pkg/cache/cache.go @@ -22,6 +22,7 @@ import ( "net/http" "time" + "github.com/kcp-dev/apimachinery/v2/third_party/informers" corev1 "k8s.io/api/core/v1" "k8s.io/apimachinery/pkg/api/meta" "k8s.io/apimachinery/pkg/fields" @@ -159,6 +160,14 @@ type Options struct { // instead of `reconcile.Result{}`. SyncPeriod *time.Duration + // NewInformerFunc is a function that is used to create SharedIndexInformers. + // Defaults to cache.NewSharedIndexInformer from client-go + NewInformerFunc client.NewInformerFunc + + // Indexers is the indexers that the informers will be configured to use. + // Will always have the standard NamespaceIndex. + Indexers toolscache.Indexers + // ReaderFailOnMissingInformer configures the cache to return a ErrResourceNotCached error when a user // requests, using Get() and List(), a resource the cache does not already have an informer for. // @@ -201,9 +210,6 @@ type Options struct { // ByObject restricts the cache's ListWatch to the desired fields per GVK at the specified object. // object, this will fall through to Default* settings. ByObject map[client.Object]ByObject - - // newInformer allows overriding of NewSharedIndexInformer for testing. - newInformer *func(toolscache.ListerWatcher, runtime.Object, time.Duration, toolscache.Indexers) toolscache.SharedIndexInformer } // ByObject offers more fine-grained control over the cache's ListWatch by object. @@ -354,7 +360,7 @@ func newCache(restConfig *rest.Config, opts Options) newCacheFunc { }, Transform: config.Transform, UnsafeDisableDeepCopy: pointer.BoolDeref(config.UnsafeDisableDeepCopy, false), - NewInformer: opts.newInformer, + NewInformer: opts.NewInformerFunc, }), readerFailOnMissingInformer: opts.ReaderFailOnMissingInformer, } @@ -438,6 +444,10 @@ func defaultOpts(config *rest.Config, opts Options) (Options, error) { if opts.SyncPeriod == nil { opts.SyncPeriod = &defaultSyncPeriod } + + if opts.NewInformerFunc == nil { + opts.NewInformerFunc = informers.NewSharedIndexInformer + } return opts, nil } diff --git a/pkg/cache/cache_test.go b/pkg/cache/cache_test.go index bff0c87083..8779764f96 100644 --- a/pkg/cache/cache_test.go +++ b/pkg/cache/cache_test.go @@ -26,6 +26,7 @@ import ( "strings" "time" + kcpcache "github.com/kcp-dev/apimachinery/v2/pkg/cache" . "github.com/onsi/ginkgo/v2" . "github.com/onsi/gomega" corev1 "k8s.io/api/core/v1" @@ -545,13 +546,13 @@ func NonBlockingGetTest(createCacheFunc func(config *rest.Config, opts cache.Opt By("creating the informer cache") v := reflect.ValueOf(&opts).Elem() - newInformerField := v.FieldByName("newInformer") - newFakeInformer := func(_ kcache.ListerWatcher, _ runtime.Object, _ time.Duration, _ kcache.Indexers) kcache.SharedIndexInformer { + newInformerField := v.FieldByName("NewInformerFunc") + newFakeInformer := func(_ kcache.ListerWatcher, _ runtime.Object, _ time.Duration, _ kcache.Indexers) kcpcache.ScopeableSharedIndexInformer { return &controllertest.FakeInformer{Synced: false} } reflect.NewAt(newInformerField.Type(), newInformerField.Addr().UnsafePointer()). Elem(). - Set(reflect.ValueOf(&newFakeInformer)) + Set(reflect.ValueOf(newFakeInformer)) informerCache, err = createCacheFunc(cfg, opts) Expect(err).NotTo(HaveOccurred()) By("running the cache and waiting for it to sync") diff --git a/pkg/cache/informer_cache.go b/pkg/cache/informer_cache.go index 0f1b4e93d2..21e1e01f20 100644 --- a/pkg/cache/informer_cache.go +++ b/pkg/cache/informer_cache.go @@ -31,6 +31,8 @@ import ( "sigs.k8s.io/controller-runtime/pkg/cache/internal" "sigs.k8s.io/controller-runtime/pkg/client" "sigs.k8s.io/controller-runtime/pkg/client/apiutil" + + "github.com/kcp-dev/logicalcluster/v3" ) var ( @@ -222,6 +224,13 @@ func indexByField(informer Informer, field string, extractValue client.IndexerFu } ns := meta.GetNamespace() + keyFunc := internal.KeyToNamespacedKey + if clusterName := logicalcluster.From(obj); !clusterName.Empty() { + keyFunc = func(ns, val string) string { + return internal.KeyToClusteredKey(clusterName.String(), ns, val) + } + } + rawVals := extractValue(obj) var vals []string if ns == "" { @@ -231,14 +240,15 @@ func indexByField(informer Informer, field string, extractValue client.IndexerFu // if we need to add non-namespaced versions too, double the length vals = make([]string, len(rawVals)*2) } + for i, rawVal := range rawVals { // save a namespaced variant, so that we can ask // "what are all the object matching a given index *in a given namespace*" - vals[i] = internal.KeyToNamespacedKey(ns, rawVal) + vals[i] = keyFunc(ns, rawVal) if ns != "" { // if we have a namespace, also inject a special index key for listing // regardless of the object namespace - vals[i+len(rawVals)] = internal.KeyToNamespacedKey("", rawVal) + vals[i+len(rawVals)] = keyFunc("", rawVal) } } diff --git a/pkg/cache/internal/cache_reader.go b/pkg/cache/internal/cache_reader.go index eb941f034e..0d195dae12 100644 --- a/pkg/cache/internal/cache_reader.go +++ b/pkg/cache/internal/cache_reader.go @@ -21,6 +21,8 @@ import ( "fmt" "reflect" + kcpcache "github.com/kcp-dev/apimachinery/v2/pkg/cache" + apierrors "k8s.io/apimachinery/pkg/api/errors" apimeta "k8s.io/apimachinery/pkg/api/meta" "k8s.io/apimachinery/pkg/labels" @@ -30,6 +32,7 @@ import ( "sigs.k8s.io/controller-runtime/pkg/client" "sigs.k8s.io/controller-runtime/pkg/internal/field/selector" + "sigs.k8s.io/controller-runtime/pkg/kontext" ) // CacheReader is a client.Reader. @@ -53,11 +56,11 @@ type CacheReader struct { } // Get checks the indexer for the object and writes a copy of it if found. -func (c *CacheReader) Get(_ context.Context, key client.ObjectKey, out client.Object, _ ...client.GetOption) error { +func (c *CacheReader) Get(ctx context.Context, key client.ObjectKey, out client.Object, _ ...client.GetOption) error { if c.scopeName == apimeta.RESTScopeNameRoot { key.Namespace = "" } - storeKey := objectKeyToStoreKey(key) + storeKey := objectKeyToStoreKey(ctx, key) // Lookup the object from the indexer cache obj, exists, err := c.indexer.GetByKey(storeKey) @@ -104,7 +107,7 @@ func (c *CacheReader) Get(_ context.Context, key client.ObjectKey, out client.Ob } // List lists items out of the indexer and writes them to out. -func (c *CacheReader) List(_ context.Context, out client.ObjectList, opts ...client.ListOption) error { +func (c *CacheReader) List(ctx context.Context, out client.ObjectList, opts ...client.ListOption) error { var objs []interface{} var err error @@ -115,6 +118,8 @@ func (c *CacheReader) List(_ context.Context, out client.ObjectList, opts ...cli return fmt.Errorf("continue list option is not supported by the cache") } + clusterName, _ := kontext.ClusterFrom(ctx) + switch { case listOpts.FieldSelector != nil: // TODO(directxman12): support more complicated field selectors by @@ -126,11 +131,23 @@ func (c *CacheReader) List(_ context.Context, out client.ObjectList, opts ...cli // list all objects by the field selector. If this is namespaced and we have one, ask for the // namespaced index key. Otherwise, ask for the non-namespaced variant by using the fake "all namespaces" // namespace. - objs, err = c.indexer.ByIndex(FieldIndexName(field), KeyToNamespacedKey(listOpts.Namespace, val)) + if clusterName.Empty() { + objs, err = c.indexer.ByIndex(FieldIndexName(field), KeyToNamespacedKey(listOpts.Namespace, val)) + } else { + objs, err = c.indexer.ByIndex(FieldIndexName(field), KeyToClusteredKey(clusterName.String(), listOpts.Namespace, val)) + } case listOpts.Namespace != "": - objs, err = c.indexer.ByIndex(cache.NamespaceIndex, listOpts.Namespace) + if clusterName.Empty() { + objs, err = c.indexer.ByIndex(cache.NamespaceIndex, listOpts.Namespace) + } else { + objs, err = c.indexer.ByIndex(kcpcache.ClusterAndNamespaceIndexName, kcpcache.ClusterAndNamespaceIndexKey(clusterName, listOpts.Namespace)) + } default: - objs = c.indexer.List() + if clusterName.Empty() { + objs = c.indexer.List() + } else { + objs, err = c.indexer.ByIndex(kcpcache.ClusterIndexName, kcpcache.ClusterIndexKey(clusterName)) + } } if err != nil { return err @@ -182,7 +199,12 @@ func (c *CacheReader) List(_ context.Context, out client.ObjectList, opts ...cli // It's akin to MetaNamespaceKeyFunc. It's separate from // String to allow keeping the key format easily in sync with // MetaNamespaceKeyFunc. -func objectKeyToStoreKey(k client.ObjectKey) string { +func objectKeyToStoreKey(ctx context.Context, k client.ObjectKey) string { + cluster, ok := kontext.ClusterFrom(ctx) + if ok { + return kcpcache.ToClusterAwareKey(cluster.String(), k.Namespace, k.Name) + } + if k.Namespace == "" { return k.Name } @@ -206,3 +228,9 @@ func KeyToNamespacedKey(ns string, baseKey string) string { } return allNamespacesNamespace + "/" + baseKey } + +// KeyToClusteredKey prefixes the given index key with a cluster name +// for use in field selector indexes. +func KeyToClusteredKey(clusterName string, ns string, baseKey string) string { + return clusterName + "/" + KeyToNamespacedKey(ns, baseKey) +} diff --git a/pkg/cache/internal/informers.go b/pkg/cache/internal/informers.go index 1d2c9ce2b4..de403ac1be 100644 --- a/pkg/cache/internal/informers.go +++ b/pkg/cache/internal/informers.go @@ -24,6 +24,8 @@ import ( "sync" "time" + kcpcache "github.com/kcp-dev/apimachinery/v2/pkg/cache" + "github.com/kcp-dev/apimachinery/v2/third_party/informers" apierrors "k8s.io/apimachinery/pkg/api/errors" "k8s.io/apimachinery/pkg/api/meta" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" @@ -45,7 +47,7 @@ type InformersOpts struct { Mapper meta.RESTMapper ResyncPeriod time.Duration Namespace string - NewInformer *func(cache.ListerWatcher, runtime.Object, time.Duration, cache.Indexers) cache.SharedIndexInformer + NewInformer func(cache.ListerWatcher, runtime.Object, time.Duration, cache.Indexers) kcpcache.ScopeableSharedIndexInformer Selector Selector Transform cache.TransformFunc UnsafeDisableDeepCopy bool @@ -53,9 +55,9 @@ type InformersOpts struct { // NewInformers creates a new InformersMap that can create informers under the hood. func NewInformers(config *rest.Config, options *InformersOpts) *Informers { - newInformer := cache.NewSharedIndexInformer + newInformer := informers.NewSharedIndexInformer if options.NewInformer != nil { - newInformer = *options.NewInformer + newInformer = options.NewInformer } return &Informers{ config: config, @@ -158,7 +160,7 @@ type Informers struct { unsafeDisableDeepCopy bool // NewInformer allows overriding of the shared index informer constructor for testing. - newInformer func(cache.ListerWatcher, runtime.Object, time.Duration, cache.Indexers) cache.SharedIndexInformer + newInformer func(cache.ListerWatcher, runtime.Object, time.Duration, cache.Indexers) kcpcache.ScopeableSharedIndexInformer } // Start calls Run on each of the informers and sets started to true. Blocks on the context. diff --git a/pkg/client/client.go b/pkg/client/client.go index 2fb0acb7b3..6a4d6db432 100644 --- a/pkg/client/client.go +++ b/pkg/client/client.go @@ -89,6 +89,14 @@ type CacheOptions struct { // NewClientFunc allows a user to define how to create a client. type NewClientFunc func(config *rest.Config, options Options) (Client, error) +// NewAPIReaderFunc allows a user to define how to create an API server reader. +type NewAPIReaderFunc func(config *rest.Config, options Options) (Reader, error) + +// NewAPIReader creates a new API server reader. +func NewAPIReader(config *rest.Config, options Options) (Reader, error) { + return New(config, options) +} + // New returns a new Client using the provided config and Options. // The returned client reads *and* writes directly from the server // (it doesn't use object caches). It understands how to work with diff --git a/pkg/client/interfaces.go b/pkg/client/interfaces.go index 3cd745e4c0..050055858a 100644 --- a/pkg/client/interfaces.go +++ b/pkg/client/interfaces.go @@ -18,9 +18,12 @@ package client import ( "context" + "time" + kcpcache "github.com/kcp-dev/apimachinery/v2/pkg/cache" apierrors "k8s.io/apimachinery/pkg/api/errors" "k8s.io/apimachinery/pkg/runtime/schema" + "k8s.io/client-go/tools/cache" "k8s.io/apimachinery/pkg/api/meta" "k8s.io/apimachinery/pkg/runtime" @@ -44,6 +47,10 @@ type Patch interface { Data(obj Object) ([]byte, error) } +// NewInformerFunc describes a function that creates SharedIndexInformers. +// Its signature matches cache.NewSharedIndexInformer from client-go. +type NewInformerFunc func(cache.ListerWatcher, runtime.Object, time.Duration, cache.Indexers) kcpcache.ScopeableSharedIndexInformer + // TODO(directxman12): is there a sane way to deal with get/delete options? // Reader knows how to read and list Kubernetes objects. diff --git a/pkg/cluster/cluster.go b/pkg/cluster/cluster.go index 248893ea31..8836658d2b 100644 --- a/pkg/cluster/cluster.go +++ b/pkg/cluster/cluster.go @@ -121,6 +121,11 @@ type Options struct { // By default, the client will use the cache for reads and direct calls for writes. Client client.Options + // NewAPIReaderFunc is the function that creates the APIReader client to be + // used by the manager. If not set this will use the default new APIReader + // function. + NewAPIReader client.NewAPIReaderFunc + // NewClient is the func that creates the client to be used by the manager. // If not set this will create a Client backed by a Cache for read operations // and a direct Client for write operations. @@ -230,7 +235,7 @@ func New(config *rest.Config, opts ...Option) (Cluster, error) { } // Create the API Reader, a client with no cache. - clientReader, err := client.New(config, client.Options{ + clientReader, err := options.NewAPIReader(config, client.Options{ HTTPClient: options.HTTPClient, Scheme: options.Scheme, Mapper: mapper, @@ -280,6 +285,10 @@ func setOptionsDefaults(options Options, config *rest.Config) (Options, error) { options.MapperProvider = apiutil.NewDynamicRESTMapper } + if options.NewAPIReader == nil { + options.NewAPIReader = client.NewAPIReader + } + // Allow users to define how to create a new client if options.NewClient == nil { options.NewClient = client.New diff --git a/pkg/controller/controllertest/util.go b/pkg/controller/controllertest/util.go index 60ec61edec..5eb3d8490a 100644 --- a/pkg/controller/controllertest/util.go +++ b/pkg/controller/controllertest/util.go @@ -19,11 +19,14 @@ package controllertest import ( "time" + kcpcache "github.com/kcp-dev/apimachinery/v2/pkg/cache" + "github.com/kcp-dev/logicalcluster/v3" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" "k8s.io/client-go/tools/cache" ) var _ cache.SharedIndexInformer = &FakeInformer{} +var _ kcpcache.ScopeableSharedIndexInformer = &FakeInformer{} // FakeInformer provides fake Informer functionality for testing. type FakeInformer struct { @@ -78,6 +81,11 @@ func (e eventHandlerWrapper) OnDelete(obj interface{}) { e.handler.(legacyResourceEventHandler).OnDelete(obj) } +// Cluster returns the fake Informer. +func (f *FakeInformer) Cluster(clusterName logicalcluster.Name) cache.SharedIndexInformer { + return f +} + // AddIndexers does nothing. TODO(community): Implement this. func (f *FakeInformer) AddIndexers(indexers cache.Indexers) error { return nil diff --git a/pkg/handler/enqueue.go b/pkg/handler/enqueue.go index c72b2e1ebb..d0ee06b859 100644 --- a/pkg/handler/enqueue.go +++ b/pkg/handler/enqueue.go @@ -19,8 +19,11 @@ package handler import ( "context" + "github.com/kcp-dev/logicalcluster/v3" + "k8s.io/apimachinery/pkg/types" "k8s.io/client-go/util/workqueue" + "sigs.k8s.io/controller-runtime/pkg/client" "sigs.k8s.io/controller-runtime/pkg/event" logf "sigs.k8s.io/controller-runtime/pkg/internal/log" "sigs.k8s.io/controller-runtime/pkg/reconcile" @@ -43,25 +46,16 @@ func (e *EnqueueRequestForObject) Create(ctx context.Context, evt event.CreateEv enqueueLog.Error(nil, "CreateEvent received with no metadata", "event", evt) return } - q.Add(reconcile.Request{NamespacedName: types.NamespacedName{ - Name: evt.Object.GetName(), - Namespace: evt.Object.GetNamespace(), - }}) + q.Add(request(evt.Object)) } // Update implements EventHandler. func (e *EnqueueRequestForObject) Update(ctx context.Context, evt event.UpdateEvent, q workqueue.RateLimitingInterface) { switch { case evt.ObjectNew != nil: - q.Add(reconcile.Request{NamespacedName: types.NamespacedName{ - Name: evt.ObjectNew.GetName(), - Namespace: evt.ObjectNew.GetNamespace(), - }}) + q.Add(request(evt.ObjectNew)) case evt.ObjectOld != nil: - q.Add(reconcile.Request{NamespacedName: types.NamespacedName{ - Name: evt.ObjectOld.GetName(), - Namespace: evt.ObjectOld.GetNamespace(), - }}) + q.Add(request(evt.ObjectOld)) default: enqueueLog.Error(nil, "UpdateEvent received with no metadata", "event", evt) } @@ -73,10 +67,7 @@ func (e *EnqueueRequestForObject) Delete(ctx context.Context, evt event.DeleteEv enqueueLog.Error(nil, "DeleteEvent received with no metadata", "event", evt) return } - q.Add(reconcile.Request{NamespacedName: types.NamespacedName{ - Name: evt.Object.GetName(), - Namespace: evt.Object.GetNamespace(), - }}) + q.Add(request(evt.Object)) } // Generic implements EventHandler. @@ -85,8 +76,16 @@ func (e *EnqueueRequestForObject) Generic(ctx context.Context, evt event.Generic enqueueLog.Error(nil, "GenericEvent received with no metadata", "event", evt) return } - q.Add(reconcile.Request{NamespacedName: types.NamespacedName{ - Name: evt.Object.GetName(), - Namespace: evt.Object.GetNamespace(), - }}) + q.Add(request(evt.Object)) +} + +func request(obj client.Object) reconcile.Request { + return reconcile.Request{ + // TODO(kcp) Need to implement a non-kcp-specific way to support this + ClusterName: logicalcluster.From(obj).String(), + NamespacedName: types.NamespacedName{ + Namespace: obj.GetNamespace(), + Name: obj.GetName(), + }, + } } diff --git a/pkg/handler/enqueue_owner.go b/pkg/handler/enqueue_owner.go index 02e7d756f8..71fb307fbf 100644 --- a/pkg/handler/enqueue_owner.go +++ b/pkg/handler/enqueue_owner.go @@ -20,6 +20,8 @@ import ( "context" "fmt" + "github.com/kcp-dev/logicalcluster/v3" + "k8s.io/apimachinery/pkg/api/meta" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" "k8s.io/apimachinery/pkg/runtime" @@ -159,9 +161,12 @@ func (e *enqueueRequestForOwner) getOwnerReconcileRequest(object metav1.Object, // object in the event. if ref.Kind == e.groupKind.Kind && refGV.Group == e.groupKind.Group { // Match found - add a Request for the object referred to in the OwnerReference - request := reconcile.Request{NamespacedName: types.NamespacedName{ - Name: ref.Name, - }} + request := reconcile.Request{ + ClusterName: logicalcluster.From(object).String(), + NamespacedName: types.NamespacedName{ + Name: ref.Name, + }, + } // if owner is not namespaced then we should not set the namespace mapping, err := e.mapper.RESTMapping(e.groupKind, refGV.Version) diff --git a/pkg/kcp/wrappers.go b/pkg/kcp/wrappers.go new file mode 100644 index 0000000000..570947950b --- /dev/null +++ b/pkg/kcp/wrappers.go @@ -0,0 +1,243 @@ +/* +Copyright 2022 The KCP Authors. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package kcp + +import ( + "fmt" + "net/http" + "regexp" + "strings" + + "k8s.io/apimachinery/pkg/api/meta" + "k8s.io/client-go/rest" + k8scache "k8s.io/client-go/tools/cache" + + ctrl "sigs.k8s.io/controller-runtime" + "sigs.k8s.io/controller-runtime/pkg/cache" + "sigs.k8s.io/controller-runtime/pkg/client" + "sigs.k8s.io/controller-runtime/pkg/client/apiutil" + "sigs.k8s.io/controller-runtime/pkg/kontext" + "sigs.k8s.io/controller-runtime/pkg/manager" + + kcpcache "github.com/kcp-dev/apimachinery/v2/pkg/cache" + "github.com/kcp-dev/apimachinery/v2/third_party/informers" + "github.com/kcp-dev/logicalcluster/v3" +) + +// NewClusterAwareManager returns a kcp-aware manager with appropriate defaults for cache and +// client creation. +func NewClusterAwareManager(cfg *rest.Config, options ctrl.Options) (manager.Manager, error) { + if options.NewCache == nil { + options.NewCache = NewClusterAwareCache + } + + if options.NewAPIReader == nil { + options.NewAPIReader = NewClusterAwareAPIReader + } + + if options.NewClient == nil { + options.NewClient = NewClusterAwareClient + } + + if options.MapperProvider == nil { + options.MapperProvider = NewClusterAwareMapperProvider + } + + cfg.Wrap(func(rt http.RoundTripper) http.RoundTripper { + return newClusterRoundTripper(rt) + }) + return ctrl.NewManager(cfg, options) +} + +// NewClusterAwareCache returns a cache.Cache that handles multi-cluster watches. +func NewClusterAwareCache(config *rest.Config, opts cache.Options) (cache.Cache, error) { + c := rest.CopyConfig(config) + c.Host += "/clusters/*" + opts.NewInformerFunc = informers.NewSharedIndexInformer + + opts.Indexers = k8scache.Indexers{ + kcpcache.ClusterIndexName: kcpcache.ClusterIndexFunc, + kcpcache.ClusterAndNamespaceIndexName: kcpcache.ClusterAndNamespaceIndexFunc, + } + return cache.New(c, opts) +} + +// NewClusterAwareAPIReader returns a client.Reader that provides read-only access to the API server, +// and is configured to use the context to scope requests to the proper cluster. To scope requests, +// pass the request context with the cluster set. +// Example: +// +// import ( +// "context" +// kcpclient "github.com/kcp-dev/apimachinery/v2/pkg/client" +// ctrl "sigs.k8s.io/controller-runtime" +// ) +// func (r *reconciler) Reconcile(ctx context.Context, req ctrl.Request) (ctrl.Result, error) { +// ctx = kcpclient.WithCluster(ctx, req.ObjectKey.Cluster) +// // from here on pass this context to all client calls +// ... +// } +func NewClusterAwareAPIReader(config *rest.Config, opts client.Options) (client.Reader, error) { + httpClient, err := ClusterAwareHTTPClient(config) + if err != nil { + return nil, err + } + opts.HTTPClient = httpClient + return client.NewAPIReader(config, opts) +} + +// NewClusterAwareClient returns a client.Client that is configured to use the context +// to scope requests to the proper cluster. To scope requests, pass the request context with the cluster set. +// Example: +// +// import ( +// "context" +// kcpclient "github.com/kcp-dev/apimachinery/v2/pkg/client" +// ctrl "sigs.k8s.io/controller-runtime" +// ) +// func (r *reconciler) Reconcile(ctx context.Context, req ctrl.Request) (ctrl.Result, error) { +// ctx = kcpclient.WithCluster(ctx, req.ObjectKey.Cluster) +// // from here on pass this context to all client calls +// ... +// } +func NewClusterAwareClient(config *rest.Config, opts client.Options) (client.Client, error) { + httpClient, err := ClusterAwareHTTPClient(config) + if err != nil { + return nil, err + } + opts.HTTPClient = httpClient + return client.New(config, opts) +} + +// NewClusterAwareClientForConfig returns a client.Client that is configured to use the context to scope +// requests to the proper cluster. To scope requests, pass the request context with the cluster set. +// Example: +// +// import ( +// "context" +// kcpclient "github.com/kcp-dev/apimachinery/v2/pkg/client" +// ctrl "sigs.k8s.io/controller-runtime" +// ) +// func (r *reconciler) Reconcile(ctx context.Context, req ctrl.Request) (ctrl.Result, error) { +// ctx = kcpclient.WithCluster(ctx, req.ObjectKey.Cluster) +// // from here on pass this context to all client calls +// ... +// } +func NewClusterAwareClientForConfig(config *rest.Config, httpClient *http.Client) (client.Client, error) { + restMapper, err := NewClusterAwareMapperProvider(config, httpClient) + if err != nil { + return nil, err + } + return client.New(config, client.Options{ + Mapper: restMapper, + HTTPClient: httpClient, + }) +} + +// ClusterAwareHTTPClient returns an http.Client with a cluster aware round tripper. +func ClusterAwareHTTPClient(config *rest.Config) (*http.Client, error) { + httpClient, err := rest.HTTPClientFor(config) + if err != nil { + return nil, err + } + + httpClient.Transport = newClusterRoundTripper(httpClient.Transport) + return httpClient, nil +} + +// NewClusterAwareMapperProvider is a MapperProvider that returns a logical cluster aware meta.RESTMapper. +func NewClusterAwareMapperProvider(c *rest.Config, httpClient *http.Client) (meta.RESTMapper, error) { + mapperCfg := rest.CopyConfig(c) + if !strings.HasSuffix(mapperCfg.Host, "/clusters/*") { + mapperCfg.Host += "/clusters/*" + } + + return apiutil.NewDynamicRESTMapper(mapperCfg, httpClient) +} + +// ClusterAwareBuilderWithOptions returns a cluster aware Cache constructor that will build +// a cache honoring the options argument, this is useful to specify options like +// SelectorsDefaultNamespaces +// WARNING: If SelectorsByObject is specified, filtered out resources are not +// returned. +// WARNING: If UnsafeDisableDeepCopy is enabled, you must DeepCopy any object +// returned from cache get/list before mutating it. +func ClusterAwareBuilderWithOptions(options cache.Options) cache.NewCacheFunc { + return func(config *rest.Config, opts cache.Options) (cache.Cache, error) { + if options.Scheme == nil { + options.Scheme = opts.Scheme + } + if options.Mapper == nil { + options.Mapper = opts.Mapper + } + if options.SyncPeriod == nil { + options.SyncPeriod = opts.SyncPeriod + } + if opts.DefaultNamespaces == nil { + opts.DefaultNamespaces = options.DefaultNamespaces + } + + return NewClusterAwareCache(config, options) + } +} + +// clusterRoundTripper is a cluster aware wrapper around http.RoundTripper. +type clusterRoundTripper struct { + delegate http.RoundTripper +} + +// newClusterRoundTripper creates a new cluster aware round tripper. +func newClusterRoundTripper(delegate http.RoundTripper) *clusterRoundTripper { + return &clusterRoundTripper{ + delegate: delegate, + } +} + +func (c *clusterRoundTripper) RoundTrip(req *http.Request) (*http.Response, error) { + cluster, ok := kontext.ClusterFrom(req.Context()) + if ok { + req = req.Clone(req.Context()) + req.URL.Path = generatePath(req.URL.Path, cluster.Path()) + req.URL.RawPath = generatePath(req.URL.RawPath, cluster.Path()) + } + return c.delegate.RoundTrip(req) +} + +// apiRegex matches any string that has /api/ or /apis/ in it. +var apiRegex = regexp.MustCompile(`(/api/|/apis/)`) + +// generatePath formats the request path to target the specified cluster. +func generatePath(originalPath string, clusterPath logicalcluster.Path) string { + // If the originalPath already has cluster.Path() then the path was already modifed and no change needed + if strings.Contains(originalPath, clusterPath.RequestPath()) { + return originalPath + } + // If the originalPath has /api/ or /apis/ in it, it might be anywhere in the path, so we use a regex to find and + // replaces /api/ or /apis/ with $cluster/api/ or $cluster/apis/ + if apiRegex.MatchString(originalPath) { + return apiRegex.ReplaceAllString(originalPath, fmt.Sprintf("%s$1", clusterPath.RequestPath())) + } + // Otherwise, we're just prepending /clusters/$name + path := clusterPath.RequestPath() + // if the original path is relative, add a / separator + if len(originalPath) > 0 && originalPath[0] != '/' { + path += "/" + } + // finally append the original path + path += originalPath + return path +} diff --git a/pkg/kontext/kontext.go b/pkg/kontext/kontext.go new file mode 100644 index 0000000000..4de151a742 --- /dev/null +++ b/pkg/kontext/kontext.go @@ -0,0 +1,24 @@ +package kontext + +import ( + "context" + + "github.com/kcp-dev/logicalcluster/v3" +) + +type key int + +const ( + keyCluster key = iota +) + +// WithCluster injects a cluster name into a context. +func WithCluster(ctx context.Context, cluster logicalcluster.Name) context.Context { + return context.WithValue(ctx, keyCluster, cluster) +} + +// ClusterFrom extracts a cluster name from the context. +func ClusterFrom(ctx context.Context) (logicalcluster.Name, bool) { + s, ok := ctx.Value(keyCluster).(logicalcluster.Name) + return s, ok +} diff --git a/pkg/manager/manager.go b/pkg/manager/manager.go index 708a9cc16f..0015c58014 100644 --- a/pkg/manager/manager.go +++ b/pkg/manager/manager.go @@ -120,6 +120,11 @@ type Options struct { // Only use a custom NewCache if you know what you are doing. NewCache cache.NewCacheFunc + // NewAPIReaderFunc is the function that creates the APIReader client to be + // used by the manager. If not set this will use the default new APIReader + // function. + NewAPIReader client.NewAPIReaderFunc + // Client is the client.Options that will be used to create the default Client. // By default, the client will use the cache for reads and direct calls for writes. Client client.Options @@ -324,6 +329,7 @@ func New(config *rest.Config, options Options) (Manager, error) { clusterOptions.MapperProvider = options.MapperProvider clusterOptions.Logger = options.Logger clusterOptions.NewCache = options.NewCache + clusterOptions.NewAPIReader = options.NewAPIReader clusterOptions.NewClient = options.NewClient clusterOptions.Cache = options.Cache clusterOptions.Client = options.Client diff --git a/pkg/reconcile/reconcile.go b/pkg/reconcile/reconcile.go index 0f4e7e16bb..66eab890b4 100644 --- a/pkg/reconcile/reconcile.go +++ b/pkg/reconcile/reconcile.go @@ -48,6 +48,10 @@ func (r *Result) IsZero() bool { type Request struct { // NamespacedName is the name and namespace of the object to reconcile. types.NamespacedName + + // ClusterName can be used for reconciling requests across multiple clusters, + // to prevent objects with the same name and namespace from conflicting + ClusterName string } /*