From c81badabcf06fae18573ce49bc6520388a0630de Mon Sep 17 00:00:00 2001 From: csviri Date: Wed, 25 Jan 2023 11:16:43 +0100 Subject: [PATCH 01/14] feat: graceful shutdown imporovements and API --- .../io/javaoperatorsdk/operator/Operator.java | 12 ++++-- .../api/config/ExecutorServiceManager.java | 40 ++++++++++--------- 2 files changed, 30 insertions(+), 22 deletions(-) diff --git a/operator-framework-core/src/main/java/io/javaoperatorsdk/operator/Operator.java b/operator-framework-core/src/main/java/io/javaoperatorsdk/operator/Operator.java index 613ea622b5..b96c639cb4 100644 --- a/operator-framework-core/src/main/java/io/javaoperatorsdk/operator/Operator.java +++ b/operator-framework-core/src/main/java/io/javaoperatorsdk/operator/Operator.java @@ -1,5 +1,6 @@ package io.javaoperatorsdk.operator; +import java.time.Duration; import java.util.HashSet; import java.util.Optional; import java.util.Set; @@ -81,7 +82,6 @@ public Operator(KubernetesClient kubernetesClient, ConfigurationService configur * * @deprecated This feature should not be used anymore */ - @Deprecated(forRemoval = true) public void installShutdownHook() { if (!leaderElectionManager.isLeaderElectionEnabled()) { Runtime.getRuntime().addShutdownHook(new Thread(this::stop)); @@ -126,14 +126,13 @@ public synchronized void start() { } } - @Override - public void stop() throws OperatorException { + public void stop(Duration gracefulShutdownTimeout) throws OperatorException { final var configurationService = ConfigurationServiceProvider.instance(); log.info( "Operator SDK {} is shutting down...", configurationService.getVersion().getSdkVersion()); controllerManager.stop(); - ExecutorServiceManager.stop(); + ExecutorServiceManager.stop(gracefulShutdownTimeout); leaderElectionManager.stop(); if (configurationService.closeClientOnStop()) { kubernetesClient.close(); @@ -142,6 +141,11 @@ public void stop() throws OperatorException { started = false; } + @Override + public void stop() throws OperatorException { + stop(Duration.ZERO); + } + /** * Add a registration requests for the specified reconciler with this operator. The effective * registration of the reconciler is delayed till the operator is started. diff --git a/operator-framework-core/src/main/java/io/javaoperatorsdk/operator/api/config/ExecutorServiceManager.java b/operator-framework-core/src/main/java/io/javaoperatorsdk/operator/api/config/ExecutorServiceManager.java index 3261651f3d..8a3e287ddf 100644 --- a/operator-framework-core/src/main/java/io/javaoperatorsdk/operator/api/config/ExecutorServiceManager.java +++ b/operator-framework-core/src/main/java/io/javaoperatorsdk/operator/api/config/ExecutorServiceManager.java @@ -1,5 +1,6 @@ package io.javaoperatorsdk.operator.api.config; +import java.time.Duration; import java.util.Collection; import java.util.List; import java.util.concurrent.Callable; @@ -24,14 +25,12 @@ public class ExecutorServiceManager { private final ExecutorService executor; private final ExecutorService workflowExecutor; private final ExecutorService cachingExecutorService; - private final int terminationTimeoutSeconds; - private ExecutorServiceManager(ExecutorService executor, ExecutorService workflowExecutor, - int terminationTimeoutSeconds) { + private ExecutorServiceManager(ExecutorService executor, ExecutorService workflowExecutor) { this.cachingExecutorService = Executors.newCachedThreadPool(); this.executor = new InstrumentedExecutorService(executor); this.workflowExecutor = new InstrumentedExecutorService(workflowExecutor); - this.terminationTimeoutSeconds = terminationTimeoutSeconds; + } public static void init() { @@ -39,8 +38,7 @@ public static void init() { final var configuration = ConfigurationServiceProvider.instance(); final var executorService = configuration.getExecutorService(); final var workflowExecutorService = configuration.getWorkflowExecutorService(); - instance = new ExecutorServiceManager(executorService, workflowExecutorService, - configuration.getTerminationTimeoutSeconds()); + instance = new ExecutorServiceManager(executorService, workflowExecutorService); log.debug( "Initialized ExecutorServiceManager executor: {}, workflow executor: {}, timeout: {}", executorService.getClass(), @@ -51,16 +49,16 @@ public static void init() { } } - public static synchronized void stop() { + public static synchronized void stop(Duration gracefulShutdownTimeout) { if (instance != null) { - instance.doStop(); + instance.doStop(gracefulShutdownTimeout); } // make sure that we remove the singleton so that the thread pool is re-created on next call to // start instance = null; } - public synchronized static ExecutorServiceManager instance() { + public static synchronized ExecutorServiceManager instance() { if (instance == null) { // provide a default configuration if none has been provided by init init(); @@ -128,23 +126,29 @@ public ExecutorService cachingExecutorService() { return cachingExecutorService; } - private void doStop() { + private void doStop(Duration gracefulShutdownTimeout) { try { + var parallelExec = Executors.newFixedThreadPool(3); log.debug("Closing executor"); - shutdown(executor); - shutdown(workflowExecutor); - shutdown(cachingExecutorService); + parallelExec.invokeAll(List.of(shutdown(executor, gracefulShutdownTimeout), + shutdown(workflowExecutor, gracefulShutdownTimeout),shutdown(cachingExecutorService,gracefulShutdownTimeout))); + parallelExec.shutdownNow(); } catch (InterruptedException e) { log.debug("Exception closing executor: {}", e.getLocalizedMessage()); Thread.currentThread().interrupt(); } } - private static void shutdown(ExecutorService executorService) throws InterruptedException { - executorService.shutdown(); - if (!executorService.awaitTermination(instance().terminationTimeoutSeconds, TimeUnit.SECONDS)) { - executorService.shutdownNow(); // if we timed out, waiting, cancel everything - } + private static Callable shutdown(ExecutorService executorService, + Duration gracefulShutdownTimeout) throws InterruptedException { + return () -> { + executorService.shutdown(); + if (!executorService.awaitTermination(gracefulShutdownTimeout.toMillis(), + TimeUnit.MILLISECONDS)) { + executorService.shutdownNow(); // if we timed out, waiting, cancel everything + } + return null; + }; } private static class InstrumentedExecutorService implements ExecutorService { From 62776058929351013a5a1ebf6110b6f41106d754 Mon Sep 17 00:00:00 2001 From: csviri Date: Wed, 25 Jan 2023 11:48:51 +0100 Subject: [PATCH 02/14] wip --- .../operator/processing/event/EventProcessor.java | 6 ++++++ 1 file changed, 6 insertions(+) diff --git a/operator-framework-core/src/main/java/io/javaoperatorsdk/operator/processing/event/EventProcessor.java b/operator-framework-core/src/main/java/io/javaoperatorsdk/operator/processing/event/EventProcessor.java index a75f51f981..1f054912db 100644 --- a/operator-framework-core/src/main/java/io/javaoperatorsdk/operator/processing/event/EventProcessor.java +++ b/operator-framework-core/src/main/java/io/javaoperatorsdk/operator/processing/event/EventProcessor.java @@ -388,6 +388,12 @@ private ReconcilerExecutor(ResourceID resourceID, ExecutionScope

executionSco @Override public void run() { + if (!running) { + // this is needed for the case when controller stopped, but there is a graceful shutdown + // timeout. that should finish the currently executing reconciliations but not the ones + // which where submitted but not started yet + log.debug("Event processor not running skipping resource processing: {}", resourceID); + } // change thread name for easier debugging final var thread = Thread.currentThread(); final var name = thread.getName(); From 81520a66a67c43e5ef0b56755f12759dc3a1d08b Mon Sep 17 00:00:00 2001 From: csviri Date: Wed, 25 Jan 2023 13:54:00 +0100 Subject: [PATCH 03/14] Operator API change --- .../io/javaoperatorsdk/operator/Operator.java | 16 +++++++++++++--- .../api/config/ConfigurationService.java | 1 + 2 files changed, 14 insertions(+), 3 deletions(-) diff --git a/operator-framework-core/src/main/java/io/javaoperatorsdk/operator/Operator.java b/operator-framework-core/src/main/java/io/javaoperatorsdk/operator/Operator.java index b96c639cb4..2ba2f4bab5 100644 --- a/operator-framework-core/src/main/java/io/javaoperatorsdk/operator/Operator.java +++ b/operator-framework-core/src/main/java/io/javaoperatorsdk/operator/Operator.java @@ -77,14 +77,24 @@ public Operator(KubernetesClient kubernetesClient, ConfigurationService configur .ifPresent(c -> leaderElectionManager.init(c, this.kubernetesClient)); } + /** + * Uses {@link ConfigurationService#getTerminationTimeoutSeconds()} for graceful shutdown timeout + */ + @Deprecated(forRemoval = true) + public void installShutdownHook() { + installShutdownHook( + Duration.ofSeconds(ConfigurationServiceProvider.instance().getTerminationTimeoutSeconds())); + } + /** * Adds a shutdown hook that automatically calls {@link #stop()} when the app shuts down. * - * @deprecated This feature should not be used anymore + * @param gracefulShutdownTimeout - timeout to wait for executor threads to complete actual + * reconciliations */ - public void installShutdownHook() { + public void installShutdownHook(Duration gracefulShutdownTimeout) { if (!leaderElectionManager.isLeaderElectionEnabled()) { - Runtime.getRuntime().addShutdownHook(new Thread(this::stop)); + Runtime.getRuntime().addShutdownHook(new Thread(() -> stop(gracefulShutdownTimeout))); } else { log.warn("Leader election is on, shutdown hook will not be installed."); } diff --git a/operator-framework-core/src/main/java/io/javaoperatorsdk/operator/api/config/ConfigurationService.java b/operator-framework-core/src/main/java/io/javaoperatorsdk/operator/api/config/ConfigurationService.java index e9249505ab..193f0a85b3 100644 --- a/operator-framework-core/src/main/java/io/javaoperatorsdk/operator/api/config/ConfigurationService.java +++ b/operator-framework-core/src/main/java/io/javaoperatorsdk/operator/api/config/ConfigurationService.java @@ -123,6 +123,7 @@ public HasMetadata clone(HasMetadata object) { * * @return the number of seconds to wait before terminating reconciliation threads */ + @Deprecated(forRemoval = true) default int getTerminationTimeoutSeconds() { return DEFAULT_TERMINATION_TIMEOUT_SECONDS; } From 148cb6219dd202bc09c636bfadc98b0d05fa8e51 Mon Sep 17 00:00:00 2001 From: csviri Date: Wed, 25 Jan 2023 14:21:32 +0100 Subject: [PATCH 04/14] fixes --- .../processing/event/EventProcessorTest.java | 12 ++++++++++++ 1 file changed, 12 insertions(+) diff --git a/operator-framework-core/src/test/java/io/javaoperatorsdk/operator/processing/event/EventProcessorTest.java b/operator-framework-core/src/test/java/io/javaoperatorsdk/operator/processing/event/EventProcessorTest.java index 583e7a7289..96f86ec8d0 100644 --- a/operator-framework-core/src/test/java/io/javaoperatorsdk/operator/processing/event/EventProcessorTest.java +++ b/operator-framework-core/src/test/java/io/javaoperatorsdk/operator/processing/event/EventProcessorTest.java @@ -417,6 +417,18 @@ void schedulesRetryForMarReconciliationIntervalIfRetryExhausted() { verify(retryTimerEventSourceMock, times(1)).scheduleOnce((ResourceID) any(), anyLong()); } + @Test + void executionOfReconciliationNotStartIfProcessorStopped() { + eventProcessor.handleEvent(prepareCREvent()); + // note that there could be race condition in this test, however it is very unlikely that it + // will happen the stop is called after submission (that could be theoretically executed before + // stop) + eventProcessor.stop(); + + verify(reconciliationDispatcherMock, timeout(SEPARATE_EXECUTION_TIMEOUT).times(0)) + .handleExecution(any()); + } + private ResourceID eventAlreadyUnderProcessing() { when(reconciliationDispatcherMock.handleExecution(any())) .then( From 7a728dcd03cef26f8c0d10d885035e6526583ceb Mon Sep 17 00:00:00 2001 From: csviri Date: Wed, 25 Jan 2023 15:59:08 +0100 Subject: [PATCH 05/14] IT --- .../io/javaoperatorsdk/operator/Operator.java | 10 +- .../operator/GracefulStopIT.java | 103 ++++++++++++++++++ .../GracefulStopTestCustomResource.java | 16 +++ .../GracefulStopTestCustomResourceSpec.java | 14 +++ .../GracefulStopTestCustomResourceStatus.java | 7 ++ .../GracefulStopTestReconciler.java | 34 ++++++ 6 files changed, 183 insertions(+), 1 deletion(-) create mode 100644 operator-framework/src/test/java/io/javaoperatorsdk/operator/GracefulStopIT.java create mode 100644 operator-framework/src/test/java/io/javaoperatorsdk/operator/sample/gracefulstop/GracefulStopTestCustomResource.java create mode 100644 operator-framework/src/test/java/io/javaoperatorsdk/operator/sample/gracefulstop/GracefulStopTestCustomResourceSpec.java create mode 100644 operator-framework/src/test/java/io/javaoperatorsdk/operator/sample/gracefulstop/GracefulStopTestCustomResourceStatus.java create mode 100644 operator-framework/src/test/java/io/javaoperatorsdk/operator/sample/gracefulstop/GracefulStopTestReconciler.java diff --git a/operator-framework-core/src/main/java/io/javaoperatorsdk/operator/Operator.java b/operator-framework-core/src/main/java/io/javaoperatorsdk/operator/Operator.java index 2ba2f4bab5..fa0e429281 100644 --- a/operator-framework-core/src/main/java/io/javaoperatorsdk/operator/Operator.java +++ b/operator-framework-core/src/main/java/io/javaoperatorsdk/operator/Operator.java @@ -87,7 +87,12 @@ public void installShutdownHook() { } /** - * Adds a shutdown hook that automatically calls {@link #stop()} when the app shuts down. + * Adds a shutdown hook that automatically calls {@link #stop()} when the app shuts down. Note + * that graceful shutdown is not always needed, just if you implementation of reconciler requires + * it. + * + * Note that you might want to tune "terminationGracePeriodSeconds" for the Pod running the + * controller. * * @param gracefulShutdownTimeout - timeout to wait for executor threads to complete actual * reconciliations @@ -137,6 +142,9 @@ public synchronized void start() { } public void stop(Duration gracefulShutdownTimeout) throws OperatorException { + if (!started) { + return; + } final var configurationService = ConfigurationServiceProvider.instance(); log.info( "Operator SDK {} is shutting down...", configurationService.getVersion().getSdkVersion()); diff --git a/operator-framework/src/test/java/io/javaoperatorsdk/operator/GracefulStopIT.java b/operator-framework/src/test/java/io/javaoperatorsdk/operator/GracefulStopIT.java new file mode 100644 index 0000000000..53fd3f168a --- /dev/null +++ b/operator-framework/src/test/java/io/javaoperatorsdk/operator/GracefulStopIT.java @@ -0,0 +1,103 @@ +package io.javaoperatorsdk.operator; + +import java.time.Duration; + +import org.junit.jupiter.api.Test; +import org.junit.jupiter.api.extension.RegisterExtension; + +import io.fabric8.kubernetes.api.model.ObjectMetaBuilder; +import io.javaoperatorsdk.operator.junit.LocallyRunOperatorExtension; +import io.javaoperatorsdk.operator.sample.gracefulstop.GracefulStopTestCustomResource; +import io.javaoperatorsdk.operator.sample.gracefulstop.GracefulStopTestCustomResourceSpec; +import io.javaoperatorsdk.operator.sample.gracefulstop.GracefulStopTestReconciler; + +import static io.javaoperatorsdk.operator.sample.gracefulstop.GracefulStopTestReconciler.RECONCILER_SLEEP; +import static org.assertj.core.api.Assertions.assertThat; +import static org.awaitility.Awaitility.await; + +public class GracefulStopIT { + + public static final String TEST_1 = "test1"; + public static final String TEST_2 = "test2"; + + @RegisterExtension + LocallyRunOperatorExtension operator = + LocallyRunOperatorExtension.builder() + .withConfigurationService(o -> o.withCloseClientOnStop(false)) + .withReconciler(new GracefulStopTestReconciler()) + .build(); + + @Test + void stopsGracefullyWIthTimeout() { + var testRes = operator.create(testResource1()); + await().untilAsserted(() -> { + var r = operator.get(GracefulStopTestCustomResource.class, TEST_1); + assertThat(r.getStatus()).isNotNull(); + assertThat(r.getStatus().getObservedGeneration()).isEqualTo(1); + assertThat(operator.getReconcilerOfType(GracefulStopTestReconciler.class) + .getNumberOfExecutions()).isEqualTo(1); + }); + + testRes.getSpec().setValue(2); + operator.replace(testRes); + + await().pollDelay(Duration.ofMillis(50)).untilAsserted( + () -> assertThat(operator.getReconcilerOfType(GracefulStopTestReconciler.class) + .getNumberOfExecutions()).isEqualTo(2)); + + operator.getOperator().stop(Duration.ofMillis(RECONCILER_SLEEP)); + + await().untilAsserted(() -> { + var r = operator.get(GracefulStopTestCustomResource.class, TEST_1); + assertThat(r.getStatus()).isNotNull(); + assertThat(r.getStatus().getObservedGeneration()).isEqualTo(2); + }); + } + + @Test + void stopsGracefullyWithExpiredTimeout() { + var testRes = operator.create(testResource2()); + await().untilAsserted(() -> { + var r = operator.get(GracefulStopTestCustomResource.class, TEST_2); + assertThat(r.getStatus()).isNotNull(); + assertThat(r.getStatus().getObservedGeneration()).isEqualTo(1); + }); + + testRes.getSpec().setValue(2); + operator.replace(testRes); + + await().pollDelay(Duration.ofMillis(50)).untilAsserted( + () -> assertThat(operator.getReconcilerOfType(GracefulStopTestReconciler.class) + .getNumberOfExecutions()).isEqualTo(2)); + + operator.getOperator().stop(Duration.ofMillis(RECONCILER_SLEEP / 5)); + + await().pollDelay(Duration.ofMillis(RECONCILER_SLEEP)).untilAsserted(() -> { + var r = operator.get(GracefulStopTestCustomResource.class, TEST_2); + assertThat(r.getStatus()).isNotNull(); + assertThat(r.getStatus().getObservedGeneration()).isEqualTo(1); + }); + } + + public GracefulStopTestCustomResource testResource1() { + return testResource(TEST_1); + } + + public GracefulStopTestCustomResource testResource2() { + return testResource(TEST_2); + } + + public GracefulStopTestCustomResource testResource(String name) { + GracefulStopTestCustomResource resource = + new GracefulStopTestCustomResource(); + resource.setMetadata( + new ObjectMetaBuilder() + .withName(name) + .withNamespace(operator.getNamespace()) + .build()); + resource.setSpec(new GracefulStopTestCustomResourceSpec()); + resource.getSpec().setValue(1); + return resource; + } + +} diff --git a/operator-framework/src/test/java/io/javaoperatorsdk/operator/sample/gracefulstop/GracefulStopTestCustomResource.java b/operator-framework/src/test/java/io/javaoperatorsdk/operator/sample/gracefulstop/GracefulStopTestCustomResource.java new file mode 100644 index 0000000000..529c5ff480 --- /dev/null +++ b/operator-framework/src/test/java/io/javaoperatorsdk/operator/sample/gracefulstop/GracefulStopTestCustomResource.java @@ -0,0 +1,16 @@ +package io.javaoperatorsdk.operator.sample.gracefulstop; + +import io.fabric8.kubernetes.api.model.Namespaced; +import io.fabric8.kubernetes.client.CustomResource; +import io.fabric8.kubernetes.model.annotation.Group; +import io.fabric8.kubernetes.model.annotation.ShortNames; +import io.fabric8.kubernetes.model.annotation.Version; + +@Group("sample.javaoperatorsdk") +@Version("v1") +@ShortNames("gst") +public class GracefulStopTestCustomResource + extends CustomResource + implements Namespaced { + +} diff --git a/operator-framework/src/test/java/io/javaoperatorsdk/operator/sample/gracefulstop/GracefulStopTestCustomResourceSpec.java b/operator-framework/src/test/java/io/javaoperatorsdk/operator/sample/gracefulstop/GracefulStopTestCustomResourceSpec.java new file mode 100644 index 0000000000..4d1f45e646 --- /dev/null +++ b/operator-framework/src/test/java/io/javaoperatorsdk/operator/sample/gracefulstop/GracefulStopTestCustomResourceSpec.java @@ -0,0 +1,14 @@ +package io.javaoperatorsdk.operator.sample.gracefulstop; + +public class GracefulStopTestCustomResourceSpec { + + private int value; + + public int getValue() { + return value; + } + + public void setValue(int value) { + this.value = value; + } +} diff --git a/operator-framework/src/test/java/io/javaoperatorsdk/operator/sample/gracefulstop/GracefulStopTestCustomResourceStatus.java b/operator-framework/src/test/java/io/javaoperatorsdk/operator/sample/gracefulstop/GracefulStopTestCustomResourceStatus.java new file mode 100644 index 0000000000..f59f5b1163 --- /dev/null +++ b/operator-framework/src/test/java/io/javaoperatorsdk/operator/sample/gracefulstop/GracefulStopTestCustomResourceStatus.java @@ -0,0 +1,7 @@ +package io.javaoperatorsdk.operator.sample.gracefulstop; + +import io.javaoperatorsdk.operator.api.ObservedGenerationAwareStatus; + +public class GracefulStopTestCustomResourceStatus extends ObservedGenerationAwareStatus { + +} diff --git a/operator-framework/src/test/java/io/javaoperatorsdk/operator/sample/gracefulstop/GracefulStopTestReconciler.java b/operator-framework/src/test/java/io/javaoperatorsdk/operator/sample/gracefulstop/GracefulStopTestReconciler.java new file mode 100644 index 0000000000..cf266c0b48 --- /dev/null +++ b/operator-framework/src/test/java/io/javaoperatorsdk/operator/sample/gracefulstop/GracefulStopTestReconciler.java @@ -0,0 +1,34 @@ +package io.javaoperatorsdk.operator.sample.gracefulstop; + +import java.util.concurrent.atomic.AtomicInteger; + +import io.javaoperatorsdk.operator.api.reconciler.Context; +import io.javaoperatorsdk.operator.api.reconciler.ControllerConfiguration; +import io.javaoperatorsdk.operator.api.reconciler.Reconciler; +import io.javaoperatorsdk.operator.api.reconciler.UpdateControl; + +@ControllerConfiguration +public class GracefulStopTestReconciler + implements Reconciler { + + public static final int RECONCILER_SLEEP = 1000; + + private final AtomicInteger numberOfExecutions = new AtomicInteger(0); + + @Override + public UpdateControl reconcile( + GracefulStopTestCustomResource resource, + Context context) throws InterruptedException { + + numberOfExecutions.addAndGet(1); + resource.setStatus(new GracefulStopTestCustomResourceStatus()); + Thread.sleep(RECONCILER_SLEEP); + + return UpdateControl.patchStatus(resource); + } + + public int getNumberOfExecutions() { + return numberOfExecutions.get(); + } + +} From b06946a3ac791e33e469f873e914e253e3b7ea95 Mon Sep 17 00:00:00 2001 From: csviri Date: Wed, 25 Jan 2023 16:31:23 +0100 Subject: [PATCH 06/14] refactor --- .../operator/GracefulStopIT.java | 50 +++++-------------- 1 file changed, 13 insertions(+), 37 deletions(-) diff --git a/operator-framework/src/test/java/io/javaoperatorsdk/operator/GracefulStopIT.java b/operator-framework/src/test/java/io/javaoperatorsdk/operator/GracefulStopIT.java index 53fd3f168a..715921ecbb 100644 --- a/operator-framework/src/test/java/io/javaoperatorsdk/operator/GracefulStopIT.java +++ b/operator-framework/src/test/java/io/javaoperatorsdk/operator/GracefulStopIT.java @@ -29,38 +29,22 @@ public class GracefulStopIT { @Test void stopsGracefullyWIthTimeout() { - var testRes = operator.create(testResource1()); - await().untilAsserted(() -> { - var r = operator.get(GracefulStopTestCustomResource.class, TEST_1); - assertThat(r.getStatus()).isNotNull(); - assertThat(r.getStatus().getObservedGeneration()).isEqualTo(1); - assertThat(operator.getReconcilerOfType(GracefulStopTestReconciler.class) - .getNumberOfExecutions()).isEqualTo(1); - }); - - testRes.getSpec().setValue(2); - operator.replace(testRes); - - await().pollDelay(Duration.ofMillis(50)).untilAsserted( - () -> assertThat(operator.getReconcilerOfType(GracefulStopTestReconciler.class) - .getNumberOfExecutions()).isEqualTo(2)); - - operator.getOperator().stop(Duration.ofMillis(RECONCILER_SLEEP)); - - await().untilAsserted(() -> { - var r = operator.get(GracefulStopTestCustomResource.class, TEST_1); - assertThat(r.getStatus()).isNotNull(); - assertThat(r.getStatus().getObservedGeneration()).isEqualTo(2); - }); + testGracefulStop(TEST_1, RECONCILER_SLEEP, 2); } @Test void stopsGracefullyWithExpiredTimeout() { - var testRes = operator.create(testResource2()); + testGracefulStop(TEST_2, RECONCILER_SLEEP / 5, 1); + } + + private void testGracefulStop(String resourceName, int stopTimeout, int expectedFinalGeneration) { + var testRes = operator.create(testResource(resourceName)); await().untilAsserted(() -> { - var r = operator.get(GracefulStopTestCustomResource.class, TEST_2); + var r = operator.get(GracefulStopTestCustomResource.class, resourceName); assertThat(r.getStatus()).isNotNull(); assertThat(r.getStatus().getObservedGeneration()).isEqualTo(1); + assertThat(operator.getReconcilerOfType(GracefulStopTestReconciler.class) + .getNumberOfExecutions()).isEqualTo(1); }); testRes.getSpec().setValue(2); @@ -70,23 +54,15 @@ void stopsGracefullyWithExpiredTimeout() { () -> assertThat(operator.getReconcilerOfType(GracefulStopTestReconciler.class) .getNumberOfExecutions()).isEqualTo(2)); - operator.getOperator().stop(Duration.ofMillis(RECONCILER_SLEEP / 5)); + operator.getOperator().stop(Duration.ofMillis(stopTimeout)); - await().pollDelay(Duration.ofMillis(RECONCILER_SLEEP)).untilAsserted(() -> { - var r = operator.get(GracefulStopTestCustomResource.class, TEST_2); + await().untilAsserted(() -> { + var r = operator.get(GracefulStopTestCustomResource.class, resourceName); assertThat(r.getStatus()).isNotNull(); - assertThat(r.getStatus().getObservedGeneration()).isEqualTo(1); + assertThat(r.getStatus().getObservedGeneration()).isEqualTo(expectedFinalGeneration); }); } - public GracefulStopTestCustomResource testResource1() { - return testResource(TEST_1); - } - - public GracefulStopTestCustomResource testResource2() { - return testResource(TEST_2); - } - public GracefulStopTestCustomResource testResource(String name) { GracefulStopTestCustomResource resource = new GracefulStopTestCustomResource(); From 48fc7e22cbdc62f58fe15ba043d41dd09769eb13 Mon Sep 17 00:00:00 2001 From: csviri Date: Wed, 25 Jan 2023 16:40:51 +0100 Subject: [PATCH 07/14] sonar minor issues fix --- .../operator/api/config/ExecutorServiceManager.java | 2 +- .../io/javaoperatorsdk/operator/api/config/VersionTest.java | 2 +- 2 files changed, 2 insertions(+), 2 deletions(-) diff --git a/operator-framework-core/src/main/java/io/javaoperatorsdk/operator/api/config/ExecutorServiceManager.java b/operator-framework-core/src/main/java/io/javaoperatorsdk/operator/api/config/ExecutorServiceManager.java index 8a3e287ddf..14409203b3 100644 --- a/operator-framework-core/src/main/java/io/javaoperatorsdk/operator/api/config/ExecutorServiceManager.java +++ b/operator-framework-core/src/main/java/io/javaoperatorsdk/operator/api/config/ExecutorServiceManager.java @@ -140,7 +140,7 @@ private void doStop(Duration gracefulShutdownTimeout) { } private static Callable shutdown(ExecutorService executorService, - Duration gracefulShutdownTimeout) throws InterruptedException { + Duration gracefulShutdownTimeout) { return () -> { executorService.shutdown(); if (!executorService.awaitTermination(gracefulShutdownTimeout.toMillis(), diff --git a/operator-framework-core/src/test/java/io/javaoperatorsdk/operator/api/config/VersionTest.java b/operator-framework-core/src/test/java/io/javaoperatorsdk/operator/api/config/VersionTest.java index 2a6b8002e3..5df0738022 100644 --- a/operator-framework-core/src/test/java/io/javaoperatorsdk/operator/api/config/VersionTest.java +++ b/operator-framework-core/src/test/java/io/javaoperatorsdk/operator/api/config/VersionTest.java @@ -4,7 +4,7 @@ import static org.junit.Assert.assertEquals; -public class VersionTest { +class VersionTest { @Test void versionShouldReturnTheSameResultFromMavenAndProperties() { From 12e800cc6a8f72ce384ed3f3b976f954adb24298 Mon Sep 17 00:00:00 2001 From: csviri Date: Wed, 25 Jan 2023 16:43:18 +0100 Subject: [PATCH 08/14] fix --- .../operator/processing/event/EventProcessor.java | 1 + 1 file changed, 1 insertion(+) diff --git a/operator-framework-core/src/main/java/io/javaoperatorsdk/operator/processing/event/EventProcessor.java b/operator-framework-core/src/main/java/io/javaoperatorsdk/operator/processing/event/EventProcessor.java index 1f054912db..31b5c136f8 100644 --- a/operator-framework-core/src/main/java/io/javaoperatorsdk/operator/processing/event/EventProcessor.java +++ b/operator-framework-core/src/main/java/io/javaoperatorsdk/operator/processing/event/EventProcessor.java @@ -393,6 +393,7 @@ public void run() { // timeout. that should finish the currently executing reconciliations but not the ones // which where submitted but not started yet log.debug("Event processor not running skipping resource processing: {}", resourceID); + return; } // change thread name for easier debugging final var thread = Thread.currentThread(); From 4812c8dd8aef056c3f409662d196b7ea16036d32 Mon Sep 17 00:00:00 2001 From: csviri Date: Wed, 25 Jan 2023 16:46:05 +0100 Subject: [PATCH 09/14] fixes from code review --- .../operator/api/config/ConfigurationService.java | 3 +++ 1 file changed, 3 insertions(+) diff --git a/operator-framework-core/src/main/java/io/javaoperatorsdk/operator/api/config/ConfigurationService.java b/operator-framework-core/src/main/java/io/javaoperatorsdk/operator/api/config/ConfigurationService.java index 193f0a85b3..680feb9e21 100644 --- a/operator-framework-core/src/main/java/io/javaoperatorsdk/operator/api/config/ConfigurationService.java +++ b/operator-framework-core/src/main/java/io/javaoperatorsdk/operator/api/config/ConfigurationService.java @@ -121,6 +121,9 @@ public HasMetadata clone(HasMetadata object) { * Retrieves the number of seconds the SDK waits for reconciliation threads to terminate before * shutting down. * + * @deprecated use {@link io.javaoperatorsdk.operator.Operator#stop(Duration)} instead. Where the + * parameter can be passed regarding graceful timeout + * * @return the number of seconds to wait before terminating reconciliation threads */ @Deprecated(forRemoval = true) From 1a4c3459a398db310fd0ca1fc4092505e235cb7b Mon Sep 17 00:00:00 2001 From: csviri Date: Wed, 25 Jan 2023 16:50:28 +0100 Subject: [PATCH 10/14] docs --- .../src/main/java/io/javaoperatorsdk/operator/Operator.java | 3 +++ .../operator/api/config/ConfigurationService.java | 2 +- 2 files changed, 4 insertions(+), 1 deletion(-) diff --git a/operator-framework-core/src/main/java/io/javaoperatorsdk/operator/Operator.java b/operator-framework-core/src/main/java/io/javaoperatorsdk/operator/Operator.java index fa0e429281..4ec0c579e0 100644 --- a/operator-framework-core/src/main/java/io/javaoperatorsdk/operator/Operator.java +++ b/operator-framework-core/src/main/java/io/javaoperatorsdk/operator/Operator.java @@ -79,6 +79,9 @@ public Operator(KubernetesClient kubernetesClient, ConfigurationService configur /** * Uses {@link ConfigurationService#getTerminationTimeoutSeconds()} for graceful shutdown timeout + * + * @deprecated use the overloaded version with graceful shutdown timeout parameter. + * */ @Deprecated(forRemoval = true) public void installShutdownHook() { diff --git a/operator-framework-core/src/main/java/io/javaoperatorsdk/operator/api/config/ConfigurationService.java b/operator-framework-core/src/main/java/io/javaoperatorsdk/operator/api/config/ConfigurationService.java index 680feb9e21..e25bb29f82 100644 --- a/operator-framework-core/src/main/java/io/javaoperatorsdk/operator/api/config/ConfigurationService.java +++ b/operator-framework-core/src/main/java/io/javaoperatorsdk/operator/api/config/ConfigurationService.java @@ -122,7 +122,7 @@ public HasMetadata clone(HasMetadata object) { * shutting down. * * @deprecated use {@link io.javaoperatorsdk.operator.Operator#stop(Duration)} instead. Where the - * parameter can be passed regarding graceful timeout + * parameter can be passed to specify graceful timeout. * * @return the number of seconds to wait before terminating reconciliation threads */ From d9e0cb53bd1a1c01195d058a3561a17ac96405e1 Mon Sep 17 00:00:00 2001 From: csviri Date: Thu, 26 Jan 2023 10:50:09 +0100 Subject: [PATCH 11/14] fixed unit test --- .../api/config/ExecutorServiceManager.java | 9 ++++- .../processing/event/EventProcessorTest.java | 36 ++++++++++--------- .../event/ReconciliationDispatcherTest.java | 1 + 3 files changed, 29 insertions(+), 17 deletions(-) diff --git a/operator-framework-core/src/main/java/io/javaoperatorsdk/operator/api/config/ExecutorServiceManager.java b/operator-framework-core/src/main/java/io/javaoperatorsdk/operator/api/config/ExecutorServiceManager.java index 14409203b3..82d77f3556 100644 --- a/operator-framework-core/src/main/java/io/javaoperatorsdk/operator/api/config/ExecutorServiceManager.java +++ b/operator-framework-core/src/main/java/io/javaoperatorsdk/operator/api/config/ExecutorServiceManager.java @@ -33,7 +33,7 @@ private ExecutorServiceManager(ExecutorService executor, ExecutorService workflo } - public static void init() { + public static synchronized void init() { if (instance == null) { final var configuration = ConfigurationServiceProvider.instance(); final var executorService = configuration.getExecutorService(); @@ -49,6 +49,13 @@ public static void init() { } } + /** For testing purposes only */ + public static synchronized void reset() { + instance().doStop(Duration.ZERO); + instance = null; + init(); + } + public static synchronized void stop(Duration gracefulShutdownTimeout) { if (instance != null) { instance.doStop(gracefulShutdownTimeout); diff --git a/operator-framework-core/src/test/java/io/javaoperatorsdk/operator/processing/event/EventProcessorTest.java b/operator-framework-core/src/test/java/io/javaoperatorsdk/operator/processing/event/EventProcessorTest.java index 96f86ec8d0..c625f31380 100644 --- a/operator-framework-core/src/test/java/io/javaoperatorsdk/operator/processing/event/EventProcessorTest.java +++ b/operator-framework-core/src/test/java/io/javaoperatorsdk/operator/processing/event/EventProcessorTest.java @@ -15,7 +15,9 @@ import org.slf4j.LoggerFactory; import io.fabric8.kubernetes.api.model.HasMetadata; +import io.javaoperatorsdk.operator.api.config.ConfigurationServiceProvider; import io.javaoperatorsdk.operator.api.config.ControllerConfiguration; +import io.javaoperatorsdk.operator.api.config.ExecutorServiceManager; import io.javaoperatorsdk.operator.api.config.RetryConfiguration; import io.javaoperatorsdk.operator.api.monitoring.Metrics; import io.javaoperatorsdk.operator.processing.event.rate.LinearRateLimiter; @@ -36,17 +38,7 @@ import static org.awaitility.Awaitility.await; import static org.mockito.ArgumentMatchers.eq; import static org.mockito.ArgumentMatchers.isNull; -import static org.mockito.Mockito.after; -import static org.mockito.Mockito.any; -import static org.mockito.Mockito.anyLong; -import static org.mockito.Mockito.doAnswer; -import static org.mockito.Mockito.mock; -import static org.mockito.Mockito.never; -import static org.mockito.Mockito.spy; -import static org.mockito.Mockito.timeout; -import static org.mockito.Mockito.times; -import static org.mockito.Mockito.verify; -import static org.mockito.Mockito.when; +import static org.mockito.Mockito.*; @SuppressWarnings({"rawtypes", "unchecked"}) class EventProcessorTest { @@ -56,6 +48,8 @@ class EventProcessorTest { public static final int FAKE_CONTROLLER_EXECUTION_DURATION = 250; public static final int SEPARATE_EXECUTION_TIMEOUT = 450; public static final String TEST_NAMESPACE = "default-event-handler-test"; + public static final int TIME_TO_WAIT_AFTER_SUBMISSION_BEFORE_EXECUTION = 150; + public static final int DISPATCHING_DELAY = 250; private final ReconciliationDispatcher reconciliationDispatcherMock = mock(ReconciliationDispatcher.class); @@ -418,14 +412,24 @@ void schedulesRetryForMarReconciliationIntervalIfRetryExhausted() { } @Test - void executionOfReconciliationNotStartIfProcessorStopped() { + void executionOfReconciliationNotStartIfProcessorStopped() throws InterruptedException { + when(reconciliationDispatcherMock.handleExecution(any())) + .then((Answer) invocationOnMock -> { + Thread.sleep(DISPATCHING_DELAY); + return PostExecutionControl.defaultDispatch(); + }); + // one event will lock the thread / executor + ConfigurationServiceProvider.overrideCurrent(o -> o.withConcurrentReconciliationThreads(1)); + ExecutorServiceManager.reset(); + eventProcessor.start(); + + eventProcessor.handleEvent(prepareCREvent()); eventProcessor.handleEvent(prepareCREvent()); - // note that there could be race condition in this test, however it is very unlikely that it - // will happen the stop is called after submission (that could be theoretically executed before - // stop) eventProcessor.stop(); - verify(reconciliationDispatcherMock, timeout(SEPARATE_EXECUTION_TIMEOUT).times(0)) + // wait until both event should be handled + Thread.sleep(TIME_TO_WAIT_AFTER_SUBMISSION_BEFORE_EXECUTION + 2 * DISPATCHING_DELAY); + verify(reconciliationDispatcherMock, atMostOnce()) .handleExecution(any()); } diff --git a/operator-framework-core/src/test/java/io/javaoperatorsdk/operator/processing/event/ReconciliationDispatcherTest.java b/operator-framework-core/src/test/java/io/javaoperatorsdk/operator/processing/event/ReconciliationDispatcherTest.java index c154e8a910..e7f9331d5c 100644 --- a/operator-framework-core/src/test/java/io/javaoperatorsdk/operator/processing/event/ReconciliationDispatcherTest.java +++ b/operator-framework-core/src/test/java/io/javaoperatorsdk/operator/processing/event/ReconciliationDispatcherTest.java @@ -75,6 +75,7 @@ static void classSetup() { * equals will fail on the two equal but NOT identical TestCustomResources because equals is not * implemented on TestCustomResourceSpec or TestCustomResourceStatus */ + ConfigurationServiceProvider.reset(); ConfigurationServiceProvider.overrideCurrent(overrider -> overrider .checkingCRDAndValidateLocalModel(false).withResourceCloner(new Cloner() { @Override From b6d3259d96c12607bc25bffc00aeb0947e7b8eaa Mon Sep 17 00:00:00 2001 From: Chris Laprun Date: Mon, 30 Jan 2023 15:13:48 +0100 Subject: [PATCH 12/14] docs: improve wording [skip ci] --- .../main/java/io/javaoperatorsdk/operator/Operator.java | 8 ++++---- 1 file changed, 4 insertions(+), 4 deletions(-) diff --git a/operator-framework-core/src/main/java/io/javaoperatorsdk/operator/Operator.java b/operator-framework-core/src/main/java/io/javaoperatorsdk/operator/Operator.java index 4ec0c579e0..d18b340f36 100644 --- a/operator-framework-core/src/main/java/io/javaoperatorsdk/operator/Operator.java +++ b/operator-framework-core/src/main/java/io/javaoperatorsdk/operator/Operator.java @@ -91,13 +91,13 @@ public void installShutdownHook() { /** * Adds a shutdown hook that automatically calls {@link #stop()} when the app shuts down. Note - * that graceful shutdown is not always needed, just if you implementation of reconciler requires - * it. - * + * that graceful shutdown is usually not needed, but your {@link Reconciler} implementations might + * require it. + *

* Note that you might want to tune "terminationGracePeriodSeconds" for the Pod running the * controller. * - * @param gracefulShutdownTimeout - timeout to wait for executor threads to complete actual + * @param gracefulShutdownTimeout timeout to wait for executor threads to complete actual * reconciliations */ public void installShutdownHook(Duration gracefulShutdownTimeout) { From 0be2471df45b8784caf9eb52feb3d23ce342d49d Mon Sep 17 00:00:00 2001 From: Chris Laprun Date: Mon, 30 Jan 2023 15:14:16 +0100 Subject: [PATCH 13/14] chore: rename test more accurately [skip ci] --- .../processing/event/EventProcessorTest.java | 15 +++++++++++++-- 1 file changed, 13 insertions(+), 2 deletions(-) diff --git a/operator-framework-core/src/test/java/io/javaoperatorsdk/operator/processing/event/EventProcessorTest.java b/operator-framework-core/src/test/java/io/javaoperatorsdk/operator/processing/event/EventProcessorTest.java index c625f31380..c3880982e6 100644 --- a/operator-framework-core/src/test/java/io/javaoperatorsdk/operator/processing/event/EventProcessorTest.java +++ b/operator-framework-core/src/test/java/io/javaoperatorsdk/operator/processing/event/EventProcessorTest.java @@ -38,7 +38,18 @@ import static org.awaitility.Awaitility.await; import static org.mockito.ArgumentMatchers.eq; import static org.mockito.ArgumentMatchers.isNull; -import static org.mockito.Mockito.*; +import static org.mockito.Mockito.after; +import static org.mockito.Mockito.any; +import static org.mockito.Mockito.anyLong; +import static org.mockito.Mockito.atMostOnce; +import static org.mockito.Mockito.doAnswer; +import static org.mockito.Mockito.mock; +import static org.mockito.Mockito.never; +import static org.mockito.Mockito.spy; +import static org.mockito.Mockito.timeout; +import static org.mockito.Mockito.times; +import static org.mockito.Mockito.verify; +import static org.mockito.Mockito.when; @SuppressWarnings({"rawtypes", "unchecked"}) class EventProcessorTest { @@ -412,7 +423,7 @@ void schedulesRetryForMarReconciliationIntervalIfRetryExhausted() { } @Test - void executionOfReconciliationNotStartIfProcessorStopped() throws InterruptedException { + void executionOfReconciliationShouldNotStartIfProcessorStopped() throws InterruptedException { when(reconciliationDispatcherMock.handleExecution(any())) .then((Answer) invocationOnMock -> { Thread.sleep(DISPATCHING_DELAY); From 430b1afd8ac6622199927aea80d9b6ee0c9e58cb Mon Sep 17 00:00:00 2001 From: csviri Date: Tue, 7 Feb 2023 10:08:17 +0100 Subject: [PATCH 14/14] fixes on rebase from next --- .../operator/api/config/ExecutorServiceManager.java | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/operator-framework-core/src/main/java/io/javaoperatorsdk/operator/api/config/ExecutorServiceManager.java b/operator-framework-core/src/main/java/io/javaoperatorsdk/operator/api/config/ExecutorServiceManager.java index 82d77f3556..54e4586aa0 100644 --- a/operator-framework-core/src/main/java/io/javaoperatorsdk/operator/api/config/ExecutorServiceManager.java +++ b/operator-framework-core/src/main/java/io/javaoperatorsdk/operator/api/config/ExecutorServiceManager.java @@ -138,7 +138,8 @@ private void doStop(Duration gracefulShutdownTimeout) { var parallelExec = Executors.newFixedThreadPool(3); log.debug("Closing executor"); parallelExec.invokeAll(List.of(shutdown(executor, gracefulShutdownTimeout), - shutdown(workflowExecutor, gracefulShutdownTimeout),shutdown(cachingExecutorService,gracefulShutdownTimeout))); + shutdown(workflowExecutor, gracefulShutdownTimeout), + shutdown(cachingExecutorService, gracefulShutdownTimeout))); parallelExec.shutdownNow(); } catch (InterruptedException e) { log.debug("Exception closing executor: {}", e.getLocalizedMessage());