diff --git a/caffeine-bounded-cache-support/pom.xml b/caffeine-bounded-cache-support/pom.xml new file mode 100644 index 0000000000..23b5334287 --- /dev/null +++ b/caffeine-bounded-cache-support/pom.xml @@ -0,0 +1,84 @@ + + + + java-operator-sdk + io.javaoperatorsdk + 4.3.0-SNAPSHOT + + 4.0.0 + + caffeine-bounded-cache-support + Operator SDK - Caffeine Bounded Cache Support + + + 11 + 11 + + + + + io.javaoperatorsdk + operator-framework-core + + + com.github.ben-manes.caffeine + caffeine + + + io.javaoperatorsdk + operator-framework + test + + + io.javaoperatorsdk + operator-framework-junit-5 + ${project.version} + test + + + io.fabric8 + crd-generator-apt + test + + + org.apache.logging.log4j + log4j-slf4j-impl + test + + + org.apache.logging.log4j + log4j-core + ${log4j.version} + test-jar + test + + + + + + + maven-compiler-plugin + ${maven-compiler-plugin.version} + + + + default-compile + compile + + compile + + + + -proc:none + + + + + + + + + \ No newline at end of file diff --git a/caffeine-bounded-cache-support/src/main/java/io/javaoperatorsdk/operator/processing/event/source/cache/CaffeineBoundedCache.java b/caffeine-bounded-cache-support/src/main/java/io/javaoperatorsdk/operator/processing/event/source/cache/CaffeineBoundedCache.java new file mode 100644 index 0000000000..eb70fe04d3 --- /dev/null +++ b/caffeine-bounded-cache-support/src/main/java/io/javaoperatorsdk/operator/processing/event/source/cache/CaffeineBoundedCache.java @@ -0,0 +1,32 @@ +package io.javaoperatorsdk.operator.processing.event.source.cache; + +import com.github.benmanes.caffeine.cache.Cache; + +/** + * Caffein cache wrapper to be used in a {@link BoundedItemStore} + */ +public class CaffeineBoundedCache implements BoundedCache { + + private Cache cache; + + public CaffeineBoundedCache(Cache cache) { + this.cache = cache; + } + + @Override + public R get(K key) { + return cache.getIfPresent(key); + } + + @Override + public R remove(K key) { + var value = cache.getIfPresent(key); + cache.invalidate(key); + return value; + } + + @Override + public void put(K key, R object) { + cache.put(key, object); + } +} diff --git a/caffeine-bounded-cache-support/src/main/java/io/javaoperatorsdk/operator/processing/event/source/cache/CaffeineBoundedItemStores.java b/caffeine-bounded-cache-support/src/main/java/io/javaoperatorsdk/operator/processing/event/source/cache/CaffeineBoundedItemStores.java new file mode 100644 index 0000000000..aa4db53030 --- /dev/null +++ b/caffeine-bounded-cache-support/src/main/java/io/javaoperatorsdk/operator/processing/event/source/cache/CaffeineBoundedItemStores.java @@ -0,0 +1,47 @@ +package io.javaoperatorsdk.operator.processing.event.source.cache; + +import java.time.Duration; + +import io.fabric8.kubernetes.api.model.HasMetadata; +import io.fabric8.kubernetes.client.KubernetesClient; + +import com.github.benmanes.caffeine.cache.Cache; +import com.github.benmanes.caffeine.cache.Caffeine; + +/** + * The idea about CaffeinBoundedItemStore-s is that, caffeine will cache the resources which were + * recently used, and will evict resource, which are not used for a while. This is ideal from the + * perspective that on startup controllers reconcile all resources (this is why a maxSize not ideal) + * but after a while it can happen (well depending on the controller and domain) that only some + * resources are actually active, thus related events happen. So in case large amount of custom + * resources only the active once will remain in the cache. Note that if a resource is reconciled + * all the secondary resources are usually reconciled too, in that case all those resources are + * fetched and populated to the cache, and will remain there for some time, for a subsequent + * reconciliations. + */ +public class CaffeineBoundedItemStores { + + private CaffeineBoundedItemStores() {} + + /** + * @param client Kubernetes Client + * @param rClass resource class + * @param accessExpireDuration the duration after resources is evicted from cache if not accessed. + * @return the ItemStore implementation + * @param resource type + */ + public static BoundedItemStore boundedItemStore( + KubernetesClient client, Class rClass, + Duration accessExpireDuration) { + Cache cache = Caffeine.newBuilder() + .expireAfterAccess(accessExpireDuration) + .build(); + return boundedItemStore(client, rClass, cache); + } + + public static BoundedItemStore boundedItemStore( + KubernetesClient client, Class rClass, Cache cache) { + return new BoundedItemStore<>(new CaffeineBoundedCache<>(cache), rClass, client); + } + +} diff --git a/caffeine-bounded-cache-support/src/test/java/io/javaoperatorsdk/operator/processing/event/source/cache/BoundedCacheTestBase.java b/caffeine-bounded-cache-support/src/test/java/io/javaoperatorsdk/operator/processing/event/source/cache/BoundedCacheTestBase.java new file mode 100644 index 0000000000..21adf81cc0 --- /dev/null +++ b/caffeine-bounded-cache-support/src/test/java/io/javaoperatorsdk/operator/processing/event/source/cache/BoundedCacheTestBase.java @@ -0,0 +1,95 @@ +package io.javaoperatorsdk.operator.processing.event.source.cache; + +import java.time.Duration; +import java.util.stream.IntStream; + +import org.junit.jupiter.api.Test; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import io.fabric8.kubernetes.api.model.ConfigMap; +import io.fabric8.kubernetes.client.CustomResource; +import io.javaoperatorsdk.operator.junit.LocallyRunOperatorExtension; +import io.javaoperatorsdk.operator.processing.event.source.cache.sample.namespacescope.BoundedCacheTestSpec; +import io.javaoperatorsdk.operator.processing.event.source.cache.sample.namespacescope.BoundedCacheTestStatus; + +import static io.javaoperatorsdk.operator.processing.event.source.cache.sample.AbstractTestReconciler.DATA_KEY; +import static org.assertj.core.api.Assertions.assertThat; +import static org.awaitility.Awaitility.await; + +public abstract class BoundedCacheTestBase

