Skip to content

Commit eafe869

Browse files
committed
feat: moving cache sync timeout to controller level (#1576)
1 parent 6f5cf2e commit eafe869

17 files changed

+84
-44
lines changed

operator-framework-core/src/main/java/io/javaoperatorsdk/operator/api/config/AnnotationControllerConfiguration.java

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -252,6 +252,12 @@ public List<DependentResourceSpec> getDependentResources() {
252252
return specs;
253253
}
254254

255+
@Override
256+
public Duration cacheSyncTimeout() {
257+
var cacheSyncTimeout = annotation.cacheSyncTimeout();
258+
return Duration.of(cacheSyncTimeout.timeout(), cacheSyncTimeout.timeUnit().toChronoUnit());
259+
}
260+
255261
private String getName(Dependent dependent, Class<? extends DependentResource> dependentType) {
256262
var name = dependent.name();
257263
if (name.isBlank()) {

operator-framework-core/src/main/java/io/javaoperatorsdk/operator/api/config/ConfigurationService.java

Lines changed: 0 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,5 @@
11
package io.javaoperatorsdk.operator.api.config;
22

3-
import java.time.Duration;
43
import java.util.Optional;
54
import java.util.Set;
65
import java.util.concurrent.ExecutorService;
@@ -169,15 +168,6 @@ default boolean stopOnInformerErrorDuringStartup() {
169168
return true;
170169
}
171170

172-
/**
173-
* Timeout for cache sync in milliseconds. In other words source start timeout. Note that is
174-
* "stopOnInformerErrorDuringStartup" is true the operator will stop on timeout. Default is 2
175-
* minutes.
176-
*/
177-
default Duration cacheSyncTimeout() {
178-
return Duration.ofMinutes(2);
179-
}
180-
181171
/**
182172
* Handler for an informer stop. Informer stops if there is a non-recoverable error. Like received
183173
* a resource that cannot be deserialized.

operator-framework-core/src/main/java/io/javaoperatorsdk/operator/api/config/ConfigurationServiceOverrider.java

Lines changed: 0 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,5 @@
11
package io.javaoperatorsdk.operator.api.config;
22

3-
import java.time.Duration;
43
import java.util.Optional;
54
import java.util.Set;
65
import java.util.concurrent.ExecutorService;
@@ -29,7 +28,6 @@ public class ConfigurationServiceOverrider {
2928
private LeaderElectionConfiguration leaderElectionConfiguration;
3029
private InformerStoppedHandler informerStoppedHandler;
3130
private Boolean stopOnInformerErrorDuringStartup;
32-
private Duration cacheSyncTimeout;
3331

3432
ConfigurationServiceOverrider(ConfigurationService original) {
3533
this.original = original;
@@ -108,11 +106,6 @@ public ConfigurationServiceOverrider withStopOnInformerErrorDuringStartup(
108106
return this;
109107
}
110108

111-
public ConfigurationServiceOverrider withCacheSyncTimeout(Duration cacheSyncTimeout) {
112-
this.cacheSyncTimeout = cacheSyncTimeout;
113-
return this;
114-
}
115-
116109
public ConfigurationService build() {
117110
return new BaseConfigurationService(original.getVersion(), cloner, objectMapper) {
118111
@Override
@@ -191,11 +184,6 @@ public boolean stopOnInformerErrorDuringStartup() {
191184
return stopOnInformerErrorDuringStartup != null ? stopOnInformerErrorDuringStartup
192185
: super.stopOnInformerErrorDuringStartup();
193186
}
194-
195-
@Override
196-
public Duration cacheSyncTimeout() {
197-
return cacheSyncTimeout != null ? cacheSyncTimeout : super.cacheSyncTimeout();
198-
}
199187
};
200188
}
201189

operator-framework-core/src/main/java/io/javaoperatorsdk/operator/api/config/ControllerConfiguration.java

Lines changed: 4 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -17,7 +17,7 @@
1717
import io.javaoperatorsdk.operator.processing.retry.GradualRetry;
1818
import io.javaoperatorsdk.operator.processing.retry.Retry;
1919

20-
public interface ControllerConfiguration<R extends HasMetadata> extends ResourceConfiguration<R> {
20+
public interface ControllerConfiguration<P extends HasMetadata> extends ResourceConfiguration<P> {
2121

2222
@SuppressWarnings("rawtypes")
2323
RateLimiter DEFAULT_RATE_LIMITER = LinearRateLimiter.deactivatedRateLimiter();
@@ -71,7 +71,7 @@ default RateLimiter getRateLimiter() {
7171
*
7272
* @return filter
7373
*/
74-
default ResourceEventFilter<R> getEventFilter() {
74+
default ResourceEventFilter<P> getEventFilter() {
7575
return ResourceEventFilters.passthrough();
7676
}
7777

@@ -91,8 +91,8 @@ default ConfigurationService getConfigurationService() {
9191

9292
@SuppressWarnings("unchecked")
9393
@Override
94-
default Class<R> getResourceClass() {
95-
return (Class<R>) Utils.getFirstTypeArgumentFromSuperClassOrInterface(getClass(),
94+
default Class<P> getResourceClass() {
95+
return (Class<P>) Utils.getFirstTypeArgumentFromSuperClassOrInterface(getClass(),
9696
ControllerConfiguration.class);
9797
}
9898
}

operator-framework-core/src/main/java/io/javaoperatorsdk/operator/api/config/ControllerConfigurationOverrider.java

Lines changed: 8 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -38,6 +38,7 @@ public class ControllerConfigurationOverrider<R extends HasMetadata> {
3838
private OnUpdateFilter<R> onUpdateFilter;
3939
private GenericFilter<R> genericFilter;
4040
private RateLimiter rateLimiter;
41+
private Duration cacheSyncTimeout;
4142

4243
private ControllerConfigurationOverrider(ControllerConfiguration<R> original) {
4344
finalizer = original.getFinalizerName();
@@ -56,6 +57,7 @@ private ControllerConfigurationOverrider(ControllerConfiguration<R> original) {
5657
dependentResources.forEach(drs -> namedDependentResourceSpecs.put(drs.getName(), drs));
5758
this.original = original;
5859
this.rateLimiter = original.getRateLimiter();
60+
this.cacheSyncTimeout = original.cacheSyncTimeout();
5961
}
6062

6163
public ControllerConfigurationOverrider<R> withFinalizer(String finalizer) {
@@ -176,6 +178,11 @@ public ControllerConfigurationOverrider<R> replacingNamedDependentResourceConfig
176178
return this;
177179
}
178180

181+
public ControllerConfigurationOverrider<R> withCacheSyncTimeout(Duration cacheSyncTimeout) {
182+
this.cacheSyncTimeout = cacheSyncTimeout;
183+
return this;
184+
}
185+
179186
public ControllerConfiguration<R> build() {
180187
final var hasModifiedNamespaces = !original.getNamespaces().equals(namespaces);
181188
final var newDependentSpecs = namedDependentResourceSpecs.values().stream()
@@ -208,6 +215,7 @@ public ControllerConfiguration<R> build() {
208215
onUpdateFilter,
209216
genericFilter,
210217
rateLimiter,
218+
cacheSyncTimeout,
211219
newDependentSpecs);
212220
}
213221

operator-framework-core/src/main/java/io/javaoperatorsdk/operator/api/config/DefaultControllerConfiguration.java

Lines changed: 7 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -31,6 +31,7 @@ public class DefaultControllerConfiguration<R extends HasMetadata>
3131
private final List<DependentResourceSpec> dependents;
3232
private final Duration reconciliationMaxInterval;
3333
private final RateLimiter rateLimiter;
34+
private final Duration cacheSyncTimeout;
3435

3536
// NOSONAR constructor is meant to provide all information
3637
public DefaultControllerConfiguration(
@@ -49,6 +50,7 @@ public DefaultControllerConfiguration(
4950
OnUpdateFilter<R> onUpdateFilter,
5051
GenericFilter<R> genericFilter,
5152
RateLimiter rateLimiter,
53+
Duration cacheSyncTimeout,
5254
List<DependentResourceSpec> dependents) {
5355
super(labelSelector, resourceClass, onAddFilter, onUpdateFilter, genericFilter, namespaces);
5456
this.associatedControllerClassName = associatedControllerClassName;
@@ -65,6 +67,7 @@ public DefaultControllerConfiguration(
6567
this.rateLimiter =
6668
rateLimiter != null ? rateLimiter : LinearRateLimiter.deactivatedRateLimiter();
6769
this.dependents = dependents != null ? dependents : Collections.emptyList();
70+
this.cacheSyncTimeout = cacheSyncTimeout;
6871
}
6972

7073
@Override
@@ -116,4 +119,8 @@ public Optional<Duration> maxReconciliationInterval() {
116119
public RateLimiter getRateLimiter() {
117120
return rateLimiter;
118121
}
122+
123+
public Duration cacheSyncTimeout() {
124+
return cacheSyncTimeout;
125+
}
119126
}

operator-framework-core/src/main/java/io/javaoperatorsdk/operator/api/config/ResourceConfiguration.java

Lines changed: 11 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,12 +1,14 @@
11
package io.javaoperatorsdk.operator.api.config;
22

3+
import java.time.Duration;
34
import java.util.Collections;
45
import java.util.Optional;
56
import java.util.Set;
67

78
import io.fabric8.kubernetes.api.model.HasMetadata;
89
import io.javaoperatorsdk.operator.OperatorException;
910
import io.javaoperatorsdk.operator.ReconcilerUtils;
11+
import io.javaoperatorsdk.operator.api.reconciler.CacheSyncTimeout;
1012
import io.javaoperatorsdk.operator.api.reconciler.Constants;
1113
import io.javaoperatorsdk.operator.processing.event.source.filter.GenericFilter;
1214
import io.javaoperatorsdk.operator.processing.event.source.filter.OnAddFilter;
@@ -108,4 +110,13 @@ default Set<String> getEffectiveNamespaces() {
108110
}
109111
return targetNamespaces;
110112
}
113+
114+
/**
115+
* Timeout for cache sync. In other words event source start timeout. Note that is
116+
* "stopOnInformerErrorDuringStartup" is true the operator will stop on timeout. Default is 2
117+
* minutes.
118+
*/
119+
default Duration cacheSyncTimeout() {
120+
return Duration.ofMinutes(CacheSyncTimeout.DEFAULT_TIMEOUT);
121+
}
111122
}
Lines changed: 19 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,19 @@
1+
package io.javaoperatorsdk.operator.api.reconciler;
2+
3+
import java.lang.annotation.ElementType;
4+
import java.lang.annotation.Retention;
5+
import java.lang.annotation.RetentionPolicy;
6+
import java.lang.annotation.Target;
7+
import java.util.concurrent.TimeUnit;
8+
9+
@Retention(RetentionPolicy.RUNTIME)
10+
@Target({ElementType.TYPE})
11+
public @interface CacheSyncTimeout {
12+
13+
int DEFAULT_TIMEOUT = 2;
14+
15+
int timeout();
16+
17+
TimeUnit timeUnit() default TimeUnit.MINUTES;
18+
19+
}

operator-framework-core/src/main/java/io/javaoperatorsdk/operator/api/reconciler/ControllerConfiguration.java

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -118,4 +118,7 @@ MaxReconciliationInterval maxReconciliationInterval() default @MaxReconciliation
118118
* accessible no-arg constructor.
119119
*/
120120
Class<? extends RateLimiter> rateLimiter() default LinearRateLimiter.class;
121+
122+
CacheSyncTimeout cacheSyncTimeout() default @CacheSyncTimeout(
123+
timeout = CacheSyncTimeout.DEFAULT_TIMEOUT);
121124
}

operator-framework-core/src/main/java/io/javaoperatorsdk/operator/processing/event/EventSourceManager.java

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -48,6 +48,7 @@ public EventSourceManager(Controller<P> controller) {
4848

4949
public void postProcessDefaultEventSourcesAfterProcessorInitializer() {
5050
eventSources.controllerResourceEventSource().setEventHandler(controller.getEventProcessor());
51+
5152
eventSources.retryEventSource().setEventHandler(controller.getEventProcessor());
5253
}
5354

@@ -119,6 +120,7 @@ public final void registerEventSource(EventSource eventSource) throws OperatorEx
119120
registerEventSource(null, eventSource);
120121
}
121122

123+
@SuppressWarnings("unchecked")
122124
public final synchronized void registerEventSource(String name, EventSource eventSource)
123125
throws OperatorException {
124126
Objects.requireNonNull(eventSource, "EventSource must not be null");

operator-framework-core/src/main/java/io/javaoperatorsdk/operator/processing/event/source/informer/InformerManager.java

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -97,7 +97,8 @@ public void changeNamespaces(Set<String> namespaces) {
9797
private InformerWrapper<T> createEventSource(
9898
FilterWatchListDeletable<T, KubernetesResourceList<T>, Resource<T>> filteredBySelectorClient,
9999
ResourceEventHandler<T> eventHandler, String key) {
100-
var source = new InformerWrapper<>(filteredBySelectorClient.runnableInformer(0));
100+
var source = new InformerWrapper<>(filteredBySelectorClient.runnableInformer(0),
101+
configuration);
101102
source.addEventHandler(eventHandler);
102103
sources.put(key, source);
103104
return source;

operator-framework-core/src/main/java/io/javaoperatorsdk/operator/processing/event/source/informer/InformerWrapper.java

Lines changed: 6 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -21,6 +21,7 @@
2121
import io.javaoperatorsdk.operator.OperatorException;
2222
import io.javaoperatorsdk.operator.ReconcilerUtils;
2323
import io.javaoperatorsdk.operator.api.config.ConfigurationServiceProvider;
24+
import io.javaoperatorsdk.operator.api.config.ResourceConfiguration;
2425
import io.javaoperatorsdk.operator.processing.LifecycleAware;
2526
import io.javaoperatorsdk.operator.processing.event.ResourceID;
2627
import io.javaoperatorsdk.operator.processing.event.source.IndexerResourceCache;
@@ -32,10 +33,13 @@ class InformerWrapper<T extends HasMetadata>
3233

3334
private final SharedIndexInformer<T> informer;
3435
private final Cache<T> cache;
36+
private final ResourceConfiguration<?> resourceConfiguration;
3537

36-
public InformerWrapper(SharedIndexInformer<T> informer) {
38+
public InformerWrapper(SharedIndexInformer<T> informer,
39+
ResourceConfiguration<?> resourceConfiguration) {
3740
this.informer = informer;
3841
this.cache = (Cache<T>) informer.getStore();
42+
this.resourceConfiguration = resourceConfiguration;
3943
}
4044

4145
@Override
@@ -69,7 +73,7 @@ public void start() throws OperatorException {
6973
// note that in case we don't put here timeout and stopOnInformerErrorDuringStartup is
7074
// false, and there is a rbac issue the get never returns; therefore operator never really
7175
// starts
72-
start.toCompletableFuture().get(configService.cacheSyncTimeout().toMillis(),
76+
start.toCompletableFuture().get(resourceConfiguration.cacheSyncTimeout().toMillis(),
7377
TimeUnit.MILLISECONDS);
7478
} catch (TimeoutException | ExecutionException e) {
7579
if (configService.stopOnInformerErrorDuringStartup()) {

operator-framework-core/src/test/java/io/javaoperatorsdk/operator/ControllerManagerTest.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -58,7 +58,7 @@ private static class TestControllerConfiguration<R extends HasMetadata>
5858
public TestControllerConfiguration(Reconciler<R> controller, Class<R> crClass) {
5959
super(null, getControllerName(controller),
6060
CustomResource.getCRDName(crClass), null, false, null, null, null, null, crClass,
61-
null, null, null, null, null, null);
61+
null, null, null, null, null, null, null);
6262
this.controller = controller;
6363
}
6464

operator-framework-core/src/test/java/io/javaoperatorsdk/operator/processing/event/ReconciliationDispatcherTest.java

Lines changed: 3 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -25,14 +25,7 @@
2525
import io.javaoperatorsdk.operator.api.config.ConfigurationServiceProvider;
2626
import io.javaoperatorsdk.operator.api.config.ControllerConfiguration;
2727
import io.javaoperatorsdk.operator.api.config.MockControllerConfiguration;
28-
import io.javaoperatorsdk.operator.api.reconciler.Cleaner;
29-
import io.javaoperatorsdk.operator.api.reconciler.Context;
30-
import io.javaoperatorsdk.operator.api.reconciler.DeleteControl;
31-
import io.javaoperatorsdk.operator.api.reconciler.ErrorStatusHandler;
32-
import io.javaoperatorsdk.operator.api.reconciler.ErrorStatusUpdateControl;
33-
import io.javaoperatorsdk.operator.api.reconciler.Reconciler;
34-
import io.javaoperatorsdk.operator.api.reconciler.RetryInfo;
35-
import io.javaoperatorsdk.operator.api.reconciler.UpdateControl;
28+
import io.javaoperatorsdk.operator.api.reconciler.*;
3629
import io.javaoperatorsdk.operator.processing.Controller;
3730
import io.javaoperatorsdk.operator.processing.event.ReconciliationDispatcher.CustomResourceFacade;
3831
import io.javaoperatorsdk.operator.processing.retry.GenericRetry;
@@ -106,7 +99,8 @@ private <R extends HasMetadata> ReconciliationDispatcher<R> init(R customResourc
10699
final Class<R> resourceClass = (Class<R>) customResource.getClass();
107100
configuration = configuration == null ? MockControllerConfiguration.forResource(resourceClass)
108101
: configuration;
109-
102+
when(configuration.cacheSyncTimeout())
103+
.thenReturn(Duration.ofMinutes(CacheSyncTimeout.DEFAULT_TIMEOUT));
110104
when(configuration.getFinalizerName()).thenReturn(DEFAULT_FINALIZER);
111105
when(configuration.getName()).thenReturn("EventDispatcherTestController");
112106
when(configuration.getResourceClass()).thenReturn(resourceClass);

operator-framework-core/src/test/java/io/javaoperatorsdk/operator/processing/event/source/ResourceEventFilterTest.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -145,7 +145,7 @@ public ControllerConfig(String finalizer, boolean generationAware,
145145
eventFilter,
146146
customResourceClass,
147147
null,
148-
null, null, null, null, null);
148+
null, null, null, null, null, null);
149149
}
150150
}
151151

operator-framework-core/src/test/java/io/javaoperatorsdk/operator/processing/event/source/controller/ControllerResourceEventSourceTest.java

Lines changed: 5 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,6 @@
11
package io.javaoperatorsdk.operator.processing.event.source.controller;
22

3+
import java.time.Duration;
34
import java.time.LocalDateTime;
45
import java.util.List;
56

@@ -9,6 +10,7 @@
910
import io.javaoperatorsdk.operator.MockKubernetesClient;
1011
import io.javaoperatorsdk.operator.TestUtils;
1112
import io.javaoperatorsdk.operator.api.config.DefaultControllerConfiguration;
13+
import io.javaoperatorsdk.operator.api.reconciler.CacheSyncTimeout;
1214
import io.javaoperatorsdk.operator.processing.Controller;
1315
import io.javaoperatorsdk.operator.processing.event.EventHandler;
1416
import io.javaoperatorsdk.operator.processing.event.EventSourceManager;
@@ -188,7 +190,9 @@ public TestConfiguration(boolean generationAware, OnAddFilter<TestCustomResource
188190
null,
189191
TestCustomResource.class,
190192
null,
191-
onAddFilter, onUpdateFilter, genericFilter, null, null);
193+
onAddFilter, onUpdateFilter, genericFilter, null,
194+
Duration.ofMinutes(CacheSyncTimeout.DEFAULT_TIMEOUT),
195+
null);
192196
}
193197
}
194198
}

operator-framework/src/test/java/io/javaoperatorsdk/operator/InformerRelatedBehaviorITS.java

Lines changed: 6 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -205,12 +205,15 @@ Operator startOperator(boolean stopOnInformerErrorDuringStartup, boolean addStop
205205
Operator operator = new Operator(clientUsingServiceAccount(),
206206
co -> {
207207
co.withStopOnInformerErrorDuringStartup(stopOnInformerErrorDuringStartup);
208-
co.withCacheSyncTimeout(Duration.ofMillis(3000));
209208
if (addStopHandler) {
210-
co.withInformerStoppedHandler((informer, ex) -> replacementStopHandlerCalled = true);
209+
co.withInformerStoppedHandler((informer, ex) -> {
210+
replacementStopHandlerCalled = true;
211+
});
211212
}
212213
});
213-
operator.register(reconciler);
214+
operator.register(reconciler, co -> {
215+
co.withCacheSyncTimeout(Duration.ofMillis(3000));
216+
});
214217
operator.installShutdownHook();
215218
operator.start();
216219
return operator;

0 commit comments

Comments
 (0)