diff --git a/docs/content/en/docs/documentation/reconciler.md b/docs/content/en/docs/documentation/reconciler.md index 26a2a20d61..b9ede8aa95 100644 --- a/docs/content/en/docs/documentation/reconciler.md +++ b/docs/content/en/docs/documentation/reconciler.md @@ -169,3 +169,117 @@ You can specify the name of the finalizer to use for your `Reconciler` using the annotation. If you do not specify a finalizer name, one will be automatically generated for you. From v5, by default, the finalizer is added using Server Side Apply. See also `UpdateControl` in docs. + +### Making sure the primary resource is up to date for the next reconciliation + +It is typical to want to update the status subresource with the information that is available during the reconciliation. +This is sometimes referred to as the last observed state. When the primary resource is updated, though, the framework +does not cache the resource directly, relying instead on the propagation of the update to the underlying informer's +cache. It can, therefore, happen that, if other events trigger other reconciliations before the informer cache gets +updated, your reconciler does not see the latest version of the primary resource. While this might not typically be a +problem in most cases, as caches eventually become consistent, depending on your reconciliation logic, you might still +require the latest status version possible, for example if the status subresource is used as a communication mechanism, +see [Representing Allocated Values](https://github.com/kubernetes/community/blob/master/contributors/devel/sig-architecture/api-conventions.md#representing-allocated-values) +from the Kubernetes docs for more details. + +The framework provides utilities to help with these use cases with +[`PrimaryUpdateAndCacheUtils`](https://github.com/operator-framework/java-operator-sdk/blob/main/operator-framework-core/src/main/java/io/javaoperatorsdk/operator/api/reconciler/PrimaryUpdateAndCacheUtils.java). +These utility methods come in two flavors: + +#### Using internal cache + +In almost all cases for this purpose, you can use internal caches: + +```java + @Override +public UpdateControl reconcile( + StatusPatchCacheCustomResource resource, Context context) { + + // omitted logic + + // update with SSA requires a fresh copy + var freshCopy = createFreshCopy(primary); + freshCopy.getStatus().setValue(statusWithState()); + + var updatedResource = PrimaryUpdateAndCacheUtils.ssaPatchAndCacheStatus(resource, freshCopy, context); + + return UpdateControl.noUpdate(); + } +``` + +In the background `PrimaryUpdateAndCacheUtils.ssaPatchAndCacheStatus` puts the result of the update into an internal +cache and will make sure that the next reconciliation will contain the most recent version of the resource. Note that it +is not necessarily the version of the resource you got as response from the update, it can be newer since other parties +can do additional updates meanwhile, but if not explicitly modified, it will contain the up-to-date status. + +See related integration test [here](https://github.com/operator-framework/java-operator-sdk/blob/main/operator-framework/src/test/java/io/javaoperatorsdk/operator/baseapi/statuscache/internal). + +This approach works with the default configuration of the framework and should be good to go in most of the cases. +Without going further into the details, this won't work if `ConfigurationService.parseResourceVersionsForEventFilteringAndCaching` +is set to `false` (more precisely there are some edge cases when it won't work). For that case framework provides the following solution: + +#### Fallback approach: using `PrimaryResourceCache` cache + +As an alternative, for very rare cases when `ConfigurationService.parseResourceVersionsForEventFilteringAndCaching` +needs to be set to `false` you can use an explicit caching approach: + +```java + +// We on purpose don't use the provided predicate to show what a custom one could look like. + private final PrimaryResourceCache cache = + new PrimaryResourceCache<>( + (statusPatchCacheCustomResourcePair, statusPatchCacheCustomResource) -> + statusPatchCacheCustomResource.getStatus().getValue() + >= statusPatchCacheCustomResourcePair.afterUpdate().getStatus().getValue()); + + @Override + public UpdateControl reconcile( + StatusPatchPrimaryCacheCustomResource primary, + Context context) { + + // cache will compare the current and the cached resource and return the more recent. (And evict the old) + primary = cache.getFreshResource(primary); + + // omitted logic + + var freshCopy = createFreshCopy(primary); + + freshCopy.getStatus().setValue(statusWithState()); + + var updated = + PrimaryUpdateAndCacheUtils.ssaPatchAndCacheStatus(primary, freshCopy, context, cache); + + return UpdateControl.noUpdate(); + } + + @Override + public DeleteControl cleanup( + StatusPatchPrimaryCacheCustomResource resource, + Context context) + throws Exception { + // cleanup the cache on resource deletion + cache.cleanup(resource); + return DeleteControl.defaultDelete(); + } + +``` + +[`PrimaryResourceCache`](https://github.com/operator-framework/java-operator-sdk/blob/main/operator-framework-core/src/main/java/io/javaoperatorsdk/operator/api/reconciler/support/PrimaryResourceCache.java) +is designed for this purpose. As shown in the example above, it is up to you to provide a predicate to determine if the +resource is more recent than the one available. In other words, when to evict the resource from the cache. Typically, as +shown in +the [integration test](https://github.com/operator-framework/java-operator-sdk/blob/main/operator-framework/src/test/java/io/javaoperatorsdk/operator/baseapi/statuscache/primarycache) +you can have a counter in status to check on that. + +Since all of this happens explicitly, you cannot use this approach for managed dependent resources and workflows and +will need to use the unmanaged approach instead. This is due to the fact that managed dependent resources always get +their associated primary resource from the underlying informer event source cache. + +#### Additional remarks + +As shown in the integration tests, there is no optimistic locking used when updating the +[resource](https://github.com/operator-framework/java-operator-sdk/blob/main/operator-framework/src/test/java/io/javaoperatorsdk/operator/baseapi/statuscache/internal/StatusPatchCacheReconciler.java#L41) +(in other words `metadata.resourceVersion` is set to `null`). This is desired since you don't want the patch to fail on +update. + +In addition, you can configure the [Fabric8 client retry](https://github.com/fabric8io/kubernetes-client?tab=readme-ov-file#configuring-the-client). diff --git a/operator-framework-core/src/main/java/io/javaoperatorsdk/operator/api/reconciler/PrimaryUpdateAndCacheUtils.java b/operator-framework-core/src/main/java/io/javaoperatorsdk/operator/api/reconciler/PrimaryUpdateAndCacheUtils.java new file mode 100644 index 0000000000..174f7667f6 --- /dev/null +++ b/operator-framework-core/src/main/java/io/javaoperatorsdk/operator/api/reconciler/PrimaryUpdateAndCacheUtils.java @@ -0,0 +1,225 @@ +package io.javaoperatorsdk.operator.api.reconciler; + +import java.util.function.Supplier; +import java.util.function.UnaryOperator; + +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import io.fabric8.kubernetes.api.model.HasMetadata; +import io.fabric8.kubernetes.client.dsl.base.PatchContext; +import io.fabric8.kubernetes.client.dsl.base.PatchType; +import io.javaoperatorsdk.operator.api.reconciler.support.PrimaryResourceCache; +import io.javaoperatorsdk.operator.processing.event.ResourceID; + +/** + * Utility methods to patch the primary resource state and store it to the related cache, to make + * sure that fresh resource is present for the next reconciliation. The main use case for such + * updates is to store state is resource status. Use of optimistic locking is not desired for such + * updates, since we don't want to patch fail and lose information that we want to store. + */ +public class PrimaryUpdateAndCacheUtils { + + private PrimaryUpdateAndCacheUtils() {} + + private static final Logger log = LoggerFactory.getLogger(PrimaryUpdateAndCacheUtils.class); + + /** + * Makes sure that the up-to-date primary resource will be present during the next reconciliation. + * Using update (PUT) method. + * + * @param primary resource + * @param context of reconciliation + * @return updated resource + * @param

