From 95afaee4038b80e9a8fa2510ed480ae61279dad8 Mon Sep 17 00:00:00 2001 From: csviri Date: Tue, 20 Sep 2022 10:35:21 +0200 Subject: [PATCH 1/5] fix: max reconciliation interval applies after retry exhaustion --- .../operator/processing/Controller.java | 2 +- .../processing/event/EventProcessor.java | 51 ++++++++++++------- .../event/ReconciliationDispatcher.java | 4 +- .../processing/event/EventProcessorTest.java | 26 +++++++--- .../event/ReconciliationDispatcherTest.java | 13 ----- 5 files changed, 54 insertions(+), 42 deletions(-) diff --git a/operator-framework-core/src/main/java/io/javaoperatorsdk/operator/processing/Controller.java b/operator-framework-core/src/main/java/io/javaoperatorsdk/operator/processing/Controller.java index 92b70e722d..2ba5e32214 100644 --- a/operator-framework-core/src/main/java/io/javaoperatorsdk/operator/processing/Controller.java +++ b/operator-framework-core/src/main/java/io/javaoperatorsdk/operator/processing/Controller.java @@ -78,7 +78,7 @@ public Controller(Reconciler

reconciler, managedWorkflow = ManagedWorkflow.workflowFor(kubernetesClient, configuration.getDependentResources()); eventSourceManager = new EventSourceManager<>(this); - eventProcessor = new EventProcessor<>(eventSourceManager); + eventProcessor = new EventProcessor<>(getConfiguration(), eventSourceManager); eventSourceManager.postProcessDefaultEventSourcesAfterProcessorInitializer(); } diff --git a/operator-framework-core/src/main/java/io/javaoperatorsdk/operator/processing/event/EventProcessor.java b/operator-framework-core/src/main/java/io/javaoperatorsdk/operator/processing/event/EventProcessor.java index df9d0f06fc..b019292e52 100644 --- a/operator-framework-core/src/main/java/io/javaoperatorsdk/operator/processing/event/EventProcessor.java +++ b/operator-framework-core/src/main/java/io/javaoperatorsdk/operator/processing/event/EventProcessor.java @@ -14,6 +14,7 @@ import io.javaoperatorsdk.operator.OperatorException; import io.javaoperatorsdk.operator.api.config.ConfigurationService; import io.javaoperatorsdk.operator.api.config.ConfigurationServiceProvider; +import io.javaoperatorsdk.operator.api.config.ControllerConfiguration; import io.javaoperatorsdk.operator.api.config.ExecutorServiceManager; import io.javaoperatorsdk.operator.api.monitoring.Metrics; import io.javaoperatorsdk.operator.api.reconciler.Constants; @@ -37,6 +38,7 @@ public class EventProcessor implements EventHandler, Life private static final long MINIMAL_RATE_LIMIT_RESCHEDULE_DURATION = 50; private volatile boolean running; + private final ControllerConfiguration controllerConfiguration; private final ReconciliationDispatcher reconciliationDispatcher; private final Retry retry; private final ExecutorService executor; @@ -48,60 +50,55 @@ public class EventProcessor implements EventHandler, Life private final ResourceStateManager resourceStateManager = new ResourceStateManager(); private final Map metricsMetadata; - public EventProcessor(EventSourceManager eventSourceManager) { + + public EventProcessor(ControllerConfiguration controllerConfiguration, + EventSourceManager eventSourceManager) { this( + controllerConfiguration, eventSourceManager.getControllerResourceEventSource(), ExecutorServiceManager.instance().executorService(), - eventSourceManager.getController().getConfiguration().getName(), new ReconciliationDispatcher<>(eventSourceManager.getController()), - eventSourceManager.getController().getConfiguration().getRetry(), ConfigurationServiceProvider.instance().getMetrics(), - eventSourceManager.getController().getConfiguration().getRateLimiter(), eventSourceManager); } @SuppressWarnings("rawtypes") EventProcessor( + ControllerConfiguration controllerConfiguration, ReconciliationDispatcher reconciliationDispatcher, EventSourceManager eventSourceManager, - String relatedControllerName, - Retry retry, - RateLimiter rateLimiter, Metrics metrics) { this( + controllerConfiguration, eventSourceManager.getControllerResourceEventSource(), null, - relatedControllerName, reconciliationDispatcher, - retry, metrics, - rateLimiter, eventSourceManager); } @SuppressWarnings({"rawtypes", "unchecked"}) private EventProcessor( + ControllerConfiguration controllerConfiguration, Cache cache, ExecutorService executor, - String relatedControllerName, ReconciliationDispatcher reconciliationDispatcher, - Retry retry, Metrics metrics, - RateLimiter rateLimiter, EventSourceManager eventSourceManager) { + this.controllerConfiguration = controllerConfiguration; this.running = false; this.executor = executor == null ? new ScheduledThreadPoolExecutor( ConfigurationService.DEFAULT_RECONCILIATION_THREADS_NUMBER) : executor; - this.controllerName = relatedControllerName; + this.controllerName = controllerConfiguration.getName(); this.reconciliationDispatcher = reconciliationDispatcher; - this.retry = retry; + this.retry = controllerConfiguration.getRetry(); this.cache = cache; this.metrics = metrics != null ? metrics : Metrics.NOOP; this.eventSourceManager = eventSourceManager; - this.rateLimiter = rateLimiter; + this.rateLimiter = controllerConfiguration.getRateLimiter(); metricsMetadata = Optional.ofNullable(eventSourceManager.getController()) .map(Controller::getAssociatedGroupVersionKind) @@ -272,18 +269,31 @@ synchronized void eventProcessingFinished( reScheduleExecutionIfInstructed(postExecutionControl, executionScope.getResource()); } } - } private void reScheduleExecutionIfInstructed( PostExecutionControl postExecutionControl, R customResource) { + postExecutionControl .getReScheduleDelay() - .ifPresent(delay -> { + .ifPresentOrElse(delay -> { var resourceID = ResourceID.fromResource(customResource); log.debug("ReScheduling event for resource: {} with delay: {}", resourceID, delay); retryEventSource().scheduleOnce(resourceID, delay); + }, () -> scheduleExecutionForMaxReconciliationInterval(customResource)); + } + + private void scheduleExecutionForMaxReconciliationInterval(R customResource) { + this.controllerConfiguration + .maxReconciliationInterval() + .ifPresent(m -> { + var resourceID = ResourceID.fromResource(customResource); + var delay = m.toMillis(); + log.debug("ReScheduling event for resource because for max reconciliation interval: " + + "{} with delay: {}", + resourceID, delay); + retryEventSource().scheduleOnce(resourceID, delay); }); } @@ -319,7 +329,10 @@ private void handleRetryOnException( metrics.failedReconciliation(resourceID, exception, metricsMetadata); retryEventSource().scheduleOnce(resourceID, delay); }, - () -> log.error("Exhausted retries for {}", executionScope)); + () -> { + log.error("Exhausted retries for {}", executionScope); + scheduleExecutionForMaxReconciliationInterval(executionScope.getResource()); + }); } private void cleanupOnSuccessfulExecution(ExecutionScope executionScope) { diff --git a/operator-framework-core/src/main/java/io/javaoperatorsdk/operator/processing/event/ReconciliationDispatcher.java b/operator-framework-core/src/main/java/io/javaoperatorsdk/operator/processing/event/ReconciliationDispatcher.java index bb19e6b505..651fc70276 100644 --- a/operator-framework-core/src/main/java/io/javaoperatorsdk/operator/processing/event/ReconciliationDispatcher.java +++ b/operator-framework-core/src/main/java/io/javaoperatorsdk/operator/processing/event/ReconciliationDispatcher.java @@ -264,9 +264,7 @@ private PostExecutionControl

createPostExecutionControl(P updatedCustomResour private void updatePostExecutionControlWithReschedule( PostExecutionControl

postExecutionControl, BaseControl baseControl) { - baseControl.getScheduleDelay().ifPresentOrElse(postExecutionControl::withReSchedule, - () -> controller.getConfiguration().maxReconciliationInterval() - .ifPresent(m -> postExecutionControl.withReSchedule(m.toMillis()))); + baseControl.getScheduleDelay().ifPresent(postExecutionControl::withReSchedule); } diff --git a/operator-framework-core/src/test/java/io/javaoperatorsdk/operator/processing/event/EventProcessorTest.java b/operator-framework-core/src/test/java/io/javaoperatorsdk/operator/processing/event/EventProcessorTest.java index 2a97158a43..41041571b5 100644 --- a/operator-framework-core/src/test/java/io/javaoperatorsdk/operator/processing/event/EventProcessorTest.java +++ b/operator-framework-core/src/test/java/io/javaoperatorsdk/operator/processing/event/EventProcessorTest.java @@ -15,6 +15,7 @@ import org.slf4j.LoggerFactory; import io.fabric8.kubernetes.api.model.HasMetadata; +import io.javaoperatorsdk.operator.api.config.ControllerConfiguration; import io.javaoperatorsdk.operator.api.config.RetryConfiguration; import io.javaoperatorsdk.operator.api.monitoring.Metrics; import io.javaoperatorsdk.operator.processing.event.rate.LinearRateLimiter; @@ -25,6 +26,7 @@ import io.javaoperatorsdk.operator.processing.event.source.controller.ResourceEvent; import io.javaoperatorsdk.operator.processing.event.source.timer.TimerEventSource; import io.javaoperatorsdk.operator.processing.retry.GenericRetry; +import io.javaoperatorsdk.operator.processing.retry.Retry; import io.javaoperatorsdk.operator.sample.simple.TestCustomResource; import static io.javaoperatorsdk.operator.TestUtils.markForDeletion; @@ -70,12 +72,15 @@ void setup() { when(eventSourceManagerMock.getControllerResourceEventSource()) .thenReturn(controllerResourceEventSourceMock); eventProcessor = - spy(new EventProcessor(reconciliationDispatcherMock, eventSourceManagerMock, "Test", null, - rateLimiterMock, null)); + spy(new EventProcessor(controllerConfiguration(null, rateLimiterMock), + reconciliationDispatcherMock, + eventSourceManagerMock, null)); eventProcessor.start(); eventProcessorWithRetry = - spy(new EventProcessor(reconciliationDispatcherMock, eventSourceManagerMock, "Test", - GenericRetry.defaultLimitedExponentialRetry(), rateLimiterMock, null)); + spy(new EventProcessor( + controllerConfiguration(GenericRetry.defaultLimitedExponentialRetry(), + rateLimiterMock), + reconciliationDispatcherMock, eventSourceManagerMock, null)); eventProcessorWithRetry.start(); when(eventProcessor.retryEventSource()).thenReturn(retryTimerEventSourceMock); when(eventProcessorWithRetry.retryEventSource()).thenReturn(retryTimerEventSourceMock); @@ -258,8 +263,9 @@ void cancelScheduleOnceEventsOnSuccessfulExecution() { void startProcessedMarkedEventReceivedBefore() { var crID = new ResourceID("test-cr", TEST_NAMESPACE); eventProcessor = - spy(new EventProcessor(reconciliationDispatcherMock, eventSourceManagerMock, "Test", null, - LinearRateLimiter.deactivatedRateLimiter(), + spy(new EventProcessor(controllerConfiguration(null, + LinearRateLimiter.deactivatedRateLimiter()), reconciliationDispatcherMock, + eventSourceManagerMock, metricsMock)); when(controllerResourceEventSourceMock.get(eq(crID))) .thenReturn(Optional.of(testCustomResource())); @@ -407,4 +413,12 @@ private void overrideData(ResourceID id, HasMetadata applyTo) { applyTo.getMetadata().setNamespace(id.getNamespace().orElse(null)); } + ControllerConfiguration controllerConfiguration(Retry retry, RateLimiter rateLimiter) { + ControllerConfiguration res = mock(ControllerConfiguration.class); + when(res.getName()).thenReturn("Test"); + when(res.getRetry()).thenReturn(retry); + when(res.getRateLimiter()).thenReturn(rateLimiter); + return res; + } + } diff --git a/operator-framework-core/src/test/java/io/javaoperatorsdk/operator/processing/event/ReconciliationDispatcherTest.java b/operator-framework-core/src/test/java/io/javaoperatorsdk/operator/processing/event/ReconciliationDispatcherTest.java index 89dafa9fc4..72406094fc 100644 --- a/operator-framework-core/src/test/java/io/javaoperatorsdk/operator/processing/event/ReconciliationDispatcherTest.java +++ b/operator-framework-core/src/test/java/io/javaoperatorsdk/operator/processing/event/ReconciliationDispatcherTest.java @@ -600,19 +600,6 @@ void errorStatusHandlerCanPatchResource() { any(), any()); } - @Test - void schedulesReconciliationIfMaxDelayIsSet() { - testCustomResource.addFinalizer(DEFAULT_FINALIZER); - - reconciler.reconcile = (r, c) -> UpdateControl.noUpdate(); - - PostExecutionControl control = - reconciliationDispatcher.handleExecution(executionScopeWithCREvent(testCustomResource)); - - assertThat(control.getReScheduleDelay()).isPresent() - .hasValue(TimeUnit.HOURS.toMillis(RECONCILIATION_MAX_INTERVAL)); - } - @Test void canSkipSchedulingMaxDelayIf() { testCustomResource.addFinalizer(DEFAULT_FINALIZER); From cd747073b671883fe35d751457d988cea13edcac Mon Sep 17 00:00:00 2001 From: csviri Date: Tue, 20 Sep 2022 11:16:58 +0200 Subject: [PATCH 2/5] added unit test --- .../processing/event/EventProcessorTest.java | 36 +++++++++++++++++++ 1 file changed, 36 insertions(+) diff --git a/operator-framework-core/src/test/java/io/javaoperatorsdk/operator/processing/event/EventProcessorTest.java b/operator-framework-core/src/test/java/io/javaoperatorsdk/operator/processing/event/EventProcessorTest.java index 41041571b5..ff0603dc93 100644 --- a/operator-framework-core/src/test/java/io/javaoperatorsdk/operator/processing/event/EventProcessorTest.java +++ b/operator-framework-core/src/test/java/io/javaoperatorsdk/operator/processing/event/EventProcessorTest.java @@ -27,6 +27,7 @@ import io.javaoperatorsdk.operator.processing.event.source.timer.TimerEventSource; import io.javaoperatorsdk.operator.processing.retry.GenericRetry; import io.javaoperatorsdk.operator.processing.retry.Retry; +import io.javaoperatorsdk.operator.processing.retry.RetryExecution; import io.javaoperatorsdk.operator.sample.simple.TestCustomResource; import static io.javaoperatorsdk.operator.TestUtils.markForDeletion; @@ -373,6 +374,40 @@ void rateLimitsReconciliationSubmission() { verify(retryTimerEventSourceMock, times(1)).scheduleOnce((ResourceID) any(), anyLong()); } + @Test + void schedulesRetryForMarReconciliationInterval() { + TestCustomResource customResource = testCustomResource(); + ExecutionScope executionScope = new ExecutionScope(customResource, null); + PostExecutionControl postExecutionControl = + PostExecutionControl.defaultDispatch(); + + eventProcessorWithRetry.eventProcessingFinished(executionScope, postExecutionControl); + + verify(retryTimerEventSourceMock, times(1)).scheduleOnce((ResourceID) any(), anyLong()); + } + + @Test + void schedulesRetryForMarReconciliationIntervalIfRetryExhausted() { + RetryExecution mockRetryExecution = mock(RetryExecution.class); + when(mockRetryExecution.nextDelay()).thenReturn(Optional.empty()); + Retry retry = mock(Retry.class); + when(retry.initExecution()).thenReturn(mockRetryExecution); + eventProcessor = + spy(new EventProcessor(controllerConfiguration(retry, + LinearRateLimiter.deactivatedRateLimiter()), reconciliationDispatcherMock, + eventSourceManagerMock, + metricsMock)); + eventProcessor.start(); + ExecutionScope executionScope = new ExecutionScope(testCustomResource(), null); + PostExecutionControl postExecutionControl = + PostExecutionControl.exceptionDuringExecution(new RuntimeException()); + when(eventProcessor.retryEventSource()).thenReturn(retryTimerEventSourceMock); + + eventProcessor.eventProcessingFinished(executionScope, postExecutionControl); + + verify(retryTimerEventSourceMock, times(1)).scheduleOnce((ResourceID) any(), anyLong()); + } + private ResourceID eventAlreadyUnderProcessing() { when(reconciliationDispatcherMock.handleExecution(any())) .then( @@ -418,6 +453,7 @@ ControllerConfiguration controllerConfiguration(Retry retry, RateLimiter rateLim when(res.getName()).thenReturn("Test"); when(res.getRetry()).thenReturn(retry); when(res.getRateLimiter()).thenReturn(rateLimiter); + when(res.maxReconciliationInterval()).thenReturn(Optional.of(Duration.ofMillis(1000))); return res; } From 8e3aa242039c6187c0dbf1dd1e7c7fb28a13cef8 Mon Sep 17 00:00:00 2001 From: csviri Date: Tue, 20 Sep 2022 12:43:04 +0200 Subject: [PATCH 3/5] small refactor --- .../operator/processing/event/EventProcessorTest.java | 8 ++++---- 1 file changed, 4 insertions(+), 4 deletions(-) diff --git a/operator-framework-core/src/test/java/io/javaoperatorsdk/operator/processing/event/EventProcessorTest.java b/operator-framework-core/src/test/java/io/javaoperatorsdk/operator/processing/event/EventProcessorTest.java index ff0603dc93..23d5961015 100644 --- a/operator-framework-core/src/test/java/io/javaoperatorsdk/operator/processing/event/EventProcessorTest.java +++ b/operator-framework-core/src/test/java/io/javaoperatorsdk/operator/processing/event/EventProcessorTest.java @@ -392,18 +392,18 @@ void schedulesRetryForMarReconciliationIntervalIfRetryExhausted() { when(mockRetryExecution.nextDelay()).thenReturn(Optional.empty()); Retry retry = mock(Retry.class); when(retry.initExecution()).thenReturn(mockRetryExecution); - eventProcessor = + eventProcessorWithRetry = spy(new EventProcessor(controllerConfiguration(retry, LinearRateLimiter.deactivatedRateLimiter()), reconciliationDispatcherMock, eventSourceManagerMock, metricsMock)); - eventProcessor.start(); + eventProcessorWithRetry.start(); ExecutionScope executionScope = new ExecutionScope(testCustomResource(), null); PostExecutionControl postExecutionControl = PostExecutionControl.exceptionDuringExecution(new RuntimeException()); - when(eventProcessor.retryEventSource()).thenReturn(retryTimerEventSourceMock); + when(eventProcessorWithRetry.retryEventSource()).thenReturn(retryTimerEventSourceMock); - eventProcessor.eventProcessingFinished(executionScope, postExecutionControl); + eventProcessorWithRetry.eventProcessingFinished(executionScope, postExecutionControl); verify(retryTimerEventSourceMock, times(1)).scheduleOnce((ResourceID) any(), anyLong()); } From 30fccca06f7af6f515f8976aa3490be755d2d5f2 Mon Sep 17 00:00:00 2001 From: csviri Date: Tue, 20 Sep 2022 13:20:58 +0200 Subject: [PATCH 4/5] Integration Test --- .../operator/MaxIntervalAfterRetryIT.java | 43 +++++++++++++++++++ .../operator/MaxIntervalIT.java | 8 ++-- ...xIntervalAfterRetryTestCustomResource.java | 15 +++++++ .../MaxIntervalAfterRetryTestReconciler.java | 36 ++++++++++++++++ 4 files changed, 99 insertions(+), 3 deletions(-) create mode 100644 operator-framework/src/test/java/io/javaoperatorsdk/operator/MaxIntervalAfterRetryIT.java create mode 100644 operator-framework/src/test/java/io/javaoperatorsdk/operator/sample/maxintervalafterretry/MaxIntervalAfterRetryTestCustomResource.java create mode 100644 operator-framework/src/test/java/io/javaoperatorsdk/operator/sample/maxintervalafterretry/MaxIntervalAfterRetryTestReconciler.java diff --git a/operator-framework/src/test/java/io/javaoperatorsdk/operator/MaxIntervalAfterRetryIT.java b/operator-framework/src/test/java/io/javaoperatorsdk/operator/MaxIntervalAfterRetryIT.java new file mode 100644 index 0000000000..84814e127e --- /dev/null +++ b/operator-framework/src/test/java/io/javaoperatorsdk/operator/MaxIntervalAfterRetryIT.java @@ -0,0 +1,43 @@ +package io.javaoperatorsdk.operator; + +import java.util.concurrent.TimeUnit; + +import org.junit.jupiter.api.Test; +import org.junit.jupiter.api.extension.RegisterExtension; + +import io.fabric8.kubernetes.api.model.ObjectMeta; +import io.javaoperatorsdk.operator.junit.LocallyRunOperatorExtension; +import io.javaoperatorsdk.operator.sample.maxintervalafterretry.MaxIntervalAfterRetryTestCustomResource; +import io.javaoperatorsdk.operator.sample.maxintervalafterretry.MaxIntervalAfterRetryTestReconciler; + +import static org.assertj.core.api.Assertions.assertThat; +import static org.awaitility.Awaitility.await; + +class MaxIntervalAfterRetryIT { + + @RegisterExtension + LocallyRunOperatorExtension operator = + LocallyRunOperatorExtension.builder() + .withReconciler(new MaxIntervalAfterRetryTestReconciler()).build(); + + @Test + void reconciliationTriggeredBasedOnMaxInterval() { + MaxIntervalAfterRetryTestCustomResource cr = createTestResource(); + + operator.create(cr); + + await() + .pollInterval(50, TimeUnit.MILLISECONDS) + .atMost(1, TimeUnit.SECONDS) + .untilAsserted( + () -> assertThat(operator.getReconcilerOfType(MaxIntervalAfterRetryTestReconciler.class) + .getNumberOfExecutions()).isGreaterThan(5)); + } + + private MaxIntervalAfterRetryTestCustomResource createTestResource() { + MaxIntervalAfterRetryTestCustomResource cr = new MaxIntervalAfterRetryTestCustomResource(); + cr.setMetadata(new ObjectMeta()); + cr.getMetadata().setName("maxintervalretrytest1"); + return cr; + } +} diff --git a/operator-framework/src/test/java/io/javaoperatorsdk/operator/MaxIntervalIT.java b/operator-framework/src/test/java/io/javaoperatorsdk/operator/MaxIntervalIT.java index e817fa8d1f..35ae316f53 100644 --- a/operator-framework/src/test/java/io/javaoperatorsdk/operator/MaxIntervalIT.java +++ b/operator-framework/src/test/java/io/javaoperatorsdk/operator/MaxIntervalIT.java @@ -10,6 +10,7 @@ import io.javaoperatorsdk.operator.sample.maxinterval.MaxIntervalTestCustomResource; import io.javaoperatorsdk.operator.sample.maxinterval.MaxIntervalTestReconciler; +import static org.assertj.core.api.Assertions.assertThat; import static org.awaitility.Awaitility.await; class MaxIntervalIT { @@ -27,9 +28,10 @@ void reconciliationTriggeredBasedOnMaxInterval() { await() .pollInterval(50, TimeUnit.MILLISECONDS) .atMost(500, TimeUnit.MILLISECONDS) - .until( - () -> ((MaxIntervalTestReconciler) operator.getFirstReconciler()) - .getNumberOfExecutions() > 3); + .untilAsserted( + () -> assertThat(operator.getReconcilerOfType(MaxIntervalTestReconciler.class) + .getNumberOfExecutions()) + .isGreaterThan(3)); } private MaxIntervalTestCustomResource createTestResource() { diff --git a/operator-framework/src/test/java/io/javaoperatorsdk/operator/sample/maxintervalafterretry/MaxIntervalAfterRetryTestCustomResource.java b/operator-framework/src/test/java/io/javaoperatorsdk/operator/sample/maxintervalafterretry/MaxIntervalAfterRetryTestCustomResource.java new file mode 100644 index 0000000000..cc914f6c48 --- /dev/null +++ b/operator-framework/src/test/java/io/javaoperatorsdk/operator/sample/maxintervalafterretry/MaxIntervalAfterRetryTestCustomResource.java @@ -0,0 +1,15 @@ +package io.javaoperatorsdk.operator.sample.maxintervalafterretry; + +import io.fabric8.kubernetes.api.model.Namespaced; +import io.fabric8.kubernetes.client.CustomResource; +import io.fabric8.kubernetes.model.annotation.Group; +import io.fabric8.kubernetes.model.annotation.ShortNames; +import io.fabric8.kubernetes.model.annotation.Version; + +@Group("sample.javaoperatorsdk") +@Version("v1") +@ShortNames("mir") +public class MaxIntervalAfterRetryTestCustomResource + extends CustomResource + implements Namespaced { +} diff --git a/operator-framework/src/test/java/io/javaoperatorsdk/operator/sample/maxintervalafterretry/MaxIntervalAfterRetryTestReconciler.java b/operator-framework/src/test/java/io/javaoperatorsdk/operator/sample/maxintervalafterretry/MaxIntervalAfterRetryTestReconciler.java new file mode 100644 index 0000000000..8473ee03ff --- /dev/null +++ b/operator-framework/src/test/java/io/javaoperatorsdk/operator/sample/maxintervalafterretry/MaxIntervalAfterRetryTestReconciler.java @@ -0,0 +1,36 @@ +package io.javaoperatorsdk.operator.sample.maxintervalafterretry; + +import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicInteger; + +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import io.javaoperatorsdk.operator.api.reconciler.*; +import io.javaoperatorsdk.operator.processing.retry.GradualRetry; +import io.javaoperatorsdk.operator.support.TestExecutionInfoProvider; + +@GradualRetry(maxAttempts = 1, initialInterval = 100) +@ControllerConfiguration(maxReconciliationInterval = @MaxReconciliationInterval(interval = 50, + timeUnit = TimeUnit.MILLISECONDS)) +public class MaxIntervalAfterRetryTestReconciler + implements Reconciler, TestExecutionInfoProvider { + + private Logger log = LoggerFactory.getLogger(MaxIntervalAfterRetryTestReconciler.class); + + private final AtomicInteger numberOfExecutions = new AtomicInteger(0); + + @Override + public UpdateControl reconcile( + MaxIntervalAfterRetryTestCustomResource resource, + Context context) { + numberOfExecutions.addAndGet(1); + log.info("Executed, number: {}", numberOfExecutions.get()); + throw new RuntimeException(); + } + + public int getNumberOfExecutions() { + return numberOfExecutions.get(); + } + +} From 090341dcd5cdd27af09dc4e95eecc1626c3468c5 Mon Sep 17 00:00:00 2001 From: Chris Laprun Date: Wed, 21 Sep 2022 15:50:50 +0200 Subject: [PATCH 5/5] refactor: remove controller name field --- .../operator/processing/Controller.java | 2 +- .../operator/processing/event/EventProcessor.java | 15 ++++++++------- 2 files changed, 9 insertions(+), 8 deletions(-) diff --git a/operator-framework-core/src/main/java/io/javaoperatorsdk/operator/processing/Controller.java b/operator-framework-core/src/main/java/io/javaoperatorsdk/operator/processing/Controller.java index 2ba5e32214..92b70e722d 100644 --- a/operator-framework-core/src/main/java/io/javaoperatorsdk/operator/processing/Controller.java +++ b/operator-framework-core/src/main/java/io/javaoperatorsdk/operator/processing/Controller.java @@ -78,7 +78,7 @@ public Controller(Reconciler

reconciler, managedWorkflow = ManagedWorkflow.workflowFor(kubernetesClient, configuration.getDependentResources()); eventSourceManager = new EventSourceManager<>(this); - eventProcessor = new EventProcessor<>(getConfiguration(), eventSourceManager); + eventProcessor = new EventProcessor<>(eventSourceManager); eventSourceManager.postProcessDefaultEventSourcesAfterProcessorInitializer(); } diff --git a/operator-framework-core/src/main/java/io/javaoperatorsdk/operator/processing/event/EventProcessor.java b/operator-framework-core/src/main/java/io/javaoperatorsdk/operator/processing/event/EventProcessor.java index b019292e52..58bf5ee00a 100644 --- a/operator-framework-core/src/main/java/io/javaoperatorsdk/operator/processing/event/EventProcessor.java +++ b/operator-framework-core/src/main/java/io/javaoperatorsdk/operator/processing/event/EventProcessor.java @@ -42,7 +42,6 @@ public class EventProcessor implements EventHandler, Life private final ReconciliationDispatcher reconciliationDispatcher; private final Retry retry; private final ExecutorService executor; - private final String controllerName; private final Metrics metrics; private final Cache cache; private final EventSourceManager eventSourceManager; @@ -51,10 +50,9 @@ public class EventProcessor implements EventHandler, Life private final Map metricsMetadata; - public EventProcessor(ControllerConfiguration controllerConfiguration, - EventSourceManager eventSourceManager) { + public EventProcessor(EventSourceManager eventSourceManager) { this( - controllerConfiguration, + eventSourceManager.getController().getConfiguration(), eventSourceManager.getControllerResourceEventSource(), ExecutorServiceManager.instance().executorService(), new ReconciliationDispatcher<>(eventSourceManager.getController()), @@ -92,7 +90,6 @@ private EventProcessor( ? new ScheduledThreadPoolExecutor( ConfigurationService.DEFAULT_RECONCILIATION_THREADS_NUMBER) : executor; - this.controllerName = controllerConfiguration.getName(); this.reconciliationDispatcher = reconciliationDispatcher; this.retry = controllerConfiguration.getRetry(); this.cache = cache; @@ -403,7 +400,7 @@ public void run() { final var name = thread.getName(); try { MDCUtils.addResourceInfo(executionScope.getResource()); - thread.setName("ReconcilerExecutor-" + controllerName + "-" + thread.getId()); + thread.setName("ReconcilerExecutor-" + controllerName() + "-" + thread.getId()); PostExecutionControl postExecutionControl = reconciliationDispatcher.handleExecution(executionScope); eventProcessingFinished(executionScope, postExecutionControl); @@ -416,10 +413,14 @@ public void run() { @Override public String toString() { - return controllerName + " -> " + executionScope; + return controllerName() + " -> " + executionScope; } } + private String controllerName() { + return controllerConfiguration.getName(); + } + public synchronized boolean isUnderProcessing(ResourceID resourceID) { return isControllerUnderExecution(resourceStateManager.getOrCreate(resourceID)); }