-
Notifications
You must be signed in to change notification settings - Fork 219
feat: primary resource caching for followup reconciliation(s) #2761
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Changes from 25 commits
3aa6d17
0125d66
8870c14
bff907c
32823e0
b016bf0
00fd9e6
3b99f78
1812851
e09472a
608fb09
21b2ef5
870db57
9c58fd4
e481342
51f1ca0
3409053
68ca625
84eec7b
42b9ead
d51f0e3
14c63bb
e9bcfbe
e8ede1a
a71eafe
eff1ccb
217629f
a8e7efc
a1d303d
7b58dca
a656160
1f1e1d0
95a9f2e
720a4c5
a510cc7
0a79dd7
f12955b
717933b
966aee7
68471f7
fb090a3
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Original file line number | Diff line number | Diff line change | ||||
---|---|---|---|---|---|---|
|
@@ -169,3 +169,121 @@ You can specify the name of the finalizer to use for your `Reconciler` using the | |||||
annotation. If you do not specify a finalizer name, one will be automatically generated for you. | ||||||
|
||||||
From v5, by default, the finalizer is added using Server Side Apply. See also `UpdateControl` in docs. | ||||||
|
||||||
### Making sure primary is up to date for the next reconciliation | ||||||
|
||||||
When you implement a reconciler as a final step (but maybe also multiple times during one reconciliation), you | ||||||
usually update the status subresource with the information that was available during the reconciliation. | ||||||
Sometimes this is referred to as the last observed state. | ||||||
When the resource is updated, the framework does not cache the resource directly from the response of the update. | ||||||
Instead, the underlying informer eventually receives an event with the updated resource and caches the resource. | ||||||
Therefore, it can happen that on next reconciliation the primary resource is not up-to-date regarding your updated (note that other event sources | ||||||
can trigger the reconciliation meanwhile). This is not usually a problem, since the status is not used as an input, | ||||||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more.
I guess this depends on the reconciler implementation. In our case for instance, we check the status of the resource and react based up on it. |
||||||
the reconciliation runs again, and the status us updated again. The caches are eventually consistent. | ||||||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more.
Suggested change
|
||||||
|
||||||
However, there are cases when you would like to store some state in the status, typically generated | ||||||
IDs of external resources. | ||||||
See related topic in Kubernetes docs: [Representing Allocated Values](https://github.com/kubernetes/community/blob/master/contributors/devel/sig-architecture/api-conventions.md#representing-allocated-values). | ||||||
In this case, it is reasonable to expect to have the state always available for the next reconciliation, | ||||||
to avoid generating the resource again and other race conditions. | ||||||
|
||||||
Therefore, | ||||||
the framework provides facilities | ||||||
to cover these use cases withing [`PrimaryUpdateAndCacheUtils`](https://github.com/operator-framework/java-operator-sdk/blob/main/operator-framework-core/src/main/java/io/javaoperatorsdk/operator/api/reconciler/PrimaryUpdateAndCacheUtils.java#L16). | ||||||
csviri marked this conversation as resolved.
Show resolved
Hide resolved
|
||||||
These utility methods come in two flavors: | ||||||
|
||||||
#### Using internal cache | ||||||
|
||||||
In almost all cases for this purpose, you can use internal caches: | ||||||
|
||||||
```java | ||||||
@Override | ||||||
public UpdateControl<StatusPatchCacheCustomResource> reconcile( | ||||||
StatusPatchCacheCustomResource resource, Context<StatusPatchCacheCustomResource> context) { | ||||||
|
||||||
// omitted logic | ||||||
|
||||||
// update with SSA requires a fresh copy | ||||||
var freshCopy = createFreshCopy(primary); | ||||||
|
||||||
freshCopy.getStatus().setValue(statusWithState()); | ||||||
|
||||||
var updatedResource = PrimaryUpdateAndCacheUtils.ssaPatchAndCacheStatus(resource, freshCopy, context); | ||||||
|
||||||
return UpdateControl.noUpdate(); | ||||||
} | ||||||
``` | ||||||
|
||||||
In the background `PrimaryUpdateAndCacheUtils.ssaPatchAndCacheStatusWith` puts the result of the update into an internal | ||||||
csviri marked this conversation as resolved.
Show resolved
Hide resolved
|
||||||
cache of the event source of primary resource, and will make sure that the next reconciliation will contain the most | ||||||
recent version of the resource. Note that it is not necessarily the version from the update response, it can be newer | ||||||
since other parties can do additional updates meanwhile, but if not explicitly modified, it will contain the up-to-date | ||||||
status. | ||||||
csviri marked this conversation as resolved.
Show resolved
Hide resolved
|
||||||
|
||||||
See related integration test [here](https://github.com/operator-framework/java-operator-sdk/blob/main/operator-framework/src/test/java/io/javaoperatorsdk/operator/baseapi/statuscache/internal). | ||||||
|
||||||
This approach works with the default configuration of the framework and should be good to go in most of the cases. | ||||||
Without going further into the details, this won't work if `ConfigurtionService.parseResourceVersionsForEventFilteringAndCaching` | ||||||
is set to `false` (more precisely there are some edge cases when it won't work). For that case framework provides the following solution: | ||||||
|
||||||
#### Using `PrimaryResourceCache` cache | ||||||
|
||||||
As an alternative, you can use an explicit caching approach that the framework supports: | ||||||
|
||||||
```java | ||||||
|
||||||
// We on purpose don't use the provided predicate to show what a custom one could look like. | ||||||
private final PrimaryResourceCache<StatusPatchPrimaryCacheCustomResource> cache = | ||||||
new PrimaryResourceCache<>( | ||||||
(statusPatchCacheCustomResourcePair, statusPatchCacheCustomResource) -> | ||||||
statusPatchCacheCustomResource.getStatus().getValue() | ||||||
>= statusPatchCacheCustomResourcePair.afterUpdate().getStatus().getValue()); | ||||||
|
||||||
@Override | ||||||
public UpdateControl<StatusPatchPrimaryCacheCustomResource> reconcile( | ||||||
StatusPatchPrimaryCacheCustomResource primary, | ||||||
Context<StatusPatchPrimaryCacheCustomResource> context) { | ||||||
|
||||||
// | ||||||
primary = cache.getFreshResource(primary); | ||||||
|
||||||
// omitted logic | ||||||
|
||||||
var freshCopy = createFreshCopy(primary); | ||||||
|
||||||
freshCopy.getStatus().setValue(statusWithState()); | ||||||
|
||||||
var updated = | ||||||
PrimaryUpdateAndCacheUtils.ssaPatchAndCacheStatus(primary, freshCopy, context, cache); | ||||||
|
||||||
return UpdateControl.noUpdate(); | ||||||
} | ||||||
|
||||||
@Override | ||||||
public DeleteControl cleanup( | ||||||
StatusPatchPrimaryCacheCustomResource resource, | ||||||
Context<StatusPatchPrimaryCacheCustomResource> context) | ||||||
throws Exception { | ||||||
// cleanup the cache on resource deletion | ||||||
cache.cleanup(resource); | ||||||
return DeleteControl.defaultDelete(); | ||||||
} | ||||||
|
||||||
``` | ||||||
|
||||||
[`PrimaryResourceCache`](https://github.com/operator-framework/java-operator-sdk/blob/main/operator-framework-core/src/main/java/io/javaoperatorsdk/operator/api/reconciler/support/PrimaryResourceCache.java) | ||||||
is designed for this purpose. | ||||||
As shown in the example above, it is up to you to provide a predicate to determine if the resource is more recent than the one available. | ||||||
In other words, when to evict the resource from the cache. Typically, as show in the [integration test](https://github.com/operator-framework/java-operator-sdk/blob/main/operator-framework/src/test/java/io/javaoperatorsdk/operator/baseapi/statuscache/primarycache) | ||||||
you can have a counter in status to check on that. | ||||||
|
||||||
Since all of this happens explicitly, you cannot use it for now with managed dependent resources and workflows. | ||||||
csviri marked this conversation as resolved.
Show resolved
Hide resolved
|
||||||
|
||||||
#### Additional remarks | ||||||
|
||||||
As shown in the integration tests, there is no optimistic locking used when updating the | ||||||
[resource](https://github.com/operator-framework/java-operator-sdk/blob/main/operator-framework/src/test/java/io/javaoperatorsdk/operator/baseapi/statuscache/internal/StatusPatchCacheReconciler.java#L41) | ||||||
csviri marked this conversation as resolved.
Show resolved
Hide resolved
|
||||||
(in other works `metadata.resourceVersion` is set to `null`). | ||||||
This is desired since you don't want the patch to fail on update. | ||||||
|
||||||
In addition, you can configure retry for in fabric8 client. |
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,222 @@ | ||
package io.javaoperatorsdk.operator.api.reconciler; | ||
|
||
import java.util.function.BiFunction; | ||
import java.util.function.UnaryOperator; | ||
|
||
import org.slf4j.Logger; | ||
import org.slf4j.LoggerFactory; | ||
|
||
import io.fabric8.kubernetes.api.model.HasMetadata; | ||
import io.fabric8.kubernetes.client.KubernetesClient; | ||
import io.fabric8.kubernetes.client.dsl.base.PatchContext; | ||
import io.fabric8.kubernetes.client.dsl.base.PatchType; | ||
import io.javaoperatorsdk.operator.api.reconciler.support.PrimaryResourceCache; | ||
import io.javaoperatorsdk.operator.processing.event.ResourceID; | ||
|
||
public class PrimaryUpdateAndCacheUtils { | ||
|
||
private PrimaryUpdateAndCacheUtils() {} | ||
|
||
private static final Logger log = LoggerFactory.getLogger(PrimaryUpdateAndCacheUtils.class); | ||
|
||
/** | ||
* Makes sure that the up-to-date primary resource will be present during the next reconciliation. | ||
csviri marked this conversation as resolved.
Show resolved
Hide resolved
|
||
* Using update (PUT) method. | ||
* | ||
* @param primary resource | ||
* @param context of reconciliation | ||
* @return updated resource | ||
* @param <P> primary resource type | ||
*/ | ||
public static <P extends HasMetadata> P updateAndCacheStatus(P primary, Context<P> context) { | ||
return patchAndCacheStatus(primary, context, (p, c) -> c.resource(primary).updateStatus()); | ||
} | ||
|
||
/** | ||
* Makes sure that the up-to-date primary resource will be present during the next reconciliation. | ||
* Using JSON Merge patch. | ||
* | ||
* @param primary resource | ||
* @param context of reconciliation | ||
* @return updated resource | ||
* @param <P> primary resource type | ||
*/ | ||
public static <P extends HasMetadata> P patchAndCacheStatus(P primary, Context<P> context) { | ||
return patchAndCacheStatus(primary, context, (p, c) -> c.resource(primary).patchStatus()); | ||
} | ||
|
||
/** | ||
* Makes sure that the up-to-date primary resource will be present during the next reconciliation. | ||
* Using JSON Patch. | ||
* | ||
* @param primary resource | ||
* @param context of reconciliation | ||
* @return updated resource | ||
* @param <P> primary resource type | ||
*/ | ||
public static <P extends HasMetadata> P editAndCacheStatus( | ||
P primary, Context<P> context, UnaryOperator<P> operation) { | ||
return patchAndCacheStatus( | ||
primary, context, (p, c) -> c.resource(primary).editStatus(operation)); | ||
} | ||
|
||
/** | ||
* Makes sure that the up-to-date primary resource will be present during the next reconciliation. | ||
* | ||
* @param primary resource | ||
* @param context of reconciliation | ||
* @param patch free implementation of cache - make sure you use optimistic locking during the | ||
* update | ||
* @return the updated resource. | ||
* @param <P> primary resource type | ||
*/ | ||
public static <P extends HasMetadata> P patchAndCacheStatus( | ||
P primary, Context<P> context, BiFunction<P, KubernetesClient, P> patch) { | ||
var updatedResource = patch.apply(primary, context.getClient()); | ||
context | ||
.eventSourceRetriever() | ||
.getControllerEventSource() | ||
.handleRecentResourceUpdate(ResourceID.fromResource(primary), updatedResource, primary); | ||
return updatedResource; | ||
} | ||
|
||
/** | ||
* Makes sure that the up-to-date primary resource will be present during the next reconciliation. | ||
* Using Server Side Apply. | ||
* | ||
* @param primary resource | ||
* @param freshResourceWithStatus - fresh resource with target state | ||
* @param context of reconciliation | ||
* @return the updated resource. | ||
* @param <P> primary resource type | ||
*/ | ||
public static <P extends HasMetadata> P ssaPatchAndCacheStatus( | ||
P primary, P freshResourceWithStatus, Context<P> context) { | ||
var res = | ||
context | ||
.getClient() | ||
.resource(freshResourceWithStatus) | ||
.subresource("status") | ||
.patch( | ||
new PatchContext.Builder() | ||
.withForce(true) | ||
.withFieldManager(context.getControllerConfiguration().fieldManager()) | ||
.withPatchType(PatchType.SERVER_SIDE_APPLY) | ||
.build()); | ||
|
||
context | ||
.eventSourceRetriever() | ||
.getControllerEventSource() | ||
.handleRecentResourceUpdate(ResourceID.fromResource(primary), res, primary); | ||
return res; | ||
} | ||
|
||
/** | ||
* Patches the resource and adds it to the {@link PrimaryResourceCache} provided. Optimistic | ||
* locking is not required. | ||
* | ||
* @param primary resource | ||
* @param freshResourceWithStatus - fresh resource with target state | ||
* @param context of reconciliation | ||
* @param cache - resource cache managed by user | ||
* @return the updated resource. | ||
* @param <P> primary resource type | ||
*/ | ||
public static <P extends HasMetadata> P ssaPatchAndCacheStatus( | ||
P primary, P freshResourceWithStatus, Context<P> context, PrimaryResourceCache<P> cache) { | ||
logWarnIfResourceVersionPresent(freshResourceWithStatus); | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. If optimistic locking is not required, why do we need to log a warning? |
||
return patchAndCacheStatus( | ||
primary, | ||
context.getClient(), | ||
cache, | ||
(P p, KubernetesClient c) -> | ||
c.resource(freshResourceWithStatus) | ||
.subresource("status") | ||
.patch( | ||
new PatchContext.Builder() | ||
.withForce(true) | ||
.withFieldManager(context.getControllerConfiguration().fieldManager()) | ||
.withPatchType(PatchType.SERVER_SIDE_APPLY) | ||
.build())); | ||
} | ||
|
||
/** | ||
* Patches the resource with JSON Patch and adds it to the {@link PrimaryResourceCache} provided. | ||
* Optimistic locking is not required. | ||
* | ||
* @param primary resource* | ||
csviri marked this conversation as resolved.
Show resolved
Hide resolved
|
||
* @param context of reconciliation | ||
* @param cache - resource cache managed by user | ||
* @return the updated resource. | ||
* @param <P> primary resource type | ||
*/ | ||
public static <P extends HasMetadata> P editAndCacheStatus( | ||
P primary, Context<P> context, PrimaryResourceCache<P> cache, UnaryOperator<P> operation) { | ||
logWarnIfResourceVersionPresent(primary); | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. If optimistic locking is not required, why do we need to log a warning? |
||
return patchAndCacheStatus( | ||
primary, | ||
context.getClient(), | ||
cache, | ||
(P p, KubernetesClient c) -> c.resource(primary).editStatus(operation)); | ||
} | ||
|
||
/** | ||
* Patches the resource with JSON Merge patch and adds it to the {@link PrimaryResourceCache} | ||
* provided. Optimistic locking is not required. | ||
* | ||
* @param primary resource* | ||
csviri marked this conversation as resolved.
Show resolved
Hide resolved
|
||
* @param context of reconciliation | ||
* @param cache - resource cache managed by user | ||
* @return the updated resource. | ||
* @param <P> primary resource type | ||
*/ | ||
public static <P extends HasMetadata> P patchAndCacheStatus( | ||
P primary, Context<P> context, PrimaryResourceCache<P> cache) { | ||
logWarnIfResourceVersionPresent(primary); | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. If optimistic locking is not required, why do we need to log a warning? |
||
return patchAndCacheStatus( | ||
primary, | ||
context.getClient(), | ||
cache, | ||
(P p, KubernetesClient c) -> c.resource(primary).patchStatus()); | ||
} | ||
|
||
/** | ||
* Updates the resource and adds it to the {@link PrimaryResourceCache} provided. Optimistic | ||
* locking is not required. | ||
* | ||
* @param primary resource* | ||
csviri marked this conversation as resolved.
Show resolved
Hide resolved
|
||
* @param context of reconciliation | ||
* @param cache - resource cache managed by user | ||
* @return the updated resource. | ||
* @param <P> primary resource type | ||
*/ | ||
public static <P extends HasMetadata> P updateAndCacheStatus( | ||
P primary, Context<P> context, PrimaryResourceCache<P> cache) { | ||
logWarnIfResourceVersionPresent(primary); | ||
return patchAndCacheStatus( | ||
primary, | ||
context.getClient(), | ||
cache, | ||
(P p, KubernetesClient c) -> c.resource(primary).updateStatus()); | ||
} | ||
|
||
public static <P extends HasMetadata> P patchAndCacheStatus( | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Can we add small JavaDoc for this exposed method as well? There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Still would be nice to have a JavaDoc for this public method. |
||
P primary, | ||
KubernetesClient client, | ||
PrimaryResourceCache<P> cache, | ||
BiFunction<P, KubernetesClient, P> patch) { | ||
var updatedResource = patch.apply(primary, client); | ||
cache.cacheResource(primary, updatedResource); | ||
return updatedResource; | ||
} | ||
|
||
private static <P extends HasMetadata> void logWarnIfResourceVersionPresent(P primary) { | ||
if (primary.getMetadata().getResourceVersion() != null) { | ||
log.warn( | ||
"Primary resource version is NOT null, for caching with optimistic locking use" | ||
+ " alternative methods. Name: {} namespace: {}", | ||
primary.getMetadata().getName(), | ||
primary.getMetadata().getNamespace()); | ||
} | ||
} | ||
} |
Uh oh!
There was an error while loading. Please reload this page.