primary resource type + */ + public static

P updateAndCacheStatus(P primary, Context

context) { + logWarnIfResourceVersionPresent(primary); + return patchAndCacheStatus( + primary, context, () -> context.getClient().resource(primary).updateStatus()); + } + + /** + * Makes sure that the up-to-date primary resource will be present during the next reconciliation. + * Using JSON Merge patch. + * + * @param primary resource + * @param context of reconciliation + * @return updated resource + * @param

primary resource type + */ + public static

P patchAndCacheStatus(P primary, Context

context) { + logWarnIfResourceVersionPresent(primary); + return patchAndCacheStatus( + primary, context, () -> context.getClient().resource(primary).patchStatus()); + } + + /** + * Makes sure that the up-to-date primary resource will be present during the next reconciliation. + * Using JSON Patch. + * + * @param primary resource + * @param context of reconciliation + * @return updated resource + * @param

primary resource type + */ + public static

P editAndCacheStatus( + P primary, Context

context, UnaryOperator

operation) { + logWarnIfResourceVersionPresent(primary); + return patchAndCacheStatus( + primary, context, () -> context.getClient().resource(primary).editStatus(operation)); + } + + /** + * Makes sure that the up-to-date primary resource will be present during the next reconciliation. + * + * @param primary resource + * @param context of reconciliation + * @param patch free implementation of cache + * @return the updated resource. + * @param

primary resource type + */ + public static

P patchAndCacheStatus( + P primary, Context

context, Supplier

patch) { + var updatedResource = patch.get(); + context + .eventSourceRetriever() + .getControllerEventSource() + .handleRecentResourceUpdate(ResourceID.fromResource(primary), updatedResource, primary); + return updatedResource; + } + + /** + * Makes sure that the up-to-date primary resource will be present during the next reconciliation. + * Using Server Side Apply. + * + * @param primary resource + * @param freshResourceWithStatus - fresh resource with target state + * @param context of reconciliation + * @return the updated resource. + * @param

primary resource type + */ + public static

P ssaPatchAndCacheStatus( + P primary, P freshResourceWithStatus, Context

context) { + logWarnIfResourceVersionPresent(freshResourceWithStatus); + var res = + context + .getClient() + .resource(freshResourceWithStatus) + .subresource("status") + .patch( + new PatchContext.Builder() + .withForce(true) + .withFieldManager(context.getControllerConfiguration().fieldManager()) + .withPatchType(PatchType.SERVER_SIDE_APPLY) + .build()); + + context + .eventSourceRetriever() + .getControllerEventSource() + .handleRecentResourceUpdate(ResourceID.fromResource(primary), res, primary); + return res; + } + + /** + * Patches the resource and adds it to the {@link PrimaryResourceCache}. + * + * @param primary resource + * @param freshResourceWithStatus - fresh resource with target state + * @param context of reconciliation + * @param cache - resource cache managed by user + * @return the updated resource. + * @param

primary resource type + */ + public static

P ssaPatchAndCacheStatus( + P primary, P freshResourceWithStatus, Context

context, PrimaryResourceCache

cache) { + logWarnIfResourceVersionPresent(freshResourceWithStatus); + return patchAndCacheStatus( + primary, + cache, + () -> + context + .getClient() + .resource(freshResourceWithStatus) + .subresource("status") + .patch( + new PatchContext.Builder() + .withForce(true) + .withFieldManager(context.getControllerConfiguration().fieldManager()) + .withPatchType(PatchType.SERVER_SIDE_APPLY) + .build())); + } + + /** + * Patches the resource with JSON Patch and adds it to the {@link PrimaryResourceCache}. + * + * @param primary resource + * @param context of reconciliation + * @param cache - resource cache managed by user + * @return the updated resource. + * @param

primary resource type + */ + public static

P editAndCacheStatus( + P primary, Context

context, PrimaryResourceCache

cache, UnaryOperator

operation) { + logWarnIfResourceVersionPresent(primary); + return patchAndCacheStatus( + primary, cache, () -> context.getClient().resource(primary).editStatus(operation)); + } + + /** + * Patches the resource with JSON Merge patch and adds it to the {@link PrimaryResourceCache} + * provided. + * + * @param primary resource + * @param context of reconciliation + * @param cache - resource cache managed by user + * @return the updated resource. + * @param

primary resource type + */ + public static

P patchAndCacheStatus( + P primary, Context

context, PrimaryResourceCache

cache) { + logWarnIfResourceVersionPresent(primary); + return patchAndCacheStatus( + primary, cache, () -> context.getClient().resource(primary).patchStatus()); + } + + /** + * Updates the resource and adds it to the {@link PrimaryResourceCache}. + * + * @param primary resource + * @param context of reconciliation + * @param cache - resource cache managed by user + * @return the updated resource. + * @param

primary resource type + */ + public static

