From 8438975e699d43d475ce3beca2370d43fadde804 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Attila=20M=C3=A9sz=C3=A1ros?= Date: Wed, 14 May 2025 09:21:25 +0200 Subject: [PATCH 1/9] improve: PrimaryUpdateAndCacheUtils naming and javadoc improvements MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Signed-off-by: Attila Mészáros --- .../en/docs/documentation/reconciler.md | 4 +- .../PrimaryUpdateAndCacheUtils.java | 70 ++++++++++--------- .../internal/StatusPatchCacheReconciler.java | 3 +- .../StatusPatchPrimaryCacheReconciler.java | 3 +- 4 files changed, 44 insertions(+), 36 deletions(-) diff --git a/docs/content/en/docs/documentation/reconciler.md b/docs/content/en/docs/documentation/reconciler.md index b9ede8aa95..362de38a03 100644 --- a/docs/content/en/docs/documentation/reconciler.md +++ b/docs/content/en/docs/documentation/reconciler.md @@ -201,7 +201,7 @@ public UpdateControl reconcile( var freshCopy = createFreshCopy(primary); freshCopy.getStatus().setValue(statusWithState()); - var updatedResource = PrimaryUpdateAndCacheUtils.ssaPatchAndCacheStatus(resource, freshCopy, context); + var updatedResource = PrimaryUpdateAndCacheUtils.ssaPatchStatusAndCacheResource(resource, freshCopy, context); return UpdateControl.noUpdate(); } @@ -247,7 +247,7 @@ needs to be set to `false` you can use an explicit caching approach: freshCopy.getStatus().setValue(statusWithState()); var updated = - PrimaryUpdateAndCacheUtils.ssaPatchAndCacheStatus(primary, freshCopy, context, cache); + PrimaryUpdateAndCacheUtils.ssaPatchStatusAndCacheResource(primary, freshCopy, context, cache); return UpdateControl.noUpdate(); } diff --git a/operator-framework-core/src/main/java/io/javaoperatorsdk/operator/api/reconciler/PrimaryUpdateAndCacheUtils.java b/operator-framework-core/src/main/java/io/javaoperatorsdk/operator/api/reconciler/PrimaryUpdateAndCacheUtils.java index 174f7667f6..d1c210dd1a 100644 --- a/operator-framework-core/src/main/java/io/javaoperatorsdk/operator/api/reconciler/PrimaryUpdateAndCacheUtils.java +++ b/operator-framework-core/src/main/java/io/javaoperatorsdk/operator/api/reconciler/PrimaryUpdateAndCacheUtils.java @@ -25,53 +25,56 @@ private PrimaryUpdateAndCacheUtils() {} private static final Logger log = LoggerFactory.getLogger(PrimaryUpdateAndCacheUtils.class); /** - * Makes sure that the up-to-date primary resource will be present during the next reconciliation. - * Using update (PUT) method. + * Updates status and makes sure that the up-to-date primary resource will be present during the + * next reconciliation. Using update (PUT) method. * * @param primary resource * @param context of reconciliation * @return updated resource * @param

primary resource type */ - public static

