Skip to content

fix: max reconciliation interval applies after retry exhaustion #1491

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 5 commits into from
Sep 21, 2022
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@
import io.javaoperatorsdk.operator.OperatorException;
import io.javaoperatorsdk.operator.api.config.ConfigurationService;
import io.javaoperatorsdk.operator.api.config.ConfigurationServiceProvider;
import io.javaoperatorsdk.operator.api.config.ControllerConfiguration;
import io.javaoperatorsdk.operator.api.config.ExecutorServiceManager;
import io.javaoperatorsdk.operator.api.monitoring.Metrics;
import io.javaoperatorsdk.operator.api.reconciler.Constants;
Expand All @@ -37,71 +38,64 @@ public class EventProcessor<R extends HasMetadata> implements EventHandler, Life
private static final long MINIMAL_RATE_LIMIT_RESCHEDULE_DURATION = 50;

private volatile boolean running;
private final ControllerConfiguration<?> controllerConfiguration;
private final ReconciliationDispatcher<R> reconciliationDispatcher;
private final Retry retry;
private final ExecutorService executor;
private final String controllerName;
private final Metrics metrics;
private final Cache<R> cache;
private final EventSourceManager<R> eventSourceManager;
private final RateLimiter<? extends RateLimitState> rateLimiter;
private final ResourceStateManager resourceStateManager = new ResourceStateManager();
private final Map<String, Object> metricsMetadata;


public EventProcessor(EventSourceManager<R> eventSourceManager) {
this(
eventSourceManager.getController().getConfiguration(),
eventSourceManager.getControllerResourceEventSource(),
ExecutorServiceManager.instance().executorService(),
eventSourceManager.getController().getConfiguration().getName(),
new ReconciliationDispatcher<>(eventSourceManager.getController()),
eventSourceManager.getController().getConfiguration().getRetry(),
ConfigurationServiceProvider.instance().getMetrics(),
eventSourceManager.getController().getConfiguration().getRateLimiter(),
eventSourceManager);
}

@SuppressWarnings("rawtypes")
EventProcessor(
ControllerConfiguration controllerConfiguration,
ReconciliationDispatcher<R> reconciliationDispatcher,
EventSourceManager<R> eventSourceManager,
String relatedControllerName,
Retry retry,
RateLimiter rateLimiter,
Metrics metrics) {
this(
controllerConfiguration,
eventSourceManager.getControllerResourceEventSource(),
null,
relatedControllerName,
reconciliationDispatcher,
retry,
metrics,
rateLimiter,
eventSourceManager);
}

@SuppressWarnings({"rawtypes", "unchecked"})
private EventProcessor(
ControllerConfiguration controllerConfiguration,
Cache<R> cache,
ExecutorService executor,
String relatedControllerName,
ReconciliationDispatcher<R> reconciliationDispatcher,
Retry retry,
Metrics metrics,
RateLimiter rateLimiter,
EventSourceManager<R> eventSourceManager) {
this.controllerConfiguration = controllerConfiguration;
this.running = false;
this.executor =
executor == null
? new ScheduledThreadPoolExecutor(
ConfigurationService.DEFAULT_RECONCILIATION_THREADS_NUMBER)
: executor;
this.controllerName = relatedControllerName;
this.reconciliationDispatcher = reconciliationDispatcher;
this.retry = retry;
this.retry = controllerConfiguration.getRetry();
this.cache = cache;
this.metrics = metrics != null ? metrics : Metrics.NOOP;
this.eventSourceManager = eventSourceManager;
this.rateLimiter = rateLimiter;
this.rateLimiter = controllerConfiguration.getRateLimiter();

metricsMetadata = Optional.ofNullable(eventSourceManager.getController())
.map(Controller::getAssociatedGroupVersionKind)
Expand Down Expand Up @@ -272,18 +266,31 @@ synchronized void eventProcessingFinished(
reScheduleExecutionIfInstructed(postExecutionControl, executionScope.getResource());
}
}

}

