diff --git a/docs/content/en/docs/documentation/reconciler.md b/docs/content/en/docs/documentation/reconciler.md index b9ede8aa95..91bac16a59 100644 --- a/docs/content/en/docs/documentation/reconciler.md +++ b/docs/content/en/docs/documentation/reconciler.md @@ -175,20 +175,23 @@ From v5, by default, the finalizer is added using Server Side Apply. See also `U It is typical to want to update the status subresource with the information that is available during the reconciliation. This is sometimes referred to as the last observed state. When the primary resource is updated, though, the framework does not cache the resource directly, relying instead on the propagation of the update to the underlying informer's -cache. It can, therefore, happen that, if other events trigger other reconciliations before the informer cache gets +cache. It can, therefore, happen that, if other events trigger other reconciliations, before the informer cache gets updated, your reconciler does not see the latest version of the primary resource. While this might not typically be a problem in most cases, as caches eventually become consistent, depending on your reconciliation logic, you might still -require the latest status version possible, for example if the status subresource is used as a communication mechanism, -see [Representing Allocated Values](https://github.com/kubernetes/community/blob/master/contributors/devel/sig-architecture/api-conventions.md#representing-allocated-values) +require the latest status version possible, for example, if the status subresource is used to store allocated values. +See [Representing Allocated Values](https://github.com/kubernetes/community/blob/master/contributors/devel/sig-architecture/api-conventions.md#representing-allocated-values) from the Kubernetes docs for more details. The framework provides utilities to help with these use cases with [`PrimaryUpdateAndCacheUtils`](https://github.com/operator-framework/java-operator-sdk/blob/main/operator-framework-core/src/main/java/io/javaoperatorsdk/operator/api/reconciler/PrimaryUpdateAndCacheUtils.java). -These utility methods come in two flavors: +These utility methods come in multiple flavors: #### Using internal cache -In almost all cases for this purpose, you can use internal caches: +In almost all cases for this purpose, you can use internal caches in combination with update methods that use +optimistic locking (end with *WithLock(...)). If the update method fails on optimistic locking, it will retry +using a fresh resource from the server as base for modification. Again, this is the default option and will probably +work for you. ```java @Override @@ -201,27 +204,32 @@ public UpdateControl reconcile( var freshCopy = createFreshCopy(primary); freshCopy.getStatus().setValue(statusWithState()); - var updatedResource = PrimaryUpdateAndCacheUtils.ssaPatchAndCacheStatus(resource, freshCopy, context); + var updatedResource = PrimaryUpdateAndCacheUtils.ssaPatchStatusAndCacheResourceWithLock(resource, freshCopy, context); return UpdateControl.noUpdate(); } ``` -In the background `PrimaryUpdateAndCacheUtils.ssaPatchAndCacheStatus` puts the result of the update into an internal -cache and will make sure that the next reconciliation will contain the most recent version of the resource. Note that it -is not necessarily the version of the resource you got as response from the update, it can be newer since other parties +After the update `PrimaryUpdateAndCacheUtils.ssaPatchStatusAndCacheResourceWithLock` puts the result of the update into an internal +cache and the framework will make sure that the next reconciliation contains the most recent version of the resource. +Note that it is not necessarily the version of the resource you got as response from the update, it can be newer since other parties can do additional updates meanwhile, but if not explicitly modified, it will contain the up-to-date status. -See related integration test [here](https://github.com/operator-framework/java-operator-sdk/blob/main/operator-framework/src/test/java/io/javaoperatorsdk/operator/baseapi/statuscache/internal). +See related integration test [here](https://github.com/operator-framework/java-operator-sdk/blob/main/operator-framework/src/test/java/io/javaoperatorsdk/operator/baseapi/statuscache/internalwithlock). This approach works with the default configuration of the framework and should be good to go in most of the cases. -Without going further into the details, this won't work if `ConfigurationService.parseResourceVersionsForEventFilteringAndCaching` -is set to `false` (more precisely there are some edge cases when it won't work). For that case framework provides the following solution: + +Without going further into the details, a bit more experimental way we provide overloaded methods without optimistic locking, +to use those you have to set `ConfigurationService.parseResourceVersionsForEventFilteringAndCaching` +to `true`. This in practice would mean that request won't fail on optimistic locking, but requires bending a bit +the rules regarding Kubernetes API contract. This might be needed only if you have multiple resources frequently +writing the resource. #### Fallback approach: using `PrimaryResourceCache` cache -As an alternative, for very rare cases when `ConfigurationService.parseResourceVersionsForEventFilteringAndCaching` -needs to be set to `false` you can use an explicit caching approach: +For the sake of completeness, we also provide a more explicit approach to manage the cache yourself. +This approach has the advantage that you don't have to do neither optimistic locking nor +setting the `parseResourceVersionsForEventFilteringAndCaching` to `true`: ```java @@ -247,7 +255,7 @@ needs to be set to `false` you can use an explicit caching approach: freshCopy.getStatus().setValue(statusWithState()); var updated = - PrimaryUpdateAndCacheUtils.ssaPatchAndCacheStatus(primary, freshCopy, context, cache); + PrimaryUpdateAndCacheUtils.ssaPatchStatusAndCacheResource(primary, freshCopy, context, cache); return UpdateControl.noUpdate(); } @@ -277,9 +285,7 @@ their associated primary resource from the underlying informer event source cach #### Additional remarks -As shown in the integration tests, there is no optimistic locking used when updating the +As shown in the last two cases, there is no optimistic locking used when updating the [resource](https://github.com/operator-framework/java-operator-sdk/blob/main/operator-framework/src/test/java/io/javaoperatorsdk/operator/baseapi/statuscache/internal/StatusPatchCacheReconciler.java#L41) -(in other words `metadata.resourceVersion` is set to `null`). This is desired since you don't want the patch to fail on -update. - -In addition, you can configure the [Fabric8 client retry](https://github.com/fabric8io/kubernetes-client?tab=readme-ov-file#configuring-the-client). +(in other words `metadata.resourceVersion` is set to `null`). This has nice property the request will be successful. +However, it might be desirable to configure retry on [Fabric8 client](https://github.com/fabric8io/kubernetes-client?tab=readme-ov-file#configuring-the-client). diff --git a/operator-framework-core/src/main/java/io/javaoperatorsdk/operator/api/reconciler/PrimaryUpdateAndCacheUtils.java b/operator-framework-core/src/main/java/io/javaoperatorsdk/operator/api/reconciler/PrimaryUpdateAndCacheUtils.java index 174f7667f6..2d95b296dc 100644 --- a/operator-framework-core/src/main/java/io/javaoperatorsdk/operator/api/reconciler/PrimaryUpdateAndCacheUtils.java +++ b/operator-framework-core/src/main/java/io/javaoperatorsdk/operator/api/reconciler/PrimaryUpdateAndCacheUtils.java @@ -7,71 +7,163 @@ import org.slf4j.LoggerFactory; import io.fabric8.kubernetes.api.model.HasMetadata; +import io.fabric8.kubernetes.client.KubernetesClientException; import io.fabric8.kubernetes.client.dsl.base.PatchContext; import io.fabric8.kubernetes.client.dsl.base.PatchType; +import io.javaoperatorsdk.operator.OperatorException; +import io.javaoperatorsdk.operator.api.config.ConfigurationService; import io.javaoperatorsdk.operator.api.reconciler.support.PrimaryResourceCache; import io.javaoperatorsdk.operator.processing.event.ResourceID; /** * Utility methods to patch the primary resource state and store it to the related cache, to make * sure that fresh resource is present for the next reconciliation. The main use case for such - * updates is to store state is resource status. Use of optimistic locking is not desired for such - * updates, since we don't want to patch fail and lose information that we want to store. + * updates is to store state is resource status. We aim here for completeness and provide you all + * various options, where all of them have pros and cons. + * + * */ public class PrimaryUpdateAndCacheUtils { + public static final int DEFAULT_MAX_RETRY = 10; + private PrimaryUpdateAndCacheUtils() {} private static final Logger log = LoggerFactory.getLogger(PrimaryUpdateAndCacheUtils.class); /** - * Makes sure that the up-to-date primary resource will be present during the next reconciliation. - * Using update (PUT) method. + * Updates the status with optimistic locking and caches the result for next reconciliation. For + * details see {@link #updateAndCacheResourceWithLock}. + */ + public static