P updateAndCacheStatus( + P primary, Context

context, PrimaryResourceCache

cache) { + logWarnIfResourceVersionPresent(primary); + return patchAndCacheStatus( + primary, cache, () -> context.getClient().resource(primary).updateStatus()); + } + + /** + * Updates the resource using the user provided implementation anc caches the result. + * + * @param primary resource + * @param cache resource cache managed by user + * @param patch implementation of resource update* + * @return the updated resource. + * @param

primary resource type + */ + public static

P patchAndCacheStatus( + P primary, PrimaryResourceCache

cache, Supplier

patch) { + var updatedResource = patch.get(); + cache.cacheResource(primary, updatedResource); + return updatedResource; + } + + private static

void logWarnIfResourceVersionPresent(P primary) { + if (primary.getMetadata().getResourceVersion() != null) { + log.warn( + "The metadata.resourceVersion of primary resource is NOT null, " + + "using optimistic locking is discouraged for this purpose. "); + } + } +} diff --git a/operator-framework-core/src/main/java/io/javaoperatorsdk/operator/api/reconciler/support/PrimaryResourceCache.java b/operator-framework-core/src/main/java/io/javaoperatorsdk/operator/api/reconciler/support/PrimaryResourceCache.java new file mode 100644 index 0000000000..4da73ab8b1 --- /dev/null +++ b/operator-framework-core/src/main/java/io/javaoperatorsdk/operator/api/reconciler/support/PrimaryResourceCache.java @@ -0,0 +1,65 @@ +package io.javaoperatorsdk.operator.api.reconciler.support; + +import java.util.concurrent.ConcurrentHashMap; +import java.util.function.BiPredicate; + +import io.fabric8.kubernetes.api.model.HasMetadata; +import io.javaoperatorsdk.operator.processing.event.ResourceID; + +public class PrimaryResourceCache

{ + + private final BiPredicate, P> evictionPredicate; + private final ConcurrentHashMap> cache = new ConcurrentHashMap<>(); + + public PrimaryResourceCache(BiPredicate, P> evictionPredicate) { + this.evictionPredicate = evictionPredicate; + } + + public PrimaryResourceCache() { + this(new ResourceVersionParsingEvictionPredicate<>()); + } + + public void cacheResource(P afterUpdate) { + var resourceId = ResourceID.fromResource(afterUpdate); + cache.put(resourceId, new Pair<>(null, afterUpdate)); + } + + public void cacheResource(P beforeUpdate, P afterUpdate) { + var resourceId = ResourceID.fromResource(beforeUpdate); + cache.put(resourceId, new Pair<>(beforeUpdate, afterUpdate)); + } + + public P getFreshResource(P newVersion) { + var resourceId = ResourceID.fromResource(newVersion); + var pair = cache.get(resourceId); + if (pair == null) { + return newVersion; + } + if (!newVersion.getMetadata().getUid().equals(pair.afterUpdate().getMetadata().getUid())) { + cache.remove(resourceId); + return newVersion; + } + if (evictionPredicate.test(pair, newVersion)) { + cache.remove(resourceId); + return newVersion; + } else { + return pair.afterUpdate(); + } + } + + public void cleanup(P resource) { + cache.remove(ResourceID.fromResource(resource)); + } + + public record Pair(T beforeUpdate, T afterUpdate) {} + + /** This works in general, but it does not strictly follow the contract with k8s API */ + public static class ResourceVersionParsingEvictionPredicate + implements BiPredicate, T> { + @Override + public boolean test(Pair updatePair, T newVersion) { + return Long.parseLong(updatePair.afterUpdate().getMetadata().getResourceVersion()) + <= Long.parseLong(newVersion.getMetadata().getResourceVersion()); + } + } +} diff --git a/operator-framework-core/src/main/java/io/javaoperatorsdk/operator/processing/event/EventSourceManager.java b/operator-framework-core/src/main/java/io/javaoperatorsdk/operator/processing/event/EventSourceManager.java index 02b91f6dd0..8b07bf110b 100644 --- a/operator-framework-core/src/main/java/io/javaoperatorsdk/operator/processing/event/EventSourceManager.java +++ b/operator-framework-core/src/main/java/io/javaoperatorsdk/operator/processing/event/EventSourceManager.java @@ -208,6 +208,7 @@ public Stream> getEventSourcesStream() { return eventSources.flatMappedSources(); } + @Override public ControllerEventSource

getControllerEventSource() { return eventSources.controllerEventSource(); } diff --git a/operator-framework-core/src/main/java/io/javaoperatorsdk/operator/processing/event/EventSourceRetriever.java b/operator-framework-core/src/main/java/io/javaoperatorsdk/operator/processing/event/EventSourceRetriever.java index 066a7f5808..c5a219a026 100644 --- a/operator-framework-core/src/main/java/io/javaoperatorsdk/operator/processing/event/EventSourceRetriever.java +++ b/operator-framework-core/src/main/java/io/javaoperatorsdk/operator/processing/event/EventSourceRetriever.java @@ -6,6 +6,7 @@ import io.fabric8.kubernetes.api.model.HasMetadata; import io.javaoperatorsdk.operator.api.reconciler.EventSourceContext; import io.javaoperatorsdk.operator.processing.event.source.EventSource; +import io.javaoperatorsdk.operator.processing.event.source.controller.ControllerEventSource; public interface EventSourceRetriever