> { + + private static final Logger log = LoggerFactory.getLogger(BoundedCacheTestBase.class); + + public static final int NUMBER_OF_RESOURCE_TO_TEST = 3; + public static final String RESOURCE_NAME_PREFIX = "test-"; + public static final String INITIAL_DATA_PREFIX = "data-"; + public static final String UPDATED_PREFIX = "updatedPrefix"; + + @Test + void reconciliationWorksWithLimitedCache() { + createTestResources(); + + assertConfigMapData(INITIAL_DATA_PREFIX); + + updateTestResources(); + + assertConfigMapData(UPDATED_PREFIX); + + deleteTestResources(); + + assertConfigMapsDeleted(); + } + + private void assertConfigMapsDeleted() { + await().atMost(Duration.ofSeconds(30)) + .untilAsserted(() -> IntStream.range(0, NUMBER_OF_RESOURCE_TO_TEST).forEach(i -> { + var cm = extension().get(ConfigMap.class, RESOURCE_NAME_PREFIX + i); + assertThat(cm).isNull(); + })); + } + + private void deleteTestResources() { + IntStream.range(0, NUMBER_OF_RESOURCE_TO_TEST).forEach(i -> { + var cm = extension().get(customResourceClass(), RESOURCE_NAME_PREFIX + i); + var deleted = extension().delete(cm); + if (!deleted) { + log.warn("Custom resource might not be deleted: {}", cm); + } + }); + } + + private void updateTestResources() { + IntStream.range(0, NUMBER_OF_RESOURCE_TO_TEST).forEach(i -> { + var cm = extension().get(ConfigMap.class, RESOURCE_NAME_PREFIX + i); + cm.getData().put(DATA_KEY, UPDATED_PREFIX + i); + extension().replace(cm); + }); + } + + void assertConfigMapData(String dataPrefix) { + await().untilAsserted(() -> IntStream.range(0, NUMBER_OF_RESOURCE_TO_TEST) + .forEach(i -> assertConfigMap(i, dataPrefix))); + } + + private void assertConfigMap(int i, String prefix) { + var cm = extension().get(ConfigMap.class, RESOURCE_NAME_PREFIX + i); + assertThat(cm).isNotNull(); + assertThat(cm.getData().get(DATA_KEY)).isEqualTo(prefix + i); + } + + private void createTestResources() { + IntStream.range(0, NUMBER_OF_RESOURCE_TO_TEST).forEach(i -> { + extension().create(createTestResource(i)); + }); + } + + abstract P createTestResource(int index); + + abstract Class

customResourceClass(); + + abstract LocallyRunOperatorExtension extension(); + + + +} diff --git a/caffeine-bounded-cache-support/src/test/java/io/javaoperatorsdk/operator/processing/event/source/cache/CaffeineBoundedCacheClusterScopeIT.java b/caffeine-bounded-cache-support/src/test/java/io/javaoperatorsdk/operator/processing/event/source/cache/CaffeineBoundedCacheClusterScopeIT.java new file mode 100644 index 0000000000..252b20f4a4 --- /dev/null +++ b/caffeine-bounded-cache-support/src/test/java/io/javaoperatorsdk/operator/processing/event/source/cache/CaffeineBoundedCacheClusterScopeIT.java @@ -0,0 +1,52 @@ +package io.javaoperatorsdk.operator.processing.event.source.cache; + +import java.time.Duration; + +import org.junit.jupiter.api.extension.RegisterExtension; + +import io.fabric8.kubernetes.api.model.ObjectMetaBuilder; +import io.fabric8.kubernetes.client.KubernetesClientBuilder; +import io.javaoperatorsdk.operator.junit.LocallyRunOperatorExtension; +import io.javaoperatorsdk.operator.processing.event.source.cache.sample.clusterscope.BoundedCacheClusterScopeTestCustomResource; +import io.javaoperatorsdk.operator.processing.event.source.cache.sample.clusterscope.BoundedCacheClusterScopeTestReconciler; +import io.javaoperatorsdk.operator.processing.event.source.cache.sample.namespacescope.BoundedCacheTestSpec; + +import static io.javaoperatorsdk.operator.processing.event.source.cache.sample.AbstractTestReconciler.boundedItemStore; + +public class CaffeineBoundedCacheClusterScopeIT + extends BoundedCacheTestBase { + + @RegisterExtension + LocallyRunOperatorExtension extension = + LocallyRunOperatorExtension.builder() + .withReconciler(new BoundedCacheClusterScopeTestReconciler(), o -> { + o.withItemStore(boundedItemStore( + new KubernetesClientBuilder().build(), + BoundedCacheClusterScopeTestCustomResource.class, + Duration.ofMinutes(1), + 1)); + }) + .build(); + + @Override + BoundedCacheClusterScopeTestCustomResource createTestResource(int index) { + var res = new BoundedCacheClusterScopeTestCustomResource(); + res.setMetadata(new ObjectMetaBuilder() + .withName(RESOURCE_NAME_PREFIX + index) + .build()); + res.setSpec(new BoundedCacheTestSpec()); + res.getSpec().setData(INITIAL_DATA_PREFIX + index); + res.getSpec().setTargetNamespace(extension.getNamespace()); + return res; + } + + @Override + Class customResourceClass() { + return BoundedCacheClusterScopeTestCustomResource.class; + } + + @Override + LocallyRunOperatorExtension extension() { + return extension; + } +} diff --git a/caffeine-bounded-cache-support/src/test/java/io/javaoperatorsdk/operator/processing/event/source/cache/CaffeineBoundedCacheNamespacedIT.java b/caffeine-bounded-cache-support/src/test/java/io/javaoperatorsdk/operator/processing/event/source/cache/CaffeineBoundedCacheNamespacedIT.java new file mode 100644 index 0000000000..ae7f8f5873 --- /dev/null +++ b/caffeine-bounded-cache-support/src/test/java/io/javaoperatorsdk/operator/processing/event/source/cache/CaffeineBoundedCacheNamespacedIT.java @@ -0,0 +1,50 @@ +package io.javaoperatorsdk.operator.processing.event.source.cache; + +import java.time.Duration; + +import org.junit.jupiter.api.extension.RegisterExtension; + +import io.fabric8.kubernetes.api.model.ObjectMetaBuilder; +import io.fabric8.kubernetes.client.KubernetesClientBuilder; +import io.javaoperatorsdk.operator.junit.LocallyRunOperatorExtension; +import io.javaoperatorsdk.operator.processing.event.source.cache.sample.namespacescope.BoundedCacheTestCustomResource; +import io.javaoperatorsdk.operator.processing.event.source.cache.sample.namespacescope.BoundedCacheTestReconciler; +import io.javaoperatorsdk.operator.processing.event.source.cache.sample.namespacescope.BoundedCacheTestSpec; + +import static io.javaoperatorsdk.operator.processing.event.source.cache.sample.AbstractTestReconciler.boundedItemStore; + +class CaffeineBoundedCacheNamespacedIT + extends BoundedCacheTestBase { + + @RegisterExtension + LocallyRunOperatorExtension extension = + LocallyRunOperatorExtension.builder().withReconciler(new BoundedCacheTestReconciler(), o -> { + o.withItemStore(boundedItemStore( + new KubernetesClientBuilder().build(), BoundedCacheTestCustomResource.class, + Duration.ofMinutes(1), + 1)); + }) + .build(); + + BoundedCacheTestCustomResource createTestResource(int index) { + var res = new BoundedCacheTestCustomResource(); + res.setMetadata(new ObjectMetaBuilder() + .withName(RESOURCE_NAME_PREFIX + index) + .build()); + res.setSpec(new BoundedCacheTestSpec()); + res.getSpec().setData(INITIAL_DATA_PREFIX + index); + res.getSpec().setTargetNamespace(extension.getNamespace()); + return res; + } + + @Override + Class customResourceClass() { + return BoundedCacheTestCustomResource.class; + } + + @Override + LocallyRunOperatorExtension extension() { + return extension; + } + +} diff --git a/caffeine-bounded-cache-support/src/test/java/io/javaoperatorsdk/operator/processing/event/source/cache/sample/AbstractTestReconciler.java b/caffeine-bounded-cache-support/src/test/java/io/javaoperatorsdk/operator/processing/event/source/cache/sample/AbstractTestReconciler.java new file mode 100644 index 0000000000..835fcef91a --- /dev/null +++ b/caffeine-bounded-cache-support/src/test/java/io/javaoperatorsdk/operator/processing/event/source/cache/sample/AbstractTestReconciler.java @@ -0,0 +1,117 @@ +package io.javaoperatorsdk.operator.processing.event.source.cache.sample; + +import java.time.Duration; +import java.util.Map; + +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import io.fabric8.kubernetes.api.model.*; +import io.fabric8.kubernetes.client.CustomResource; +import io.fabric8.kubernetes.client.KubernetesClient; +import io.fabric8.kubernetes.client.KubernetesClientBuilder; +import io.javaoperatorsdk.operator.api.config.informer.InformerConfiguration; +import io.javaoperatorsdk.operator.api.reconciler.*; +import io.javaoperatorsdk.operator.api.reconciler.Context; +import io.javaoperatorsdk.operator.junit.KubernetesClientAware; +import io.javaoperatorsdk.operator.processing.event.source.EventSource; +import io.javaoperatorsdk.operator.processing.event.source.cache.BoundedItemStore; +import io.javaoperatorsdk.operator.processing.event.source.cache.CaffeineBoundedItemStores; +import io.javaoperatorsdk.operator.processing.event.source.cache.sample.clusterscope.BoundedCacheClusterScopeTestReconciler; +import io.javaoperatorsdk.operator.processing.event.source.cache.sample.namespacescope.BoundedCacheTestSpec; +import io.javaoperatorsdk.operator.processing.event.source.cache.sample.namespacescope.BoundedCacheTestStatus; +import io.javaoperatorsdk.operator.processing.event.source.informer.InformerEventSource; +import io.javaoperatorsdk.operator.processing.event.source.informer.Mappers; + +import com.github.benmanes.caffeine.cache.Cache; +import com.github.benmanes.caffeine.cache.Caffeine; + +public abstract class AbstractTestReconciler

