Skip to content

Add Indexer plumbing, add kcp indexers by default #10

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 5 additions & 1 deletion pkg/cache/cache.go
Original file line number Diff line number Diff line change
Expand Up @@ -113,6 +113,10 @@ type Options struct {
// Defaults to cache.MetaNamespaceKeyFunc from client-go
KeyFunction cache.KeyFunc

// Indexers is the indexers that the informers will be configured to use.
// Will always have the standard NamespaceIndex.
Indexers cache.Indexers

// Namespace restricts the cache's ListWatch to the desired namespace
// Default watches all namespaces
Namespace string
Expand Down Expand Up @@ -151,7 +155,7 @@ func New(config *rest.Config, opts Options) (Cache, error) {
if err != nil {
return nil, err
}
im := internal.NewInformersMap(config, opts.Scheme, opts.Mapper, *opts.Resync, opts.Namespace, selectorsByGVK, disableDeepCopyByGVK, opts.KeyFunction)
im := internal.NewInformersMap(config, opts.Scheme, opts.Mapper, *opts.Resync, opts.Namespace, selectorsByGVK, disableDeepCopyByGVK, opts.KeyFunction, opts.Indexers)
return &informerCache{InformersMap: im}, nil
}

Expand Down
20 changes: 17 additions & 3 deletions pkg/cache/internal/cache_reader.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,8 @@ import (
"reflect"

kcpcache "github.com/kcp-dev/apimachinery/pkg/cache"
kcpclient "github.com/kcp-dev/apimachinery/pkg/client"

apierrors "k8s.io/apimachinery/pkg/api/errors"
apimeta "k8s.io/apimachinery/pkg/api/meta"
"k8s.io/apimachinery/pkg/fields"
Expand Down Expand Up @@ -106,14 +108,24 @@ func (c *CacheReader) Get(_ context.Context, key client.ObjectKey, out client.Ob
}

// List lists items out of the indexer and writes them to out.
func (c *CacheReader) List(_ context.Context, out client.ObjectList, opts ...client.ListOption) error {
func (c *CacheReader) List(ctx context.Context, out client.ObjectList, opts ...client.ListOption) error {
var objs []interface{}
var err error

listOpts := client.ListOptions{}
listOpts.ApplyOptions(opts)

// TODO(kcp), should we just require people to pass in the cluster list option, or maybe provide
// a wrapper that adds it from the context automatically rather than doing this?
// It may also make more sense to just use the context and not bother provided a ListOption for it
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I maybe think I prefer the ListOption, especially if it's in a kcp repo instead of controller-runtime. I'm not sure that would work with the live client easily, though, as that needs the context?

if listOpts.Cluster.Empty() {
if cluster, ok := kcpclient.ClusterFromContext(ctx); ok {
client.InCluster(cluster).ApplyToList(&listOpts)
}
}

switch {
// TODO(kcp) add cluster to this case
case listOpts.FieldSelector != nil:
// TODO(directxman12): support more complicated field selectors by
// combining multiple indices, GetIndexers, etc
Expand All @@ -125,10 +137,12 @@ func (c *CacheReader) List(_ context.Context, out client.ObjectList, opts ...cli
// namespaced index key. Otherwise, ask for the non-namespaced variant by using the fake "all namespaces"
// namespace.
objs, err = c.indexer.ByIndex(FieldIndexName(field), KeyToNamespacedKey(listOpts.Namespace, val))
case listOpts.Cluster.Empty():
objs = c.indexer.List()
case listOpts.Namespace != "":
objs, err = c.indexer.ByIndex(cache.NamespaceIndex, listOpts.Namespace)
objs, err = c.indexer.ByIndex(kcpcache.ClusterAndNamespaceIndexName, kcpcache.ToClusterAwareKey(listOpts.Cluster.String(), listOpts.Namespace, ""))
default:
objs = c.indexer.List()
objs, err = c.indexer.ByIndex(kcpcache.ClusterIndexName, kcpcache.ToClusterAwareKey(listOpts.Cluster.String(), "", ""))
}
if err != nil {
return err
Expand Down
19 changes: 10 additions & 9 deletions pkg/cache/internal/deleg_map.go
Original file line number Diff line number Diff line change
Expand Up @@ -53,11 +53,12 @@ func NewInformersMap(config *rest.Config,
selectors SelectorsByGVK,
disableDeepCopy DisableDeepCopyByGVK,
keyFunc cache.KeyFunc,
indexers cache.Indexers,
) *InformersMap {
return &InformersMap{
structured: newStructuredInformersMap(config, scheme, mapper, resync, namespace, selectors, disableDeepCopy, keyFunc),
unstructured: newUnstructuredInformersMap(config, scheme, mapper, resync, namespace, selectors, disableDeepCopy, keyFunc),
metadata: newMetadataInformersMap(config, scheme, mapper, resync, namespace, selectors, disableDeepCopy, keyFunc),
structured: newStructuredInformersMap(config, scheme, mapper, resync, namespace, selectors, disableDeepCopy, keyFunc, indexers),
unstructured: newUnstructuredInformersMap(config, scheme, mapper, resync, namespace, selectors, disableDeepCopy, keyFunc, indexers),
metadata: newMetadataInformersMap(config, scheme, mapper, resync, namespace, selectors, disableDeepCopy, keyFunc, indexers),

Scheme: scheme,
}
Expand Down Expand Up @@ -109,18 +110,18 @@ func (m *InformersMap) Get(ctx context.Context, gvk schema.GroupVersionKind, obj

// newStructuredInformersMap creates a new InformersMap for structured objects.
func newStructuredInformersMap(config *rest.Config, scheme *runtime.Scheme, mapper meta.RESTMapper, resync time.Duration,
namespace string, selectors SelectorsByGVK, disableDeepCopy DisableDeepCopyByGVK, keyFunc cache.KeyFunc) *specificInformersMap {
return newSpecificInformersMap(config, scheme, mapper, resync, namespace, selectors, disableDeepCopy, createStructuredListWatch, keyFunc)
namespace string, selectors SelectorsByGVK, disableDeepCopy DisableDeepCopyByGVK, keyFunc cache.KeyFunc, indexers cache.Indexers) *specificInformersMap {
return newSpecificInformersMap(config, scheme, mapper, resync, namespace, selectors, disableDeepCopy, createStructuredListWatch, keyFunc, indexers)
}

// newUnstructuredInformersMap creates a new InformersMap for unstructured objects.
func newUnstructuredInformersMap(config *rest.Config, scheme *runtime.Scheme, mapper meta.RESTMapper, resync time.Duration,
namespace string, selectors SelectorsByGVK, disableDeepCopy DisableDeepCopyByGVK, keyFunc cache.KeyFunc) *specificInformersMap {
return newSpecificInformersMap(config, scheme, mapper, resync, namespace, selectors, disableDeepCopy, createUnstructuredListWatch, keyFunc)
namespace string, selectors SelectorsByGVK, disableDeepCopy DisableDeepCopyByGVK, keyFunc cache.KeyFunc, indexers cache.Indexers) *specificInformersMap {
return newSpecificInformersMap(config, scheme, mapper, resync, namespace, selectors, disableDeepCopy, createUnstructuredListWatch, keyFunc, indexers)
}

// newMetadataInformersMap creates a new InformersMap for metadata-only objects.
func newMetadataInformersMap(config *rest.Config, scheme *runtime.Scheme, mapper meta.RESTMapper, resync time.Duration,
namespace string, selectors SelectorsByGVK, disableDeepCopy DisableDeepCopyByGVK, keyFunc cache.KeyFunc) *specificInformersMap {
return newSpecificInformersMap(config, scheme, mapper, resync, namespace, selectors, disableDeepCopy, createMetadataListWatch, keyFunc)
namespace string, selectors SelectorsByGVK, disableDeepCopy DisableDeepCopyByGVK, keyFunc cache.KeyFunc, indexers cache.Indexers) *specificInformersMap {
return newSpecificInformersMap(config, scheme, mapper, resync, namespace, selectors, disableDeepCopy, createMetadataListWatch, keyFunc, indexers)
}
44 changes: 27 additions & 17 deletions pkg/cache/internal/informers_map.go
Original file line number Diff line number Diff line change
Expand Up @@ -55,22 +55,24 @@ func newSpecificInformersMap(config *rest.Config,
selectors SelectorsByGVK,
disableDeepCopy DisableDeepCopyByGVK,
createListWatcher createListWatcherFunc,
keyFunction cache.KeyFunc) *specificInformersMap {
keyFunction cache.KeyFunc,
indexers cache.Indexers) *specificInformersMap {

ip := &specificInformersMap{
config: config,
Scheme: scheme,
mapper: mapper,
informersByGVK: make(map[schema.GroupVersionKind]*MapEntry),
codecs: serializer.NewCodecFactory(scheme),
paramCodec: runtime.NewParameterCodec(scheme),
resync: resync,
startWait: make(chan struct{}),
createListWatcher: createListWatcher,
namespace: namespace,
selectors: selectors.forGVK,
disableDeepCopy: disableDeepCopy,
keyFunction: keyFunction,
config: config,
Scheme: scheme,
mapper: mapper,
informersByGVK: make(map[schema.GroupVersionKind]*MapEntry),
codecs: serializer.NewCodecFactory(scheme),
paramCodec: runtime.NewParameterCodec(scheme),
resync: resync,
startWait: make(chan struct{}),
createListWatcher: createListWatcher,
namespace: namespace,
selectors: selectors.forGVK,
disableDeepCopy: disableDeepCopy,
keyFunction: keyFunction,
additionalIndexers: indexers,
}
return ip
}
Expand Down Expand Up @@ -141,6 +143,10 @@ type specificInformersMap struct {

// keyFunction is the cache.KeyFunc informers will be configured to use
keyFunction cache.KeyFunc

// additionalIndexers is the indexers that the informers will be configured to use.
// Will not allow overwriting the standard NamespaceIndex.
additionalIndexers cache.Indexers
}

// Start calls Run on each of the informers and sets started to true. Blocks on the context.
Expand Down Expand Up @@ -230,12 +236,16 @@ func (ip *specificInformersMap) addInformerToMap(gvk schema.GroupVersionKind, ob
if err != nil {
return nil, false, err
}
indexers := cache.Indexers{}
for indexName, indexer := range ip.additionalIndexers {
indexers[indexName] = indexer
}
indexers[cache.NamespaceIndex] = cache.MetaNamespaceIndexFunc

ni := cache.NewSharedIndexInformerWithOptions(lw, obj,
cache.WithResyncPeriod(resyncPeriod(ip.resync)()),
cache.WithKeyFunction(ip.keyFunction),
cache.WithIndexers(cache.Indexers{
cache.NamespaceIndex: cache.MetaNamespaceIndexFunc,
}))
cache.WithIndexers(indexers))
rm, err := ip.mapper.RESTMapping(gvk.GroupKind(), gvk.Version)
if err != nil {
return nil, false, err
Expand Down
12 changes: 12 additions & 0 deletions pkg/client/options.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@ limitations under the License.
package client

import (
"github.com/kcp-dev/logicalcluster"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/fields"
"k8s.io/apimachinery/pkg/labels"
Expand Down Expand Up @@ -330,6 +331,9 @@ type ListOptions struct {
// non-namespaced objects, or to list across all namespaces.
Namespace string

// Cluster represents the cluster to list for, or empty to list across all clusters.
Cluster logicalcluster.Name

// Limit specifies the maximum number of results to return from the server. The server may
// not support this field on all resource types, but if it does and more results remain it
// will set the continue field on the returned list object. This field is not supported if watch
Expand Down Expand Up @@ -488,6 +492,14 @@ func (m MatchingFieldsSelector) ApplyToDeleteAllOf(opts *DeleteAllOfOptions) {
m.ApplyToList(&opts.ListOptions)
}

// InCluster restricts the list/delete operation to the given cluster.
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Could this ultimately go into a kcp repo instead of controller-runtime?

type InCluster logicalcluster.Name

// ApplyToList applies this configuration to the given list options.
func (n InCluster) ApplyToList(opts *ListOptions) {
opts.Cluster = logicalcluster.Name(n)
}

// InNamespace restricts the list/delete operation to the given namespace.
type InNamespace string

Expand Down
5 changes: 5 additions & 0 deletions pkg/kcp/wrappers.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@ import (
"net/http"

"k8s.io/client-go/rest"
k8scache "k8s.io/client-go/tools/cache"

kcpcache "github.com/kcp-dev/apimachinery/pkg/cache"
kcpclient "github.com/kcp-dev/apimachinery/pkg/client"
Expand Down Expand Up @@ -49,6 +50,10 @@ func NewClusterAwareCache(config *rest.Config, opts cache.Options) (cache.Cache,
c := rest.CopyConfig(config)
c.Host += "/clusters/*"
opts.KeyFunction = kcpcache.ClusterAwareKeyFunc
opts.Indexers = k8scache.Indexers{
kcpcache.ClusterIndexName: kcpcache.ClusterIndexFunc,
kcpcache.ClusterAndNamespaceIndexName: kcpcache.ClusterAndNamespaceIndexFunc,
}
return cache.New(c, opts)
}

Expand Down