diff --git a/operator-framework-core/src/main/java/io/javaoperatorsdk/operator/processing/event/EventProcessor.java b/operator-framework-core/src/main/java/io/javaoperatorsdk/operator/processing/event/EventProcessor.java index df9d0f06fc..58bf5ee00a 100644 --- a/operator-framework-core/src/main/java/io/javaoperatorsdk/operator/processing/event/EventProcessor.java +++ b/operator-framework-core/src/main/java/io/javaoperatorsdk/operator/processing/event/EventProcessor.java @@ -14,6 +14,7 @@ import io.javaoperatorsdk.operator.OperatorException; import io.javaoperatorsdk.operator.api.config.ConfigurationService; import io.javaoperatorsdk.operator.api.config.ConfigurationServiceProvider; +import io.javaoperatorsdk.operator.api.config.ControllerConfiguration; import io.javaoperatorsdk.operator.api.config.ExecutorServiceManager; import io.javaoperatorsdk.operator.api.monitoring.Metrics; import io.javaoperatorsdk.operator.api.reconciler.Constants; @@ -37,10 +38,10 @@ public class EventProcessor implements EventHandler, Life private static final long MINIMAL_RATE_LIMIT_RESCHEDULE_DURATION = 50; private volatile boolean running; + private final ControllerConfiguration controllerConfiguration; private final ReconciliationDispatcher reconciliationDispatcher; private final Retry retry; private final ExecutorService executor; - private final String controllerName; private final Metrics metrics; private final Cache cache; private final EventSourceManager eventSourceManager; @@ -48,60 +49,53 @@ public class EventProcessor implements EventHandler, Life private final ResourceStateManager resourceStateManager = new ResourceStateManager(); private final Map metricsMetadata; + public EventProcessor(EventSourceManager eventSourceManager) { this( + eventSourceManager.getController().getConfiguration(), eventSourceManager.getControllerResourceEventSource(), ExecutorServiceManager.instance().executorService(), - eventSourceManager.getController().getConfiguration().getName(), new ReconciliationDispatcher<>(eventSourceManager.getController()), - eventSourceManager.getController().getConfiguration().getRetry(), ConfigurationServiceProvider.instance().getMetrics(), - eventSourceManager.getController().getConfiguration().getRateLimiter(), eventSourceManager); } @SuppressWarnings("rawtypes") EventProcessor( + ControllerConfiguration controllerConfiguration, ReconciliationDispatcher reconciliationDispatcher, EventSourceManager eventSourceManager, - String relatedControllerName, - Retry retry, - RateLimiter rateLimiter, Metrics metrics) { this( + controllerConfiguration, eventSourceManager.getControllerResourceEventSource(), null, - relatedControllerName, reconciliationDispatcher, - retry, metrics, - rateLimiter, eventSourceManager); } @SuppressWarnings({"rawtypes", "unchecked"}) private EventProcessor( + ControllerConfiguration controllerConfiguration, Cache cache, ExecutorService executor, - String relatedControllerName, ReconciliationDispatcher reconciliationDispatcher, - Retry retry, Metrics metrics, - RateLimiter rateLimiter, EventSourceManager eventSourceManager) { + this.controllerConfiguration = controllerConfiguration; this.running = false; this.executor = executor == null ? new ScheduledThreadPoolExecutor( ConfigurationService.DEFAULT_RECONCILIATION_THREADS_NUMBER) : executor; - this.controllerName = relatedControllerName; this.reconciliationDispatcher = reconciliationDispatcher; - this.retry = retry; + this.retry = controllerConfiguration.getRetry(); this.cache = cache; this.metrics = metrics != null ? metrics : Metrics.NOOP; this.eventSourceManager = eventSourceManager; - this.rateLimiter = rateLimiter; + this.rateLimiter = controllerConfiguration.getRateLimiter(); metricsMetadata = Optional.ofNullable(eventSourceManager.getController()) .map(Controller::getAssociatedGroupVersionKind) @@ -272,18 +266,31 @@ synchronized void eventProcessingFinished( reScheduleExecutionIfInstructed(postExecutionControl, executionScope.getResource()); } } - } private void reScheduleExecutionIfInstructed( PostExecutionControl postExecutionControl, R customResource) { + postExecutionControl .getReScheduleDelay() - .ifPresent(delay -> { + .ifPresentOrElse(delay -> { var resourceID = ResourceID.fromResource(customResource); log.debug("ReScheduling event for resource: {} with delay: {}", resourceID, delay); retryEventSource().scheduleOnce(resourceID, delay); + }, () -> scheduleExecutionForMaxReconciliationInterval(customResource)); + } + + private void scheduleExecutionForMaxReconciliationInterval(R customResource) { + this.controllerConfiguration + .maxReconciliationInterval() + .ifPresent(m -> { + var resourceID = ResourceID.fromResource(customResource); + var delay = m.toMillis(); + log.debug("ReScheduling event for resource because for max reconciliation interval: " + + "{} with delay: {}", + resourceID, delay); + retryEventSource().scheduleOnce(resourceID, delay); }); } @@ -319,7 +326,10 @@ private void handleRetryOnException( metrics.failedReconciliation(resourceID, exception, metricsMetadata); retryEventSource().scheduleOnce(resourceID, delay); }, - () -> log.error("Exhausted retries for {}", executionScope)); + () -> { + log.error("Exhausted retries for {}", executionScope); + scheduleExecutionForMaxReconciliationInterval(executionScope.getResource()); + }); } private void cleanupOnSuccessfulExecution(ExecutionScope executionScope) { @@ -390,7 +400,7 @@ public void run() { final var name = thread.getName(); try { MDCUtils.addResourceInfo(executionScope.getResource()); - thread.setName("ReconcilerExecutor-" + controllerName + "-" + thread.getId()); + thread.setName("ReconcilerExecutor-" + controllerName() + "-" + thread.getId()); PostExecutionControl postExecutionControl = reconciliationDispatcher.handleExecution(executionScope); eventProcessingFinished(executionScope, postExecutionControl); @@ -403,10 +413,14 @@ public void run() { @Override public String toString() { - return controllerName + " -> " + executionScope; + return controllerName() + " -> " + executionScope; } } + private String controllerName() { + return controllerConfiguration.getName(); + } + public synchronized boolean isUnderProcessing(ResourceID resourceID) { return isControllerUnderExecution(resourceStateManager.getOrCreate(resourceID)); } diff --git a/operator-framework-core/src/main/java/io/javaoperatorsdk/operator/processing/event/ReconciliationDispatcher.java b/operator-framework-core/src/main/java/io/javaoperatorsdk/operator/processing/event/ReconciliationDispatcher.java index bb19e6b505..651fc70276 100644 --- a/operator-framework-core/src/main/java/io/javaoperatorsdk/operator/processing/event/ReconciliationDispatcher.java +++ b/operator-framework-core/src/main/java/io/javaoperatorsdk/operator/processing/event/ReconciliationDispatcher.java @@ -264,9 +264,7 @@ private PostExecutionControl

createPostExecutionControl(P updatedCustomResour private void updatePostExecutionControlWithReschedule( PostExecutionControl

postExecutionControl, BaseControl baseControl) { - baseControl.getScheduleDelay().ifPresentOrElse(postExecutionControl::withReSchedule, - () -> controller.getConfiguration().maxReconciliationInterval() - .ifPresent(m -> postExecutionControl.withReSchedule(m.toMillis()))); + baseControl.getScheduleDelay().ifPresent(postExecutionControl::withReSchedule); } diff --git a/operator-framework-core/src/test/java/io/javaoperatorsdk/operator/processing/event/EventProcessorTest.java b/operator-framework-core/src/test/java/io/javaoperatorsdk/operator/processing/event/EventProcessorTest.java index 2a97158a43..23d5961015 100644 --- a/operator-framework-core/src/test/java/io/javaoperatorsdk/operator/processing/event/EventProcessorTest.java +++ b/operator-framework-core/src/test/java/io/javaoperatorsdk/operator/processing/event/EventProcessorTest.java @@ -15,6 +15,7 @@ import org.slf4j.LoggerFactory; import io.fabric8.kubernetes.api.model.HasMetadata; +import io.javaoperatorsdk.operator.api.config.ControllerConfiguration; import io.javaoperatorsdk.operator.api.config.RetryConfiguration; import io.javaoperatorsdk.operator.api.monitoring.Metrics; import io.javaoperatorsdk.operator.processing.event.rate.LinearRateLimiter; @@ -25,6 +26,8 @@ import io.javaoperatorsdk.operator.processing.event.source.controller.ResourceEvent; import io.javaoperatorsdk.operator.processing.event.source.timer.TimerEventSource; import io.javaoperatorsdk.operator.processing.retry.GenericRetry; +import io.javaoperatorsdk.operator.processing.retry.Retry; +import io.javaoperatorsdk.operator.processing.retry.RetryExecution; import io.javaoperatorsdk.operator.sample.simple.TestCustomResource; import static io.javaoperatorsdk.operator.TestUtils.markForDeletion; @@ -70,12 +73,15 @@ void setup() { when(eventSourceManagerMock.getControllerResourceEventSource()) .thenReturn(controllerResourceEventSourceMock); eventProcessor = - spy(new EventProcessor(reconciliationDispatcherMock, eventSourceManagerMock, "Test", null, - rateLimiterMock, null)); + spy(new EventProcessor(controllerConfiguration(null, rateLimiterMock), + reconciliationDispatcherMock, + eventSourceManagerMock, null)); eventProcessor.start(); eventProcessorWithRetry = - spy(new EventProcessor(reconciliationDispatcherMock, eventSourceManagerMock, "Test", - GenericRetry.defaultLimitedExponentialRetry(), rateLimiterMock, null)); + spy(new EventProcessor( + controllerConfiguration(GenericRetry.defaultLimitedExponentialRetry(), + rateLimiterMock), + reconciliationDispatcherMock, eventSourceManagerMock, null)); eventProcessorWithRetry.start(); when(eventProcessor.retryEventSource()).thenReturn(retryTimerEventSourceMock); when(eventProcessorWithRetry.retryEventSource()).thenReturn(retryTimerEventSourceMock); @@ -258,8 +264,9 @@ void cancelScheduleOnceEventsOnSuccessfulExecution() { void startProcessedMarkedEventReceivedBefore() { var crID = new ResourceID("test-cr", TEST_NAMESPACE); eventProcessor = - spy(new EventProcessor(reconciliationDispatcherMock, eventSourceManagerMock, "Test", null, - LinearRateLimiter.deactivatedRateLimiter(), + spy(new EventProcessor(controllerConfiguration(null, + LinearRateLimiter.deactivatedRateLimiter()), reconciliationDispatcherMock, + eventSourceManagerMock, metricsMock)); when(controllerResourceEventSourceMock.get(eq(crID))) .thenReturn(Optional.of(testCustomResource())); @@ -367,6 +374,40 @@ void rateLimitsReconciliationSubmission() { verify(retryTimerEventSourceMock, times(1)).scheduleOnce((ResourceID) any(), anyLong()); } + @Test + void schedulesRetryForMarReconciliationInterval() { + TestCustomResource customResource = testCustomResource(); + ExecutionScope executionScope = new ExecutionScope(customResource, null); + PostExecutionControl postExecutionControl = + PostExecutionControl.defaultDispatch(); + + eventProcessorWithRetry.eventProcessingFinished(executionScope, postExecutionControl); + + verify(retryTimerEventSourceMock, times(1)).scheduleOnce((ResourceID) any(), anyLong()); + } + + @Test + void schedulesRetryForMarReconciliationIntervalIfRetryExhausted() { + RetryExecution mockRetryExecution = mock(RetryExecution.class); + when(mockRetryExecution.nextDelay()).thenReturn(Optional.empty()); + Retry retry = mock(Retry.class); + when(retry.initExecution()).thenReturn(mockRetryExecution); + eventProcessorWithRetry = + spy(new EventProcessor(controllerConfiguration(retry, + LinearRateLimiter.deactivatedRateLimiter()), reconciliationDispatcherMock, + eventSourceManagerMock, + metricsMock)); + eventProcessorWithRetry.start(); + ExecutionScope executionScope = new ExecutionScope(testCustomResource(), null); + PostExecutionControl postExecutionControl = + PostExecutionControl.exceptionDuringExecution(new RuntimeException()); + when(eventProcessorWithRetry.retryEventSource()).thenReturn(retryTimerEventSourceMock); + + eventProcessorWithRetry.eventProcessingFinished(executionScope, postExecutionControl); + + verify(retryTimerEventSourceMock, times(1)).scheduleOnce((ResourceID) any(), anyLong()); + } + private ResourceID eventAlreadyUnderProcessing() { when(reconciliationDispatcherMock.handleExecution(any())) .then( @@ -407,4 +448,13 @@ private void overrideData(ResourceID id, HasMetadata applyTo) { applyTo.getMetadata().setNamespace(id.getNamespace().orElse(null)); } + ControllerConfiguration controllerConfiguration(Retry retry, RateLimiter rateLimiter) { + ControllerConfiguration res = mock(ControllerConfiguration.class); + when(res.getName()).thenReturn("Test"); + when(res.getRetry()).thenReturn(retry); + when(res.getRateLimiter()).thenReturn(rateLimiter); + when(res.maxReconciliationInterval()).thenReturn(Optional.of(Duration.ofMillis(1000))); + return res; + } + } diff --git a/operator-framework-core/src/test/java/io/javaoperatorsdk/operator/processing/event/ReconciliationDispatcherTest.java b/operator-framework-core/src/test/java/io/javaoperatorsdk/operator/processing/event/ReconciliationDispatcherTest.java index 89dafa9fc4..72406094fc 100644 --- a/operator-framework-core/src/test/java/io/javaoperatorsdk/operator/processing/event/ReconciliationDispatcherTest.java +++ b/operator-framework-core/src/test/java/io/javaoperatorsdk/operator/processing/event/ReconciliationDispatcherTest.java @@ -600,19 +600,6 @@ void errorStatusHandlerCanPatchResource() { any(), any()); } - @Test - void schedulesReconciliationIfMaxDelayIsSet() { - testCustomResource.addFinalizer(DEFAULT_FINALIZER); - - reconciler.reconcile = (r, c) -> UpdateControl.noUpdate(); - - PostExecutionControl control = - reconciliationDispatcher.handleExecution(executionScopeWithCREvent(testCustomResource)); - - assertThat(control.getReScheduleDelay()).isPresent() - .hasValue(TimeUnit.HOURS.toMillis(RECONCILIATION_MAX_INTERVAL)); - } - @Test void canSkipSchedulingMaxDelayIf() { testCustomResource.addFinalizer(DEFAULT_FINALIZER); diff --git a/operator-framework/src/test/java/io/javaoperatorsdk/operator/MaxIntervalAfterRetryIT.java b/operator-framework/src/test/java/io/javaoperatorsdk/operator/MaxIntervalAfterRetryIT.java new file mode 100644 index 0000000000..84814e127e --- /dev/null +++ b/operator-framework/src/test/java/io/javaoperatorsdk/operator/MaxIntervalAfterRetryIT.java @@ -0,0 +1,43 @@ +package io.javaoperatorsdk.operator; + +import java.util.concurrent.TimeUnit; + +import org.junit.jupiter.api.Test; +import org.junit.jupiter.api.extension.RegisterExtension; + +import io.fabric8.kubernetes.api.model.ObjectMeta; +import io.javaoperatorsdk.operator.junit.LocallyRunOperatorExtension; +import io.javaoperatorsdk.operator.sample.maxintervalafterretry.MaxIntervalAfterRetryTestCustomResource; +import io.javaoperatorsdk.operator.sample.maxintervalafterretry.MaxIntervalAfterRetryTestReconciler; + +import static org.assertj.core.api.Assertions.assertThat; +import static org.awaitility.Awaitility.await; + +class MaxIntervalAfterRetryIT { + + @RegisterExtension + LocallyRunOperatorExtension operator = + LocallyRunOperatorExtension.builder() + .withReconciler(new MaxIntervalAfterRetryTestReconciler()).build(); + + @Test + void reconciliationTriggeredBasedOnMaxInterval() { + MaxIntervalAfterRetryTestCustomResource cr = createTestResource(); + + operator.create(cr); + + await() + .pollInterval(50, TimeUnit.MILLISECONDS) + .atMost(1, TimeUnit.SECONDS) + .untilAsserted( + () -> assertThat(operator.getReconcilerOfType(MaxIntervalAfterRetryTestReconciler.class) + .getNumberOfExecutions()).isGreaterThan(5)); + } + + private MaxIntervalAfterRetryTestCustomResource createTestResource() { + MaxIntervalAfterRetryTestCustomResource cr = new MaxIntervalAfterRetryTestCustomResource(); + cr.setMetadata(new ObjectMeta()); + cr.getMetadata().setName("maxintervalretrytest1"); + return cr; + } +} diff --git a/operator-framework/src/test/java/io/javaoperatorsdk/operator/MaxIntervalIT.java b/operator-framework/src/test/java/io/javaoperatorsdk/operator/MaxIntervalIT.java index e817fa8d1f..35ae316f53 100644 --- a/operator-framework/src/test/java/io/javaoperatorsdk/operator/MaxIntervalIT.java +++ b/operator-framework/src/test/java/io/javaoperatorsdk/operator/MaxIntervalIT.java @@ -10,6 +10,7 @@ import io.javaoperatorsdk.operator.sample.maxinterval.MaxIntervalTestCustomResource; import io.javaoperatorsdk.operator.sample.maxinterval.MaxIntervalTestReconciler; +import static org.assertj.core.api.Assertions.assertThat; import static org.awaitility.Awaitility.await; class MaxIntervalIT { @@ -27,9 +28,10 @@ void reconciliationTriggeredBasedOnMaxInterval() { await() .pollInterval(50, TimeUnit.MILLISECONDS) .atMost(500, TimeUnit.MILLISECONDS) - .until( - () -> ((MaxIntervalTestReconciler) operator.getFirstReconciler()) - .getNumberOfExecutions() > 3); + .untilAsserted( + () -> assertThat(operator.getReconcilerOfType(MaxIntervalTestReconciler.class) + .getNumberOfExecutions()) + .isGreaterThan(3)); } private MaxIntervalTestCustomResource createTestResource() { diff --git a/operator-framework/src/test/java/io/javaoperatorsdk/operator/sample/maxintervalafterretry/MaxIntervalAfterRetryTestCustomResource.java b/operator-framework/src/test/java/io/javaoperatorsdk/operator/sample/maxintervalafterretry/MaxIntervalAfterRetryTestCustomResource.java new file mode 100644 index 0000000000..cc914f6c48 --- /dev/null +++ b/operator-framework/src/test/java/io/javaoperatorsdk/operator/sample/maxintervalafterretry/MaxIntervalAfterRetryTestCustomResource.java @@ -0,0 +1,15 @@ +package io.javaoperatorsdk.operator.sample.maxintervalafterretry; + +import io.fabric8.kubernetes.api.model.Namespaced; +import io.fabric8.kubernetes.client.CustomResource; +import io.fabric8.kubernetes.model.annotation.Group; +import io.fabric8.kubernetes.model.annotation.ShortNames; +import io.fabric8.kubernetes.model.annotation.Version; + +@Group("sample.javaoperatorsdk") +@Version("v1") +@ShortNames("mir") +public class MaxIntervalAfterRetryTestCustomResource + extends CustomResource + implements Namespaced { +} diff --git a/operator-framework/src/test/java/io/javaoperatorsdk/operator/sample/maxintervalafterretry/MaxIntervalAfterRetryTestReconciler.java b/operator-framework/src/test/java/io/javaoperatorsdk/operator/sample/maxintervalafterretry/MaxIntervalAfterRetryTestReconciler.java new file mode 100644 index 0000000000..8473ee03ff --- /dev/null +++ b/operator-framework/src/test/java/io/javaoperatorsdk/operator/sample/maxintervalafterretry/MaxIntervalAfterRetryTestReconciler.java @@ -0,0 +1,36 @@ +package io.javaoperatorsdk.operator.sample.maxintervalafterretry; + +import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicInteger; + +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import io.javaoperatorsdk.operator.api.reconciler.*; +import io.javaoperatorsdk.operator.processing.retry.GradualRetry; +import io.javaoperatorsdk.operator.support.TestExecutionInfoProvider; + +@GradualRetry(maxAttempts = 1, initialInterval = 100) +@ControllerConfiguration(maxReconciliationInterval = @MaxReconciliationInterval(interval = 50, + timeUnit = TimeUnit.MILLISECONDS)) +public class MaxIntervalAfterRetryTestReconciler + implements Reconciler, TestExecutionInfoProvider { + + private Logger log = LoggerFactory.getLogger(MaxIntervalAfterRetryTestReconciler.class); + + private final AtomicInteger numberOfExecutions = new AtomicInteger(0); + + @Override + public UpdateControl reconcile( + MaxIntervalAfterRetryTestCustomResource resource, + Context context) { + numberOfExecutions.addAndGet(1); + log.info("Executed, number: {}", numberOfExecutions.get()); + throw new RuntimeException(); + } + + public int getNumberOfExecutions() { + return numberOfExecutions.get(); + } + +}