P updateStatusAndCacheResourceWithLock( + P primary, Context

context, UnaryOperator

modificationFunction) { + return updateAndCacheResourceWithLock( + primary, + context, + modificationFunction, + r -> context.getClient().resource(r).updateStatus()); + } + + /** + * Patches the status using JSON Merge Patch with optimistic locking and caches the result for + * next reconciliation. For details see {@link #updateAndCacheResourceWithLock}. + */ + public static

P patchStatusAndCacheResourceWithLock( + P primary, Context

context, UnaryOperator

modificationFunction) { + return updateAndCacheResourceWithLock( + primary, context, modificationFunction, r -> context.getClient().resource(r).patchStatus()); + } + + /** + * Patches the status using JSON Patch with optimistic locking and caches the result for next + * reconciliation. For details see {@link #updateAndCacheResourceWithLock}. + */ + public static

P editStatusAndCacheResourceWithLock( + P primary, Context

context, UnaryOperator

modificationFunction) { + return updateAndCacheResourceWithLock( + primary, + context, + UnaryOperator.identity(), + r -> context.getClient().resource(r).editStatus(modificationFunction)); + } + + /** + * Patches the status using Server Side Apply with optimistic locking and caches the result for + * next reconciliation. For details see {@link #updateAndCacheResourceWithLock}. + */ + public static