{ @@ -17,6 +18,8 @@ default EventSource getEventSourceFor(Class dependentType) { List> getEventSourcesFor(Class dependentType); + ControllerEventSource

getControllerEventSource(); + /** * Registers (and starts) the specified {@link EventSource} dynamically during the reconciliation. * If an EventSource is already registered with the specified name, the registration will be diff --git a/operator-framework-core/src/main/java/io/javaoperatorsdk/operator/processing/event/source/informer/InformerEventSource.java b/operator-framework-core/src/main/java/io/javaoperatorsdk/operator/processing/event/source/informer/InformerEventSource.java index 688a88ae22..b52dc278f2 100644 --- a/operator-framework-core/src/main/java/io/javaoperatorsdk/operator/processing/event/source/informer/InformerEventSource.java +++ b/operator-framework-core/src/main/java/io/javaoperatorsdk/operator/processing/event/source/informer/InformerEventSource.java @@ -13,6 +13,7 @@ import io.fabric8.kubernetes.client.dsl.MixedOperation; import io.fabric8.kubernetes.client.informers.ResourceEventHandler; import io.javaoperatorsdk.operator.api.config.informer.InformerEventSourceConfiguration; +import io.javaoperatorsdk.operator.api.reconciler.Context; import io.javaoperatorsdk.operator.api.reconciler.EventSourceContext; import io.javaoperatorsdk.operator.processing.event.Event; import io.javaoperatorsdk.operator.processing.event.EventHandler; @@ -20,50 +21,45 @@ import io.javaoperatorsdk.operator.processing.event.source.PrimaryToSecondaryMapper; /** - * Wraps informer(s) so it is connected to the eventing system of the framework. Note that since - * it's it is built on top of Informers, it also support caching resources using caching from - * fabric8 client Informer caches and additional caches described below. + * Wraps informer(s) so they are connected to the eventing system of the framework. Note that since + * this is built on top of Fabric8 client Informers, it also supports caching resources using + * caching from informer caches as well as additional caches described below. * *

InformerEventSource also supports two features to better handle events and caching of - * resources on top of Informers from fabric8 Kubernetes client. These two features implementation - * wise are related to each other:
+ * resources on top of Informers from the Fabric8 Kubernetes client. These two features are related + * to each other as follows: * - *

1. API that allows to make sure the cache contains the fresh resource after an update. This is - * important for {@link io.javaoperatorsdk.operator.api.reconciler.dependent.DependentResource} and - * mainly for {@link - * io.javaoperatorsdk.operator.processing.dependent.kubernetes.KubernetesDependentResource} so after - * reconcile if getResource() called always return the fresh resource. To achieve this - * handleRecentResourceUpdate() and handleRecentResourceCreate() needs to be called explicitly after - * resource created/updated using the kubernetes client. (These calls are done automatically by - * KubernetesDependentResource implementation.). In the background this will store the new resource - * in a temporary cache {@link TemporaryResourceCache} which do additional checks. After a new event - * is received the cachec object is removed from this cache, since in general then it is already in - * the cache of informer.
+ *

    + *
  1. Ensuring the cache contains the fresh resource after an update. This is important for + * {@link io.javaoperatorsdk.operator.api.reconciler.dependent.DependentResource} and mainly + * for {@link + * io.javaoperatorsdk.operator.processing.dependent.kubernetes.KubernetesDependentResource} so + * that {@link + * io.javaoperatorsdk.operator.api.reconciler.dependent.DependentResource#getSecondaryResource(HasMetadata, + * Context)} always returns the latest version of the resource after a reconciliation. To + * achieve this {@link #handleRecentResourceUpdate(ResourceID, HasMetadata, HasMetadata)} and + * {@link #handleRecentResourceCreate(ResourceID, HasMetadata)} need to be called explicitly + * after a resource is created or updated using the kubernetes client. These calls are done + * automatically by the KubernetesDependentResource implementation. In the background this + * will store the new resource in a temporary cache {@link TemporaryResourceCache} which does + * additional checks. After a new event is received the cached object is removed from this + * cache, since it is then usually already in the informer cache. + *
  2. Avoiding unneeded reconciliations after resources are created or updated. This filters out + * events that are the results of updates and creates made by the controller itself because we + * typically don't want the associated informer to trigger an event causing a useless + * reconciliation (as the change originates from the reconciler itself). For the details see + * {@link #canSkipEvent(HasMetadata, HasMetadata, ResourceID)} and related usage. + *
* - *

2. Additional API is provided that is meant to be used with the combination of the previous - * one, and the goal is to filter out events that are the results of updates and creates made by the - * controller itself. For example if in reconciler a ConfigMaps is created, there should be an - * Informer in place to handle change events of that ConfigMap, but since it has bean created (or - * updated) by the reconciler this should not trigger an additional reconciliation by default. In - * order to achieve this prepareForCreateOrUpdateEventFiltering(..) method needs to be called before - * the operation of the k8s client. And the operation from point 1. after the k8s client call. See - * it's usage in CreateUpdateEventFilterTestReconciler integration test for the usage. (Again this - * is managed for the developer if using dependent resources.)
- * Roughly it works in a way that before the K8S API call is made, we set mark the resource ID, and - * from that point informer won't propagate events further just will start record them. After the - * client operation is done, it's checked and analysed what events were received and based on that - * it will propagate event or not and/or put the new resource into the temporal cache - so if the - * event not arrived yet about the update will be able to filter it in the future. - * - * @param resource type watching - * @param

type of the primary resource + * @param resource type being watched + * @param

type of the associated primary resource */ public class InformerEventSource extends ManagedInformerEventSource> implements ResourceEventHandler { - private static final Logger log = LoggerFactory.getLogger(InformerEventSource.class); public static final String PREVIOUS_ANNOTATION_KEY = "javaoperatorsdk.io/previous"; + private static final Logger log = LoggerFactory.getLogger(InformerEventSource.class); // we need direct control for the indexer to propagate the just update resource also to the index private final PrimaryToSecondaryIndex primaryToSecondaryIndex; private final PrimaryToSecondaryMapper

primaryToSecondaryMapper; diff --git a/operator-framework-core/src/main/java/io/javaoperatorsdk/operator/processing/event/source/informer/TemporaryResourceCache.java b/operator-framework-core/src/main/java/io/javaoperatorsdk/operator/processing/event/source/informer/TemporaryResourceCache.java index 247cdb9aa5..9ec5b3694c 100644 --- a/operator-framework-core/src/main/java/io/javaoperatorsdk/operator/processing/event/source/informer/TemporaryResourceCache.java +++ b/operator-framework-core/src/main/java/io/javaoperatorsdk/operator/processing/event/source/informer/TemporaryResourceCache.java @@ -9,7 +9,7 @@ import org.slf4j.LoggerFactory; import io.fabric8.kubernetes.api.model.HasMetadata; -import io.javaoperatorsdk.operator.api.config.informer.InformerEventSourceConfiguration; +import io.javaoperatorsdk.operator.api.config.ConfigurationService; import io.javaoperatorsdk.operator.processing.dependent.kubernetes.KubernetesDependentResource; import io.javaoperatorsdk.operator.processing.event.ResourceID; @@ -167,9 +167,9 @@ public synchronized boolean isKnownResourceVersion(T resource) { } /** - * @return true if {@link InformerEventSourceConfiguration#parseResourceVersions()} is enabled and - * the resourceVersion of newResource is numerically greater than cachedResource, otherwise - * false + * @return true if {@link ConfigurationService#parseResourceVersionsForEventFilteringAndCaching()} + * is enabled and the resourceVersion of newResource is numerically greater than + * cachedResource, otherwise false */ private boolean isLaterResourceVersion(ResourceID resourceId, T newResource, T cachedResource) { try { diff --git a/operator-framework-core/src/test/java/io/javaoperatorsdk/operator/api/reconciler/support/PrimaryResourceCacheTest.java b/operator-framework-core/src/test/java/io/javaoperatorsdk/operator/api/reconciler/support/PrimaryResourceCacheTest.java new file mode 100644 index 0000000000..58e3ce8a0a --- /dev/null +++ b/operator-framework-core/src/test/java/io/javaoperatorsdk/operator/api/reconciler/support/PrimaryResourceCacheTest.java @@ -0,0 +1,87 @@ +package io.javaoperatorsdk.operator.api.reconciler.support; + +import org.junit.jupiter.api.Test; + +import io.fabric8.kubernetes.api.model.ObjectMetaBuilder; +import io.javaoperatorsdk.operator.sample.simple.TestCustomResource; +import io.javaoperatorsdk.operator.sample.simple.TestCustomResourceSpec; + +import static org.assertj.core.api.Assertions.assertThat; + +class PrimaryResourceCacheTest { + + PrimaryResourceCache versionParsingCache = + new PrimaryResourceCache<>( + new PrimaryResourceCache.ResourceVersionParsingEvictionPredicate<>()); + + @Test + void returnsThePassedValueIfCacheIsEmpty() { + var cr = customResource("1"); + + var res = versionParsingCache.getFreshResource(cr); + + assertThat(cr).isSameAs(res); + } + + @Test + void returnsTheCachedIfNotEvictedAccordingToPredicate() { + var cr = customResource("2"); + + versionParsingCache.cacheResource(cr); + + var res = versionParsingCache.getFreshResource(customResource("1")); + assertThat(cr).isSameAs(res); + } + + @Test + void ifMoreFreshPassedCachedIsEvicted() { + var cr = customResource("2"); + versionParsingCache.cacheResource(cr); + var newCR = customResource("3"); + + var res = versionParsingCache.getFreshResource(newCR); + var resOnOlder = versionParsingCache.getFreshResource(cr); + + assertThat(newCR).isSameAs(res); + assertThat(resOnOlder).isSameAs(cr); + assertThat(newCR).isNotSameAs(cr); + } + + @Test + void cleanupRemovesCachedResources() { + var cr = customResource("2"); + versionParsingCache.cacheResource(cr); + + versionParsingCache.cleanup(customResource("3")); + + var olderCR = customResource("1"); + var res = versionParsingCache.getFreshResource(olderCR); + assertThat(olderCR).isSameAs(res); + } + + @Test + void removesIfNewResourceWithDifferentUid() { + var cr = customResource("2"); + versionParsingCache.cacheResource(cr); + var crWithDifferentUid = customResource("1"); + cr.getMetadata().setUid("otheruid"); + + var res = versionParsingCache.getFreshResource(crWithDifferentUid); + + assertThat(res).isSameAs(crWithDifferentUid); + } + + private TestCustomResource customResource(String resourceVersion) { + var cr = new TestCustomResource(); + cr.setMetadata( + new ObjectMetaBuilder() + .withName("test1") + .withNamespace("default") + .withUid("uid") + .withResourceVersion(resourceVersion) + .build()); + cr.setSpec(new TestCustomResourceSpec()); + cr.getSpec().setKey("key"); + return cr; + } +} diff --git a/operator-framework-core/src/test/java/io/javaoperatorsdk/operator/processing/event/source/informer/TemporaryResourceCacheTest.java b/operator-framework-core/src/test/java/io/javaoperatorsdk/operator/processing/event/source/informer/TemporaryPrimaryResourceCacheTest.java similarity index 99% rename from operator-framework-core/src/test/java/io/javaoperatorsdk/operator/processing/event/source/informer/TemporaryResourceCacheTest.java rename to operator-framework-core/src/test/java/io/javaoperatorsdk/operator/processing/event/source/informer/TemporaryPrimaryResourceCacheTest.java index d31408beb6..e62888832f 100644 --- a/operator-framework-core/src/test/java/io/javaoperatorsdk/operator/processing/event/source/informer/TemporaryResourceCacheTest.java +++ b/operator-framework-core/src/test/java/io/javaoperatorsdk/operator/processing/event/source/informer/TemporaryPrimaryResourceCacheTest.java @@ -19,7 +19,7 @@ import static org.mockito.Mockito.mock; import static org.mockito.Mockito.when; -class TemporaryResourceCacheTest { +class TemporaryPrimaryResourceCacheTest { public static final String RESOURCE_VERSION = "2"; diff --git a/operator-framework/src/test/java/io/javaoperatorsdk/operator/baseapi/statuscache/PeriodicTriggerEventSource.java b/operator-framework/src/test/java/io/javaoperatorsdk/operator/baseapi/statuscache/PeriodicTriggerEventSource.java new file mode 100644 index 0000000000..366777409a --- /dev/null +++ b/operator-framework/src/test/java/io/javaoperatorsdk/operator/baseapi/statuscache/PeriodicTriggerEventSource.java @@ -0,0 +1,52 @@ +package io.javaoperatorsdk.operator.baseapi.statuscache; + +import java.util.Set; +import java.util.Timer; +import java.util.TimerTask; + +import io.fabric8.kubernetes.api.model.HasMetadata; +import io.javaoperatorsdk.operator.OperatorException; +import io.javaoperatorsdk.operator.processing.event.Event; +import io.javaoperatorsdk.operator.processing.event.ResourceID; +import io.javaoperatorsdk.operator.processing.event.source.AbstractEventSource; +import io.javaoperatorsdk.operator.processing.event.source.IndexerResourceCache; + +public class PeriodicTriggerEventSource

+ extends AbstractEventSource { + + public static final int DEFAULT_PERIOD = 30; + private final Timer timer = new Timer(); + private final IndexerResourceCache

primaryCache; + private final int period; + + public PeriodicTriggerEventSource(IndexerResourceCache

primaryCache) { + this(primaryCache, DEFAULT_PERIOD); + } + + public PeriodicTriggerEventSource(IndexerResourceCache

primaryCache, int period) { + super(Void.class); + this.primaryCache = primaryCache; + this.period = period; + } + + @Override + public Set getSecondaryResources(P primary) { + return Set.of(); + } + + @Override + public void start() throws OperatorException { + super.start(); + timer.schedule( + new TimerTask() { + @Override + public void run() { + primaryCache + .list() + .forEach(r -> getEventHandler().handleEvent(new Event(ResourceID.fromResource(r)))); + } + }, + 0, + period); + } +} diff --git a/operator-framework/src/test/java/io/javaoperatorsdk/operator/baseapi/statuscache/internal/StatusPatchCacheCustomResource.java b/operator-framework/src/test/java/io/javaoperatorsdk/operator/baseapi/statuscache/internal/StatusPatchCacheCustomResource.java new file mode 100644 index 0000000000..2a2d8b83fd --- /dev/null +++ b/operator-framework/src/test/java/io/javaoperatorsdk/operator/baseapi/statuscache/internal/StatusPatchCacheCustomResource.java @@ -0,0 +1,13 @@ +package io.javaoperatorsdk.operator.baseapi.statuscache.internal; + +import io.fabric8.kubernetes.api.model.Namespaced; +import io.fabric8.kubernetes.client.CustomResource; +import io.fabric8.kubernetes.model.annotation.Group; +import io.fabric8.kubernetes.model.annotation.ShortNames; +import io.fabric8.kubernetes.model.annotation.Version; + +@Group("sample.javaoperatorsdk") +@Version("v1") +@ShortNames("spcl") +public class StatusPatchCacheCustomResource + extends CustomResource implements Namespaced {} diff --git a/operator-framework/src/test/java/io/javaoperatorsdk/operator/baseapi/statuscache/internal/StatusPatchCacheIT.java b/operator-framework/src/test/java/io/javaoperatorsdk/operator/baseapi/statuscache/internal/StatusPatchCacheIT.java new file mode 100644 index 0000000000..f78511f250 --- /dev/null +++ b/operator-framework/src/test/java/io/javaoperatorsdk/operator/baseapi/statuscache/internal/StatusPatchCacheIT.java @@ -0,0 +1,48 @@ +package io.javaoperatorsdk.operator.baseapi.statuscache.internal; + +import java.time.Duration; + +import org.junit.jupiter.api.Test; +import org.junit.jupiter.api.extension.RegisterExtension; + +import io.fabric8.kubernetes.api.model.ObjectMetaBuilder; +import io.javaoperatorsdk.operator.junit.LocallyRunOperatorExtension; + +import static org.assertj.core.api.Assertions.assertThat; +import static org.awaitility.Awaitility.await; + +public class StatusPatchCacheIT { + + public static final String TEST_1 = "test1"; + + @RegisterExtension + LocallyRunOperatorExtension extension = + LocallyRunOperatorExtension.builder() + .withReconciler(StatusPatchCacheReconciler.class) + .build(); + + @Test + void testStatusAlwaysUpToDate() { + var reconciler = extension.getReconcilerOfType(StatusPatchCacheReconciler.class); + + extension.create(testResource()); + + // the reconciliation is periodically triggered, the status values should be increasing + // monotonically + await() + .pollDelay(Duration.ofSeconds(1)) + .pollInterval(Duration.ofMillis(30)) + .untilAsserted( + () -> { + assertThat(reconciler.errorPresent).isFalse(); + assertThat(reconciler.latestValue).isGreaterThan(10); + }); + } + + StatusPatchCacheCustomResource testResource() { + var res = new StatusPatchCacheCustomResource(); + res.setMetadata(new ObjectMetaBuilder().withName(TEST_1).build()); + res.setSpec(new StatusPatchCacheSpec()); + return res; + } +} diff --git a/operator-framework/src/test/java/io/javaoperatorsdk/operator/baseapi/statuscache/internal/StatusPatchCacheReconciler.java b/operator-framework/src/test/java/io/javaoperatorsdk/operator/baseapi/statuscache/internal/StatusPatchCacheReconciler.java new file mode 100644 index 0000000000..8a3a72a901 --- /dev/null +++ b/operator-framework/src/test/java/io/javaoperatorsdk/operator/baseapi/statuscache/internal/StatusPatchCacheReconciler.java @@ -0,0 +1,64 @@ +package io.javaoperatorsdk.operator.baseapi.statuscache.internal; + +import java.util.List; + +import io.fabric8.kubernetes.api.model.ObjectMetaBuilder; +import io.javaoperatorsdk.operator.api.reconciler.Context; +import io.javaoperatorsdk.operator.api.reconciler.ControllerConfiguration; +import io.javaoperatorsdk.operator.api.reconciler.EventSourceContext; +import io.javaoperatorsdk.operator.api.reconciler.PrimaryUpdateAndCacheUtils; +import io.javaoperatorsdk.operator.api.reconciler.Reconciler; +import io.javaoperatorsdk.operator.api.reconciler.UpdateControl; +import io.javaoperatorsdk.operator.baseapi.statuscache.PeriodicTriggerEventSource; +import io.javaoperatorsdk.operator.processing.event.source.EventSource; + +@ControllerConfiguration +public class StatusPatchCacheReconciler implements Reconciler { + + public volatile int latestValue = 0; + public volatile boolean errorPresent = false; + + @Override + public UpdateControl reconcile( + StatusPatchCacheCustomResource resource, Context context) { + + if (resource.getStatus() != null && resource.getStatus().getValue() != latestValue) { + errorPresent = true; + throw new IllegalStateException( + "status is not up to date. Latest value: " + + latestValue + + " status values: " + + resource.getStatus().getValue()); + } + + var freshCopy = createFreshCopy(resource); + + freshCopy + .getStatus() + .setValue(resource.getStatus() == null ? 1 : resource.getStatus().getValue() + 1); + + var updated = PrimaryUpdateAndCacheUtils.ssaPatchAndCacheStatus(resource, freshCopy, context); + latestValue = updated.getStatus().getValue(); + + return UpdateControl.noUpdate(); + } + + @Override + public List> prepareEventSources( + EventSourceContext context) { + // periodic event triggering for testing purposes + return List.of(new PeriodicTriggerEventSource<>(context.getPrimaryCache())); + } + + private StatusPatchCacheCustomResource createFreshCopy(StatusPatchCacheCustomResource resource) { + var res = new StatusPatchCacheCustomResource(); + res.setMetadata( + new ObjectMetaBuilder() + .withName(resource.getMetadata().getName()) + .withNamespace(resource.getMetadata().getNamespace()) + .build()); + res.setStatus(new StatusPatchCacheStatus()); + + return res; + } +} diff --git a/operator-framework/src/test/java/io/javaoperatorsdk/operator/baseapi/statuscache/internal/StatusPatchCacheSpec.java b/operator-framework/src/test/java/io/javaoperatorsdk/operator/baseapi/statuscache/internal/StatusPatchCacheSpec.java new file mode 100644 index 0000000000..d1426fd943 --- /dev/null +++ b/operator-framework/src/test/java/io/javaoperatorsdk/operator/baseapi/statuscache/internal/StatusPatchCacheSpec.java @@ -0,0 +1,14 @@ +package io.javaoperatorsdk.operator.baseapi.statuscache.internal; + +public class StatusPatchCacheSpec { + + private int counter = 0; + + public int getCounter() { + return counter; + } + + public void setCounter(int counter) { + this.counter = counter; + } +} diff --git a/operator-framework/src/test/java/io/javaoperatorsdk/operator/baseapi/statuscache/internal/StatusPatchCacheStatus.java b/operator-framework/src/test/java/io/javaoperatorsdk/operator/baseapi/statuscache/internal/StatusPatchCacheStatus.java new file mode 100644 index 0000000000..00bc4b6f04 --- /dev/null +++ b/operator-framework/src/test/java/io/javaoperatorsdk/operator/baseapi/statuscache/internal/StatusPatchCacheStatus.java @@ -0,0 +1,15 @@ +package io.javaoperatorsdk.operator.baseapi.statuscache.internal; + +public class StatusPatchCacheStatus { + + private Integer value = 0; + + public Integer getValue() { + return value; + } + + public StatusPatchCacheStatus setValue(Integer value) { + this.value = value; + return this; + } +} diff --git a/operator-framework/src/test/java/io/javaoperatorsdk/operator/baseapi/statuscache/primarycache/StatusPatchPrimaryCacheCustomResource.java b/operator-framework/src/test/java/io/javaoperatorsdk/operator/baseapi/statuscache/primarycache/StatusPatchPrimaryCacheCustomResource.java new file mode 100644 index 0000000000..84b145cac3 --- /dev/null +++ b/operator-framework/src/test/java/io/javaoperatorsdk/operator/baseapi/statuscache/primarycache/StatusPatchPrimaryCacheCustomResource.java @@ -0,0 +1,14 @@ +package io.javaoperatorsdk.operator.baseapi.statuscache.primarycache; + +import io.fabric8.kubernetes.api.model.Namespaced; +import io.fabric8.kubernetes.client.CustomResource; +import io.fabric8.kubernetes.model.annotation.Group; +import io.fabric8.kubernetes.model.annotation.ShortNames; +import io.fabric8.kubernetes.model.annotation.Version; + +@Group("sample.javaoperatorsdk") +@Version("v1") +@ShortNames("spc") +public class StatusPatchPrimaryCacheCustomResource + extends CustomResource + implements Namespaced {} diff --git a/operator-framework/src/test/java/io/javaoperatorsdk/operator/baseapi/statuscache/primarycache/StatusPatchPrimaryCacheIT.java b/operator-framework/src/test/java/io/javaoperatorsdk/operator/baseapi/statuscache/primarycache/StatusPatchPrimaryCacheIT.java new file mode 100644 index 0000000000..a884ec0758 --- /dev/null +++ b/operator-framework/src/test/java/io/javaoperatorsdk/operator/baseapi/statuscache/primarycache/StatusPatchPrimaryCacheIT.java @@ -0,0 +1,48 @@ +package io.javaoperatorsdk.operator.baseapi.statuscache.primarycache; + +import java.time.Duration; + +import org.junit.jupiter.api.Test; +import org.junit.jupiter.api.extension.RegisterExtension; + +import io.fabric8.kubernetes.api.model.ObjectMetaBuilder; +import io.javaoperatorsdk.operator.junit.LocallyRunOperatorExtension; + +import static org.assertj.core.api.Assertions.assertThat; +import static org.awaitility.Awaitility.await; + +class StatusPatchPrimaryCacheIT { + + public static final String TEST_1 = "test1"; + + @RegisterExtension + LocallyRunOperatorExtension extension = + LocallyRunOperatorExtension.builder() + .withReconciler(StatusPatchPrimaryCacheReconciler.class) + .build(); + + @Test + void testStatusAlwaysUpToDate() { + var reconciler = extension.getReconcilerOfType(StatusPatchPrimaryCacheReconciler.class); + + extension.create(testResource()); + + // the reconciliation is periodically triggered, the status values should be increasing + // monotonically + await() + .pollDelay(Duration.ofSeconds(1)) + .pollInterval(Duration.ofMillis(30)) + .untilAsserted( + () -> { + assertThat(reconciler.errorPresent).isFalse(); + assertThat(reconciler.latestValue).isGreaterThan(10); + }); + } + + StatusPatchPrimaryCacheCustomResource testResource() { + var res = new StatusPatchPrimaryCacheCustomResource(); + res.setMetadata(new ObjectMetaBuilder().withName(TEST_1).build()); + res.setSpec(new StatusPatchPrimaryCacheSpec()); + return res; + } +} diff --git a/operator-framework/src/test/java/io/javaoperatorsdk/operator/baseapi/statuscache/primarycache/StatusPatchPrimaryCacheReconciler.java b/operator-framework/src/test/java/io/javaoperatorsdk/operator/baseapi/statuscache/primarycache/StatusPatchPrimaryCacheReconciler.java new file mode 100644 index 0000000000..c25fcddfec --- /dev/null +++ b/operator-framework/src/test/java/io/javaoperatorsdk/operator/baseapi/statuscache/primarycache/StatusPatchPrimaryCacheReconciler.java @@ -0,0 +1,89 @@ +package io.javaoperatorsdk.operator.baseapi.statuscache.primarycache; + +import java.util.List; + +import io.fabric8.kubernetes.api.model.ObjectMetaBuilder; +import io.javaoperatorsdk.operator.api.reconciler.Cleaner; +import io.javaoperatorsdk.operator.api.reconciler.Context; +import io.javaoperatorsdk.operator.api.reconciler.ControllerConfiguration; +import io.javaoperatorsdk.operator.api.reconciler.DeleteControl; +import io.javaoperatorsdk.operator.api.reconciler.EventSourceContext; +import io.javaoperatorsdk.operator.api.reconciler.PrimaryUpdateAndCacheUtils; +import io.javaoperatorsdk.operator.api.reconciler.Reconciler; +import io.javaoperatorsdk.operator.api.reconciler.UpdateControl; +import io.javaoperatorsdk.operator.api.reconciler.support.PrimaryResourceCache; +import io.javaoperatorsdk.operator.baseapi.statuscache.PeriodicTriggerEventSource; +import io.javaoperatorsdk.operator.processing.event.source.EventSource; + +@ControllerConfiguration +public class StatusPatchPrimaryCacheReconciler + implements Reconciler, + Cleaner { + + public volatile int latestValue = 0; + public volatile boolean errorPresent = false; + + // We on purpose don't use the provided predicate to show what a custom one could look like. + private final PrimaryResourceCache cache = + new PrimaryResourceCache<>( + (statusPatchCacheCustomResourcePair, statusPatchCacheCustomResource) -> + statusPatchCacheCustomResource.getStatus().getValue() + >= statusPatchCacheCustomResourcePair.afterUpdate().getStatus().getValue()); + + @Override + public UpdateControl reconcile( + StatusPatchPrimaryCacheCustomResource primary, + Context context) { + + primary = cache.getFreshResource(primary); + + if (primary.getStatus() != null && primary.getStatus().getValue() != latestValue) { + errorPresent = true; + throw new IllegalStateException( + "status is not up to date. Latest value: " + + latestValue + + " status values: " + + primary.getStatus().getValue()); + } + + var freshCopy = createFreshCopy(primary); + freshCopy + .getStatus() + .setValue(primary.getStatus() == null ? 1 : primary.getStatus().getValue() + 1); + + var updated = + PrimaryUpdateAndCacheUtils.ssaPatchAndCacheStatus(primary, freshCopy, context, cache); + latestValue = updated.getStatus().getValue(); + + return UpdateControl.noUpdate(); + } + + @Override + public List> prepareEventSources( + EventSourceContext context) { + // periodic event triggering for testing purposes + return List.of(new PeriodicTriggerEventSource<>(context.getPrimaryCache())); + } + + private StatusPatchPrimaryCacheCustomResource createFreshCopy( + StatusPatchPrimaryCacheCustomResource resource) { + var res = new StatusPatchPrimaryCacheCustomResource(); + res.setMetadata( + new ObjectMetaBuilder() + .withName(resource.getMetadata().getName()) + .withNamespace(resource.getMetadata().getNamespace()) + .build()); + res.setStatus(new StatusPatchPrimaryCacheStatus()); + + return res; + } + + @Override + public DeleteControl cleanup( + StatusPatchPrimaryCacheCustomResource resource, + Context context) + throws Exception { + cache.cleanup(resource); + return DeleteControl.defaultDelete(); + } +} diff --git a/operator-framework/src/test/java/io/javaoperatorsdk/operator/baseapi/statuscache/primarycache/StatusPatchPrimaryCacheSpec.java b/operator-framework/src/test/java/io/javaoperatorsdk/operator/baseapi/statuscache/primarycache/StatusPatchPrimaryCacheSpec.java new file mode 100644 index 0000000000..90630c1ae8 --- /dev/null +++ b/operator-framework/src/test/java/io/javaoperatorsdk/operator/baseapi/statuscache/primarycache/StatusPatchPrimaryCacheSpec.java @@ -0,0 +1,15 @@ +package io.javaoperatorsdk.operator.baseapi.statuscache.primarycache; + +public class StatusPatchPrimaryCacheSpec { + + private boolean messageInStatus = true; + + public boolean isMessageInStatus() { + return messageInStatus; + } + + public StatusPatchPrimaryCacheSpec setMessageInStatus(boolean messageInStatus) { + this.messageInStatus = messageInStatus; + return this; + } +} diff --git a/operator-framework/src/test/java/io/javaoperatorsdk/operator/baseapi/statuscache/primarycache/StatusPatchPrimaryCacheStatus.java b/operator-framework/src/test/java/io/javaoperatorsdk/operator/baseapi/statuscache/primarycache/StatusPatchPrimaryCacheStatus.java new file mode 100644 index 0000000000..0687d5576a --- /dev/null +++ b/operator-framework/src/test/java/io/javaoperatorsdk/operator/baseapi/statuscache/primarycache/StatusPatchPrimaryCacheStatus.java @@ -0,0 +1,15 @@ +package io.javaoperatorsdk.operator.baseapi.statuscache.primarycache; + +public class StatusPatchPrimaryCacheStatus { + + private Integer value = 0; + + public Integer getValue() { + return value; + } + + public StatusPatchPrimaryCacheStatus setValue(Integer value) { + this.value = value; + return this; + } +}