From bad30ba3dafbbdb4c34851355a9b5878022426e8 Mon Sep 17 00:00:00 2001 From: Fabian von Feilitzsch Date: Fri, 27 May 2022 12:30:50 -0400 Subject: [PATCH] Add Indexer plumbing, add kcp indexers by default --- pkg/cache/cache.go | 6 +++- pkg/cache/internal/cache_reader.go | 20 +++++++++++-- pkg/cache/internal/deleg_map.go | 19 +++++++------ pkg/cache/internal/informers_map.go | 44 ++++++++++++++++++----------- pkg/client/options.go | 12 ++++++++ pkg/kcp/wrappers.go | 5 ++++ 6 files changed, 76 insertions(+), 30 deletions(-) diff --git a/pkg/cache/cache.go b/pkg/cache/cache.go index 0cdea6c41f..37386d9acd 100644 --- a/pkg/cache/cache.go +++ b/pkg/cache/cache.go @@ -113,6 +113,10 @@ type Options struct { // Defaults to cache.MetaNamespaceKeyFunc from client-go KeyFunction cache.KeyFunc + // Indexers is the indexers that the informers will be configured to use. + // Will always have the standard NamespaceIndex. + Indexers cache.Indexers + // Namespace restricts the cache's ListWatch to the desired namespace // Default watches all namespaces Namespace string @@ -151,7 +155,7 @@ func New(config *rest.Config, opts Options) (Cache, error) { if err != nil { return nil, err } - im := internal.NewInformersMap(config, opts.Scheme, opts.Mapper, *opts.Resync, opts.Namespace, selectorsByGVK, disableDeepCopyByGVK, opts.KeyFunction) + im := internal.NewInformersMap(config, opts.Scheme, opts.Mapper, *opts.Resync, opts.Namespace, selectorsByGVK, disableDeepCopyByGVK, opts.KeyFunction, opts.Indexers) return &informerCache{InformersMap: im}, nil } diff --git a/pkg/cache/internal/cache_reader.go b/pkg/cache/internal/cache_reader.go index 95fe95c0a0..28859447cb 100644 --- a/pkg/cache/internal/cache_reader.go +++ b/pkg/cache/internal/cache_reader.go @@ -22,6 +22,8 @@ import ( "reflect" kcpcache "github.com/kcp-dev/apimachinery/pkg/cache" + kcpclient "github.com/kcp-dev/apimachinery/pkg/client" + apierrors "k8s.io/apimachinery/pkg/api/errors" apimeta "k8s.io/apimachinery/pkg/api/meta" "k8s.io/apimachinery/pkg/fields" @@ -106,14 +108,24 @@ func (c *CacheReader) Get(_ context.Context, key client.ObjectKey, out client.Ob } // List lists items out of the indexer and writes them to out. -func (c *CacheReader) List(_ context.Context, out client.ObjectList, opts ...client.ListOption) error { +func (c *CacheReader) List(ctx context.Context, out client.ObjectList, opts ...client.ListOption) error { var objs []interface{} var err error listOpts := client.ListOptions{} listOpts.ApplyOptions(opts) + // TODO(kcp), should we just require people to pass in the cluster list option, or maybe provide + // a wrapper that adds it from the context automatically rather than doing this? + // It may also make more sense to just use the context and not bother provided a ListOption for it + if listOpts.Cluster.Empty() { + if cluster, ok := kcpclient.ClusterFromContext(ctx); ok { + client.InCluster(cluster).ApplyToList(&listOpts) + } + } + switch { + // TODO(kcp) add cluster to this case case listOpts.FieldSelector != nil: // TODO(directxman12): support more complicated field selectors by // combining multiple indices, GetIndexers, etc @@ -125,10 +137,12 @@ func (c *CacheReader) List(_ context.Context, out client.ObjectList, opts ...cli // namespaced index key. Otherwise, ask for the non-namespaced variant by using the fake "all namespaces" // namespace. objs, err = c.indexer.ByIndex(FieldIndexName(field), KeyToNamespacedKey(listOpts.Namespace, val)) + case listOpts.Cluster.Empty(): + objs = c.indexer.List() case listOpts.Namespace != "": - objs, err = c.indexer.ByIndex(cache.NamespaceIndex, listOpts.Namespace) + objs, err = c.indexer.ByIndex(kcpcache.ClusterAndNamespaceIndexName, kcpcache.ToClusterAwareKey(listOpts.Cluster.String(), listOpts.Namespace, "")) default: - objs = c.indexer.List() + objs, err = c.indexer.ByIndex(kcpcache.ClusterIndexName, kcpcache.ToClusterAwareKey(listOpts.Cluster.String(), "", "")) } if err != nil { return err diff --git a/pkg/cache/internal/deleg_map.go b/pkg/cache/internal/deleg_map.go index 117718109c..f0896d163c 100644 --- a/pkg/cache/internal/deleg_map.go +++ b/pkg/cache/internal/deleg_map.go @@ -53,11 +53,12 @@ func NewInformersMap(config *rest.Config, selectors SelectorsByGVK, disableDeepCopy DisableDeepCopyByGVK, keyFunc cache.KeyFunc, + indexers cache.Indexers, ) *InformersMap { return &InformersMap{ - structured: newStructuredInformersMap(config, scheme, mapper, resync, namespace, selectors, disableDeepCopy, keyFunc), - unstructured: newUnstructuredInformersMap(config, scheme, mapper, resync, namespace, selectors, disableDeepCopy, keyFunc), - metadata: newMetadataInformersMap(config, scheme, mapper, resync, namespace, selectors, disableDeepCopy, keyFunc), + structured: newStructuredInformersMap(config, scheme, mapper, resync, namespace, selectors, disableDeepCopy, keyFunc, indexers), + unstructured: newUnstructuredInformersMap(config, scheme, mapper, resync, namespace, selectors, disableDeepCopy, keyFunc, indexers), + metadata: newMetadataInformersMap(config, scheme, mapper, resync, namespace, selectors, disableDeepCopy, keyFunc, indexers), Scheme: scheme, } @@ -109,18 +110,18 @@ func (m *InformersMap) Get(ctx context.Context, gvk schema.GroupVersionKind, obj // newStructuredInformersMap creates a new InformersMap for structured objects. func newStructuredInformersMap(config *rest.Config, scheme *runtime.Scheme, mapper meta.RESTMapper, resync time.Duration, - namespace string, selectors SelectorsByGVK, disableDeepCopy DisableDeepCopyByGVK, keyFunc cache.KeyFunc) *specificInformersMap { - return newSpecificInformersMap(config, scheme, mapper, resync, namespace, selectors, disableDeepCopy, createStructuredListWatch, keyFunc) + namespace string, selectors SelectorsByGVK, disableDeepCopy DisableDeepCopyByGVK, keyFunc cache.KeyFunc, indexers cache.Indexers) *specificInformersMap { + return newSpecificInformersMap(config, scheme, mapper, resync, namespace, selectors, disableDeepCopy, createStructuredListWatch, keyFunc, indexers) } // newUnstructuredInformersMap creates a new InformersMap for unstructured objects. func newUnstructuredInformersMap(config *rest.Config, scheme *runtime.Scheme, mapper meta.RESTMapper, resync time.Duration, - namespace string, selectors SelectorsByGVK, disableDeepCopy DisableDeepCopyByGVK, keyFunc cache.KeyFunc) *specificInformersMap { - return newSpecificInformersMap(config, scheme, mapper, resync, namespace, selectors, disableDeepCopy, createUnstructuredListWatch, keyFunc) + namespace string, selectors SelectorsByGVK, disableDeepCopy DisableDeepCopyByGVK, keyFunc cache.KeyFunc, indexers cache.Indexers) *specificInformersMap { + return newSpecificInformersMap(config, scheme, mapper, resync, namespace, selectors, disableDeepCopy, createUnstructuredListWatch, keyFunc, indexers) } // newMetadataInformersMap creates a new InformersMap for metadata-only objects. func newMetadataInformersMap(config *rest.Config, scheme *runtime.Scheme, mapper meta.RESTMapper, resync time.Duration, - namespace string, selectors SelectorsByGVK, disableDeepCopy DisableDeepCopyByGVK, keyFunc cache.KeyFunc) *specificInformersMap { - return newSpecificInformersMap(config, scheme, mapper, resync, namespace, selectors, disableDeepCopy, createMetadataListWatch, keyFunc) + namespace string, selectors SelectorsByGVK, disableDeepCopy DisableDeepCopyByGVK, keyFunc cache.KeyFunc, indexers cache.Indexers) *specificInformersMap { + return newSpecificInformersMap(config, scheme, mapper, resync, namespace, selectors, disableDeepCopy, createMetadataListWatch, keyFunc, indexers) } diff --git a/pkg/cache/internal/informers_map.go b/pkg/cache/internal/informers_map.go index 215852edb5..ead27a859e 100644 --- a/pkg/cache/internal/informers_map.go +++ b/pkg/cache/internal/informers_map.go @@ -55,22 +55,24 @@ func newSpecificInformersMap(config *rest.Config, selectors SelectorsByGVK, disableDeepCopy DisableDeepCopyByGVK, createListWatcher createListWatcherFunc, - keyFunction cache.KeyFunc) *specificInformersMap { + keyFunction cache.KeyFunc, + indexers cache.Indexers) *specificInformersMap { ip := &specificInformersMap{ - config: config, - Scheme: scheme, - mapper: mapper, - informersByGVK: make(map[schema.GroupVersionKind]*MapEntry), - codecs: serializer.NewCodecFactory(scheme), - paramCodec: runtime.NewParameterCodec(scheme), - resync: resync, - startWait: make(chan struct{}), - createListWatcher: createListWatcher, - namespace: namespace, - selectors: selectors.forGVK, - disableDeepCopy: disableDeepCopy, - keyFunction: keyFunction, + config: config, + Scheme: scheme, + mapper: mapper, + informersByGVK: make(map[schema.GroupVersionKind]*MapEntry), + codecs: serializer.NewCodecFactory(scheme), + paramCodec: runtime.NewParameterCodec(scheme), + resync: resync, + startWait: make(chan struct{}), + createListWatcher: createListWatcher, + namespace: namespace, + selectors: selectors.forGVK, + disableDeepCopy: disableDeepCopy, + keyFunction: keyFunction, + additionalIndexers: indexers, } return ip } @@ -141,6 +143,10 @@ type specificInformersMap struct { // keyFunction is the cache.KeyFunc informers will be configured to use keyFunction cache.KeyFunc + + // additionalIndexers is the indexers that the informers will be configured to use. + // Will not allow overwriting the standard NamespaceIndex. + additionalIndexers cache.Indexers } // Start calls Run on each of the informers and sets started to true. Blocks on the context. @@ -230,12 +236,16 @@ func (ip *specificInformersMap) addInformerToMap(gvk schema.GroupVersionKind, ob if err != nil { return nil, false, err } + indexers := cache.Indexers{} + for indexName, indexer := range ip.additionalIndexers { + indexers[indexName] = indexer + } + indexers[cache.NamespaceIndex] = cache.MetaNamespaceIndexFunc + ni := cache.NewSharedIndexInformerWithOptions(lw, obj, cache.WithResyncPeriod(resyncPeriod(ip.resync)()), cache.WithKeyFunction(ip.keyFunction), - cache.WithIndexers(cache.Indexers{ - cache.NamespaceIndex: cache.MetaNamespaceIndexFunc, - })) + cache.WithIndexers(indexers)) rm, err := ip.mapper.RESTMapping(gvk.GroupKind(), gvk.Version) if err != nil { return nil, false, err diff --git a/pkg/client/options.go b/pkg/client/options.go index aa2299eac0..b301b2a7cc 100644 --- a/pkg/client/options.go +++ b/pkg/client/options.go @@ -17,6 +17,7 @@ limitations under the License. package client import ( + "github.com/kcp-dev/logicalcluster" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" "k8s.io/apimachinery/pkg/fields" "k8s.io/apimachinery/pkg/labels" @@ -330,6 +331,9 @@ type ListOptions struct { // non-namespaced objects, or to list across all namespaces. Namespace string + // Cluster represents the cluster to list for, or empty to list across all clusters. + Cluster logicalcluster.Name + // Limit specifies the maximum number of results to return from the server. The server may // not support this field on all resource types, but if it does and more results remain it // will set the continue field on the returned list object. This field is not supported if watch @@ -488,6 +492,14 @@ func (m MatchingFieldsSelector) ApplyToDeleteAllOf(opts *DeleteAllOfOptions) { m.ApplyToList(&opts.ListOptions) } +// InCluster restricts the list/delete operation to the given cluster. +type InCluster logicalcluster.Name + +// ApplyToList applies this configuration to the given list options. +func (n InCluster) ApplyToList(opts *ListOptions) { + opts.Cluster = logicalcluster.Name(n) +} + // InNamespace restricts the list/delete operation to the given namespace. type InNamespace string diff --git a/pkg/kcp/wrappers.go b/pkg/kcp/wrappers.go index e64a41043c..5bde9a33c2 100644 --- a/pkg/kcp/wrappers.go +++ b/pkg/kcp/wrappers.go @@ -20,6 +20,7 @@ import ( "net/http" "k8s.io/client-go/rest" + k8scache "k8s.io/client-go/tools/cache" kcpcache "github.com/kcp-dev/apimachinery/pkg/cache" kcpclient "github.com/kcp-dev/apimachinery/pkg/client" @@ -49,6 +50,10 @@ func NewClusterAwareCache(config *rest.Config, opts cache.Options) (cache.Cache, c := rest.CopyConfig(config) c.Host += "/clusters/*" opts.KeyFunction = kcpcache.ClusterAwareKeyFunc + opts.Indexers = k8scache.Indexers{ + kcpcache.ClusterIndexName: kcpcache.ClusterIndexFunc, + kcpcache.ClusterAndNamespaceIndexName: kcpcache.ClusterAndNamespaceIndexFunc, + } return cache.New(c, opts) }