private void reScheduleExecutionIfInstructed(
PostExecutionControl<R> postExecutionControl, R customResource) {

postExecutionControl
.getReScheduleDelay()
.ifPresent(delay -> {
.ifPresentOrElse(delay -> {
var resourceID = ResourceID.fromResource(customResource);
log.debug("ReScheduling event for resource: {} with delay: {}",
resourceID, delay);
retryEventSource().scheduleOnce(resourceID, delay);
}, () -> scheduleExecutionForMaxReconciliationInterval(customResource));
}

private void scheduleExecutionForMaxReconciliationInterval(R customResource) {
this.controllerConfiguration
.maxReconciliationInterval()
.ifPresent(m -> {
var resourceID = ResourceID.fromResource(customResource);
var delay = m.toMillis();
log.debug("ReScheduling event for resource because for max reconciliation interval: " +
"{} with delay: {}",
resourceID, delay);
retryEventSource().scheduleOnce(resourceID, delay);
});
}

Expand Down Expand Up @@ -319,7 +326,10 @@ private void handleRetryOnException(
metrics.failedReconciliation(resourceID, exception, metricsMetadata);
retryEventSource().scheduleOnce(resourceID, delay);
},
() -> log.error("Exhausted retries for {}", executionScope));
() -> {
log.error("Exhausted retries for {}", executionScope);
scheduleExecutionForMaxReconciliationInterval(executionScope.getResource());
});
}

