diff --git a/operator-framework-core/src/main/java/io/javaoperatorsdk/operator/api/config/AnnotationControllerConfiguration.java b/operator-framework-core/src/main/java/io/javaoperatorsdk/operator/api/config/AnnotationControllerConfiguration.java index 43c61319ac..749f3cce22 100644 --- a/operator-framework-core/src/main/java/io/javaoperatorsdk/operator/api/config/AnnotationControllerConfiguration.java +++ b/operator-framework-core/src/main/java/io/javaoperatorsdk/operator/api/config/AnnotationControllerConfiguration.java @@ -252,6 +252,12 @@ public List getDependentResources() { return specs; } + @Override + public Duration cacheSyncTimeout() { + var cacheSyncTimeout = annotation.cacheSyncTimeout(); + return Duration.of(cacheSyncTimeout.timeout(), cacheSyncTimeout.timeUnit().toChronoUnit()); + } + private String getName(Dependent dependent, Class dependentType) { var name = dependent.name(); if (name.isBlank()) { diff --git a/operator-framework-core/src/main/java/io/javaoperatorsdk/operator/api/config/ConfigurationService.java b/operator-framework-core/src/main/java/io/javaoperatorsdk/operator/api/config/ConfigurationService.java index 9f4bde5beb..2bce3541b3 100644 --- a/operator-framework-core/src/main/java/io/javaoperatorsdk/operator/api/config/ConfigurationService.java +++ b/operator-framework-core/src/main/java/io/javaoperatorsdk/operator/api/config/ConfigurationService.java @@ -1,6 +1,5 @@ package io.javaoperatorsdk.operator.api.config; -import java.time.Duration; import java.util.Optional; import java.util.Set; import java.util.concurrent.ExecutorService; @@ -169,15 +168,6 @@ default boolean stopOnInformerErrorDuringStartup() { return true; } - /** - * Timeout for cache sync in milliseconds. In other words source start timeout. Note that is - * "stopOnInformerErrorDuringStartup" is true the operator will stop on timeout. Default is 2 - * minutes. - */ - default Duration cacheSyncTimeout() { - return Duration.ofMinutes(2); - } - /** * Handler for an informer stop. Informer stops if there is a non-recoverable error. Like received * a resource that cannot be deserialized. diff --git a/operator-framework-core/src/main/java/io/javaoperatorsdk/operator/api/config/ConfigurationServiceOverrider.java b/operator-framework-core/src/main/java/io/javaoperatorsdk/operator/api/config/ConfigurationServiceOverrider.java index 2b8aee9708..d1da844732 100644 --- a/operator-framework-core/src/main/java/io/javaoperatorsdk/operator/api/config/ConfigurationServiceOverrider.java +++ b/operator-framework-core/src/main/java/io/javaoperatorsdk/operator/api/config/ConfigurationServiceOverrider.java @@ -1,6 +1,5 @@ package io.javaoperatorsdk.operator.api.config; -import java.time.Duration; import java.util.Optional; import java.util.Set; import java.util.concurrent.ExecutorService; @@ -29,7 +28,6 @@ public class ConfigurationServiceOverrider { private LeaderElectionConfiguration leaderElectionConfiguration; private InformerStoppedHandler informerStoppedHandler; private Boolean stopOnInformerErrorDuringStartup; - private Duration cacheSyncTimeout; ConfigurationServiceOverrider(ConfigurationService original) { this.original = original; @@ -108,11 +106,6 @@ public ConfigurationServiceOverrider withStopOnInformerErrorDuringStartup( return this; } - public ConfigurationServiceOverrider withCacheSyncTimeout(Duration cacheSyncTimeout) { - this.cacheSyncTimeout = cacheSyncTimeout; - return this; - } - public ConfigurationService build() { return new BaseConfigurationService(original.getVersion(), cloner, objectMapper) { @Override @@ -191,11 +184,6 @@ public boolean stopOnInformerErrorDuringStartup() { return stopOnInformerErrorDuringStartup != null ? stopOnInformerErrorDuringStartup : super.stopOnInformerErrorDuringStartup(); } - - @Override - public Duration cacheSyncTimeout() { - return cacheSyncTimeout != null ? cacheSyncTimeout : super.cacheSyncTimeout(); - } }; } diff --git a/operator-framework-core/src/main/java/io/javaoperatorsdk/operator/api/config/ControllerConfiguration.java b/operator-framework-core/src/main/java/io/javaoperatorsdk/operator/api/config/ControllerConfiguration.java index 74c40ed9a8..7e641bb751 100644 --- a/operator-framework-core/src/main/java/io/javaoperatorsdk/operator/api/config/ControllerConfiguration.java +++ b/operator-framework-core/src/main/java/io/javaoperatorsdk/operator/api/config/ControllerConfiguration.java @@ -17,7 +17,7 @@ import io.javaoperatorsdk.operator.processing.retry.GradualRetry; import io.javaoperatorsdk.operator.processing.retry.Retry; -public interface ControllerConfiguration extends ResourceConfiguration { +public interface ControllerConfiguration

extends ResourceConfiguration

{ @SuppressWarnings("rawtypes") RateLimiter DEFAULT_RATE_LIMITER = LinearRateLimiter.deactivatedRateLimiter(); @@ -71,7 +71,7 @@ default RateLimiter getRateLimiter() { * * @return filter */ - default ResourceEventFilter getEventFilter() { + default ResourceEventFilter

getEventFilter() { return ResourceEventFilters.passthrough(); } @@ -91,8 +91,8 @@ default ConfigurationService getConfigurationService() { @SuppressWarnings("unchecked") @Override - default Class getResourceClass() { - return (Class) Utils.getFirstTypeArgumentFromSuperClassOrInterface(getClass(), + default Class

getResourceClass() { + return (Class

) Utils.getFirstTypeArgumentFromSuperClassOrInterface(getClass(), ControllerConfiguration.class); } } diff --git a/operator-framework-core/src/main/java/io/javaoperatorsdk/operator/api/config/ControllerConfigurationOverrider.java b/operator-framework-core/src/main/java/io/javaoperatorsdk/operator/api/config/ControllerConfigurationOverrider.java index c36aa51d2e..b3c75c132a 100644 --- a/operator-framework-core/src/main/java/io/javaoperatorsdk/operator/api/config/ControllerConfigurationOverrider.java +++ b/operator-framework-core/src/main/java/io/javaoperatorsdk/operator/api/config/ControllerConfigurationOverrider.java @@ -38,6 +38,7 @@ public class ControllerConfigurationOverrider { private OnUpdateFilter onUpdateFilter; private GenericFilter genericFilter; private RateLimiter rateLimiter; + private Duration cacheSyncTimeout; private ControllerConfigurationOverrider(ControllerConfiguration original) { finalizer = original.getFinalizerName(); @@ -56,6 +57,7 @@ private ControllerConfigurationOverrider(ControllerConfiguration original) { dependentResources.forEach(drs -> namedDependentResourceSpecs.put(drs.getName(), drs)); this.original = original; this.rateLimiter = original.getRateLimiter(); + this.cacheSyncTimeout = original.cacheSyncTimeout(); } public ControllerConfigurationOverrider withFinalizer(String finalizer) { @@ -176,6 +178,11 @@ public ControllerConfigurationOverrider replacingNamedDependentResourceConfig return this; } + public ControllerConfigurationOverrider withCacheSyncTimeout(Duration cacheSyncTimeout) { + this.cacheSyncTimeout = cacheSyncTimeout; + return this; + } + public ControllerConfiguration build() { final var hasModifiedNamespaces = !original.getNamespaces().equals(namespaces); final var newDependentSpecs = namedDependentResourceSpecs.values().stream() @@ -208,6 +215,7 @@ public ControllerConfiguration build() { onUpdateFilter, genericFilter, rateLimiter, + cacheSyncTimeout, newDependentSpecs); } diff --git a/operator-framework-core/src/main/java/io/javaoperatorsdk/operator/api/config/DefaultControllerConfiguration.java b/operator-framework-core/src/main/java/io/javaoperatorsdk/operator/api/config/DefaultControllerConfiguration.java index 3f4d952133..01db454390 100644 --- a/operator-framework-core/src/main/java/io/javaoperatorsdk/operator/api/config/DefaultControllerConfiguration.java +++ b/operator-framework-core/src/main/java/io/javaoperatorsdk/operator/api/config/DefaultControllerConfiguration.java @@ -31,6 +31,7 @@ public class DefaultControllerConfiguration private final List dependents; private final Duration reconciliationMaxInterval; private final RateLimiter rateLimiter; + private final Duration cacheSyncTimeout; // NOSONAR constructor is meant to provide all information public DefaultControllerConfiguration( @@ -49,6 +50,7 @@ public DefaultControllerConfiguration( OnUpdateFilter onUpdateFilter, GenericFilter genericFilter, RateLimiter rateLimiter, + Duration cacheSyncTimeout, List dependents) { super(labelSelector, resourceClass, onAddFilter, onUpdateFilter, genericFilter, namespaces); this.associatedControllerClassName = associatedControllerClassName; @@ -65,6 +67,7 @@ public DefaultControllerConfiguration( this.rateLimiter = rateLimiter != null ? rateLimiter : LinearRateLimiter.deactivatedRateLimiter(); this.dependents = dependents != null ? dependents : Collections.emptyList(); + this.cacheSyncTimeout = cacheSyncTimeout; } @Override @@ -116,4 +119,8 @@ public Optional maxReconciliationInterval() { public RateLimiter getRateLimiter() { return rateLimiter; } + + public Duration cacheSyncTimeout() { + return cacheSyncTimeout; + } } diff --git a/operator-framework-core/src/main/java/io/javaoperatorsdk/operator/api/config/ResourceConfiguration.java b/operator-framework-core/src/main/java/io/javaoperatorsdk/operator/api/config/ResourceConfiguration.java index 90e18f3e52..cf356e97fc 100644 --- a/operator-framework-core/src/main/java/io/javaoperatorsdk/operator/api/config/ResourceConfiguration.java +++ b/operator-framework-core/src/main/java/io/javaoperatorsdk/operator/api/config/ResourceConfiguration.java @@ -1,5 +1,6 @@ package io.javaoperatorsdk.operator.api.config; +import java.time.Duration; import java.util.Collections; import java.util.Optional; import java.util.Set; @@ -7,6 +8,7 @@ import io.fabric8.kubernetes.api.model.HasMetadata; import io.javaoperatorsdk.operator.OperatorException; import io.javaoperatorsdk.operator.ReconcilerUtils; +import io.javaoperatorsdk.operator.api.reconciler.CacheSyncTimeout; import io.javaoperatorsdk.operator.api.reconciler.Constants; import io.javaoperatorsdk.operator.processing.event.source.filter.GenericFilter; import io.javaoperatorsdk.operator.processing.event.source.filter.OnAddFilter; @@ -108,4 +110,13 @@ default Set getEffectiveNamespaces() { } return targetNamespaces; } + + /** + * Timeout for cache sync. In other words event source start timeout. Note that is + * "stopOnInformerErrorDuringStartup" is true the operator will stop on timeout. Default is 2 + * minutes. + */ + default Duration cacheSyncTimeout() { + return Duration.ofMinutes(CacheSyncTimeout.DEFAULT_TIMEOUT); + } } diff --git a/operator-framework-core/src/main/java/io/javaoperatorsdk/operator/api/reconciler/CacheSyncTimeout.java b/operator-framework-core/src/main/java/io/javaoperatorsdk/operator/api/reconciler/CacheSyncTimeout.java new file mode 100644 index 0000000000..c52d899864 --- /dev/null +++ b/operator-framework-core/src/main/java/io/javaoperatorsdk/operator/api/reconciler/CacheSyncTimeout.java @@ -0,0 +1,19 @@ +package io.javaoperatorsdk.operator.api.reconciler; + +import java.lang.annotation.ElementType; +import java.lang.annotation.Retention; +import java.lang.annotation.RetentionPolicy; +import java.lang.annotation.Target; +import java.util.concurrent.TimeUnit; + +@Retention(RetentionPolicy.RUNTIME) +@Target({ElementType.TYPE}) +public @interface CacheSyncTimeout { + + int DEFAULT_TIMEOUT = 2; + + int timeout(); + + TimeUnit timeUnit() default TimeUnit.MINUTES; + +} diff --git a/operator-framework-core/src/main/java/io/javaoperatorsdk/operator/api/reconciler/ControllerConfiguration.java b/operator-framework-core/src/main/java/io/javaoperatorsdk/operator/api/reconciler/ControllerConfiguration.java index ec76adf89d..edb82e67a5 100644 --- a/operator-framework-core/src/main/java/io/javaoperatorsdk/operator/api/reconciler/ControllerConfiguration.java +++ b/operator-framework-core/src/main/java/io/javaoperatorsdk/operator/api/reconciler/ControllerConfiguration.java @@ -118,4 +118,7 @@ MaxReconciliationInterval maxReconciliationInterval() default @MaxReconciliation * accessible no-arg constructor. */ Class rateLimiter() default LinearRateLimiter.class; + + CacheSyncTimeout cacheSyncTimeout() default @CacheSyncTimeout( + timeout = CacheSyncTimeout.DEFAULT_TIMEOUT); } diff --git a/operator-framework-core/src/main/java/io/javaoperatorsdk/operator/processing/event/EventSourceManager.java b/operator-framework-core/src/main/java/io/javaoperatorsdk/operator/processing/event/EventSourceManager.java index d091d442f6..8a07ad64c9 100644 --- a/operator-framework-core/src/main/java/io/javaoperatorsdk/operator/processing/event/EventSourceManager.java +++ b/operator-framework-core/src/main/java/io/javaoperatorsdk/operator/processing/event/EventSourceManager.java @@ -48,6 +48,7 @@ public EventSourceManager(Controller

controller) { public void postProcessDefaultEventSourcesAfterProcessorInitializer() { eventSources.controllerResourceEventSource().setEventHandler(controller.getEventProcessor()); + eventSources.retryEventSource().setEventHandler(controller.getEventProcessor()); } @@ -119,6 +120,7 @@ public final void registerEventSource(EventSource eventSource) throws OperatorEx registerEventSource(null, eventSource); } + @SuppressWarnings("unchecked") public final synchronized void registerEventSource(String name, EventSource eventSource) throws OperatorException { Objects.requireNonNull(eventSource, "EventSource must not be null"); diff --git a/operator-framework-core/src/main/java/io/javaoperatorsdk/operator/processing/event/source/informer/InformerManager.java b/operator-framework-core/src/main/java/io/javaoperatorsdk/operator/processing/event/source/informer/InformerManager.java index f5f52d1c0e..fe88b22a01 100644 --- a/operator-framework-core/src/main/java/io/javaoperatorsdk/operator/processing/event/source/informer/InformerManager.java +++ b/operator-framework-core/src/main/java/io/javaoperatorsdk/operator/processing/event/source/informer/InformerManager.java @@ -97,7 +97,8 @@ public void changeNamespaces(Set namespaces) { private InformerWrapper createEventSource( FilterWatchListDeletable, Resource> filteredBySelectorClient, ResourceEventHandler eventHandler, String key) { - var source = new InformerWrapper<>(filteredBySelectorClient.runnableInformer(0)); + var source = new InformerWrapper<>(filteredBySelectorClient.runnableInformer(0), + configuration); source.addEventHandler(eventHandler); sources.put(key, source); return source; diff --git a/operator-framework-core/src/main/java/io/javaoperatorsdk/operator/processing/event/source/informer/InformerWrapper.java b/operator-framework-core/src/main/java/io/javaoperatorsdk/operator/processing/event/source/informer/InformerWrapper.java index 601cb0c10c..2ec06a859d 100644 --- a/operator-framework-core/src/main/java/io/javaoperatorsdk/operator/processing/event/source/informer/InformerWrapper.java +++ b/operator-framework-core/src/main/java/io/javaoperatorsdk/operator/processing/event/source/informer/InformerWrapper.java @@ -21,6 +21,7 @@ import io.javaoperatorsdk.operator.OperatorException; import io.javaoperatorsdk.operator.ReconcilerUtils; import io.javaoperatorsdk.operator.api.config.ConfigurationServiceProvider; +import io.javaoperatorsdk.operator.api.config.ResourceConfiguration; import io.javaoperatorsdk.operator.processing.LifecycleAware; import io.javaoperatorsdk.operator.processing.event.ResourceID; import io.javaoperatorsdk.operator.processing.event.source.IndexerResourceCache; @@ -32,10 +33,13 @@ class InformerWrapper private final SharedIndexInformer informer; private final Cache cache; + private final ResourceConfiguration resourceConfiguration; - public InformerWrapper(SharedIndexInformer informer) { + public InformerWrapper(SharedIndexInformer informer, + ResourceConfiguration resourceConfiguration) { this.informer = informer; this.cache = (Cache) informer.getStore(); + this.resourceConfiguration = resourceConfiguration; } @Override @@ -69,7 +73,7 @@ public void start() throws OperatorException { // note that in case we don't put here timeout and stopOnInformerErrorDuringStartup is // false, and there is a rbac issue the get never returns; therefore operator never really // starts - start.toCompletableFuture().get(configService.cacheSyncTimeout().toMillis(), + start.toCompletableFuture().get(resourceConfiguration.cacheSyncTimeout().toMillis(), TimeUnit.MILLISECONDS); } catch (TimeoutException | ExecutionException e) { if (configService.stopOnInformerErrorDuringStartup()) { diff --git a/operator-framework-core/src/test/java/io/javaoperatorsdk/operator/ControllerManagerTest.java b/operator-framework-core/src/test/java/io/javaoperatorsdk/operator/ControllerManagerTest.java index d788f61e4a..94af637b38 100644 --- a/operator-framework-core/src/test/java/io/javaoperatorsdk/operator/ControllerManagerTest.java +++ b/operator-framework-core/src/test/java/io/javaoperatorsdk/operator/ControllerManagerTest.java @@ -58,7 +58,7 @@ private static class TestControllerConfiguration public TestControllerConfiguration(Reconciler controller, Class crClass) { super(null, getControllerName(controller), CustomResource.getCRDName(crClass), null, false, null, null, null, null, crClass, - null, null, null, null, null, null); + null, null, null, null, null, null, null); this.controller = controller; } diff --git a/operator-framework-core/src/test/java/io/javaoperatorsdk/operator/processing/event/ReconciliationDispatcherTest.java b/operator-framework-core/src/test/java/io/javaoperatorsdk/operator/processing/event/ReconciliationDispatcherTest.java index 72406094fc..f72a92dd6f 100644 --- a/operator-framework-core/src/test/java/io/javaoperatorsdk/operator/processing/event/ReconciliationDispatcherTest.java +++ b/operator-framework-core/src/test/java/io/javaoperatorsdk/operator/processing/event/ReconciliationDispatcherTest.java @@ -25,14 +25,7 @@ import io.javaoperatorsdk.operator.api.config.ConfigurationServiceProvider; import io.javaoperatorsdk.operator.api.config.ControllerConfiguration; import io.javaoperatorsdk.operator.api.config.MockControllerConfiguration; -import io.javaoperatorsdk.operator.api.reconciler.Cleaner; -import io.javaoperatorsdk.operator.api.reconciler.Context; -import io.javaoperatorsdk.operator.api.reconciler.DeleteControl; -import io.javaoperatorsdk.operator.api.reconciler.ErrorStatusHandler; -import io.javaoperatorsdk.operator.api.reconciler.ErrorStatusUpdateControl; -import io.javaoperatorsdk.operator.api.reconciler.Reconciler; -import io.javaoperatorsdk.operator.api.reconciler.RetryInfo; -import io.javaoperatorsdk.operator.api.reconciler.UpdateControl; +import io.javaoperatorsdk.operator.api.reconciler.*; import io.javaoperatorsdk.operator.processing.Controller; import io.javaoperatorsdk.operator.processing.event.ReconciliationDispatcher.CustomResourceFacade; import io.javaoperatorsdk.operator.processing.retry.GenericRetry; @@ -106,7 +99,8 @@ private ReconciliationDispatcher init(R customResourc final Class resourceClass = (Class) customResource.getClass(); configuration = configuration == null ? MockControllerConfiguration.forResource(resourceClass) : configuration; - + when(configuration.cacheSyncTimeout()) + .thenReturn(Duration.ofMinutes(CacheSyncTimeout.DEFAULT_TIMEOUT)); when(configuration.getFinalizerName()).thenReturn(DEFAULT_FINALIZER); when(configuration.getName()).thenReturn("EventDispatcherTestController"); when(configuration.getResourceClass()).thenReturn(resourceClass); diff --git a/operator-framework-core/src/test/java/io/javaoperatorsdk/operator/processing/event/source/ResourceEventFilterTest.java b/operator-framework-core/src/test/java/io/javaoperatorsdk/operator/processing/event/source/ResourceEventFilterTest.java index 7cc5a20781..55ab40c173 100644 --- a/operator-framework-core/src/test/java/io/javaoperatorsdk/operator/processing/event/source/ResourceEventFilterTest.java +++ b/operator-framework-core/src/test/java/io/javaoperatorsdk/operator/processing/event/source/ResourceEventFilterTest.java @@ -145,7 +145,7 @@ public ControllerConfig(String finalizer, boolean generationAware, eventFilter, customResourceClass, null, - null, null, null, null, null); + null, null, null, null, null, null); } } diff --git a/operator-framework-core/src/test/java/io/javaoperatorsdk/operator/processing/event/source/controller/ControllerResourceEventSourceTest.java b/operator-framework-core/src/test/java/io/javaoperatorsdk/operator/processing/event/source/controller/ControllerResourceEventSourceTest.java index 00980743e0..2ebd5f0a3e 100644 --- a/operator-framework-core/src/test/java/io/javaoperatorsdk/operator/processing/event/source/controller/ControllerResourceEventSourceTest.java +++ b/operator-framework-core/src/test/java/io/javaoperatorsdk/operator/processing/event/source/controller/ControllerResourceEventSourceTest.java @@ -1,5 +1,6 @@ package io.javaoperatorsdk.operator.processing.event.source.controller; +import java.time.Duration; import java.time.LocalDateTime; import java.util.List; @@ -9,6 +10,7 @@ import io.javaoperatorsdk.operator.MockKubernetesClient; import io.javaoperatorsdk.operator.TestUtils; import io.javaoperatorsdk.operator.api.config.DefaultControllerConfiguration; +import io.javaoperatorsdk.operator.api.reconciler.CacheSyncTimeout; import io.javaoperatorsdk.operator.processing.Controller; import io.javaoperatorsdk.operator.processing.event.EventHandler; import io.javaoperatorsdk.operator.processing.event.EventSourceManager; @@ -188,7 +190,9 @@ public TestConfiguration(boolean generationAware, OnAddFilter { co.withStopOnInformerErrorDuringStartup(stopOnInformerErrorDuringStartup); - co.withCacheSyncTimeout(Duration.ofMillis(3000)); if (addStopHandler) { - co.withInformerStoppedHandler((informer, ex) -> replacementStopHandlerCalled = true); + co.withInformerStoppedHandler((informer, ex) -> { + replacementStopHandlerCalled = true; + }); } }); - operator.register(reconciler); + operator.register(reconciler, co -> { + co.withCacheSyncTimeout(Duration.ofMillis(3000)); + }); operator.installShutdownHook(); operator.start(); return operator;