P updateAndCacheStatus(P primary, Context

context) { + public static

P updateStatusAndCacheResource( + P primary, Context

context) { logWarnIfResourceVersionPresent(primary); - return patchAndCacheStatus( + return patchStatusAndCacheResource( primary, context, () -> context.getClient().resource(primary).updateStatus()); } /** - * Makes sure that the up-to-date primary resource will be present during the next reconciliation. - * Using JSON Merge patch. + * Patches status with and makes sure that the up-to-date primary resource will be present during + * the next reconciliation. Using JSON Merge patch. * * @param primary resource * @param context of reconciliation * @return updated resource * @param

primary resource type */ - public static

P patchAndCacheStatus(P primary, Context

context) { + public static

P patchStatusAndCacheResource( + P primary, Context

context) { logWarnIfResourceVersionPresent(primary); - return patchAndCacheStatus( + return patchStatusAndCacheResource( primary, context, () -> context.getClient().resource(primary).patchStatus()); } /** - * Makes sure that the up-to-date primary resource will be present during the next reconciliation. - * Using JSON Patch. + * Patches status and makes sure that the up-to-date primary resource will be present during the + * next reconciliation. Using JSON Patch. * * @param primary resource * @param context of reconciliation * @return updated resource * @param

primary resource type */ - public static

P editAndCacheStatus( + public static

P editStatusAndCacheResource( P primary, Context

context, UnaryOperator

operation) { logWarnIfResourceVersionPresent(primary); - return patchAndCacheStatus( + return patchStatusAndCacheResource( primary, context, () -> context.getClient().resource(primary).editStatus(operation)); } /** - * Makes sure that the up-to-date primary resource will be present during the next reconciliation. + * Patches the resource with supplied method and makes sure that the up-to-date primary resource + * will be present during the next reconciliation. * * @param primary resource * @param context of reconciliation @@ -79,7 +82,7 @@ public static

P editAndCacheStatus( * @return the updated resource. * @param

primary resource type */ - public static

P patchAndCacheStatus( + public static

P patchStatusAndCacheResource( P primary, Context

context, Supplier

patch) { var updatedResource = patch.get(); context @@ -90,8 +93,8 @@ public static

P patchAndCacheStatus( } /** - * Makes sure that the up-to-date primary resource will be present during the next reconciliation. - * Using Server Side Apply. + * Patches status and makes sure that the up-to-date primary resource will be present during the + * next reconciliation. Using Server Side Apply. * * @param primary resource * @param freshResourceWithStatus - fresh resource with target state @@ -99,7 +102,7 @@ public static

P patchAndCacheStatus( * @return the updated resource. * @param

primary resource type */ - public static

P ssaPatchAndCacheStatus( + public static

P ssaPatchStatusAndCacheResource( P primary, P freshResourceWithStatus, Context

context) { logWarnIfResourceVersionPresent(freshResourceWithStatus); var res = @@ -122,7 +125,8 @@ public static

P ssaPatchAndCacheStatus( } /** - * Patches the resource and adds it to the {@link PrimaryResourceCache}. + * Patches the resource status and caches the response in provided {@link PrimaryResourceCache}. + * Uses Server Side Apply. * * @param primary resource * @param freshResourceWithStatus - fresh resource with target state @@ -131,10 +135,10 @@ public static

P ssaPatchAndCacheStatus( * @return the updated resource. * @param

primary resource type */ - public static

P ssaPatchAndCacheStatus( + public static

P ssaPatchStatusAndCacheResource( P primary, P freshResourceWithStatus, Context

context, PrimaryResourceCache

cache) { logWarnIfResourceVersionPresent(freshResourceWithStatus); - return patchAndCacheStatus( + return patchStatusAndCacheResource( primary, cache, () -> @@ -151,7 +155,8 @@ public static

P ssaPatchAndCacheStatus( } /** - * Patches the resource with JSON Patch and adds it to the {@link PrimaryResourceCache}. + * Patches the resource with JSON Patch and caches the response in provided {@link + * PrimaryResourceCache}. * * @param primary resource * @param context of reconciliation @@ -159,16 +164,16 @@ public static

P ssaPatchAndCacheStatus( * @return the updated resource. * @param

primary resource type */ - public static

P editAndCacheStatus( + public static

P editStatusAndCacheResource( P primary, Context

context, PrimaryResourceCache

cache, UnaryOperator

operation) { logWarnIfResourceVersionPresent(primary); - return patchAndCacheStatus( + return patchStatusAndCacheResource( primary, cache, () -> context.getClient().resource(primary).editStatus(operation)); } /** - * Patches the resource with JSON Merge patch and adds it to the {@link PrimaryResourceCache} - * provided. + * Patches the resource status with JSON Merge patch and caches the response in provided {@link + * PrimaryResourceCache} * * @param primary resource * @param context of reconciliation @@ -176,15 +181,15 @@ public static

P editAndCacheStatus( * @return the updated resource. * @param

primary resource type */ - public static

P patchAndCacheStatus( + public static

P patchStatusAndCacheResource( P primary, Context

context, PrimaryResourceCache

cache) { logWarnIfResourceVersionPresent(primary); - return patchAndCacheStatus( + return patchStatusAndCacheResource( primary, cache, () -> context.getClient().resource(primary).patchStatus()); } /** - * Updates the resource and adds it to the {@link PrimaryResourceCache}. + * Updates the resource status and caches the response in provided {@link PrimaryResourceCache}. * * @param primary resource * @param context of reconciliation @@ -192,15 +197,16 @@ public static

P patchAndCacheStatus( * @return the updated resource. * @param

primary resource type */ - public static

P updateAndCacheStatus( + public static

P updateStatusAndCacheResource( P primary, Context

context, PrimaryResourceCache

cache) { logWarnIfResourceVersionPresent(primary); - return patchAndCacheStatus( + return patchStatusAndCacheResource( primary, cache, () -> context.getClient().resource(primary).updateStatus()); } /** - * Updates the resource using the user provided implementation anc caches the result. + * Updates the resource using the user provided implementation and caches the response in provided + * {@link PrimaryResourceCache}. * * @param primary resource * @param cache resource cache managed by user @@ -208,7 +214,7 @@ public static

P updateAndCacheStatus( * @return the updated resource. * @param

primary resource type */ - public static

P patchAndCacheStatus( + public static

P patchStatusAndCacheResource( P primary, PrimaryResourceCache

cache, Supplier

patch) { var updatedResource = patch.get(); cache.cacheResource(primary, updatedResource); diff --git a/operator-framework/src/test/java/io/javaoperatorsdk/operator/baseapi/statuscache/internal/StatusPatchCacheReconciler.java b/operator-framework/src/test/java/io/javaoperatorsdk/operator/baseapi/statuscache/internal/StatusPatchCacheReconciler.java index 8a3a72a901..a62d0c5b18 100644 --- a/operator-framework/src/test/java/io/javaoperatorsdk/operator/baseapi/statuscache/internal/StatusPatchCacheReconciler.java +++ b/operator-framework/src/test/java/io/javaoperatorsdk/operator/baseapi/statuscache/internal/StatusPatchCacheReconciler.java @@ -37,7 +37,8 @@ public UpdateControl reconcile( .getStatus() .setValue(resource.getStatus() == null ? 1 : resource.getStatus().getValue() + 1); - var updated = PrimaryUpdateAndCacheUtils.ssaPatchAndCacheStatus(resource, freshCopy, context); + var updated = + PrimaryUpdateAndCacheUtils.ssaPatchStatusAndCacheResource(resource, freshCopy, context); latestValue = updated.getStatus().getValue(); return UpdateControl.noUpdate(); diff --git a/operator-framework/src/test/java/io/javaoperatorsdk/operator/baseapi/statuscache/primarycache/StatusPatchPrimaryCacheReconciler.java b/operator-framework/src/test/java/io/javaoperatorsdk/operator/baseapi/statuscache/primarycache/StatusPatchPrimaryCacheReconciler.java index c25fcddfec..b03424441f 100644 --- a/operator-framework/src/test/java/io/javaoperatorsdk/operator/baseapi/statuscache/primarycache/StatusPatchPrimaryCacheReconciler.java +++ b/operator-framework/src/test/java/io/javaoperatorsdk/operator/baseapi/statuscache/primarycache/StatusPatchPrimaryCacheReconciler.java @@ -52,7 +52,8 @@ public UpdateControl reconcile( .setValue(primary.getStatus() == null ? 1 : primary.getStatus().getValue() + 1); var updated = - PrimaryUpdateAndCacheUtils.ssaPatchAndCacheStatus(primary, freshCopy, context, cache); + PrimaryUpdateAndCacheUtils.ssaPatchStatusAndCacheResource( + primary, freshCopy, context, cache); latestValue = updated.getStatus().getValue(); return UpdateControl.noUpdate(); From 6e5d46e54416943e306c1619c56526277feb10bc Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Attila=20M=C3=A9sz=C3=A1ros?= Date: Wed, 14 May 2025 15:05:51 +0200 Subject: [PATCH 2/9] improve: checks on inputs and lock method version of PrimaryUpdateAndCacheUtils MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Signed-off-by: Attila Mészáros --- .../PrimaryUpdateAndCacheUtils.java | 114 ++++++++++++++++++ 1 file changed, 114 insertions(+) diff --git a/operator-framework-core/src/main/java/io/javaoperatorsdk/operator/api/reconciler/PrimaryUpdateAndCacheUtils.java b/operator-framework-core/src/main/java/io/javaoperatorsdk/operator/api/reconciler/PrimaryUpdateAndCacheUtils.java index d1c210dd1a..0e09776e11 100644 --- a/operator-framework-core/src/main/java/io/javaoperatorsdk/operator/api/reconciler/PrimaryUpdateAndCacheUtils.java +++ b/operator-framework-core/src/main/java/io/javaoperatorsdk/operator/api/reconciler/PrimaryUpdateAndCacheUtils.java @@ -7,8 +7,10 @@ import org.slf4j.LoggerFactory; import io.fabric8.kubernetes.api.model.HasMetadata; +import io.fabric8.kubernetes.client.KubernetesClientException; import io.fabric8.kubernetes.client.dsl.base.PatchContext; import io.fabric8.kubernetes.client.dsl.base.PatchType; +import io.javaoperatorsdk.operator.OperatorException; import io.javaoperatorsdk.operator.api.reconciler.support.PrimaryResourceCache; import io.javaoperatorsdk.operator.processing.event.ResourceID; @@ -20,6 +22,8 @@ */ public class PrimaryUpdateAndCacheUtils { + public static final int DEFAULT_MAX_RETRY = 3; + private PrimaryUpdateAndCacheUtils() {} private static final Logger log = LoggerFactory.getLogger(PrimaryUpdateAndCacheUtils.class); @@ -40,6 +44,15 @@ public static

P updateStatusAndCacheResource( primary, context, () -> context.getClient().resource(primary).updateStatus()); } + public static

P updateStatusAndCacheResourceWithLock( + P primary, Context

context, UnaryOperator

modificationFunction) { + return updateAndCacheResourceWithLock( + primary, + context, + modificationFunction, + r -> context.getClient().resource(r).updateStatus()); + } + /** * Patches status with and makes sure that the up-to-date primary resource will be present during * the next reconciliation. Using JSON Merge patch. @@ -56,6 +69,12 @@ public static

P patchStatusAndCacheResource( primary, context, () -> context.getClient().resource(primary).patchStatus()); } + public static

P patchStatusAndCacheResourceWithLock( + P primary, Context

context, UnaryOperator

modificationFunction) { + return updateAndCacheResourceWithLock( + primary, context, modificationFunction, r -> context.getClient().resource(r).patchStatus()); + } + /** * Patches status and makes sure that the up-to-date primary resource will be present during the * next reconciliation. Using JSON Patch. @@ -72,6 +91,15 @@ public static

P editStatusAndCacheResource( primary, context, () -> context.getClient().resource(primary).editStatus(operation)); } + public static

P editStatusAndCacheResourceWithLock( + P primary, Context

context, UnaryOperator

modificationFunction) { + return updateAndCacheResourceWithLock( + primary, + context, + UnaryOperator.identity(), + r -> context.getClient().resource(r).editStatus(modificationFunction)); + } + /** * Patches the resource with supplied method and makes sure that the up-to-date primary resource * will be present during the next reconciliation. @@ -124,6 +152,25 @@ public static

P ssaPatchStatusAndCacheResource( return res; } + public static

P ssaPatchStatusAndCacheResourceWithLock( + P primary, P freshResourceWithStatus, Context

context) { + return updateAndCacheResourceWithLock( + primary, + context, + r -> freshResourceWithStatus, + r -> + context + .getClient() + .resource(r) + .subresource("status") + .patch( + new PatchContext.Builder() + .withForce(true) + .withFieldManager(context.getControllerConfiguration().fieldManager()) + .withPatchType(PatchType.SERVER_SIDE_APPLY) + .build())); + } + /** * Patches the resource status and caches the response in provided {@link PrimaryResourceCache}. * Uses Server Side Apply. @@ -228,4 +275,71 @@ private static

void logWarnIfResourceVersionPresent(P pr + "using optimistic locking is discouraged for this purpose. "); } } + + public static

P updateAndCacheResourceWithLock( + P primary, + Context

context, + UnaryOperator

modificationFunction, + UnaryOperator

updateMethod) { + return updateAndCacheResourceWithLock( + primary, context, modificationFunction, updateMethod, DEFAULT_MAX_RETRY); + } + + @SuppressWarnings("unchecked") + public static

P updateAndCacheResourceWithLock( + P primary, + Context

context, + UnaryOperator

modificationFunction, + UnaryOperator

updateMethod, + int maxRetry) { + + if (log.isDebugEnabled()) { + log.debug("Conflict retrying update for: {}", ResourceID.fromResource(primary)); + } + int retryIndex = 0; + while (true) { + try { + var modified = modificationFunction.apply(primary); + modified.getMetadata().setResourceVersion(primary.getMetadata().getResourceVersion()); + var updated = updateMethod.apply(modified); + context + .eventSourceRetriever() + .getControllerEventSource() + .handleRecentResourceUpdate(ResourceID.fromResource(primary), updated, primary); + return updated; + } catch (KubernetesClientException e) { + log.trace("Exception during patch for resource: {}", primary); + retryIndex++; + // only retry on conflict (409) and unprocessable content (422) which + // can happen if JSON Patch is not a valid request since there was + // a concurrent request which already removed another finalizer: + // List element removal from a list is by index in JSON Patch + // so if addressing a second finalizer but first is meanwhile removed + // it is a wrong request. + if (e.getCode() != 409 && e.getCode() != 422) { + throw e; + } + if (retryIndex >= maxRetry) { + throw new OperatorException( + "Exceeded maximum (" + + maxRetry + + ") retry attempts to patch resource: " + + ResourceID.fromResource(primary)); + } + log.debug( + "Retrying patch for resource name: {}, namespace: {}; HTTP code: {}", + primary.getMetadata().getName(), + primary.getMetadata().getNamespace(), + e.getCode()); + primary = + (P) + context + .getClient() + .resources(primary.getClass()) + .inNamespace(primary.getMetadata().getNamespace()) + .withName(primary.getMetadata().getName()) + .get(); + } + } + } } From 1992acd3b014c2a2d89dc073e6e43ea60cbc7fab Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Attila=20M=C3=A9sz=C3=A1ros?= Date: Wed, 14 May 2025 15:26:05 +0200 Subject: [PATCH 3/9] wip MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Signed-off-by: Attila Mészáros --- .../PrimaryUpdateAndCacheUtils.java | 74 +++++++++++-------- 1 file changed, 42 insertions(+), 32 deletions(-) diff --git a/operator-framework-core/src/main/java/io/javaoperatorsdk/operator/api/reconciler/PrimaryUpdateAndCacheUtils.java b/operator-framework-core/src/main/java/io/javaoperatorsdk/operator/api/reconciler/PrimaryUpdateAndCacheUtils.java index 0e09776e11..e04fca67c2 100644 --- a/operator-framework-core/src/main/java/io/javaoperatorsdk/operator/api/reconciler/PrimaryUpdateAndCacheUtils.java +++ b/operator-framework-core/src/main/java/io/javaoperatorsdk/operator/api/reconciler/PrimaryUpdateAndCacheUtils.java @@ -17,8 +17,8 @@ /** * Utility methods to patch the primary resource state and store it to the related cache, to make * sure that fresh resource is present for the next reconciliation. The main use case for such - * updates is to store state is resource status. Use of optimistic locking is not desired for such - * updates, since we don't want to patch fail and lose information that we want to store. + * updates is to store state is resource status. We aim here for completeness and provide you all + * various options, where all of them have pros and cons. */ public class PrimaryUpdateAndCacheUtils { @@ -39,7 +39,7 @@ private PrimaryUpdateAndCacheUtils() {} */ public static

P updateStatusAndCacheResource( P primary, Context

context) { - logWarnIfResourceVersionPresent(primary); + checkResourceVersionNotPresentAndParseConfiguration(primary, context); return patchStatusAndCacheResource( primary, context, () -> context.getClient().resource(primary).updateStatus()); } @@ -64,7 +64,7 @@ public static

P updateStatusAndCacheResourceWithLock( */ public static

P patchStatusAndCacheResource( P primary, Context

context) { - logWarnIfResourceVersionPresent(primary); + checkResourceVersionNotPresentAndParseConfiguration(primary, context); return patchStatusAndCacheResource( primary, context, () -> context.getClient().resource(primary).patchStatus()); } @@ -86,7 +86,7 @@ public static

P patchStatusAndCacheResourceWithLock( */ public static

P editStatusAndCacheResource( P primary, Context

context, UnaryOperator

operation) { - logWarnIfResourceVersionPresent(primary); + checkResourceVersionNotPresentAndParseConfiguration(primary, context); return patchStatusAndCacheResource( primary, context, () -> context.getClient().resource(primary).editStatus(operation)); } @@ -132,24 +132,21 @@ public static

P patchStatusAndCacheResource( */ public static

P ssaPatchStatusAndCacheResource( P primary, P freshResourceWithStatus, Context

context) { - logWarnIfResourceVersionPresent(freshResourceWithStatus); - var res = - context - .getClient() - .resource(freshResourceWithStatus) - .subresource("status") - .patch( - new PatchContext.Builder() - .withForce(true) - .withFieldManager(context.getControllerConfiguration().fieldManager()) - .withPatchType(PatchType.SERVER_SIDE_APPLY) - .build()); - - context - .eventSourceRetriever() - .getControllerEventSource() - .handleRecentResourceUpdate(ResourceID.fromResource(primary), res, primary); - return res; + checkResourceVersionNotPresentAndParseConfiguration(freshResourceWithStatus, context); + return patchStatusAndCacheResource( + primary, + context, + () -> + context + .getClient() + .resource(freshResourceWithStatus) + .subresource("status") + .patch( + new PatchContext.Builder() + .withForce(true) + .withFieldManager(context.getControllerConfiguration().fieldManager()) + .withPatchType(PatchType.SERVER_SIDE_APPLY) + .build())); } public static

P ssaPatchStatusAndCacheResourceWithLock( @@ -184,7 +181,7 @@ public static

P ssaPatchStatusAndCacheResourceWithLock( */ public static

P ssaPatchStatusAndCacheResource( P primary, P freshResourceWithStatus, Context

context, PrimaryResourceCache

cache) { - logWarnIfResourceVersionPresent(freshResourceWithStatus); + checkResourceVersionIsNotPresent(freshResourceWithStatus); return patchStatusAndCacheResource( primary, cache, @@ -213,7 +210,7 @@ public static

P ssaPatchStatusAndCacheResource( */ public static

P editStatusAndCacheResource( P primary, Context

context, PrimaryResourceCache

cache, UnaryOperator

operation) { - logWarnIfResourceVersionPresent(primary); + checkResourceVersionIsNotPresent(primary); return patchStatusAndCacheResource( primary, cache, () -> context.getClient().resource(primary).editStatus(operation)); } @@ -230,7 +227,7 @@ public static

P editStatusAndCacheResource( */ public static

P patchStatusAndCacheResource( P primary, Context

context, PrimaryResourceCache

cache) { - logWarnIfResourceVersionPresent(primary); + checkResourceVersionIsNotPresent(primary); return patchStatusAndCacheResource( primary, cache, () -> context.getClient().resource(primary).patchStatus()); } @@ -246,7 +243,7 @@ public static

P patchStatusAndCacheResource( */ public static

P updateStatusAndCacheResource( P primary, Context

context, PrimaryResourceCache

cache) { - logWarnIfResourceVersionPresent(primary); + checkResourceVersionIsNotPresent(primary); return patchStatusAndCacheResource( primary, cache, () -> context.getClient().resource(primary).updateStatus()); } @@ -268,11 +265,22 @@ public static

P patchStatusAndCacheResource( return updatedResource; } - private static

void logWarnIfResourceVersionPresent(P primary) { + private static

void checkResourceVersionIsNotPresent(P primary) { if (primary.getMetadata().getResourceVersion() != null) { - log.warn( - "The metadata.resourceVersion of primary resource is NOT null, " - + "using optimistic locking is discouraged for this purpose. "); + throw new IllegalArgumentException("Resource version is present"); + } + } + + private static

void checkResourceVersionNotPresentAndParseConfiguration( + P primary, Context

context) { + checkResourceVersionIsNotPresent(primary); + if (!context + .getControllerConfiguration() + .getConfigurationService() + .parseResourceVersionsForEventFilteringAndCaching()) { + throw new OperatorException( + "For internal primary resource caching 'parseResourceVersionsForEventFilteringAndCaching'" + + " must be allowed."); } } @@ -296,10 +304,11 @@ public static

P updateAndCacheResourceWithLock( if (log.isDebugEnabled()) { log.debug("Conflict retrying update for: {}", ResourceID.fromResource(primary)); } + P modified = null; int retryIndex = 0; while (true) { try { - var modified = modificationFunction.apply(primary); + modified = modificationFunction.apply(primary); modified.getMetadata().setResourceVersion(primary.getMetadata().getResourceVersion()); var updated = updateMethod.apply(modified); context @@ -320,6 +329,7 @@ public static

P updateAndCacheResourceWithLock( throw e; } if (retryIndex >= maxRetry) { + log.warn("Retry exhausted, last desired resource: {}", modified); throw new OperatorException( "Exceeded maximum (" + maxRetry From 01a9826cf2861dce0a3e54e0f88f0e75e10701bf Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Attila=20M=C3=A9sz=C3=A1ros?= Date: Wed, 14 May 2025 15:34:06 +0200 Subject: [PATCH 4/9] wip MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Signed-off-by: Attila Mészáros --- .../operator/api/reconciler/PrimaryUpdateAndCacheUtils.java | 1 + 1 file changed, 1 insertion(+) diff --git a/operator-framework-core/src/main/java/io/javaoperatorsdk/operator/api/reconciler/PrimaryUpdateAndCacheUtils.java b/operator-framework-core/src/main/java/io/javaoperatorsdk/operator/api/reconciler/PrimaryUpdateAndCacheUtils.java index e04fca67c2..74e826fce2 100644 --- a/operator-framework-core/src/main/java/io/javaoperatorsdk/operator/api/reconciler/PrimaryUpdateAndCacheUtils.java +++ b/operator-framework-core/src/main/java/io/javaoperatorsdk/operator/api/reconciler/PrimaryUpdateAndCacheUtils.java @@ -75,6 +75,7 @@ public static

P patchStatusAndCacheResourceWithLock( primary, context, modificationFunction, r -> context.getClient().resource(r).patchStatus()); } + // TODO document caveat with JSON PATCH /** * Patches status and makes sure that the up-to-date primary resource will be present during the * next reconciliation. Using JSON Patch. From 562416ada2aae1252bdd5cde37c2af85a755a64a Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Attila=20M=C3=A9sz=C3=A1ros?= Date: Wed, 14 May 2025 15:54:16 +0200 Subject: [PATCH 5/9] wip MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Signed-off-by: Attila Mészáros --- .../PrimaryUpdateAndCacheUtils.java | 26 ++++++++++++++++++- 1 file changed, 25 insertions(+), 1 deletion(-) diff --git a/operator-framework-core/src/main/java/io/javaoperatorsdk/operator/api/reconciler/PrimaryUpdateAndCacheUtils.java b/operator-framework-core/src/main/java/io/javaoperatorsdk/operator/api/reconciler/PrimaryUpdateAndCacheUtils.java index 74e826fce2..39b929107b 100644 --- a/operator-framework-core/src/main/java/io/javaoperatorsdk/operator/api/reconciler/PrimaryUpdateAndCacheUtils.java +++ b/operator-framework-core/src/main/java/io/javaoperatorsdk/operator/api/reconciler/PrimaryUpdateAndCacheUtils.java @@ -11,6 +11,7 @@ import io.fabric8.kubernetes.client.dsl.base.PatchContext; import io.fabric8.kubernetes.client.dsl.base.PatchType; import io.javaoperatorsdk.operator.OperatorException; +import io.javaoperatorsdk.operator.api.config.ConfigurationService; import io.javaoperatorsdk.operator.api.reconciler.support.PrimaryResourceCache; import io.javaoperatorsdk.operator.processing.event.ResourceID; @@ -19,6 +20,25 @@ * sure that fresh resource is present for the next reconciliation. The main use case for such * updates is to store state is resource status. We aim here for completeness and provide you all * various options, where all of them have pros and cons. + * + *

    + *
  • Retryable updates with optimistic locking (*withLock) - you can use this approach out of + * the box, it updates the resource using optimistic locking and caches the resource. If the + * update fails it reads the primary resource and applies the modifications again and retries + * the update. After successful update it caches the resource for next reconciliation. The + * disadvantage of this method is that theoretically it could fail the max attempt retry. Note + * that optimistic locking is essential to have the caching work in general. + *
  • Caching without optimistic locking but with parsing the resource version - to use this you + * have to set {@link ConfigurationService#parseResourceVersionsForEventFilteringAndCaching()} + * to true. The update won't fail on optimistic locking so there is much higher chance to + * succeed. However this bends the rules of Kubernetes API contract by parsing the resource + * version. Using this for this purpose is actually a gray area, it should be fine in most of + * the setups. + *
  • Using {@link PrimaryResourceCache} - in this way you can explicitly ensure freshness or the + * resource (see related docs). You don't have to use optimistic locking or parse the resource + * version. But requires code from your side and for now (might in future) is not supported in + * managed dependent resources. + *
*/ public class PrimaryUpdateAndCacheUtils { @@ -75,11 +95,15 @@ public static

P patchStatusAndCacheResourceWithLock( primary, context, modificationFunction, r -> context.getClient().resource(r).patchStatus()); } - // TODO document caveat with JSON PATCH /** * Patches status and makes sure that the up-to-date primary resource will be present during the * next reconciliation. Using JSON Patch. * + *

Note that since optimistic locking is not used, there is a risk that JSON Patch will have + * concurrency issues when removing an element from a list. Since, the list element in JSON Patch + * are addressed by index, so if a concurrent request removes an element with lower index, the + * request might be not valid anymore (HTTP 422) or might remove an unmeant element. + * * @param primary resource * @param context of reconciliation * @return updated resource From cfd0ad713edb9ca2b91b25dfdffd7ac51c942224 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Attila=20M=C3=A9sz=C3=A1ros?= Date: Wed, 14 May 2025 16:03:03 +0200 Subject: [PATCH 6/9] Integration test MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Signed-off-by: Attila Mészáros --- .../internal/StatusPatchCacheIT.java | 1 + ...tatusPatchCacheWithLockCustomResource.java | 14 ++++ .../StatusPatchCacheWithLockIT.java | 48 +++++++++++++ .../StatusPatchCacheWithLockReconciler.java | 69 +++++++++++++++++++ .../StatusPatchCacheWithLockSpec.java | 14 ++++ .../StatusPatchCacheWithLockStatus.java | 15 ++++ 6 files changed, 161 insertions(+) create mode 100644 operator-framework/src/test/java/io/javaoperatorsdk/operator/baseapi/statuscache/internalwithlock/StatusPatchCacheWithLockCustomResource.java create mode 100644 operator-framework/src/test/java/io/javaoperatorsdk/operator/baseapi/statuscache/internalwithlock/StatusPatchCacheWithLockIT.java create mode 100644 operator-framework/src/test/java/io/javaoperatorsdk/operator/baseapi/statuscache/internalwithlock/StatusPatchCacheWithLockReconciler.java create mode 100644 operator-framework/src/test/java/io/javaoperatorsdk/operator/baseapi/statuscache/internalwithlock/StatusPatchCacheWithLockSpec.java create mode 100644 operator-framework/src/test/java/io/javaoperatorsdk/operator/baseapi/statuscache/internalwithlock/StatusPatchCacheWithLockStatus.java diff --git a/operator-framework/src/test/java/io/javaoperatorsdk/operator/baseapi/statuscache/internal/StatusPatchCacheIT.java b/operator-framework/src/test/java/io/javaoperatorsdk/operator/baseapi/statuscache/internal/StatusPatchCacheIT.java index f78511f250..98948e9848 100644 --- a/operator-framework/src/test/java/io/javaoperatorsdk/operator/baseapi/statuscache/internal/StatusPatchCacheIT.java +++ b/operator-framework/src/test/java/io/javaoperatorsdk/operator/baseapi/statuscache/internal/StatusPatchCacheIT.java @@ -18,6 +18,7 @@ public class StatusPatchCacheIT { @RegisterExtension LocallyRunOperatorExtension extension = LocallyRunOperatorExtension.builder() + .withConfigurationService(o -> o.withParseResourceVersions(true)) .withReconciler(StatusPatchCacheReconciler.class) .build(); diff --git a/operator-framework/src/test/java/io/javaoperatorsdk/operator/baseapi/statuscache/internalwithlock/StatusPatchCacheWithLockCustomResource.java b/operator-framework/src/test/java/io/javaoperatorsdk/operator/baseapi/statuscache/internalwithlock/StatusPatchCacheWithLockCustomResource.java new file mode 100644 index 0000000000..2f02381a68 --- /dev/null +++ b/operator-framework/src/test/java/io/javaoperatorsdk/operator/baseapi/statuscache/internalwithlock/StatusPatchCacheWithLockCustomResource.java @@ -0,0 +1,14 @@ +package io.javaoperatorsdk.operator.baseapi.statuscache.internalwithlock; + +import io.fabric8.kubernetes.api.model.Namespaced; +import io.fabric8.kubernetes.client.CustomResource; +import io.fabric8.kubernetes.model.annotation.Group; +import io.fabric8.kubernetes.model.annotation.ShortNames; +import io.fabric8.kubernetes.model.annotation.Version; + +@Group("sample.javaoperatorsdk") +@Version("v1") +@ShortNames("spwl") +public class StatusPatchCacheWithLockCustomResource + extends CustomResource + implements Namespaced {} diff --git a/operator-framework/src/test/java/io/javaoperatorsdk/operator/baseapi/statuscache/internalwithlock/StatusPatchCacheWithLockIT.java b/operator-framework/src/test/java/io/javaoperatorsdk/operator/baseapi/statuscache/internalwithlock/StatusPatchCacheWithLockIT.java new file mode 100644 index 0000000000..0f6a581660 --- /dev/null +++ b/operator-framework/src/test/java/io/javaoperatorsdk/operator/baseapi/statuscache/internalwithlock/StatusPatchCacheWithLockIT.java @@ -0,0 +1,48 @@ +package io.javaoperatorsdk.operator.baseapi.statuscache.internalwithlock; + +import java.time.Duration; + +import org.junit.jupiter.api.Test; +import org.junit.jupiter.api.extension.RegisterExtension; + +import io.fabric8.kubernetes.api.model.ObjectMetaBuilder; +import io.javaoperatorsdk.operator.junit.LocallyRunOperatorExtension; + +import static org.assertj.core.api.Assertions.assertThat; +import static org.awaitility.Awaitility.await; + +public class StatusPatchCacheWithLockIT { + + public static final String TEST_1 = "test1"; + + @RegisterExtension + LocallyRunOperatorExtension extension = + LocallyRunOperatorExtension.builder() + .withReconciler(StatusPatchCacheWithLockReconciler.class) + .build(); + + @Test + void testStatusAlwaysUpToDate() { + var reconciler = extension.getReconcilerOfType(StatusPatchCacheWithLockReconciler.class); + + extension.create(testResource()); + + // the reconciliation is periodically triggered, the status values should be increasing + // monotonically + await() + .pollDelay(Duration.ofSeconds(1)) + .pollInterval(Duration.ofMillis(30)) + .untilAsserted( + () -> { + assertThat(reconciler.errorPresent).isFalse(); + assertThat(reconciler.latestValue).isGreaterThan(10); + }); + } + + StatusPatchCacheWithLockCustomResource testResource() { + var res = new StatusPatchCacheWithLockCustomResource(); + res.setMetadata(new ObjectMetaBuilder().withName(TEST_1).build()); + res.setSpec(new StatusPatchCacheWithLockSpec()); + return res; + } +} diff --git a/operator-framework/src/test/java/io/javaoperatorsdk/operator/baseapi/statuscache/internalwithlock/StatusPatchCacheWithLockReconciler.java b/operator-framework/src/test/java/io/javaoperatorsdk/operator/baseapi/statuscache/internalwithlock/StatusPatchCacheWithLockReconciler.java new file mode 100644 index 0000000000..08ede99848 --- /dev/null +++ b/operator-framework/src/test/java/io/javaoperatorsdk/operator/baseapi/statuscache/internalwithlock/StatusPatchCacheWithLockReconciler.java @@ -0,0 +1,69 @@ +package io.javaoperatorsdk.operator.baseapi.statuscache.internalwithlock; + +import java.util.List; + +import io.fabric8.kubernetes.api.model.ObjectMetaBuilder; +import io.javaoperatorsdk.operator.api.reconciler.Context; +import io.javaoperatorsdk.operator.api.reconciler.ControllerConfiguration; +import io.javaoperatorsdk.operator.api.reconciler.EventSourceContext; +import io.javaoperatorsdk.operator.api.reconciler.PrimaryUpdateAndCacheUtils; +import io.javaoperatorsdk.operator.api.reconciler.Reconciler; +import io.javaoperatorsdk.operator.api.reconciler.UpdateControl; +import io.javaoperatorsdk.operator.baseapi.statuscache.PeriodicTriggerEventSource; +import io.javaoperatorsdk.operator.processing.event.source.EventSource; + +@ControllerConfiguration +public class StatusPatchCacheWithLockReconciler + implements Reconciler { + + public volatile int latestValue = 0; + public volatile boolean errorPresent = false; + + @Override + public UpdateControl reconcile( + StatusPatchCacheWithLockCustomResource resource, + Context context) { + + if (resource.getStatus() != null && resource.getStatus().getValue() != latestValue) { + errorPresent = true; + throw new IllegalStateException( + "status is not up to date. Latest value: " + + latestValue + + " status values: " + + resource.getStatus().getValue()); + } + + var freshCopy = createFreshCopy(resource); + + freshCopy + .getStatus() + .setValue(resource.getStatus() == null ? 1 : resource.getStatus().getValue() + 1); + + var updated = + PrimaryUpdateAndCacheUtils.ssaPatchStatusAndCacheResourceWithLock( + resource, freshCopy, context); + latestValue = updated.getStatus().getValue(); + + return UpdateControl.noUpdate(); + } + + @Override + public List> prepareEventSources( + EventSourceContext context) { + // periodic event triggering for testing purposes + return List.of(new PeriodicTriggerEventSource<>(context.getPrimaryCache())); + } + + private StatusPatchCacheWithLockCustomResource createFreshCopy( + StatusPatchCacheWithLockCustomResource resource) { + var res = new StatusPatchCacheWithLockCustomResource(); + res.setMetadata( + new ObjectMetaBuilder() + .withName(resource.getMetadata().getName()) + .withNamespace(resource.getMetadata().getNamespace()) + .build()); + res.setStatus(new StatusPatchCacheWithLockStatus()); + + return res; + } +} diff --git a/operator-framework/src/test/java/io/javaoperatorsdk/operator/baseapi/statuscache/internalwithlock/StatusPatchCacheWithLockSpec.java b/operator-framework/src/test/java/io/javaoperatorsdk/operator/baseapi/statuscache/internalwithlock/StatusPatchCacheWithLockSpec.java new file mode 100644 index 0000000000..495db097e8 --- /dev/null +++ b/operator-framework/src/test/java/io/javaoperatorsdk/operator/baseapi/statuscache/internalwithlock/StatusPatchCacheWithLockSpec.java @@ -0,0 +1,14 @@ +package io.javaoperatorsdk.operator.baseapi.statuscache.internalwithlock; + +public class StatusPatchCacheWithLockSpec { + + private int counter = 0; + + public int getCounter() { + return counter; + } + + public void setCounter(int counter) { + this.counter = counter; + } +} diff --git a/operator-framework/src/test/java/io/javaoperatorsdk/operator/baseapi/statuscache/internalwithlock/StatusPatchCacheWithLockStatus.java b/operator-framework/src/test/java/io/javaoperatorsdk/operator/baseapi/statuscache/internalwithlock/StatusPatchCacheWithLockStatus.java new file mode 100644 index 0000000000..586c88b1f8 --- /dev/null +++ b/operator-framework/src/test/java/io/javaoperatorsdk/operator/baseapi/statuscache/internalwithlock/StatusPatchCacheWithLockStatus.java @@ -0,0 +1,15 @@ +package io.javaoperatorsdk.operator.baseapi.statuscache.internalwithlock; + +public class StatusPatchCacheWithLockStatus { + + private Integer value = 0; + + public Integer getValue() { + return value; + } + + public StatusPatchCacheWithLockStatus setValue(Integer value) { + this.value = value; + return this; + } +} From e966d575098d985032c0b92f477790a60de14b21 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Attila=20M=C3=A9sz=C3=A1ros?= Date: Wed, 14 May 2025 16:04:57 +0200 Subject: [PATCH 7/9] increase default MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Signed-off-by: Attila Mészáros --- .../operator/api/reconciler/PrimaryUpdateAndCacheUtils.java | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/operator-framework-core/src/main/java/io/javaoperatorsdk/operator/api/reconciler/PrimaryUpdateAndCacheUtils.java b/operator-framework-core/src/main/java/io/javaoperatorsdk/operator/api/reconciler/PrimaryUpdateAndCacheUtils.java index 39b929107b..5a60ad2ae6 100644 --- a/operator-framework-core/src/main/java/io/javaoperatorsdk/operator/api/reconciler/PrimaryUpdateAndCacheUtils.java +++ b/operator-framework-core/src/main/java/io/javaoperatorsdk/operator/api/reconciler/PrimaryUpdateAndCacheUtils.java @@ -42,7 +42,7 @@ */ public class PrimaryUpdateAndCacheUtils { - public static final int DEFAULT_MAX_RETRY = 3; + public static final int DEFAULT_MAX_RETRY = 10; private PrimaryUpdateAndCacheUtils() {} From 4caf4d094414a907ef7f3416cb8cc87c342c869d Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Attila=20M=C3=A9sz=C3=A1ros?= Date: Thu, 15 May 2025 10:34:30 +0200 Subject: [PATCH 8/9] docs MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Signed-off-by: Attila Mészáros --- .../en/docs/documentation/reconciler.md | 44 ++--- .../PrimaryUpdateAndCacheUtils.java | 150 ++++++++++++------ .../internal/StatusPatchCacheReconciler.java | 4 + .../StatusPatchCacheWithLockReconciler.java | 4 + .../StatusPatchPrimaryCacheReconciler.java | 4 + .../StatusPatchPrimaryCacheSpec.java | 11 +- 6 files changed, 143 insertions(+), 74 deletions(-) diff --git a/docs/content/en/docs/documentation/reconciler.md b/docs/content/en/docs/documentation/reconciler.md index 362de38a03..40e7fa2c35 100644 --- a/docs/content/en/docs/documentation/reconciler.md +++ b/docs/content/en/docs/documentation/reconciler.md @@ -175,20 +175,23 @@ From v5, by default, the finalizer is added using Server Side Apply. See also `U It is typical to want to update the status subresource with the information that is available during the reconciliation. This is sometimes referred to as the last observed state. When the primary resource is updated, though, the framework does not cache the resource directly, relying instead on the propagation of the update to the underlying informer's -cache. It can, therefore, happen that, if other events trigger other reconciliations before the informer cache gets +cache. It can, therefore, happen that, if other events trigger other reconciliations, before the informer cache gets updated, your reconciler does not see the latest version of the primary resource. While this might not typically be a problem in most cases, as caches eventually become consistent, depending on your reconciliation logic, you might still -require the latest status version possible, for example if the status subresource is used as a communication mechanism, -see [Representing Allocated Values](https://github.com/kubernetes/community/blob/master/contributors/devel/sig-architecture/api-conventions.md#representing-allocated-values) +require the latest status version possible, for example, if the status subresource is used to store allocated values. +See [Representing Allocated Values](https://github.com/kubernetes/community/blob/master/contributors/devel/sig-architecture/api-conventions.md#representing-allocated-values) from the Kubernetes docs for more details. The framework provides utilities to help with these use cases with [`PrimaryUpdateAndCacheUtils`](https://github.com/operator-framework/java-operator-sdk/blob/main/operator-framework-core/src/main/java/io/javaoperatorsdk/operator/api/reconciler/PrimaryUpdateAndCacheUtils.java). -These utility methods come in two flavors: +These utility methods come in multiple flavors: #### Using internal cache -In almost all cases for this purpose, you can use internal caches: +In almost all cases for this purpose, you can use internal caches in combination with update methods that use +optimistic locking (end with *WithLock(...)). If the update method fails on optimistic locking, it will retry +using a fresh resource from the server as base for modification. Again, this is the default option and will probably +work for you. ```java @Override @@ -201,27 +204,32 @@ public UpdateControl reconcile( var freshCopy = createFreshCopy(primary); freshCopy.getStatus().setValue(statusWithState()); - var updatedResource = PrimaryUpdateAndCacheUtils.ssaPatchStatusAndCacheResource(resource, freshCopy, context); + var updatedResource = PrimaryUpdateAndCacheUtils.ssaPatchStatusAndCacheResourceWithLock(resource, freshCopy, context); return UpdateControl.noUpdate(); } ``` -In the background `PrimaryUpdateAndCacheUtils.ssaPatchAndCacheStatus` puts the result of the update into an internal -cache and will make sure that the next reconciliation will contain the most recent version of the resource. Note that it -is not necessarily the version of the resource you got as response from the update, it can be newer since other parties +After the update `PrimaryUpdateAndCacheUtils.ssaPatchStatusAndCacheResourceWithLock` puts the result of the update into an internal +cache and will the framework will make sure that the next reconciliation will contain the most recent version of the resource. +Note that it is not necessarily the version of the resource you got as response from the update, it can be newer since other parties can do additional updates meanwhile, but if not explicitly modified, it will contain the up-to-date status. -See related integration test [here](https://github.com/operator-framework/java-operator-sdk/blob/main/operator-framework/src/test/java/io/javaoperatorsdk/operator/baseapi/statuscache/internal). +See related integration test [here](https://github.com/operator-framework/java-operator-sdk/blob/main/operator-framework/src/test/java/io/javaoperatorsdk/operator/baseapi/statuscache/internalwithlock). This approach works with the default configuration of the framework and should be good to go in most of the cases. -Without going further into the details, this won't work if `ConfigurationService.parseResourceVersionsForEventFilteringAndCaching` -is set to `false` (more precisely there are some edge cases when it won't work). For that case framework provides the following solution: + +Without going further into the details, a bit more experimental way we provide overloaded methods without optimistic locking, +to use those you have to set `ConfigurationService.parseResourceVersionsForEventFilteringAndCaching` +to `true`. This in practice would mean that request won't fail on optimistic locking, but requires bending a bit +the rules regarding Kubernetes API contract. This might be needed only if you have multiple resources frequently +writing the resource. #### Fallback approach: using `PrimaryResourceCache` cache -As an alternative, for very rare cases when `ConfigurationService.parseResourceVersionsForEventFilteringAndCaching` -needs to be set to `false` you can use an explicit caching approach: +For the sake of completeness, we also provide a more explicit approach to manage the cache yourself. +This approach has the advantage that you don't have to do neither optimistic locking nor +setting the `parseResourceVersionsForEventFilteringAndCaching` to `true`: ```java @@ -277,9 +285,7 @@ their associated primary resource from the underlying informer event source cach #### Additional remarks -As shown in the integration tests, there is no optimistic locking used when updating the +As shown in the last two cases, there is no optimistic locking used when updating the [resource](https://github.com/operator-framework/java-operator-sdk/blob/main/operator-framework/src/test/java/io/javaoperatorsdk/operator/baseapi/statuscache/internal/StatusPatchCacheReconciler.java#L41) -(in other words `metadata.resourceVersion` is set to `null`). This is desired since you don't want the patch to fail on -update. - -In addition, you can configure the [Fabric8 client retry](https://github.com/fabric8io/kubernetes-client?tab=readme-ov-file#configuring-the-client). +(in other words `metadata.resourceVersion` is set to `null`). This has nice property the request will be successful. +However, it might be desirable to configure retry on [Fabric8 client](https://github.com/fabric8io/kubernetes-client?tab=readme-ov-file#configuring-the-client). diff --git a/operator-framework-core/src/main/java/io/javaoperatorsdk/operator/api/reconciler/PrimaryUpdateAndCacheUtils.java b/operator-framework-core/src/main/java/io/javaoperatorsdk/operator/api/reconciler/PrimaryUpdateAndCacheUtils.java index 5a60ad2ae6..2d95b296dc 100644 --- a/operator-framework-core/src/main/java/io/javaoperatorsdk/operator/api/reconciler/PrimaryUpdateAndCacheUtils.java +++ b/operator-framework-core/src/main/java/io/javaoperatorsdk/operator/api/reconciler/PrimaryUpdateAndCacheUtils.java @@ -22,12 +22,13 @@ * various options, where all of them have pros and cons. * *

    - *
  • Retryable updates with optimistic locking (*withLock) - you can use this approach out of - * the box, it updates the resource using optimistic locking and caches the resource. If the - * update fails it reads the primary resource and applies the modifications again and retries - * the update. After successful update it caches the resource for next reconciliation. The - * disadvantage of this method is that theoretically it could fail the max attempt retry. Note - * that optimistic locking is essential to have the caching work in general. + *
  • (Preferred) Retryable updates with optimistic locking (*withLock) - you can use this + * approach out of the box, it updates the resource using optimistic locking and caches the + * resource. If the update fails it reads the primary resource and applies the modifications + * again and retries the update. After successful update it caches the resource for next + * reconciliation. The disadvantage of this method is that theoretically it could fail the max + * attempt retry. Note that optimistic locking is essential to have the caching work in + * general. *
  • Caching without optimistic locking but with parsing the resource version - to use this you * have to set {@link ConfigurationService#parseResourceVersionsForEventFilteringAndCaching()} * to true. The update won't fail on optimistic locking so there is much higher chance to @@ -48,6 +49,65 @@ private PrimaryUpdateAndCacheUtils() {} private static final Logger log = LoggerFactory.getLogger(PrimaryUpdateAndCacheUtils.class); + /** + * Updates the status with optimistic locking and caches the result for next reconciliation. For + * details see {@link #updateAndCacheResourceWithLock}. + */ + public static

    P updateStatusAndCacheResourceWithLock( + P primary, Context

    context, UnaryOperator

    modificationFunction) { + return updateAndCacheResourceWithLock( + primary, + context, + modificationFunction, + r -> context.getClient().resource(r).updateStatus()); + } + + /** + * Patches the status using JSON Merge Patch with optimistic locking and caches the result for + * next reconciliation. For details see {@link #updateAndCacheResourceWithLock}. + */ + public static

    P patchStatusAndCacheResourceWithLock( + P primary, Context

    context, UnaryOperator

    modificationFunction) { + return updateAndCacheResourceWithLock( + primary, context, modificationFunction, r -> context.getClient().resource(r).patchStatus()); + } + + /** + * Patches the status using JSON Patch with optimistic locking and caches the result for next + * reconciliation. For details see {@link #updateAndCacheResourceWithLock}. + */ + public static

    P editStatusAndCacheResourceWithLock( + P primary, Context

    context, UnaryOperator

    modificationFunction) { + return updateAndCacheResourceWithLock( + primary, + context, + UnaryOperator.identity(), + r -> context.getClient().resource(r).editStatus(modificationFunction)); + } + + /** + * Patches the status using Server Side Apply with optimistic locking and caches the result for + * next reconciliation. For details see {@link #updateAndCacheResourceWithLock}. + */ + public static

    P ssaPatchStatusAndCacheResourceWithLock( + P primary, P freshResourceWithStatus, Context

    context) { + return updateAndCacheResourceWithLock( + primary, + context, + r -> freshResourceWithStatus, + r -> + context + .getClient() + .resource(r) + .subresource("status") + .patch( + new PatchContext.Builder() + .withForce(true) + .withFieldManager(context.getControllerConfiguration().fieldManager()) + .withPatchType(PatchType.SERVER_SIDE_APPLY) + .build())); + } + /** * Updates status and makes sure that the up-to-date primary resource will be present during the * next reconciliation. Using update (PUT) method. @@ -64,15 +124,6 @@ public static

    P updateStatusAndCacheResource( primary, context, () -> context.getClient().resource(primary).updateStatus()); } - public static

    P updateStatusAndCacheResourceWithLock( - P primary, Context

    context, UnaryOperator

    modificationFunction) { - return updateAndCacheResourceWithLock( - primary, - context, - modificationFunction, - r -> context.getClient().resource(r).updateStatus()); - } - /** * Patches status with and makes sure that the up-to-date primary resource will be present during * the next reconciliation. Using JSON Merge patch. @@ -89,12 +140,6 @@ public static

    P patchStatusAndCacheResource( primary, context, () -> context.getClient().resource(primary).patchStatus()); } - public static

    P patchStatusAndCacheResourceWithLock( - P primary, Context

    context, UnaryOperator

    modificationFunction) { - return updateAndCacheResourceWithLock( - primary, context, modificationFunction, r -> context.getClient().resource(r).patchStatus()); - } - /** * Patches status and makes sure that the up-to-date primary resource will be present during the * next reconciliation. Using JSON Patch. @@ -116,15 +161,6 @@ public static

    P editStatusAndCacheResource( primary, context, () -> context.getClient().resource(primary).editStatus(operation)); } - public static

    P editStatusAndCacheResourceWithLock( - P primary, Context

    context, UnaryOperator

    modificationFunction) { - return updateAndCacheResourceWithLock( - primary, - context, - UnaryOperator.identity(), - r -> context.getClient().resource(r).editStatus(modificationFunction)); - } - /** * Patches the resource with supplied method and makes sure that the up-to-date primary resource * will be present during the next reconciliation. @@ -174,25 +210,6 @@ public static

    P ssaPatchStatusAndCacheResource( .build())); } - public static

    P ssaPatchStatusAndCacheResourceWithLock( - P primary, P freshResourceWithStatus, Context

    context) { - return updateAndCacheResourceWithLock( - primary, - context, - r -> freshResourceWithStatus, - r -> - context - .getClient() - .resource(r) - .subresource("status") - .patch( - new PatchContext.Builder() - .withForce(true) - .withFieldManager(context.getControllerConfiguration().fieldManager()) - .withPatchType(PatchType.SERVER_SIDE_APPLY) - .build())); - } - /** * Patches the resource status and caches the response in provided {@link PrimaryResourceCache}. * Uses Server Side Apply. @@ -309,6 +326,23 @@ private static

    void checkResourceVersionNotPresentAndPar } } + /** + * Modifies the primary using modificationFunction, then uses the modified resource for the + * request to update with provided update method. But before the update operation sets the + * resourceVersion to the modified resource from the primary resource, so there is always + * optimistic locking happening. If the request fails on optimistic update, we read the resource + * again from the K8S API server and retry the whole process. In short, we make sure we always + * update the resource with optimistic locking, after we cache the resource in internal cache. + * Without further going into the details, the optimistic locking is needed so we can reliably + * handle the caching. + * + * @param primary original resource to update + * @param context of reconciliation + * @param modificationFunction modifications to make on primary + * @param updateMethod the update method implementation + * @return updated resource + * @param

    primary type + */ public static

    P updateAndCacheResourceWithLock( P primary, Context

    context, @@ -318,6 +352,24 @@ public static

    P updateAndCacheResourceWithLock( primary, context, modificationFunction, updateMethod, DEFAULT_MAX_RETRY); } + /** + * Modifies the primary using modificationFunction, then uses the modified resource for the + * request to update with provided update method. But before the update operation sets the + * resourceVersion to the modified resource from the primary resource, so there is always + * optimistic locking happening. If the request fails on optimistic update, we read the resource + * again from the K8S API server and retry the whole process. In short, we make sure we always + * update the resource with optimistic locking, after we cache the resource in internal cache. + * Without further going into the details, the optimistic locking is needed so we can reliably + * handle the caching. + * + * @param primary original resource to update + * @param context of reconciliation + * @param modificationFunction modifications to make on primary + * @param updateMethod the update method implementation + * @param maxRetry - maximum number of retries of conflicts + * @return updated resource + * @param

    primary type + */ @SuppressWarnings("unchecked") public static

    P updateAndCacheResourceWithLock( P primary, diff --git a/operator-framework/src/test/java/io/javaoperatorsdk/operator/baseapi/statuscache/internal/StatusPatchCacheReconciler.java b/operator-framework/src/test/java/io/javaoperatorsdk/operator/baseapi/statuscache/internal/StatusPatchCacheReconciler.java index a62d0c5b18..99a94796a5 100644 --- a/operator-framework/src/test/java/io/javaoperatorsdk/operator/baseapi/statuscache/internal/StatusPatchCacheReconciler.java +++ b/operator-framework/src/test/java/io/javaoperatorsdk/operator/baseapi/statuscache/internal/StatusPatchCacheReconciler.java @@ -31,6 +31,10 @@ public UpdateControl reconcile( + resource.getStatus().getValue()); } + // test also resource update happening meanwhile reconciliation + resource.getSpec().setCounter(resource.getSpec().getCounter() + 1); + context.getClient().resource(resource).update(); + var freshCopy = createFreshCopy(resource); freshCopy diff --git a/operator-framework/src/test/java/io/javaoperatorsdk/operator/baseapi/statuscache/internalwithlock/StatusPatchCacheWithLockReconciler.java b/operator-framework/src/test/java/io/javaoperatorsdk/operator/baseapi/statuscache/internalwithlock/StatusPatchCacheWithLockReconciler.java index 08ede99848..e6377a58dc 100644 --- a/operator-framework/src/test/java/io/javaoperatorsdk/operator/baseapi/statuscache/internalwithlock/StatusPatchCacheWithLockReconciler.java +++ b/operator-framework/src/test/java/io/javaoperatorsdk/operator/baseapi/statuscache/internalwithlock/StatusPatchCacheWithLockReconciler.java @@ -33,6 +33,10 @@ public UpdateControl reconcile( + resource.getStatus().getValue()); } + // test also resource update happening meanwhile reconciliation + resource.getSpec().setCounter(resource.getSpec().getCounter() + 1); + context.getClient().resource(resource).update(); + var freshCopy = createFreshCopy(resource); freshCopy diff --git a/operator-framework/src/test/java/io/javaoperatorsdk/operator/baseapi/statuscache/primarycache/StatusPatchPrimaryCacheReconciler.java b/operator-framework/src/test/java/io/javaoperatorsdk/operator/baseapi/statuscache/primarycache/StatusPatchPrimaryCacheReconciler.java index b03424441f..b36ef2e8d9 100644 --- a/operator-framework/src/test/java/io/javaoperatorsdk/operator/baseapi/statuscache/primarycache/StatusPatchPrimaryCacheReconciler.java +++ b/operator-framework/src/test/java/io/javaoperatorsdk/operator/baseapi/statuscache/primarycache/StatusPatchPrimaryCacheReconciler.java @@ -46,6 +46,10 @@ public UpdateControl reconcile( + primary.getStatus().getValue()); } + // test also resource update happening meanwhile reconciliation + primary.getSpec().setCounter(primary.getSpec().getCounter() + 1); + context.getClient().resource(primary).update(); + var freshCopy = createFreshCopy(primary); freshCopy .getStatus() diff --git a/operator-framework/src/test/java/io/javaoperatorsdk/operator/baseapi/statuscache/primarycache/StatusPatchPrimaryCacheSpec.java b/operator-framework/src/test/java/io/javaoperatorsdk/operator/baseapi/statuscache/primarycache/StatusPatchPrimaryCacheSpec.java index 90630c1ae8..da52a48478 100644 --- a/operator-framework/src/test/java/io/javaoperatorsdk/operator/baseapi/statuscache/primarycache/StatusPatchPrimaryCacheSpec.java +++ b/operator-framework/src/test/java/io/javaoperatorsdk/operator/baseapi/statuscache/primarycache/StatusPatchPrimaryCacheSpec.java @@ -2,14 +2,13 @@ public class StatusPatchPrimaryCacheSpec { - private boolean messageInStatus = true; + private int counter = 0; - public boolean isMessageInStatus() { - return messageInStatus; + public int getCounter() { + return counter; } - public StatusPatchPrimaryCacheSpec setMessageInStatus(boolean messageInStatus) { - this.messageInStatus = messageInStatus; - return this; + public void setCounter(int counter) { + this.counter = counter; } } From 828f7a3522ca69a5420fc6aed5bc33d41878be9e Mon Sep 17 00:00:00 2001 From: Chris Laprun Date: Thu, 15 May 2025 19:14:00 +0200 Subject: [PATCH 9/9] fix: typo and wording --- docs/content/en/docs/documentation/reconciler.md | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/docs/content/en/docs/documentation/reconciler.md b/docs/content/en/docs/documentation/reconciler.md index 40e7fa2c35..91bac16a59 100644 --- a/docs/content/en/docs/documentation/reconciler.md +++ b/docs/content/en/docs/documentation/reconciler.md @@ -211,7 +211,7 @@ public UpdateControl reconcile( ``` After the update `PrimaryUpdateAndCacheUtils.ssaPatchStatusAndCacheResourceWithLock` puts the result of the update into an internal -cache and will the framework will make sure that the next reconciliation will contain the most recent version of the resource. +cache and the framework will make sure that the next reconciliation contains the most recent version of the resource. Note that it is not necessarily the version of the resource you got as response from the update, it can be newer since other parties can do additional updates meanwhile, but if not explicitly modified, it will contain the up-to-date status.