P ssaPatchStatusAndCacheResourceWithLock( + P primary, P freshResourceWithStatus, Context

context) { + return updateAndCacheResourceWithLock( + primary, + context, + r -> freshResourceWithStatus, + r -> + context + .getClient() + .resource(r) + .subresource("status") + .patch( + new PatchContext.Builder() + .withForce(true) + .withFieldManager(context.getControllerConfiguration().fieldManager()) + .withPatchType(PatchType.SERVER_SIDE_APPLY) + .build())); + } + + /** + * Updates status and makes sure that the up-to-date primary resource will be present during the + * next reconciliation. Using update (PUT) method. * * @param primary resource * @param context of reconciliation * @return updated resource * @param

primary resource type */ - public static

P updateAndCacheStatus(P primary, Context

context) { - logWarnIfResourceVersionPresent(primary); - return patchAndCacheStatus( + public static

P updateStatusAndCacheResource( + P primary, Context

context) { + checkResourceVersionNotPresentAndParseConfiguration(primary, context); + return patchStatusAndCacheResource( primary, context, () -> context.getClient().resource(primary).updateStatus()); } /** - * Makes sure that the up-to-date primary resource will be present during the next reconciliation. - * Using JSON Merge patch. + * Patches status with and makes sure that the up-to-date primary resource will be present during + * the next reconciliation. Using JSON Merge patch. * * @param primary resource * @param context of reconciliation * @return updated resource * @param

primary resource type */ - public static

P patchAndCacheStatus(P primary, Context

context) { - logWarnIfResourceVersionPresent(primary); - return patchAndCacheStatus( + public static

P patchStatusAndCacheResource( + P primary, Context

context) { + checkResourceVersionNotPresentAndParseConfiguration(primary, context); + return patchStatusAndCacheResource( primary, context, () -> context.getClient().resource(primary).patchStatus()); } /** - * Makes sure that the up-to-date primary resource will be present during the next reconciliation. - * Using JSON Patch. + * Patches status and makes sure that the up-to-date primary resource will be present during the + * next reconciliation. Using JSON Patch. + * + *

Note that since optimistic locking is not used, there is a risk that JSON Patch will have + * concurrency issues when removing an element from a list. Since, the list element in JSON Patch + * are addressed by index, so if a concurrent request removes an element with lower index, the + * request might be not valid anymore (HTTP 422) or might remove an unmeant element. * * @param primary resource * @param context of reconciliation * @return updated resource * @param

primary resource type */ - public static

P editAndCacheStatus( + public static

P editStatusAndCacheResource( P primary, Context

context, UnaryOperator

operation) { - logWarnIfResourceVersionPresent(primary); - return patchAndCacheStatus( + checkResourceVersionNotPresentAndParseConfiguration(primary, context); + return patchStatusAndCacheResource( primary, context, () -> context.getClient().resource(primary).editStatus(operation)); } /** - * Makes sure that the up-to-date primary resource will be present during the next reconciliation. + * Patches the resource with supplied method and makes sure that the up-to-date primary resource + * will be present during the next reconciliation. * * @param primary resource * @param context of reconciliation @@ -79,7 +171,7 @@ public static

P editAndCacheStatus( * @return the updated resource. * @param

primary resource type */ - public static

P patchAndCacheStatus( + public static

P patchStatusAndCacheResource( P primary, Context

context, Supplier

patch) { var updatedResource = patch.get(); context @@ -90,8 +182,8 @@ public static

P patchAndCacheStatus( } /** - * Makes sure that the up-to-date primary resource will be present during the next reconciliation. - * Using Server Side Apply. + * Patches status and makes sure that the up-to-date primary resource will be present during the + * next reconciliation. Using Server Side Apply. * * @param primary resource * @param freshResourceWithStatus - fresh resource with target state @@ -99,30 +191,28 @@ public static

P patchAndCacheStatus( * @return the updated resource. * @param

primary resource type */ - public static

P ssaPatchAndCacheStatus( + public static

P ssaPatchStatusAndCacheResource( P primary, P freshResourceWithStatus, Context

context) { - logWarnIfResourceVersionPresent(freshResourceWithStatus); - var res = - context - .getClient() - .resource(freshResourceWithStatus) - .subresource("status") - .patch( - new PatchContext.Builder() - .withForce(true) - .withFieldManager(context.getControllerConfiguration().fieldManager()) - .withPatchType(PatchType.SERVER_SIDE_APPLY) - .build()); - - context - .eventSourceRetriever() - .getControllerEventSource() - .handleRecentResourceUpdate(ResourceID.fromResource(primary), res, primary); - return res; + checkResourceVersionNotPresentAndParseConfiguration(freshResourceWithStatus, context); + return patchStatusAndCacheResource( + primary, + context, + () -> + context + .getClient() + .resource(freshResourceWithStatus) + .subresource("status") + .patch( + new PatchContext.Builder() + .withForce(true) + .withFieldManager(context.getControllerConfiguration().fieldManager()) + .withPatchType(PatchType.SERVER_SIDE_APPLY) + .build())); } /** - * Patches the resource and adds it to the {@link PrimaryResourceCache}. + * Patches the resource status and caches the response in provided {@link PrimaryResourceCache}. + * Uses Server Side Apply. * * @param primary resource * @param freshResourceWithStatus - fresh resource with target state @@ -131,10 +221,10 @@ public static

P ssaPatchAndCacheStatus( * @return the updated resource. * @param

primary resource type */ - public static

P ssaPatchAndCacheStatus( + public static

P ssaPatchStatusAndCacheResource( P primary, P freshResourceWithStatus, Context

context, PrimaryResourceCache

cache) { - logWarnIfResourceVersionPresent(freshResourceWithStatus); - return patchAndCacheStatus( + checkResourceVersionIsNotPresent(freshResourceWithStatus); + return patchStatusAndCacheResource( primary, cache, () -> @@ -151,7 +241,8 @@ public static

P ssaPatchAndCacheStatus( } /** - * Patches the resource with JSON Patch and adds it to the {@link PrimaryResourceCache}. + * Patches the resource with JSON Patch and caches the response in provided {@link + * PrimaryResourceCache}. * * @param primary resource * @param context of reconciliation @@ -159,16 +250,16 @@ public static

P ssaPatchAndCacheStatus( * @return the updated resource. * @param

primary resource type */ - public static

P editAndCacheStatus( + public static

P editStatusAndCacheResource( P primary, Context

context, PrimaryResourceCache

cache, UnaryOperator

operation) { - logWarnIfResourceVersionPresent(primary); - return patchAndCacheStatus( + checkResourceVersionIsNotPresent(primary); + return patchStatusAndCacheResource( primary, cache, () -> context.getClient().resource(primary).editStatus(operation)); } /** - * Patches the resource with JSON Merge patch and adds it to the {@link PrimaryResourceCache} - * provided. + * Patches the resource status with JSON Merge patch and caches the response in provided {@link + * PrimaryResourceCache} * * @param primary resource * @param context of reconciliation @@ -176,15 +267,15 @@ public static

P editAndCacheStatus( * @return the updated resource. * @param

primary resource type */ - public static

P patchAndCacheStatus( + public static

P patchStatusAndCacheResource( P primary, Context

context, PrimaryResourceCache

cache) { - logWarnIfResourceVersionPresent(primary); - return patchAndCacheStatus( + checkResourceVersionIsNotPresent(primary); + return patchStatusAndCacheResource( primary, cache, () -> context.getClient().resource(primary).patchStatus()); } /** - * Updates the resource and adds it to the {@link PrimaryResourceCache}. + * Updates the resource status and caches the response in provided {@link PrimaryResourceCache}. * * @param primary resource * @param context of reconciliation @@ -192,15 +283,16 @@ public static

P patchAndCacheStatus( * @return the updated resource. * @param

primary resource type */ - public static

P updateAndCacheStatus( + public static

P updateStatusAndCacheResource( P primary, Context

context, PrimaryResourceCache

cache) { - logWarnIfResourceVersionPresent(primary); - return patchAndCacheStatus( + checkResourceVersionIsNotPresent(primary); + return patchStatusAndCacheResource( primary, cache, () -> context.getClient().resource(primary).updateStatus()); } /** - * Updates the resource using the user provided implementation anc caches the result. + * Updates the resource using the user provided implementation and caches the response in provided + * {@link PrimaryResourceCache}. * * @param primary resource * @param cache resource cache managed by user @@ -208,18 +300,133 @@ public static

P updateAndCacheStatus( * @return the updated resource. * @param

primary resource type */ - public static

P patchAndCacheStatus( + public static

P patchStatusAndCacheResource( P primary, PrimaryResourceCache

cache, Supplier

patch) { var updatedResource = patch.get(); cache.cacheResource(primary, updatedResource); return updatedResource; } - private static

void logWarnIfResourceVersionPresent(P primary) { + private static

void checkResourceVersionIsNotPresent(P primary) { if (primary.getMetadata().getResourceVersion() != null) { - log.warn( - "The metadata.resourceVersion of primary resource is NOT null, " - + "using optimistic locking is discouraged for this purpose. "); + throw new IllegalArgumentException("Resource version is present"); + } + } + + private static

void checkResourceVersionNotPresentAndParseConfiguration( + P primary, Context

context) { + checkResourceVersionIsNotPresent(primary); + if (!context + .getControllerConfiguration() + .getConfigurationService() + .parseResourceVersionsForEventFilteringAndCaching()) { + throw new OperatorException( + "For internal primary resource caching 'parseResourceVersionsForEventFilteringAndCaching'" + + " must be allowed."); + } + } + + /** + * Modifies the primary using modificationFunction, then uses the modified resource for the + * request to update with provided update method. But before the update operation sets the + * resourceVersion to the modified resource from the primary resource, so there is always + * optimistic locking happening. If the request fails on optimistic update, we read the resource + * again from the K8S API server and retry the whole process. In short, we make sure we always + * update the resource with optimistic locking, after we cache the resource in internal cache. + * Without further going into the details, the optimistic locking is needed so we can reliably + * handle the caching. + * + * @param primary original resource to update + * @param context of reconciliation + * @param modificationFunction modifications to make on primary + * @param updateMethod the update method implementation + * @return updated resource + * @param

primary type + */ + public static

P updateAndCacheResourceWithLock( + P primary, + Context

context, + UnaryOperator

modificationFunction, + UnaryOperator

updateMethod) { + return updateAndCacheResourceWithLock( + primary, context, modificationFunction, updateMethod, DEFAULT_MAX_RETRY); + } + + /** + * Modifies the primary using modificationFunction, then uses the modified resource for the + * request to update with provided update method. But before the update operation sets the + * resourceVersion to the modified resource from the primary resource, so there is always + * optimistic locking happening. If the request fails on optimistic update, we read the resource + * again from the K8S API server and retry the whole process. In short, we make sure we always + * update the resource with optimistic locking, after we cache the resource in internal cache. + * Without further going into the details, the optimistic locking is needed so we can reliably + * handle the caching. + * + * @param primary original resource to update + * @param context of reconciliation + * @param modificationFunction modifications to make on primary + * @param updateMethod the update method implementation + * @param maxRetry - maximum number of retries of conflicts + * @return updated resource + * @param

primary type + */ + @SuppressWarnings("unchecked") + public static

P updateAndCacheResourceWithLock( + P primary, + Context

context, + UnaryOperator

modificationFunction, + UnaryOperator

updateMethod, + int maxRetry) { + + if (log.isDebugEnabled()) { + log.debug("Conflict retrying update for: {}", ResourceID.fromResource(primary)); + } + P modified = null; + int retryIndex = 0; + while (true) { + try { + modified = modificationFunction.apply(primary); + modified.getMetadata().setResourceVersion(primary.getMetadata().getResourceVersion()); + var updated = updateMethod.apply(modified); + context + .eventSourceRetriever() + .getControllerEventSource() + .handleRecentResourceUpdate(ResourceID.fromResource(primary), updated, primary); + return updated; + } catch (KubernetesClientException e) { + log.trace("Exception during patch for resource: {}", primary); + retryIndex++; + // only retry on conflict (409) and unprocessable content (422) which + // can happen if JSON Patch is not a valid request since there was + // a concurrent request which already removed another finalizer: + // List element removal from a list is by index in JSON Patch + // so if addressing a second finalizer but first is meanwhile removed + // it is a wrong request. + if (e.getCode() != 409 && e.getCode() != 422) { + throw e; + } + if (retryIndex >= maxRetry) { + log.warn("Retry exhausted, last desired resource: {}", modified); + throw new OperatorException( + "Exceeded maximum (" + + maxRetry + + ") retry attempts to patch resource: " + + ResourceID.fromResource(primary)); + } + log.debug( + "Retrying patch for resource name: {}, namespace: {}; HTTP code: {}", + primary.getMetadata().getName(), + primary.getMetadata().getNamespace(), + e.getCode()); + primary = + (P) + context + .getClient() + .resources(primary.getClass()) + .inNamespace(primary.getMetadata().getNamespace()) + .withName(primary.getMetadata().getName()) + .get(); + } } } } diff --git a/operator-framework/src/test/java/io/javaoperatorsdk/operator/baseapi/statuscache/internal/StatusPatchCacheIT.java b/operator-framework/src/test/java/io/javaoperatorsdk/operator/baseapi/statuscache/internal/StatusPatchCacheIT.java index f78511f250..98948e9848 100644 --- a/operator-framework/src/test/java/io/javaoperatorsdk/operator/baseapi/statuscache/internal/StatusPatchCacheIT.java +++ b/operator-framework/src/test/java/io/javaoperatorsdk/operator/baseapi/statuscache/internal/StatusPatchCacheIT.java @@ -18,6 +18,7 @@ public class StatusPatchCacheIT { @RegisterExtension LocallyRunOperatorExtension extension = LocallyRunOperatorExtension.builder() + .withConfigurationService(o -> o.withParseResourceVersions(true)) .withReconciler(StatusPatchCacheReconciler.class) .build(); diff --git a/operator-framework/src/test/java/io/javaoperatorsdk/operator/baseapi/statuscache/internal/StatusPatchCacheReconciler.java b/operator-framework/src/test/java/io/javaoperatorsdk/operator/baseapi/statuscache/internal/StatusPatchCacheReconciler.java index 8a3a72a901..99a94796a5 100644 --- a/operator-framework/src/test/java/io/javaoperatorsdk/operator/baseapi/statuscache/internal/StatusPatchCacheReconciler.java +++ b/operator-framework/src/test/java/io/javaoperatorsdk/operator/baseapi/statuscache/internal/StatusPatchCacheReconciler.java @@ -31,13 +31,18 @@ public UpdateControl reconcile( + resource.getStatus().getValue()); } + // test also resource update happening meanwhile reconciliation + resource.getSpec().setCounter(resource.getSpec().getCounter() + 1); + context.getClient().resource(resource).update(); + var freshCopy = createFreshCopy(resource); freshCopy .getStatus() .setValue(resource.getStatus() == null ? 1 : resource.getStatus().getValue() + 1); - var updated = PrimaryUpdateAndCacheUtils.ssaPatchAndCacheStatus(resource, freshCopy, context); + var updated = + PrimaryUpdateAndCacheUtils.ssaPatchStatusAndCacheResource(resource, freshCopy, context); latestValue = updated.getStatus().getValue(); return UpdateControl.noUpdate(); diff --git a/operator-framework/src/test/java/io/javaoperatorsdk/operator/baseapi/statuscache/internalwithlock/StatusPatchCacheWithLockCustomResource.java b/operator-framework/src/test/java/io/javaoperatorsdk/operator/baseapi/statuscache/internalwithlock/StatusPatchCacheWithLockCustomResource.java new file mode 100644 index 0000000000..2f02381a68 --- /dev/null +++ b/operator-framework/src/test/java/io/javaoperatorsdk/operator/baseapi/statuscache/internalwithlock/StatusPatchCacheWithLockCustomResource.java @@ -0,0 +1,14 @@ +package io.javaoperatorsdk.operator.baseapi.statuscache.internalwithlock; + +import io.fabric8.kubernetes.api.model.Namespaced; +import io.fabric8.kubernetes.client.CustomResource; +import io.fabric8.kubernetes.model.annotation.Group; +import io.fabric8.kubernetes.model.annotation.ShortNames; +import io.fabric8.kubernetes.model.annotation.Version; + +@Group("sample.javaoperatorsdk") +@Version("v1") +@ShortNames("spwl") +public class StatusPatchCacheWithLockCustomResource + extends CustomResource + implements Namespaced {} diff --git a/operator-framework/src/test/java/io/javaoperatorsdk/operator/baseapi/statuscache/internalwithlock/StatusPatchCacheWithLockIT.java b/operator-framework/src/test/java/io/javaoperatorsdk/operator/baseapi/statuscache/internalwithlock/StatusPatchCacheWithLockIT.java new file mode 100644 index 0000000000..0f6a581660 --- /dev/null +++ b/operator-framework/src/test/java/io/javaoperatorsdk/operator/baseapi/statuscache/internalwithlock/StatusPatchCacheWithLockIT.java @@ -0,0 +1,48 @@ +package io.javaoperatorsdk.operator.baseapi.statuscache.internalwithlock; + +import java.time.Duration; + +import org.junit.jupiter.api.Test; +import org.junit.jupiter.api.extension.RegisterExtension; + +import io.fabric8.kubernetes.api.model.ObjectMetaBuilder; +import io.javaoperatorsdk.operator.junit.LocallyRunOperatorExtension; + +import static org.assertj.core.api.Assertions.assertThat; +import static org.awaitility.Awaitility.await; + +public class StatusPatchCacheWithLockIT { + + public static final String TEST_1 = "test1"; + + @RegisterExtension + LocallyRunOperatorExtension extension = + LocallyRunOperatorExtension.builder() + .withReconciler(StatusPatchCacheWithLockReconciler.class) + .build(); + + @Test + void testStatusAlwaysUpToDate() { + var reconciler = extension.getReconcilerOfType(StatusPatchCacheWithLockReconciler.class); + + extension.create(testResource()); + + // the reconciliation is periodically triggered, the status values should be increasing + // monotonically + await() + .pollDelay(Duration.ofSeconds(1)) + .pollInterval(Duration.ofMillis(30)) + .untilAsserted( + () -> { + assertThat(reconciler.errorPresent).isFalse(); + assertThat(reconciler.latestValue).isGreaterThan(10); + }); + } + + StatusPatchCacheWithLockCustomResource testResource() { + var res = new StatusPatchCacheWithLockCustomResource(); + res.setMetadata(new ObjectMetaBuilder().withName(TEST_1).build()); + res.setSpec(new StatusPatchCacheWithLockSpec()); + return res; + } +} diff --git a/operator-framework/src/test/java/io/javaoperatorsdk/operator/baseapi/statuscache/internalwithlock/StatusPatchCacheWithLockReconciler.java b/operator-framework/src/test/java/io/javaoperatorsdk/operator/baseapi/statuscache/internalwithlock/StatusPatchCacheWithLockReconciler.java new file mode 100644 index 0000000000..e6377a58dc --- /dev/null +++ b/operator-framework/src/test/java/io/javaoperatorsdk/operator/baseapi/statuscache/internalwithlock/StatusPatchCacheWithLockReconciler.java @@ -0,0 +1,73 @@ +package io.javaoperatorsdk.operator.baseapi.statuscache.internalwithlock; + +import java.util.List; + +import io.fabric8.kubernetes.api.model.ObjectMetaBuilder; +import io.javaoperatorsdk.operator.api.reconciler.Context; +import io.javaoperatorsdk.operator.api.reconciler.ControllerConfiguration; +import io.javaoperatorsdk.operator.api.reconciler.EventSourceContext; +import io.javaoperatorsdk.operator.api.reconciler.PrimaryUpdateAndCacheUtils; +import io.javaoperatorsdk.operator.api.reconciler.Reconciler; +import io.javaoperatorsdk.operator.api.reconciler.UpdateControl; +import io.javaoperatorsdk.operator.baseapi.statuscache.PeriodicTriggerEventSource; +import io.javaoperatorsdk.operator.processing.event.source.EventSource; + +@ControllerConfiguration +public class StatusPatchCacheWithLockReconciler + implements Reconciler { + + public volatile int latestValue = 0; + public volatile boolean errorPresent = false; + + @Override + public UpdateControl reconcile( + StatusPatchCacheWithLockCustomResource resource, + Context context) { + + if (resource.getStatus() != null && resource.getStatus().getValue() != latestValue) { + errorPresent = true; + throw new IllegalStateException( + "status is not up to date. Latest value: " + + latestValue + + " status values: " + + resource.getStatus().getValue()); + } + + // test also resource update happening meanwhile reconciliation + resource.getSpec().setCounter(resource.getSpec().getCounter() + 1); + context.getClient().resource(resource).update(); + + var freshCopy = createFreshCopy(resource); + + freshCopy + .getStatus() + .setValue(resource.getStatus() == null ? 1 : resource.getStatus().getValue() + 1); + + var updated = + PrimaryUpdateAndCacheUtils.ssaPatchStatusAndCacheResourceWithLock( + resource, freshCopy, context); + latestValue = updated.getStatus().getValue(); + + return UpdateControl.noUpdate(); + } + + @Override + public List> prepareEventSources( + EventSourceContext context) { + // periodic event triggering for testing purposes + return List.of(new PeriodicTriggerEventSource<>(context.getPrimaryCache())); + } + + private StatusPatchCacheWithLockCustomResource createFreshCopy( + StatusPatchCacheWithLockCustomResource resource) { + var res = new StatusPatchCacheWithLockCustomResource(); + res.setMetadata( + new ObjectMetaBuilder() + .withName(resource.getMetadata().getName()) + .withNamespace(resource.getMetadata().getNamespace()) + .build()); + res.setStatus(new StatusPatchCacheWithLockStatus()); + + return res; + } +} diff --git a/operator-framework/src/test/java/io/javaoperatorsdk/operator/baseapi/statuscache/internalwithlock/StatusPatchCacheWithLockSpec.java b/operator-framework/src/test/java/io/javaoperatorsdk/operator/baseapi/statuscache/internalwithlock/StatusPatchCacheWithLockSpec.java new file mode 100644 index 0000000000..495db097e8 --- /dev/null +++ b/operator-framework/src/test/java/io/javaoperatorsdk/operator/baseapi/statuscache/internalwithlock/StatusPatchCacheWithLockSpec.java @@ -0,0 +1,14 @@ +package io.javaoperatorsdk.operator.baseapi.statuscache.internalwithlock; + +public class StatusPatchCacheWithLockSpec { + + private int counter = 0; + + public int getCounter() { + return counter; + } + + public void setCounter(int counter) { + this.counter = counter; + } +} diff --git a/operator-framework/src/test/java/io/javaoperatorsdk/operator/baseapi/statuscache/internalwithlock/StatusPatchCacheWithLockStatus.java b/operator-framework/src/test/java/io/javaoperatorsdk/operator/baseapi/statuscache/internalwithlock/StatusPatchCacheWithLockStatus.java new file mode 100644 index 0000000000..586c88b1f8 --- /dev/null +++ b/operator-framework/src/test/java/io/javaoperatorsdk/operator/baseapi/statuscache/internalwithlock/StatusPatchCacheWithLockStatus.java @@ -0,0 +1,15 @@ +package io.javaoperatorsdk.operator.baseapi.statuscache.internalwithlock; + +public class StatusPatchCacheWithLockStatus { + + private Integer value = 0; + + public Integer getValue() { + return value; + } + + public StatusPatchCacheWithLockStatus setValue(Integer value) { + this.value = value; + return this; + } +} diff --git a/operator-framework/src/test/java/io/javaoperatorsdk/operator/baseapi/statuscache/primarycache/StatusPatchPrimaryCacheReconciler.java b/operator-framework/src/test/java/io/javaoperatorsdk/operator/baseapi/statuscache/primarycache/StatusPatchPrimaryCacheReconciler.java index c25fcddfec..b36ef2e8d9 100644 --- a/operator-framework/src/test/java/io/javaoperatorsdk/operator/baseapi/statuscache/primarycache/StatusPatchPrimaryCacheReconciler.java +++ b/operator-framework/src/test/java/io/javaoperatorsdk/operator/baseapi/statuscache/primarycache/StatusPatchPrimaryCacheReconciler.java @@ -46,13 +46,18 @@ public UpdateControl reconcile( + primary.getStatus().getValue()); } + // test also resource update happening meanwhile reconciliation + primary.getSpec().setCounter(primary.getSpec().getCounter() + 1); + context.getClient().resource(primary).update(); + var freshCopy = createFreshCopy(primary); freshCopy .getStatus() .setValue(primary.getStatus() == null ? 1 : primary.getStatus().getValue() + 1); var updated = - PrimaryUpdateAndCacheUtils.ssaPatchAndCacheStatus(primary, freshCopy, context, cache); + PrimaryUpdateAndCacheUtils.ssaPatchStatusAndCacheResource( + primary, freshCopy, context, cache); latestValue = updated.getStatus().getValue(); return UpdateControl.noUpdate(); diff --git a/operator-framework/src/test/java/io/javaoperatorsdk/operator/baseapi/statuscache/primarycache/StatusPatchPrimaryCacheSpec.java b/operator-framework/src/test/java/io/javaoperatorsdk/operator/baseapi/statuscache/primarycache/StatusPatchPrimaryCacheSpec.java index 90630c1ae8..da52a48478 100644 --- a/operator-framework/src/test/java/io/javaoperatorsdk/operator/baseapi/statuscache/primarycache/StatusPatchPrimaryCacheSpec.java +++ b/operator-framework/src/test/java/io/javaoperatorsdk/operator/baseapi/statuscache/primarycache/StatusPatchPrimaryCacheSpec.java @@ -2,14 +2,13 @@ public class StatusPatchPrimaryCacheSpec { - private boolean messageInStatus = true; + private int counter = 0; - public boolean isMessageInStatus() { - return messageInStatus; + public int getCounter() { + return counter; } - public StatusPatchPrimaryCacheSpec setMessageInStatus(boolean messageInStatus) { - this.messageInStatus = messageInStatus; - return this; + public void setCounter(int counter) { + this.counter = counter; } }