Skip to content

Revert "feat: moving cache sync timeout to controller level (#1576)" #1583

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 1 commit into from
Oct 31, 2022
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -252,12 +252,6 @@ public List<DependentResourceSpec> getDependentResources() {
return specs;
}

@Override
public Duration cacheSyncTimeout() {
var cacheSyncTimeout = annotation.cacheSyncTimeout();
return Duration.of(cacheSyncTimeout.timeout(), cacheSyncTimeout.timeUnit().toChronoUnit());
}

private String getName(Dependent dependent, Class<? extends DependentResource> dependentType) {
var name = dependent.name();
if (name.isBlank()) {
Expand Down
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
package io.javaoperatorsdk.operator.api.config;

import java.time.Duration;
import java.util.Optional;
import java.util.Set;
import java.util.concurrent.ExecutorService;
Expand Down Expand Up @@ -168,6 +169,15 @@ default boolean stopOnInformerErrorDuringStartup() {
return true;
}

/**
* Timeout for cache sync in milliseconds. In other words source start timeout. Note that is
* "stopOnInformerErrorDuringStartup" is true the operator will stop on timeout. Default is 2
* minutes.
*/
default Duration cacheSyncTimeout() {
return Duration.ofMinutes(2);
}

/**
* Handler for an informer stop. Informer stops if there is a non-recoverable error. Like received
* a resource that cannot be deserialized.
Expand Down
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
package io.javaoperatorsdk.operator.api.config;

import java.time.Duration;
import java.util.Optional;
import java.util.Set;
import java.util.concurrent.ExecutorService;
Expand Down Expand Up @@ -28,6 +29,7 @@ public class ConfigurationServiceOverrider {
private LeaderElectionConfiguration leaderElectionConfiguration;
private InformerStoppedHandler informerStoppedHandler;
private Boolean stopOnInformerErrorDuringStartup;
private Duration cacheSyncTimeout;

ConfigurationServiceOverrider(ConfigurationService original) {
this.original = original;
Expand Down Expand Up @@ -106,6 +108,11 @@ public ConfigurationServiceOverrider withStopOnInformerErrorDuringStartup(
return this;
}

public ConfigurationServiceOverrider withCacheSyncTimeout(Duration cacheSyncTimeout) {
this.cacheSyncTimeout = cacheSyncTimeout;
return this;
}

public ConfigurationService build() {
return new BaseConfigurationService(original.getVersion(), cloner, objectMapper) {
@Override
Expand Down Expand Up @@ -184,6 +191,11 @@ public boolean stopOnInformerErrorDuringStartup() {
return stopOnInformerErrorDuringStartup != null ? stopOnInformerErrorDuringStartup
: super.stopOnInformerErrorDuringStartup();
}

@Override
public Duration cacheSyncTimeout() {
return cacheSyncTimeout != null ? cacheSyncTimeout : super.cacheSyncTimeout();
}
};
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@
import io.javaoperatorsdk.operator.processing.retry.GradualRetry;
import io.javaoperatorsdk.operator.processing.retry.Retry;

public interface ControllerConfiguration<P extends HasMetadata> extends ResourceConfiguration<P> {
public interface ControllerConfiguration<R extends HasMetadata> extends ResourceConfiguration<R> {

@SuppressWarnings("rawtypes")
RateLimiter DEFAULT_RATE_LIMITER = LinearRateLimiter.deactivatedRateLimiter();
Expand Down Expand Up @@ -71,7 +71,7 @@ default RateLimiter getRateLimiter() {
*
* @return filter
*/
default ResourceEventFilter<P> getEventFilter() {
default ResourceEventFilter<R> getEventFilter() {
return ResourceEventFilters.passthrough();
}

Expand All @@ -91,8 +91,8 @@ default ConfigurationService getConfigurationService() {

@SuppressWarnings("unchecked")
@Override
default Class<P> getResourceClass() {
return (Class<P>) Utils.getFirstTypeArgumentFromSuperClassOrInterface(getClass(),
default Class<R> getResourceClass() {
return (Class<R>) Utils.getFirstTypeArgumentFromSuperClassOrInterface(getClass(),
ControllerConfiguration.class);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -38,7 +38,6 @@ public class ControllerConfigurationOverrider<R extends HasMetadata> {
private OnUpdateFilter<R> onUpdateFilter;
private GenericFilter<R> genericFilter;
private RateLimiter rateLimiter;
private Duration cacheSyncTimeout;

private ControllerConfigurationOverrider(ControllerConfiguration<R> original) {
finalizer = original.getFinalizerName();
Expand All @@ -57,7 +56,6 @@ private ControllerConfigurationOverrider(ControllerConfiguration<R> original) {
dependentResources.forEach(drs -> namedDependentResourceSpecs.put(drs.getName(), drs));
this.original = original;
this.rateLimiter = original.getRateLimiter();
this.cacheSyncTimeout = original.cacheSyncTimeout();
}

public ControllerConfigurationOverrider<R> withFinalizer(String finalizer) {
Expand Down Expand Up @@ -178,11 +176,6 @@ public ControllerConfigurationOverrider<R> replacingNamedDependentResourceConfig
return this;
}

public ControllerConfigurationOverrider<R> withCacheSyncTimeout(Duration cacheSyncTimeout) {
this.cacheSyncTimeout = cacheSyncTimeout;
return this;
}

public ControllerConfiguration<R> build() {
final var hasModifiedNamespaces = !original.getNamespaces().equals(namespaces);
final var newDependentSpecs = namedDependentResourceSpecs.values().stream()
Expand Down Expand Up @@ -215,7 +208,6 @@ public ControllerConfiguration<R> build() {
onUpdateFilter,
genericFilter,
rateLimiter,
cacheSyncTimeout,
newDependentSpecs);
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,6 @@ public class DefaultControllerConfiguration<R extends HasMetadata>
private final List<DependentResourceSpec> dependents;
private final Duration reconciliationMaxInterval;
private final RateLimiter rateLimiter;
private final Duration cacheSyncTimeout;

// NOSONAR constructor is meant to provide all information
public DefaultControllerConfiguration(
Expand All @@ -50,7 +49,6 @@ public DefaultControllerConfiguration(
OnUpdateFilter<R> onUpdateFilter,
GenericFilter<R> genericFilter,
RateLimiter rateLimiter,
Duration cacheSyncTimeout,
List<DependentResourceSpec> dependents) {
super(labelSelector, resourceClass, onAddFilter, onUpdateFilter, genericFilter, namespaces);
this.associatedControllerClassName = associatedControllerClassName;
Expand All @@ -67,7 +65,6 @@ public DefaultControllerConfiguration(
this.rateLimiter =
rateLimiter != null ? rateLimiter : LinearRateLimiter.deactivatedRateLimiter();
this.dependents = dependents != null ? dependents : Collections.emptyList();
this.cacheSyncTimeout = cacheSyncTimeout;
}

@Override
Expand Down Expand Up @@ -119,8 +116,4 @@ public Optional<Duration> maxReconciliationInterval() {
public RateLimiter getRateLimiter() {
return rateLimiter;
}

public Duration cacheSyncTimeout() {
return cacheSyncTimeout;
}
}
Original file line number Diff line number Diff line change
@@ -1,14 +1,12 @@
package io.javaoperatorsdk.operator.api.config;

import java.time.Duration;
import java.util.Collections;
import java.util.Optional;
import java.util.Set;

import io.fabric8.kubernetes.api.model.HasMetadata;
import io.javaoperatorsdk.operator.OperatorException;
import io.javaoperatorsdk.operator.ReconcilerUtils;
import io.javaoperatorsdk.operator.api.reconciler.CacheSyncTimeout;
import io.javaoperatorsdk.operator.api.reconciler.Constants;
import io.javaoperatorsdk.operator.processing.event.source.filter.GenericFilter;
import io.javaoperatorsdk.operator.processing.event.source.filter.OnAddFilter;
Expand Down Expand Up @@ -110,13 +108,4 @@ default Set<String> getEffectiveNamespaces() {
}
return targetNamespaces;
}

/**
* Timeout for cache sync. In other words event source start timeout. Note that is
* "stopOnInformerErrorDuringStartup" is true the operator will stop on timeout. Default is 2
* minutes.
*/
default Duration cacheSyncTimeout() {
return Duration.ofMinutes(CacheSyncTimeout.DEFAULT_TIMEOUT);
}
}

This file was deleted.

Original file line number Diff line number Diff line change
Expand Up @@ -118,7 +118,4 @@ MaxReconciliationInterval maxReconciliationInterval() default @MaxReconciliation
* accessible no-arg constructor.
*/
Class<? extends RateLimiter> rateLimiter() default LinearRateLimiter.class;

CacheSyncTimeout cacheSyncTimeout() default @CacheSyncTimeout(
timeout = CacheSyncTimeout.DEFAULT_TIMEOUT);
}
Original file line number Diff line number Diff line change
Expand Up @@ -48,7 +48,6 @@ public EventSourceManager(Controller<P> controller) {

public void postProcessDefaultEventSourcesAfterProcessorInitializer() {
eventSources.controllerResourceEventSource().setEventHandler(controller.getEventProcessor());

eventSources.retryEventSource().setEventHandler(controller.getEventProcessor());
}

Expand Down Expand Up @@ -120,7 +119,6 @@ public final void registerEventSource(EventSource eventSource) throws OperatorEx
registerEventSource(null, eventSource);
}

@SuppressWarnings("unchecked")
public final synchronized void registerEventSource(String name, EventSource eventSource)
throws OperatorException {
Objects.requireNonNull(eventSource, "EventSource must not be null");
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -97,8 +97,7 @@ public void changeNamespaces(Set<String> namespaces) {
private InformerWrapper<T> createEventSource(
FilterWatchListDeletable<T, KubernetesResourceList<T>, Resource<T>> filteredBySelectorClient,
ResourceEventHandler<T> eventHandler, String key) {
var source = new InformerWrapper<>(filteredBySelectorClient.runnableInformer(0),
configuration);
var source = new InformerWrapper<>(filteredBySelectorClient.runnableInformer(0));
source.addEventHandler(eventHandler);
sources.put(key, source);
return source;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,6 @@
import io.javaoperatorsdk.operator.OperatorException;
import io.javaoperatorsdk.operator.ReconcilerUtils;
import io.javaoperatorsdk.operator.api.config.ConfigurationServiceProvider;
import io.javaoperatorsdk.operator.api.config.ResourceConfiguration;
import io.javaoperatorsdk.operator.processing.LifecycleAware;
import io.javaoperatorsdk.operator.processing.event.ResourceID;
import io.javaoperatorsdk.operator.processing.event.source.IndexerResourceCache;
Expand All @@ -33,13 +32,10 @@ class InformerWrapper<T extends HasMetadata>

private final SharedIndexInformer<T> informer;
private final Cache<T> cache;
private final ResourceConfiguration<?> resourceConfiguration;

public InformerWrapper(SharedIndexInformer<T> informer,
ResourceConfiguration<?> resourceConfiguration) {
public InformerWrapper(SharedIndexInformer<T> informer) {
this.informer = informer;
this.cache = (Cache<T>) informer.getStore();
this.resourceConfiguration = resourceConfiguration;
}

@Override
Expand Down Expand Up @@ -73,7 +69,7 @@ public void start() throws OperatorException {
// note that in case we don't put here timeout and stopOnInformerErrorDuringStartup is
// false, and there is a rbac issue the get never returns; therefore operator never really
// starts
start.toCompletableFuture().get(resourceConfiguration.cacheSyncTimeout().toMillis(),
start.toCompletableFuture().get(configService.cacheSyncTimeout().toMillis(),
TimeUnit.MILLISECONDS);
} catch (TimeoutException | ExecutionException e) {
if (configService.stopOnInformerErrorDuringStartup()) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -58,7 +58,7 @@ private static class TestControllerConfiguration<R extends HasMetadata>
public TestControllerConfiguration(Reconciler<R> controller, Class<R> crClass) {
super(null, getControllerName(controller),
CustomResource.getCRDName(crClass), null, false, null, null, null, null, crClass,
null, null, null, null, null, null, null);
null, null, null, null, null, null);
this.controller = controller;
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,14 @@
import io.javaoperatorsdk.operator.api.config.ConfigurationServiceProvider;
import io.javaoperatorsdk.operator.api.config.ControllerConfiguration;
import io.javaoperatorsdk.operator.api.config.MockControllerConfiguration;
import io.javaoperatorsdk.operator.api.reconciler.*;
import io.javaoperatorsdk.operator.api.reconciler.Cleaner;
import io.javaoperatorsdk.operator.api.reconciler.Context;
import io.javaoperatorsdk.operator.api.reconciler.DeleteControl;
import io.javaoperatorsdk.operator.api.reconciler.ErrorStatusHandler;
import io.javaoperatorsdk.operator.api.reconciler.ErrorStatusUpdateControl;
import io.javaoperatorsdk.operator.api.reconciler.Reconciler;
import io.javaoperatorsdk.operator.api.reconciler.RetryInfo;
import io.javaoperatorsdk.operator.api.reconciler.UpdateControl;
import io.javaoperatorsdk.operator.processing.Controller;
import io.javaoperatorsdk.operator.processing.event.ReconciliationDispatcher.CustomResourceFacade;
import io.javaoperatorsdk.operator.processing.retry.GenericRetry;
Expand Down Expand Up @@ -99,8 +106,7 @@ private <R extends HasMetadata> ReconciliationDispatcher<R> init(R customResourc
final Class<R> resourceClass = (Class<R>) customResource.getClass();
configuration = configuration == null ? MockControllerConfiguration.forResource(resourceClass)
: configuration;
when(configuration.cacheSyncTimeout())
.thenReturn(Duration.ofMinutes(CacheSyncTimeout.DEFAULT_TIMEOUT));

when(configuration.getFinalizerName()).thenReturn(DEFAULT_FINALIZER);
when(configuration.getName()).thenReturn("EventDispatcherTestController");
when(configuration.getResourceClass()).thenReturn(resourceClass);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -145,7 +145,7 @@ public ControllerConfig(String finalizer, boolean generationAware,
eventFilter,
customResourceClass,
null,
null, null, null, null, null, null);
null, null, null, null, null);
}
}

Expand Down
Original file line number Diff line number Diff line change
@@ -1,6 +1,5 @@
package io.javaoperatorsdk.operator.processing.event.source.controller;

import java.time.Duration;
import java.time.LocalDateTime;
import java.util.List;

Expand All @@ -10,7 +9,6 @@
import io.javaoperatorsdk.operator.MockKubernetesClient;
import io.javaoperatorsdk.operator.TestUtils;
import io.javaoperatorsdk.operator.api.config.DefaultControllerConfiguration;
import io.javaoperatorsdk.operator.api.reconciler.CacheSyncTimeout;
import io.javaoperatorsdk.operator.processing.Controller;
import io.javaoperatorsdk.operator.processing.event.EventHandler;
import io.javaoperatorsdk.operator.processing.event.EventSourceManager;
Expand Down Expand Up @@ -190,9 +188,7 @@ public TestConfiguration(boolean generationAware, OnAddFilter<TestCustomResource
null,
TestCustomResource.class,
null,
onAddFilter, onUpdateFilter, genericFilter, null,
Duration.ofMinutes(CacheSyncTimeout.DEFAULT_TIMEOUT),
null);
onAddFilter, onUpdateFilter, genericFilter, null, null);
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -205,15 +205,12 @@ Operator startOperator(boolean stopOnInformerErrorDuringStartup, boolean addStop
Operator operator = new Operator(clientUsingServiceAccount(),
co -> {
co.withStopOnInformerErrorDuringStartup(stopOnInformerErrorDuringStartup);
co.withCacheSyncTimeout(Duration.ofMillis(3000));
if (addStopHandler) {
co.withInformerStoppedHandler((informer, ex) -> {
replacementStopHandlerCalled = true;
});
co.withInformerStoppedHandler((informer, ex) -> replacementStopHandlerCalled = true);
}
});
operator.register(reconciler, co -> {
co.withCacheSyncTimeout(Duration.ofMillis(3000));
});
operator.register(reconciler);
operator.installShutdownHook();
operator.start();
return operator;
Expand Down