> + implements KubernetesClientAware, Reconciler

, + EventSourceInitializer

{ + + private static final Logger log = + LoggerFactory.getLogger(BoundedCacheClusterScopeTestReconciler.class); + + public static final String DATA_KEY = "dataKey"; + + protected KubernetesClient client; + + @Override + public UpdateControl

reconcile( + P resource, + Context

context) { + var maybeConfigMap = context.getSecondaryResource(ConfigMap.class); + maybeConfigMap.ifPresentOrElse( + cm -> updateConfigMapIfNeeded(cm, resource), + () -> createConfigMap(resource)); + ensureStatus(resource); + log.info("Reconciled: {}", resource.getMetadata().getName()); + return UpdateControl.patchStatus(resource); + } + + protected void updateConfigMapIfNeeded(ConfigMap cm, P resource) { + var data = cm.getData().get(DATA_KEY); + if (data == null || data.equals(resource.getSpec().getData())) { + cm.setData(Map.of(DATA_KEY, resource.getSpec().getData())); + client.configMaps().resource(cm).replace(); + } + } + + protected void createConfigMap(P resource) { + var cm = new ConfigMapBuilder() + .withMetadata(new ObjectMetaBuilder() + .withName(resource.getMetadata().getName()) + .withNamespace(resource.getSpec().getTargetNamespace()) + .build()) + .withData(Map.of(DATA_KEY, resource.getSpec().getData())) + .build(); + cm.addOwnerReference(resource); + client.configMaps().resource(cm).create(); + } + + @Override + public KubernetesClient getKubernetesClient() { + return client; + } + + @Override + public void setKubernetesClient(KubernetesClient kubernetesClient) { + this.client = kubernetesClient; + } + + @Override + public Map prepareEventSources( + EventSourceContext

context) { + + var boundedItemStore = + boundedItemStore(new KubernetesClientBuilder().build(), + ConfigMap.class, Duration.ofMinutes(1), 1); // setting max size for testing purposes + + var es = new InformerEventSource<>(InformerConfiguration.from(ConfigMap.class, context) + .withItemStore(boundedItemStore) + .withSecondaryToPrimaryMapper( + Mappers.fromOwnerReference(this instanceof BoundedCacheClusterScopeTestReconciler)) + .build(), context); + + return EventSourceInitializer.nameEventSources(es); + } + + private void ensureStatus(P resource) { + if (resource.getStatus() == null) { + resource.setStatus(new BoundedCacheTestStatus()); + } + } + + public static BoundedItemStore boundedItemStore( + KubernetesClient client, Class rClass, + Duration accessExpireDuration, + // max size is only for testing purposes + long cacheMaxSize) { + Cache cache = Caffeine.newBuilder() + .expireAfterAccess(accessExpireDuration) + .maximumSize(cacheMaxSize) + .build(); + return CaffeineBoundedItemStores.boundedItemStore(client, rClass, cache); + } +} diff --git a/caffeine-bounded-cache-support/src/test/java/io/javaoperatorsdk/operator/processing/event/source/cache/sample/clusterscope/BoundedCacheClusterScopeTestCustomResource.java b/caffeine-bounded-cache-support/src/test/java/io/javaoperatorsdk/operator/processing/event/source/cache/sample/clusterscope/BoundedCacheClusterScopeTestCustomResource.java new file mode 100644 index 0000000000..a77416715e --- /dev/null +++ b/caffeine-bounded-cache-support/src/test/java/io/javaoperatorsdk/operator/processing/event/source/cache/sample/clusterscope/BoundedCacheClusterScopeTestCustomResource.java @@ -0,0 +1,15 @@ +package io.javaoperatorsdk.operator.processing.event.source.cache.sample.clusterscope; + +import io.fabric8.kubernetes.client.CustomResource; +import io.fabric8.kubernetes.model.annotation.Group; +import io.fabric8.kubernetes.model.annotation.ShortNames; +import io.fabric8.kubernetes.model.annotation.Version; +import io.javaoperatorsdk.operator.processing.event.source.cache.sample.namespacescope.BoundedCacheTestSpec; +import io.javaoperatorsdk.operator.processing.event.source.cache.sample.namespacescope.BoundedCacheTestStatus; + +@Group("sample.javaoperatorsdk") +@Version("v1") +@ShortNames("bccs") +public class BoundedCacheClusterScopeTestCustomResource + extends CustomResource { +} diff --git a/caffeine-bounded-cache-support/src/test/java/io/javaoperatorsdk/operator/processing/event/source/cache/sample/clusterscope/BoundedCacheClusterScopeTestReconciler.java b/caffeine-bounded-cache-support/src/test/java/io/javaoperatorsdk/operator/processing/event/source/cache/sample/clusterscope/BoundedCacheClusterScopeTestReconciler.java new file mode 100644 index 0000000000..a154659164 --- /dev/null +++ b/caffeine-bounded-cache-support/src/test/java/io/javaoperatorsdk/operator/processing/event/source/cache/sample/clusterscope/BoundedCacheClusterScopeTestReconciler.java @@ -0,0 +1,10 @@ +package io.javaoperatorsdk.operator.processing.event.source.cache.sample.clusterscope; + +import io.javaoperatorsdk.operator.api.reconciler.*; +import io.javaoperatorsdk.operator.processing.event.source.cache.sample.AbstractTestReconciler; + +@ControllerConfiguration +public class BoundedCacheClusterScopeTestReconciler extends + AbstractTestReconciler { + +} diff --git a/caffeine-bounded-cache-support/src/test/java/io/javaoperatorsdk/operator/processing/event/source/cache/sample/namespacescope/BoundedCacheTestCustomResource.java b/caffeine-bounded-cache-support/src/test/java/io/javaoperatorsdk/operator/processing/event/source/cache/sample/namespacescope/BoundedCacheTestCustomResource.java new file mode 100644 index 0000000000..a5e37917ba --- /dev/null +++ b/caffeine-bounded-cache-support/src/test/java/io/javaoperatorsdk/operator/processing/event/source/cache/sample/namespacescope/BoundedCacheTestCustomResource.java @@ -0,0 +1,14 @@ +package io.javaoperatorsdk.operator.processing.event.source.cache.sample.namespacescope; + +import io.fabric8.kubernetes.api.model.Namespaced; +import io.fabric8.kubernetes.client.CustomResource; +import io.fabric8.kubernetes.model.annotation.Group; +import io.fabric8.kubernetes.model.annotation.ShortNames; +import io.fabric8.kubernetes.model.annotation.Version; + +@Group("sample.javaoperatorsdk") +@Version("v1") +@ShortNames("bct") +public class BoundedCacheTestCustomResource + extends CustomResource implements Namespaced { +} diff --git a/caffeine-bounded-cache-support/src/test/java/io/javaoperatorsdk/operator/processing/event/source/cache/sample/namespacescope/BoundedCacheTestReconciler.java b/caffeine-bounded-cache-support/src/test/java/io/javaoperatorsdk/operator/processing/event/source/cache/sample/namespacescope/BoundedCacheTestReconciler.java new file mode 100644 index 0000000000..211877b361 --- /dev/null +++ b/caffeine-bounded-cache-support/src/test/java/io/javaoperatorsdk/operator/processing/event/source/cache/sample/namespacescope/BoundedCacheTestReconciler.java @@ -0,0 +1,10 @@ +package io.javaoperatorsdk.operator.processing.event.source.cache.sample.namespacescope; + +import io.javaoperatorsdk.operator.api.reconciler.ControllerConfiguration; +import io.javaoperatorsdk.operator.processing.event.source.cache.sample.AbstractTestReconciler; + +@ControllerConfiguration +public class BoundedCacheTestReconciler + extends AbstractTestReconciler { + +} diff --git a/caffeine-bounded-cache-support/src/test/java/io/javaoperatorsdk/operator/processing/event/source/cache/sample/namespacescope/BoundedCacheTestSpec.java b/caffeine-bounded-cache-support/src/test/java/io/javaoperatorsdk/operator/processing/event/source/cache/sample/namespacescope/BoundedCacheTestSpec.java new file mode 100644 index 0000000000..63e5876267 --- /dev/null +++ b/caffeine-bounded-cache-support/src/test/java/io/javaoperatorsdk/operator/processing/event/source/cache/sample/namespacescope/BoundedCacheTestSpec.java @@ -0,0 +1,25 @@ +package io.javaoperatorsdk.operator.processing.event.source.cache.sample.namespacescope; + +public class BoundedCacheTestSpec { + + private String data; + private String targetNamespace; + + public String getData() { + return data; + } + + public BoundedCacheTestSpec setData(String data) { + this.data = data; + return this; + } + + public String getTargetNamespace() { + return targetNamespace; + } + + public BoundedCacheTestSpec setTargetNamespace(String targetNamespace) { + this.targetNamespace = targetNamespace; + return this; + } +} diff --git a/caffeine-bounded-cache-support/src/test/java/io/javaoperatorsdk/operator/processing/event/source/cache/sample/namespacescope/BoundedCacheTestStatus.java b/caffeine-bounded-cache-support/src/test/java/io/javaoperatorsdk/operator/processing/event/source/cache/sample/namespacescope/BoundedCacheTestStatus.java new file mode 100644 index 0000000000..03a311529e --- /dev/null +++ b/caffeine-bounded-cache-support/src/test/java/io/javaoperatorsdk/operator/processing/event/source/cache/sample/namespacescope/BoundedCacheTestStatus.java @@ -0,0 +1,6 @@ +package io.javaoperatorsdk.operator.processing.event.source.cache.sample.namespacescope; + +import io.javaoperatorsdk.operator.api.ObservedGenerationAwareStatus; + +public class BoundedCacheTestStatus extends ObservedGenerationAwareStatus { +} diff --git a/caffeine-bounded-cache-support/src/test/resources/log4j2.xml b/caffeine-bounded-cache-support/src/test/resources/log4j2.xml new file mode 100644 index 0000000000..f23cf772dd --- /dev/null +++ b/caffeine-bounded-cache-support/src/test/resources/log4j2.xml @@ -0,0 +1,13 @@ + + + + + + + + + + + + + diff --git a/docs/documentation/features.md b/docs/documentation/features.md index afc010ec06..f0c20e5d67 100644 --- a/docs/documentation/features.md +++ b/docs/documentation/features.md @@ -739,3 +739,29 @@ with a `mycrs` plural form will result in 2 files: **NOTE:** > Quarkus users using the `quarkus-operator-sdk` extension do not need to add any extra dependency > to get their CRD generated as this is handled by the extension itself. + +## Optimizing Caches + +One of the ideas around operator pattern, is that all the relevant resources are cached, thus reconciliation is usually +very fast (especially if it does not need to update resources) since it's mostly working with in memory state. +However or large clusters, caching huge amount of primary and secondary resources might consume lots of memory. +There are some semi-experimental (experimental in terms that it works, but we need feedback from real production usage) +features to optimize memory usage of controllers. + +### Bounded Caches for Informers + +Limiting caches for informers - thus for Kubernetes resources, both controllers primary resource - is supported for now. +The idea with the implementation that is provided, is that resources are in the cache for a limited time. +So for use cases, when a resource is only frequently reconciled when it is created, and later no or +occasionally reconciled, will be evicted from the cache, since the resources are not accessed. +If a resource accessed in the future but not in the cache, the bounded cache implementation will fetch it from +the service if needed. +Note that on start of a controller all the resources are reconciled, for this reason explicitly setting a maximal +size of a cache might not be ideal. In other words it is desired to have all the resources in the cache at startup, +but not later if not accessed. + +See usage of the related implementation using Caffein cache in integration tests for [primary resource](https://github.com/java-operator-sdk/java-operator-sdk/blob/10e11e587447667ef0da1ddb29e0ba15fcd24ada/caffein-bounded-cache-support/src/test/java/io/javaoperatorsdk/operator/processing/event/source/cache/CaffeinBoundedCacheNamespacedIT.java#L19-L19) and for an [informer](https://github.com/java-operator-sdk/java-operator-sdk/blob/10e11e587447667ef0da1ddb29e0ba15fcd24ada/caffein-bounded-cache-support/src/test/java/io/javaoperatorsdk/operator/processing/event/source/cache/sample/AbstractTestReconciler.java#L84-L93). + +See also [CaffeinBoundedItemStores](https://github.com/java-operator-sdk/java-operator-sdk/blob/10e11e587447667ef0da1ddb29e0ba15fcd24ada/caffein-bounded-cache-support/src/main/java/io/javaoperatorsdk/operator/processing/event/source/cache/CaffeinBoundedItemStores.java#L22-L22) + + diff --git a/operator-framework-core/src/main/java/io/javaoperatorsdk/operator/api/config/BaseConfigurationService.java b/operator-framework-core/src/main/java/io/javaoperatorsdk/operator/api/config/BaseConfigurationService.java index 2e8895ad87..7d0653a825 100644 --- a/operator-framework-core/src/main/java/io/javaoperatorsdk/operator/api/config/BaseConfigurationService.java +++ b/operator-framework-core/src/main/java/io/javaoperatorsdk/operator/api/config/BaseConfigurationService.java @@ -13,6 +13,7 @@ import org.slf4j.LoggerFactory; import io.fabric8.kubernetes.api.model.HasMetadata; +import io.fabric8.kubernetes.client.informers.cache.ItemStore; import io.javaoperatorsdk.operator.OperatorException; import io.javaoperatorsdk.operator.ReconcilerUtils; import io.javaoperatorsdk.operator.api.config.Utils.Configurator; @@ -116,14 +117,14 @@ protected

ControllerConfiguration

configFor(Reconcile final var associatedReconcilerClass = ResolvedControllerConfiguration.getAssociatedReconcilerClassName(reconciler.getClass()); + final var context = Utils.contextFor(name); final Class retryClass = annotation.retry(); final var retry = Utils.instantiateAndConfigureIfNeeded(retryClass, Retry.class, - Utils.contextFor(name, null, null), configuratorFor(Retry.class, reconciler)); + context, configuratorFor(Retry.class, reconciler)); final Class rateLimiterClass = annotation.rateLimiter(); final var rateLimiter = Utils.instantiateAndConfigureIfNeeded(rateLimiterClass, - RateLimiter.class, - Utils.contextFor(name, null, null), configuratorFor(RateLimiter.class, reconciler)); + RateLimiter.class, context, configuratorFor(RateLimiter.class, reconciler)); final var reconciliationInterval = annotation.maxReconciliationInterval(); long interval = -1; @@ -137,12 +138,9 @@ protected

ControllerConfiguration

configFor(Reconcile resourceClass, name, generationAware, associatedReconcilerClass, retry, rateLimiter, ResolvedControllerConfiguration.getMaxReconciliationInterval(interval, timeUnit), - Utils.instantiate(annotation.onAddFilter(), OnAddFilter.class, - Utils.contextFor(name, null, null)), - Utils.instantiate(annotation.onUpdateFilter(), OnUpdateFilter.class, - Utils.contextFor(name, null, null)), - Utils.instantiate(annotation.genericFilter(), GenericFilter.class, - Utils.contextFor(name, null, null)), + Utils.instantiate(annotation.onAddFilter(), OnAddFilter.class, context), + Utils.instantiate(annotation.onUpdateFilter(), OnUpdateFilter.class, context), + Utils.instantiate(annotation.genericFilter(), GenericFilter.class, context), Set.of(valueOrDefault(annotation, io.javaoperatorsdk.operator.api.reconciler.ControllerConfiguration::namespaces, DEFAULT_NAMESPACES_SET.toArray(String[]::new))), @@ -152,7 +150,8 @@ protected

ControllerConfiguration

configFor(Reconcile valueOrDefault(annotation, io.javaoperatorsdk.operator.api.reconciler.ControllerConfiguration::labelSelector, Constants.NO_VALUE_SET), - null); + null, + Utils.instantiate(annotation.itemStore(), ItemStore.class, context)); ResourceEventFilter

answer = deprecatedEventFilter(annotation); config.setEventFilter(answer != null ? answer : ResourceEventFilters.passthrough()); diff --git a/operator-framework-core/src/main/java/io/javaoperatorsdk/operator/api/config/ControllerConfigurationOverrider.java b/operator-framework-core/src/main/java/io/javaoperatorsdk/operator/api/config/ControllerConfigurationOverrider.java index 4a3efbc4a6..ab6b1d8e46 100644 --- a/operator-framework-core/src/main/java/io/javaoperatorsdk/operator/api/config/ControllerConfigurationOverrider.java +++ b/operator-framework-core/src/main/java/io/javaoperatorsdk/operator/api/config/ControllerConfigurationOverrider.java @@ -8,6 +8,7 @@ import java.util.Set; import io.fabric8.kubernetes.api.model.HasMetadata; +import io.fabric8.kubernetes.client.informers.cache.ItemStore; import io.javaoperatorsdk.operator.api.config.dependent.DependentResourceSpec; import io.javaoperatorsdk.operator.processing.event.rate.RateLimiter; import io.javaoperatorsdk.operator.processing.event.source.controller.ResourceEventFilter; @@ -36,6 +37,7 @@ public class ControllerConfigurationOverrider { private GenericFilter genericFilter; private RateLimiter rateLimiter; private Map configurations; + private ItemStore itemStore; private ControllerConfigurationOverrider(ControllerConfiguration original) { finalizer = original.getFinalizerName(); @@ -152,6 +154,11 @@ public ControllerConfigurationOverrider withGenericFilter(GenericFilter ge return this; } + public ControllerConfigurationOverrider withItemStore(ItemStore itemStore) { + this.itemStore = itemStore; + return this; + } + public ControllerConfigurationOverrider replacingNamedDependentResourceConfig(String name, Object dependentResourceConfig) { @@ -175,7 +182,7 @@ public ControllerConfiguration build() { generationAware, original.getAssociatedReconcilerClassName(), retry, rateLimiter, reconciliationMaxInterval, onAddFilter, onUpdateFilter, genericFilter, original.getDependentResources(), - namespaces, finalizer, labelSelector, configurations); + namespaces, finalizer, labelSelector, configurations, itemStore); overridden.setEventFilter(customResourcePredicate); return overridden; } diff --git a/operator-framework-core/src/main/java/io/javaoperatorsdk/operator/api/config/DefaultResourceConfiguration.java b/operator-framework-core/src/main/java/io/javaoperatorsdk/operator/api/config/DefaultResourceConfiguration.java index b85bd76e9a..a612f2d136 100644 --- a/operator-framework-core/src/main/java/io/javaoperatorsdk/operator/api/config/DefaultResourceConfiguration.java +++ b/operator-framework-core/src/main/java/io/javaoperatorsdk/operator/api/config/DefaultResourceConfiguration.java @@ -4,6 +4,7 @@ import java.util.Set; import io.fabric8.kubernetes.api.model.HasMetadata; +import io.fabric8.kubernetes.client.informers.cache.ItemStore; import io.javaoperatorsdk.operator.ReconcilerUtils; import io.javaoperatorsdk.operator.processing.event.source.filter.GenericFilter; import io.javaoperatorsdk.operator.processing.event.source.filter.OnAddFilter; @@ -19,10 +20,11 @@ public class DefaultResourceConfiguration private final GenericFilter genericFilter; private final String labelSelector; private final Set namespaces; + private final ItemStore itemStore; protected DefaultResourceConfiguration(Class resourceClass, Set namespaces, String labelSelector, OnAddFilter onAddFilter, - OnUpdateFilter onUpdateFilter, GenericFilter genericFilter) { + OnUpdateFilter onUpdateFilter, GenericFilter genericFilter, ItemStore itemStore) { this.resourceClass = resourceClass; this.resourceTypeName = ReconcilerUtils.getResourceTypeName(resourceClass); this.onAddFilter = onAddFilter; @@ -31,6 +33,7 @@ protected DefaultResourceConfiguration(Class resourceClass, this.namespaces = ResourceConfiguration.ensureValidNamespaces(namespaces); this.labelSelector = ResourceConfiguration.ensureValidLabelSelector(labelSelector); + this.itemStore = itemStore; } @Override @@ -66,4 +69,9 @@ public Optional> onUpdateFilter() { public Optional> genericFilter() { return Optional.ofNullable(genericFilter); } + + @Override + public Optional> getItemStore() { + return Optional.ofNullable(itemStore); + } } diff --git a/operator-framework-core/src/main/java/io/javaoperatorsdk/operator/api/config/ResolvedControllerConfiguration.java b/operator-framework-core/src/main/java/io/javaoperatorsdk/operator/api/config/ResolvedControllerConfiguration.java index 5513b43060..844724fa48 100644 --- a/operator-framework-core/src/main/java/io/javaoperatorsdk/operator/api/config/ResolvedControllerConfiguration.java +++ b/operator-framework-core/src/main/java/io/javaoperatorsdk/operator/api/config/ResolvedControllerConfiguration.java @@ -5,6 +5,7 @@ import java.util.concurrent.TimeUnit; import io.fabric8.kubernetes.api.model.HasMetadata; +import io.fabric8.kubernetes.client.informers.cache.ItemStore; import io.javaoperatorsdk.operator.api.config.dependent.DependentResourceConfigurationProvider; import io.javaoperatorsdk.operator.api.config.dependent.DependentResourceSpec; import io.javaoperatorsdk.operator.api.reconciler.Reconciler; @@ -29,6 +30,7 @@ public class ResolvedControllerConfiguration

private final Duration maxReconciliationInterval; private final String finalizer; private final Map configurations; + private final ItemStore

itemStore; private ResourceEventFilter

eventFilter; private List dependentResources; @@ -40,7 +42,8 @@ public ResolvedControllerConfiguration(Class

resourceClass, ControllerConfigu other.onAddFilter().orElse(null), other.onUpdateFilter().orElse(null), other.genericFilter().orElse(null), other.getDependentResources(), other.getNamespaces(), - other.getFinalizerName(), other.getLabelSelector(), Collections.emptyMap()); + other.getFinalizerName(), other.getLabelSelector(), Collections.emptyMap(), + other.getItemStore().orElse(null)); } public static Duration getMaxReconciliationInterval(long interval, TimeUnit timeUnit) { @@ -67,10 +70,10 @@ public ResolvedControllerConfiguration(Class

resourceClass, String name, GenericFilter

genericFilter, List dependentResources, Set namespaces, String finalizer, String labelSelector, - Map configurations) { + Map configurations, ItemStore

itemStore) { this(resourceClass, name, generationAware, associatedReconcilerClassName, retry, rateLimiter, maxReconciliationInterval, onAddFilter, onUpdateFilter, genericFilter, - namespaces, finalizer, labelSelector, configurations); + namespaces, finalizer, labelSelector, configurations, itemStore); setDependentResources(dependentResources); } @@ -79,8 +82,9 @@ protected ResolvedControllerConfiguration(Class

resourceClass, String name, RateLimiter rateLimiter, Duration maxReconciliationInterval, OnAddFilter

onAddFilter, OnUpdateFilter

onUpdateFilter, GenericFilter

genericFilter, Set namespaces, String finalizer, String labelSelector, - Map configurations) { - super(resourceClass, namespaces, labelSelector, onAddFilter, onUpdateFilter, genericFilter); + Map configurations, ItemStore

itemStore) { + super(resourceClass, namespaces, labelSelector, onAddFilter, onUpdateFilter, genericFilter, + itemStore); this.name = ControllerConfiguration.ensureValidName(name, associatedReconcilerClassName); this.generationAware = generationAware; this.associatedReconcilerClassName = associatedReconcilerClassName; @@ -88,7 +92,7 @@ protected ResolvedControllerConfiguration(Class

resourceClass, String name, this.rateLimiter = ensureRateLimiter(rateLimiter); this.maxReconciliationInterval = maxReconciliationInterval; this.configurations = configurations != null ? configurations : Collections.emptyMap(); - + this.itemStore = itemStore; this.finalizer = ControllerConfiguration.ensureValidFinalizerName(finalizer, getResourceTypeName()); } @@ -162,4 +166,9 @@ protected void setEventFilter(ResourceEventFilter

eventFilter) { public Object getConfigurationFor(DependentResourceSpec spec) { return configurations.get(spec); } + + @Override + public Optional> getItemStore() { + return Optional.ofNullable(itemStore); + } } diff --git a/operator-framework-core/src/main/java/io/javaoperatorsdk/operator/api/config/ResourceConfiguration.java b/operator-framework-core/src/main/java/io/javaoperatorsdk/operator/api/config/ResourceConfiguration.java index 81b7fdf4a7..d3a8379d46 100644 --- a/operator-framework-core/src/main/java/io/javaoperatorsdk/operator/api/config/ResourceConfiguration.java +++ b/operator-framework-core/src/main/java/io/javaoperatorsdk/operator/api/config/ResourceConfiguration.java @@ -6,6 +6,7 @@ import java.util.Set; import io.fabric8.kubernetes.api.model.HasMetadata; +import io.fabric8.kubernetes.client.informers.cache.ItemStore; import io.javaoperatorsdk.operator.OperatorException; import io.javaoperatorsdk.operator.ReconcilerUtils; import io.javaoperatorsdk.operator.api.reconciler.Constants; @@ -122,4 +123,23 @@ default Set getEffectiveNamespaces() { } return targetNamespaces; } + + /** + * Replaces the item store in informer. See underling + * method in fabric8 client informer implementation. + * + * The main goal, is to be able to use limited caches. + * + * See {@link io.javaoperatorsdk.operator.processing.event.source.cache.BoundedItemStore} and + * + * CaffeinBoundedCache + * + * @return Optional ItemStore implementation. If present this item store will be used inside the + * informers. + */ + default Optional> getItemStore() { + return Optional.empty(); + } } diff --git a/operator-framework-core/src/main/java/io/javaoperatorsdk/operator/api/config/Utils.java b/operator-framework-core/src/main/java/io/javaoperatorsdk/operator/api/config/Utils.java index bee5b96cbe..ec244f224c 100644 --- a/operator-framework-core/src/main/java/io/javaoperatorsdk/operator/api/config/Utils.java +++ b/operator-framework-core/src/main/java/io/javaoperatorsdk/operator/api/config/Utils.java @@ -203,9 +203,7 @@ public static T instantiateAndConfigureIfNeeded(Class targetCla } try { - final Constructor constructor = targetClass.getDeclaredConstructor(); - constructor.setAccessible(true); - final var instance = constructor.newInstance(); + final var instance = getConstructor(targetClass).newInstance(); if (configurator != null) { configurator.configure(instance); @@ -213,13 +211,25 @@ public static T instantiateAndConfigureIfNeeded(Class targetCla return instance; } catch (InstantiationException | IllegalAccessException | InvocationTargetException - | NoSuchMethodException e) { + | IllegalStateException e) { throw new OperatorException("Couldn't instantiate " + expectedType.getSimpleName() + " '" - + targetClass.getName() + "': you need to provide an accessible no-arg constructor." + + targetClass.getName() + "'." + (context != null ? " Context: " + context : ""), e); } } + public static Constructor getConstructor(Class targetClass) { + final Constructor constructor; + try { + constructor = targetClass.getDeclaredConstructor(); + } catch (NoSuchMethodException e) { + throw new IllegalStateException( + "Couldn't find a no-arg constructor for " + targetClass.getName(), e); + } + constructor.setAccessible(true); + return constructor; + } + public static T instantiate(Class toInstantiate, Class expectedType, String context) { return instantiateAndConfigureIfNeeded(toInstantiate, expectedType, context, null); @@ -237,6 +247,10 @@ public static String contextFor(ControllerConfiguration controllerConfigurati return contextFor(controllerConfiguration.getName(), dependentType, configurationAnnotation); } + public static String contextFor(String reconcilerName) { + return contextFor(reconcilerName, null, null); + } + @SuppressWarnings("rawtypes") public static String contextFor(String reconcilerName, Class dependentType, diff --git a/operator-framework-core/src/main/java/io/javaoperatorsdk/operator/api/config/informer/InformerConfiguration.java b/operator-framework-core/src/main/java/io/javaoperatorsdk/operator/api/config/informer/InformerConfiguration.java index c6ee847cb0..cafce36213 100644 --- a/operator-framework-core/src/main/java/io/javaoperatorsdk/operator/api/config/informer/InformerConfiguration.java +++ b/operator-framework-core/src/main/java/io/javaoperatorsdk/operator/api/config/informer/InformerConfiguration.java @@ -5,6 +5,7 @@ import java.util.Set; import io.fabric8.kubernetes.api.model.HasMetadata; +import io.fabric8.kubernetes.client.informers.cache.ItemStore; import io.javaoperatorsdk.operator.api.config.DefaultResourceConfiguration; import io.javaoperatorsdk.operator.api.config.ResourceConfiguration; import io.javaoperatorsdk.operator.api.config.Utils; @@ -38,8 +39,10 @@ protected DefaultInformerConfiguration(String labelSelector, OnAddFilter onAddFilter, OnUpdateFilter onUpdateFilter, OnDeleteFilter onDeleteFilter, - GenericFilter genericFilter) { - super(resourceClass, namespaces, labelSelector, onAddFilter, onUpdateFilter, genericFilter); + GenericFilter genericFilter, + ItemStore itemStore) { + super(resourceClass, namespaces, labelSelector, onAddFilter, onUpdateFilter, genericFilter, + itemStore); this.followControllerNamespaceChanges = followControllerNamespaceChanges; this.primaryToSecondaryMapper = primaryToSecondaryMapper; @@ -103,6 +106,7 @@ class InformerConfigurationBuilder { private OnDeleteFilter onDeleteFilter; private GenericFilter genericFilter; private boolean inheritControllerNamespacesOnChange = false; + private ItemStore itemStore; private InformerConfigurationBuilder(Class resourceClass) { this.resourceClass = resourceClass; @@ -203,12 +207,17 @@ public InformerConfigurationBuilder withGenericFilter(GenericFilter generi return this; } + public InformerConfigurationBuilder withItemStore(ItemStore itemStore) { + this.itemStore = itemStore; + return this; + } + public InformerConfiguration build() { return new DefaultInformerConfiguration<>(labelSelector, resourceClass, primaryToSecondaryMapper, secondaryToPrimaryMapper, namespaces, inheritControllerNamespacesOnChange, onAddFilter, onUpdateFilter, - onDeleteFilter, genericFilter); + onDeleteFilter, genericFilter, itemStore); } } diff --git a/operator-framework-core/src/main/java/io/javaoperatorsdk/operator/api/reconciler/ControllerConfiguration.java b/operator-framework-core/src/main/java/io/javaoperatorsdk/operator/api/reconciler/ControllerConfiguration.java index ec76adf89d..a4a4a78797 100644 --- a/operator-framework-core/src/main/java/io/javaoperatorsdk/operator/api/reconciler/ControllerConfiguration.java +++ b/operator-framework-core/src/main/java/io/javaoperatorsdk/operator/api/reconciler/ControllerConfiguration.java @@ -6,6 +6,7 @@ import java.lang.annotation.RetentionPolicy; import java.lang.annotation.Target; +import io.fabric8.kubernetes.client.informers.cache.ItemStore; import io.javaoperatorsdk.operator.api.reconciler.dependent.Dependent; import io.javaoperatorsdk.operator.processing.event.rate.LinearRateLimiter; import io.javaoperatorsdk.operator.processing.event.rate.RateLimiter; @@ -118,4 +119,6 @@ MaxReconciliationInterval maxReconciliationInterval() default @MaxReconciliation * accessible no-arg constructor. */ Class rateLimiter() default LinearRateLimiter.class; + + Class itemStore() default ItemStore.class; } diff --git a/operator-framework-core/src/main/java/io/javaoperatorsdk/operator/processing/event/source/cache/BoundedCache.java b/operator-framework-core/src/main/java/io/javaoperatorsdk/operator/processing/event/source/cache/BoundedCache.java new file mode 100644 index 0000000000..1651d44dc7 --- /dev/null +++ b/operator-framework-core/src/main/java/io/javaoperatorsdk/operator/processing/event/source/cache/BoundedCache.java @@ -0,0 +1,11 @@ +package io.javaoperatorsdk.operator.processing.event.source.cache; + +public interface BoundedCache { + + R get(K key); + + R remove(K key); + + void put(K key, R object); + +} diff --git a/operator-framework-core/src/main/java/io/javaoperatorsdk/operator/processing/event/source/cache/BoundedItemStore.java b/operator-framework-core/src/main/java/io/javaoperatorsdk/operator/processing/event/source/cache/BoundedItemStore.java new file mode 100644 index 0000000000..4f0fcad280 --- /dev/null +++ b/operator-framework-core/src/main/java/io/javaoperatorsdk/operator/processing/event/source/cache/BoundedItemStore.java @@ -0,0 +1,148 @@ +package io.javaoperatorsdk.operator.processing.event.source.cache; + +import java.lang.reflect.Constructor; +import java.lang.reflect.InvocationTargetException; +import java.util.Map; +import java.util.concurrent.ConcurrentHashMap; +import java.util.function.Function; +import java.util.stream.Stream; + +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import io.fabric8.kubernetes.api.model.HasMetadata; +import io.fabric8.kubernetes.api.model.ObjectMetaBuilder; +import io.fabric8.kubernetes.client.KubernetesClient; +import io.fabric8.kubernetes.client.informers.cache.Cache; +import io.fabric8.kubernetes.client.informers.cache.ItemStore; +import io.javaoperatorsdk.operator.api.config.Utils; + +public class BoundedItemStore + implements ItemStore { + + private static final Logger log = LoggerFactory.getLogger(BoundedItemStore.class); + + private final ResourceFetcher resourceFetcher; + private final BoundedCache cache; + private final Function keyFunction; + private final Map existingMinimalResources = new ConcurrentHashMap<>(); + private final Constructor resourceConstructor; + + public BoundedItemStore(BoundedCache cache, Class resourceClass, + KubernetesClient client) { + this(cache, resourceClass, namespaceKeyFunc(), + new KubernetesResourceFetcher<>(resourceClass, client)); + } + + public BoundedItemStore(BoundedCache cache, + Class resourceClass, + Function keyFunction, + ResourceFetcher resourceFetcher) { + this.resourceFetcher = resourceFetcher; + this.cache = cache; + this.keyFunction = keyFunction; + this.resourceConstructor = Utils.getConstructor(resourceClass); + } + + @Override + public String getKey(R obj) { + return keyFunction.apply(obj); + } + + @Override + public synchronized R put(String key, R obj) { + var result = existingMinimalResources.get(key); + cache.put(key, obj); + existingMinimalResources.put(key, createMinimalResource(obj)); + return result; + } + + private R createMinimalResource(R obj) { + try { + R minimal = resourceConstructor.newInstance(); + final var metadata = obj.getMetadata(); + minimal.setMetadata(new ObjectMetaBuilder() + .withName(metadata.getName()) + .withNamespace(metadata.getNamespace()) + .withResourceVersion(metadata.getResourceVersion()) + .build()); + return minimal; + } catch (InstantiationException | IllegalAccessException | InvocationTargetException e) { + throw new IllegalStateException(e); + } + } + + @Override + public synchronized R remove(String key) { + var fullValue = cache.remove(key); + var minimalValue = existingMinimalResources.remove(key); + return fullValue != null ? fullValue : minimalValue; + } + + @Override + public Stream keySet() { + return existingMinimalResources.keySet().stream(); + } + + @Override + public Stream values() { + return existingMinimalResources.values().stream(); + } + + @Override + public int size() { + return existingMinimalResources.size(); + } + + @Override + public R get(String key) { + var res = cache.get(key); + if (res != null) { + return res; + } + if (!existingMinimalResources.containsKey(key)) { + return null; + } else { + return refreshMissingStateFromServer(key); + } + } + + @Override + public boolean isFullState() { + return false; + } + + public static Function namespaceKeyFunc() { + return r -> Cache.namespaceKeyFunc(r.getMetadata().getNamespace(), r.getMetadata().getName()); + } + + protected R refreshMissingStateFromServer(String key) { + log.debug("Fetching resource from server for key: {}", key); + var newRes = resourceFetcher.fetchResource(key); + synchronized (this) { + log.debug("Fetched resource: {}", newRes); + var actual = cache.get(key); + if (newRes == null) { + // double-checking if actual, not received since. + // If received we just return. Since the resource from informer should be always leading, + // even if the fetched resource is null, this will be eventually received as an event. + if (actual == null) { + existingMinimalResources.remove(key); + return null; + } else { + return actual; + } + } + // Just want to put the fetched resource if there is still no resource published from + // different source. In case of informers actually multiple events might arrive, therefore non + // fetched resource should take always precedence. + if (actual == null) { + cache.put(key, newRes); + existingMinimalResources.put(key, createMinimalResource(newRes)); + return newRes; + } else { + return actual; + } + } + } +} diff --git a/operator-framework-core/src/main/java/io/javaoperatorsdk/operator/processing/event/source/cache/KubernetesResourceFetcher.java b/operator-framework-core/src/main/java/io/javaoperatorsdk/operator/processing/event/source/cache/KubernetesResourceFetcher.java new file mode 100644 index 0000000000..ad996afc36 --- /dev/null +++ b/operator-framework-core/src/main/java/io/javaoperatorsdk/operator/processing/event/source/cache/KubernetesResourceFetcher.java @@ -0,0 +1,47 @@ +package io.javaoperatorsdk.operator.processing.event.source.cache; + +import java.util.function.Function; + +import io.fabric8.kubernetes.api.model.HasMetadata; +import io.fabric8.kubernetes.client.KubernetesClient; +import io.javaoperatorsdk.operator.processing.event.ResourceID; + +public class KubernetesResourceFetcher + implements ResourceFetcher { + + private final Class rClass; + private final KubernetesClient client; + private final Function resourceIDFunction; + + public KubernetesResourceFetcher(Class rClass, KubernetesClient client) { + this(rClass, client, inverseNamespaceKeyFunction()); + } + + public KubernetesResourceFetcher(Class rClass, + KubernetesClient client, + Function resourceIDFunction) { + this.rClass = rClass; + this.client = client; + this.resourceIDFunction = resourceIDFunction; + } + + @Override + public R fetchResource(String key) { + var resourceId = resourceIDFunction.apply(key); + return resourceId.getNamespace().map(ns -> client.resources(rClass).inNamespace(ns) + .withName(resourceId.getName()).get()) + .orElse(client.resources(rClass).withName(resourceId.getName()).get()); + } + + public static Function inverseNamespaceKeyFunction() { + return s -> { + int delimiterIndex = s.indexOf("/"); + if (delimiterIndex == -1) { + return new ResourceID(s); + } else { + return new ResourceID(s.substring(delimiterIndex + 1), s.substring(0, delimiterIndex)); + } + }; + } + +} diff --git a/operator-framework-core/src/main/java/io/javaoperatorsdk/operator/processing/event/source/cache/ResourceFetcher.java b/operator-framework-core/src/main/java/io/javaoperatorsdk/operator/processing/event/source/cache/ResourceFetcher.java new file mode 100644 index 0000000000..9cc4fe7a35 --- /dev/null +++ b/operator-framework-core/src/main/java/io/javaoperatorsdk/operator/processing/event/source/cache/ResourceFetcher.java @@ -0,0 +1,7 @@ +package io.javaoperatorsdk.operator.processing.event.source.cache; + +public interface ResourceFetcher { + + R fetchResource(K key); + +} diff --git a/operator-framework-core/src/main/java/io/javaoperatorsdk/operator/processing/event/source/informer/InformerManager.java b/operator-framework-core/src/main/java/io/javaoperatorsdk/operator/processing/event/source/informer/InformerManager.java index 0a6b8327c5..db4ce49076 100644 --- a/operator-framework-core/src/main/java/io/javaoperatorsdk/operator/processing/event/source/informer/InformerManager.java +++ b/operator-framework-core/src/main/java/io/javaoperatorsdk/operator/processing/event/source/informer/InformerManager.java @@ -127,6 +127,7 @@ private InformerWrapper createEventSource( FilterWatchListDeletable, Resource> filteredBySelectorClient, ResourceEventHandler eventHandler, String namespaceIdentifier) { var informer = filteredBySelectorClient.runnableInformer(0); + configuration.getItemStore().ifPresent(informer::itemStore); var source = new InformerWrapper<>(informer, namespaceIdentifier); source.addEventHandler(eventHandler); diff --git a/operator-framework-core/src/test/java/io/javaoperatorsdk/operator/processing/event/source/cache/BoundedItemStoreTest.java b/operator-framework-core/src/test/java/io/javaoperatorsdk/operator/processing/event/source/cache/BoundedItemStoreTest.java new file mode 100644 index 0000000000..9381aedd0d --- /dev/null +++ b/operator-framework-core/src/test/java/io/javaoperatorsdk/operator/processing/event/source/cache/BoundedItemStoreTest.java @@ -0,0 +1,111 @@ +package io.javaoperatorsdk.operator.processing.event.source.cache; + +import org.junit.jupiter.api.BeforeEach; +import org.junit.jupiter.api.Test; + +import io.fabric8.kubernetes.api.model.HasMetadata; +import io.fabric8.kubernetes.client.informers.cache.Cache; +import io.javaoperatorsdk.operator.TestUtils; +import io.javaoperatorsdk.operator.sample.simple.TestCustomResource; + +import static io.javaoperatorsdk.operator.processing.event.source.cache.BoundedItemStore.namespaceKeyFunc; +import static org.assertj.core.api.Assertions.assertThat; +import static org.mockito.ArgumentMatchers.any; +import static org.mockito.Mockito.mock; +import static org.mockito.Mockito.never; +import static org.mockito.Mockito.times; +import static org.mockito.Mockito.verify; +import static org.mockito.Mockito.when; + +class BoundedItemStoreTest { + + private BoundedItemStore boundedItemStore; + @SuppressWarnings("unchecked") + private final BoundedCache boundedCache = mock(BoundedCache.class); + @SuppressWarnings("unchecked") + private final ResourceFetcher resourceFetcher = + mock(ResourceFetcher.class); + + @BeforeEach + void setup() { + boundedItemStore = new BoundedItemStore<>(boundedCache, + TestCustomResource.class, + namespaceKeyFunc(), + resourceFetcher); + } + + @Test + void shouldNotFetchResourcesFromServerIfNotKnown() { + var res = boundedItemStore.get(testRes1Key()); + + assertThat(res).isNull(); + verify(resourceFetcher, never()).fetchResource(any()); + } + + @Test + void getsResourceFromServerIfNotInCache() { + boundedItemStore.put(testRes1Key(), + TestUtils.testCustomResource1()); + when(resourceFetcher.fetchResource(testRes1Key())) + .thenReturn(TestUtils.testCustomResource1()); + + var res = boundedItemStore.get(testRes1Key()); + + assertThat(res).isNotNull(); + verify(resourceFetcher, times(1)).fetchResource(any()); + } + + @Test + void removesResourcesNotFoundOnServerFromStore() { + boundedItemStore.put(testRes1Key(), + TestUtils.testCustomResource1()); + when(resourceFetcher.fetchResource(testRes1Key())) + .thenReturn(null); + + var res = boundedItemStore.get(testRes1Key()); + + assertThat(res).isNull(); + assertThat(boundedItemStore.keySet()).isEmpty(); + } + + @Test + void removesResourceFromCache() { + boundedItemStore.put(testRes1Key(), + TestUtils.testCustomResource1()); + + boundedItemStore.remove(testRes1Key()); + + var res = boundedItemStore.get(testRes1Key()); + verify(resourceFetcher, never()).fetchResource(any()); + assertThat(res).isNull(); + assertThat(boundedItemStore.keySet()).isEmpty(); + } + + @Test + void readingKeySetDoesNotReadFromBoundedCache() { + boundedItemStore.put(testRes1Key(), + TestUtils.testCustomResource1()); + + boundedItemStore.keySet(); + + verify(boundedCache, never()).get(any()); + } + + @Test + void readingValuesDoesNotReadFromBoundedCache() { + boundedItemStore.put(testRes1Key(), + TestUtils.testCustomResource1()); + + boundedItemStore.values(); + + verify(boundedCache, never()).get(any()); + } + + String key(HasMetadata r) { + return Cache.namespaceKeyFunc(r.getMetadata().getNamespace(), r.getMetadata().getName()); + } + + String testRes1Key() { + return key(TestUtils.testCustomResource1()); + } +} diff --git a/operator-framework-core/src/test/java/io/javaoperatorsdk/operator/processing/event/source/cache/KubernetesResourceFetcherTest.java b/operator-framework-core/src/test/java/io/javaoperatorsdk/operator/processing/event/source/cache/KubernetesResourceFetcherTest.java new file mode 100644 index 0000000000..1158e00295 --- /dev/null +++ b/operator-framework-core/src/test/java/io/javaoperatorsdk/operator/processing/event/source/cache/KubernetesResourceFetcherTest.java @@ -0,0 +1,48 @@ +package io.javaoperatorsdk.operator.processing.event.source.cache; + +import org.junit.jupiter.api.Test; + +import io.fabric8.kubernetes.api.model.ConfigMap; +import io.fabric8.kubernetes.api.model.HasMetadata; +import io.fabric8.kubernetes.api.model.ObjectMetaBuilder; +import io.fabric8.kubernetes.api.model.apiextensions.v1.CustomResourceDefinition; + +import static org.assertj.core.api.Assertions.assertThat; + +class KubernetesResourceFetcherTest { + + public static final String DEFAULT_NAMESPACE = "default"; + public static final String TEST_RESOURCE_NAME = "test1"; + + @Test + void inverseKeyFunction() { + String key = BoundedItemStore.namespaceKeyFunc().apply(namespacedResource()); + var resourceID = KubernetesResourceFetcher.inverseNamespaceKeyFunction().apply(key); + + assertThat(resourceID.getNamespace()).isPresent().get().isEqualTo(DEFAULT_NAMESPACE); + assertThat(resourceID.getName()).isEqualTo(TEST_RESOURCE_NAME); + + key = BoundedItemStore.namespaceKeyFunc().apply(clusterScopedResource()); + resourceID = KubernetesResourceFetcher.inverseNamespaceKeyFunction().apply(key); + + assertThat(resourceID.getNamespace()).isEmpty(); + assertThat(resourceID.getName()).isEqualTo(TEST_RESOURCE_NAME); + } + + private HasMetadata namespacedResource() { + var cm = new ConfigMap(); + cm.setMetadata(new ObjectMetaBuilder() + .withName(TEST_RESOURCE_NAME) + .withNamespace(DEFAULT_NAMESPACE) + .build()); + return cm; + } + + private HasMetadata clusterScopedResource() { + var cm = new CustomResourceDefinition(); + cm.setMetadata(new ObjectMetaBuilder() + .withName(TEST_RESOURCE_NAME) + .build()); + return cm; + } +} diff --git a/pom.xml b/pom.xml index 4c088ed87d..bcc55dc646 100644 --- a/pom.xml +++ b/pom.xml @@ -72,6 +72,7 @@ 1.0 1.8.0 4.10.0 + 3.1.3 @@ -81,6 +82,7 @@ operator-framework micrometer-support sample-operators + caffeine-bounded-cache-support @@ -193,6 +195,11 @@ mockwebserver ${okhttp.version} + + com.github.ben-manes.caffeine + caffeine + ${caffein.version} +