private void cleanupOnSuccessfulExecution(ExecutionScope<R> executionScope) {
Expand Down Expand Up @@ -390,7 +400,7 @@ public void run() {
final var name = thread.getName();
try {
MDCUtils.addResourceInfo(executionScope.getResource());
thread.setName("ReconcilerExecutor-" + controllerName + "-" + thread.getId());
thread.setName("ReconcilerExecutor-" + controllerName() + "-" + thread.getId());
PostExecutionControl<R> postExecutionControl =
reconciliationDispatcher.handleExecution(executionScope);
eventProcessingFinished(executionScope, postExecutionControl);
Expand All @@ -403,10 +413,14 @@ public void run() {

@Override
public String toString() {
return controllerName + " -> " + executionScope;
return controllerName() + " -> " + executionScope;
}
}

private String controllerName() {
return controllerConfiguration.getName();
}

public synchronized boolean isUnderProcessing(ResourceID resourceID) {
return isControllerUnderExecution(resourceStateManager.getOrCreate(resourceID));
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -264,9 +264,7 @@ private PostExecutionControl<P> createPostExecutionControl(P updatedCustomResour
private void updatePostExecutionControlWithReschedule(
PostExecutionControl<P> postExecutionControl,
BaseControl<?> baseControl) {
baseControl.getScheduleDelay().ifPresentOrElse(postExecutionControl::withReSchedule,
() -> controller.getConfiguration().maxReconciliationInterval()
.ifPresent(m -> postExecutionControl.withReSchedule(m.toMillis())));
baseControl.getScheduleDelay().ifPresent(postExecutionControl::withReSchedule);
}


Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@
import org.slf4j.LoggerFactory;

import io.fabric8.kubernetes.api.model.HasMetadata;
import io.javaoperatorsdk.operator.api.config.ControllerConfiguration;
import io.javaoperatorsdk.operator.api.config.RetryConfiguration;
import io.javaoperatorsdk.operator.api.monitoring.Metrics;
import io.javaoperatorsdk.operator.processing.event.rate.LinearRateLimiter;
Expand All @@ -25,6 +26,8 @@
import io.javaoperatorsdk.operator.processing.event.source.controller.ResourceEvent;
import io.javaoperatorsdk.operator.processing.event.source.timer.TimerEventSource;
import io.javaoperatorsdk.operator.processing.retry.GenericRetry;
import io.javaoperatorsdk.operator.processing.retry.Retry;
import io.javaoperatorsdk.operator.processing.retry.RetryExecution;
import io.javaoperatorsdk.operator.sample.simple.TestCustomResource;

import static io.javaoperatorsdk.operator.TestUtils.markForDeletion;
Expand Down Expand Up @@ -70,12 +73,15 @@ void setup() {
when(eventSourceManagerMock.getControllerResourceEventSource())
.thenReturn(controllerResourceEventSourceMock);
eventProcessor =
spy(new EventProcessor(reconciliationDispatcherMock, eventSourceManagerMock, "Test", null,
rateLimiterMock, null));
spy(new EventProcessor(controllerConfiguration(null, rateLimiterMock),
reconciliationDispatcherMock,
eventSourceManagerMock, null));
eventProcessor.start();
eventProcessorWithRetry =
spy(new EventProcessor(reconciliationDispatcherMock, eventSourceManagerMock, "Test",
GenericRetry.defaultLimitedExponentialRetry(), rateLimiterMock, null));
spy(new EventProcessor(
controllerConfiguration(GenericRetry.defaultLimitedExponentialRetry(),
rateLimiterMock),
reconciliationDispatcherMock, eventSourceManagerMock, null));
eventProcessorWithRetry.start();
when(eventProcessor.retryEventSource()).thenReturn(retryTimerEventSourceMock);
when(eventProcessorWithRetry.retryEventSource()).thenReturn(retryTimerEventSourceMock);
Expand Down Expand Up @@ -258,8 +264,9 @@ void cancelScheduleOnceEventsOnSuccessfulExecution() {
void startProcessedMarkedEventReceivedBefore() {
var crID = new ResourceID("test-cr", TEST_NAMESPACE);
eventProcessor =
spy(new EventProcessor(reconciliationDispatcherMock, eventSourceManagerMock, "Test", null,
LinearRateLimiter.deactivatedRateLimiter(),
spy(new EventProcessor(controllerConfiguration(null,
LinearRateLimiter.deactivatedRateLimiter()), reconciliationDispatcherMock,
eventSourceManagerMock,
metricsMock));
when(controllerResourceEventSourceMock.get(eq(crID)))
.thenReturn(Optional.of(testCustomResource()));
Expand Down Expand Up @@ -367,6 +374,40 @@ void rateLimitsReconciliationSubmission() {
verify(retryTimerEventSourceMock, times(1)).scheduleOnce((ResourceID) any(), anyLong());
}

@Test
void schedulesRetryForMarReconciliationInterval() {
TestCustomResource customResource = testCustomResource();
ExecutionScope executionScope = new ExecutionScope(customResource, null);
PostExecutionControl postExecutionControl =
PostExecutionControl.defaultDispatch();

eventProcessorWithRetry.eventProcessingFinished(executionScope, postExecutionControl);

verify(retryTimerEventSourceMock, times(1)).scheduleOnce((ResourceID) any(), anyLong());
}

@Test
void schedulesRetryForMarReconciliationIntervalIfRetryExhausted() {
RetryExecution mockRetryExecution = mock(RetryExecution.class);
when(mockRetryExecution.nextDelay()).thenReturn(Optional.empty());
Retry retry = mock(Retry.class);
when(retry.initExecution()).thenReturn(mockRetryExecution);
eventProcessorWithRetry =
spy(new EventProcessor(controllerConfiguration(retry,
LinearRateLimiter.deactivatedRateLimiter()), reconciliationDispatcherMock,
eventSourceManagerMock,
metricsMock));
eventProcessorWithRetry.start();
ExecutionScope executionScope = new ExecutionScope(testCustomResource(), null);
PostExecutionControl postExecutionControl =
PostExecutionControl.exceptionDuringExecution(new RuntimeException());
when(eventProcessorWithRetry.retryEventSource()).thenReturn(retryTimerEventSourceMock);

eventProcessorWithRetry.eventProcessingFinished(executionScope, postExecutionControl);

verify(retryTimerEventSourceMock, times(1)).scheduleOnce((ResourceID) any(), anyLong());
}

private ResourceID eventAlreadyUnderProcessing() {
when(reconciliationDispatcherMock.handleExecution(any()))
.then(
Expand Down Expand Up @@ -407,4 +448,13 @@ private void overrideData(ResourceID id, HasMetadata applyTo) {
applyTo.getMetadata().setNamespace(id.getNamespace().orElse(null));
}

ControllerConfiguration controllerConfiguration(Retry retry, RateLimiter rateLimiter) {
ControllerConfiguration res = mock(ControllerConfiguration.class);
when(res.getName()).thenReturn("Test");
when(res.getRetry()).thenReturn(retry);
when(res.getRateLimiter()).thenReturn(rateLimiter);
when(res.maxReconciliationInterval()).thenReturn(Optional.of(Duration.ofMillis(1000)));
return res;
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -600,19 +600,6 @@ void errorStatusHandlerCanPatchResource() {
any(), any());
}

@Test
void schedulesReconciliationIfMaxDelayIsSet() {
testCustomResource.addFinalizer(DEFAULT_FINALIZER);

reconciler.reconcile = (r, c) -> UpdateControl.noUpdate();

PostExecutionControl control =
reconciliationDispatcher.handleExecution(executionScopeWithCREvent(testCustomResource));

assertThat(control.getReScheduleDelay()).isPresent()
.hasValue(TimeUnit.HOURS.toMillis(RECONCILIATION_MAX_INTERVAL));
}

@Test
void canSkipSchedulingMaxDelayIf() {
testCustomResource.addFinalizer(DEFAULT_FINALIZER);
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,43 @@
package io.javaoperatorsdk.operator;

import java.util.concurrent.TimeUnit;

import org.junit.jupiter.api.Test;
import org.junit.jupiter.api.extension.RegisterExtension;

import io.fabric8.kubernetes.api.model.ObjectMeta;
import io.javaoperatorsdk.operator.junit.LocallyRunOperatorExtension;
import io.javaoperatorsdk.operator.sample.maxintervalafterretry.MaxIntervalAfterRetryTestCustomResource;
import io.javaoperatorsdk.operator.sample.maxintervalafterretry.MaxIntervalAfterRetryTestReconciler;

import static org.assertj.core.api.Assertions.assertThat;
import static org.awaitility.Awaitility.await;

class MaxIntervalAfterRetryIT {

@RegisterExtension
LocallyRunOperatorExtension operator =
LocallyRunOperatorExtension.builder()
.withReconciler(new MaxIntervalAfterRetryTestReconciler()).build();

@Test
void reconciliationTriggeredBasedOnMaxInterval() {
MaxIntervalAfterRetryTestCustomResource cr = createTestResource();

operator.create(cr);

await()
.pollInterval(50, TimeUnit.MILLISECONDS)
.atMost(1, TimeUnit.SECONDS)
.untilAsserted(
() -> assertThat(operator.getReconcilerOfType(MaxIntervalAfterRetryTestReconciler.class)
.getNumberOfExecutions()).isGreaterThan(5));
}

private MaxIntervalAfterRetryTestCustomResource createTestResource() {
MaxIntervalAfterRetryTestCustomResource cr = new MaxIntervalAfterRetryTestCustomResource();
cr.setMetadata(new ObjectMeta());
cr.getMetadata().setName("maxintervalretrytest1");
return cr;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@
import io.javaoperatorsdk.operator.sample.maxinterval.MaxIntervalTestCustomResource;
import io.javaoperatorsdk.operator.sample.maxinterval.MaxIntervalTestReconciler;

import static org.assertj.core.api.Assertions.assertThat;
import static org.awaitility.Awaitility.await;

class MaxIntervalIT {
Expand All @@ -27,9 +28,10 @@ void reconciliationTriggeredBasedOnMaxInterval() {
await()
.pollInterval(50, TimeUnit.MILLISECONDS)
.atMost(500, TimeUnit.MILLISECONDS)
.until(
() -> ((MaxIntervalTestReconciler) operator.getFirstReconciler())
.getNumberOfExecutions() > 3);
.untilAsserted(
() -> assertThat(operator.getReconcilerOfType(MaxIntervalTestReconciler.class)
.getNumberOfExecutions())
.isGreaterThan(3));
}

private MaxIntervalTestCustomResource createTestResource() {
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,15 @@
package io.javaoperatorsdk.operator.sample.maxintervalafterretry;

import io.fabric8.kubernetes.api.model.Namespaced;
import io.fabric8.kubernetes.client.CustomResource;
import io.fabric8.kubernetes.model.annotation.Group;
import io.fabric8.kubernetes.model.annotation.ShortNames;
import io.fabric8.kubernetes.model.annotation.Version;

@Group("sample.javaoperatorsdk")
@Version("v1")
@ShortNames("mir")
public class MaxIntervalAfterRetryTestCustomResource
extends CustomResource<Void, Void>
implements Namespaced {
}
Loading