diff --git a/operator-framework-core/src/main/java/io/javaoperatorsdk/operator/Operator.java b/operator-framework-core/src/main/java/io/javaoperatorsdk/operator/Operator.java index 613ea622b5..d18b340f36 100644 --- a/operator-framework-core/src/main/java/io/javaoperatorsdk/operator/Operator.java +++ b/operator-framework-core/src/main/java/io/javaoperatorsdk/operator/Operator.java @@ -1,5 +1,6 @@ package io.javaoperatorsdk.operator; +import java.time.Duration; import java.util.HashSet; import java.util.Optional; import java.util.Set; @@ -77,14 +78,31 @@ public Operator(KubernetesClient kubernetesClient, ConfigurationService configur } /** - * Adds a shutdown hook that automatically calls {@link #stop()} when the app shuts down. + * Uses {@link ConfigurationService#getTerminationTimeoutSeconds()} for graceful shutdown timeout + * + * @deprecated use the overloaded version with graceful shutdown timeout parameter. * - * @deprecated This feature should not be used anymore */ @Deprecated(forRemoval = true) public void installShutdownHook() { + installShutdownHook( + Duration.ofSeconds(ConfigurationServiceProvider.instance().getTerminationTimeoutSeconds())); + } + + /** + * Adds a shutdown hook that automatically calls {@link #stop()} when the app shuts down. Note + * that graceful shutdown is usually not needed, but your {@link Reconciler} implementations might + * require it. + *

+ * Note that you might want to tune "terminationGracePeriodSeconds" for the Pod running the + * controller. + * + * @param gracefulShutdownTimeout timeout to wait for executor threads to complete actual + * reconciliations + */ + public void installShutdownHook(Duration gracefulShutdownTimeout) { if (!leaderElectionManager.isLeaderElectionEnabled()) { - Runtime.getRuntime().addShutdownHook(new Thread(this::stop)); + Runtime.getRuntime().addShutdownHook(new Thread(() -> stop(gracefulShutdownTimeout))); } else { log.warn("Leader election is on, shutdown hook will not be installed."); } @@ -126,14 +144,16 @@ public synchronized void start() { } } - @Override - public void stop() throws OperatorException { + public void stop(Duration gracefulShutdownTimeout) throws OperatorException { + if (!started) { + return; + } final var configurationService = ConfigurationServiceProvider.instance(); log.info( "Operator SDK {} is shutting down...", configurationService.getVersion().getSdkVersion()); controllerManager.stop(); - ExecutorServiceManager.stop(); + ExecutorServiceManager.stop(gracefulShutdownTimeout); leaderElectionManager.stop(); if (configurationService.closeClientOnStop()) { kubernetesClient.close(); @@ -142,6 +162,11 @@ public void stop() throws OperatorException { started = false; } + @Override + public void stop() throws OperatorException { + stop(Duration.ZERO); + } + /** * Add a registration requests for the specified reconciler with this operator. The effective * registration of the reconciler is delayed till the operator is started. diff --git a/operator-framework-core/src/main/java/io/javaoperatorsdk/operator/api/config/ConfigurationService.java b/operator-framework-core/src/main/java/io/javaoperatorsdk/operator/api/config/ConfigurationService.java index e9249505ab..e25bb29f82 100644 --- a/operator-framework-core/src/main/java/io/javaoperatorsdk/operator/api/config/ConfigurationService.java +++ b/operator-framework-core/src/main/java/io/javaoperatorsdk/operator/api/config/ConfigurationService.java @@ -121,8 +121,12 @@ public HasMetadata clone(HasMetadata object) { * Retrieves the number of seconds the SDK waits for reconciliation threads to terminate before * shutting down. * + * @deprecated use {@link io.javaoperatorsdk.operator.Operator#stop(Duration)} instead. Where the + * parameter can be passed to specify graceful timeout. + * * @return the number of seconds to wait before terminating reconciliation threads */ + @Deprecated(forRemoval = true) default int getTerminationTimeoutSeconds() { return DEFAULT_TERMINATION_TIMEOUT_SECONDS; } diff --git a/operator-framework-core/src/main/java/io/javaoperatorsdk/operator/api/config/ExecutorServiceManager.java b/operator-framework-core/src/main/java/io/javaoperatorsdk/operator/api/config/ExecutorServiceManager.java index 3261651f3d..54e4586aa0 100644 --- a/operator-framework-core/src/main/java/io/javaoperatorsdk/operator/api/config/ExecutorServiceManager.java +++ b/operator-framework-core/src/main/java/io/javaoperatorsdk/operator/api/config/ExecutorServiceManager.java @@ -1,5 +1,6 @@ package io.javaoperatorsdk.operator.api.config; +import java.time.Duration; import java.util.Collection; import java.util.List; import java.util.concurrent.Callable; @@ -24,23 +25,20 @@ public class ExecutorServiceManager { private final ExecutorService executor; private final ExecutorService workflowExecutor; private final ExecutorService cachingExecutorService; - private final int terminationTimeoutSeconds; - private ExecutorServiceManager(ExecutorService executor, ExecutorService workflowExecutor, - int terminationTimeoutSeconds) { + private ExecutorServiceManager(ExecutorService executor, ExecutorService workflowExecutor) { this.cachingExecutorService = Executors.newCachedThreadPool(); this.executor = new InstrumentedExecutorService(executor); this.workflowExecutor = new InstrumentedExecutorService(workflowExecutor); - this.terminationTimeoutSeconds = terminationTimeoutSeconds; + } - public static void init() { + public static synchronized void init() { if (instance == null) { final var configuration = ConfigurationServiceProvider.instance(); final var executorService = configuration.getExecutorService(); final var workflowExecutorService = configuration.getWorkflowExecutorService(); - instance = new ExecutorServiceManager(executorService, workflowExecutorService, - configuration.getTerminationTimeoutSeconds()); + instance = new ExecutorServiceManager(executorService, workflowExecutorService); log.debug( "Initialized ExecutorServiceManager executor: {}, workflow executor: {}, timeout: {}", executorService.getClass(), @@ -51,16 +49,23 @@ public static void init() { } } - public static synchronized void stop() { + /** For testing purposes only */ + public static synchronized void reset() { + instance().doStop(Duration.ZERO); + instance = null; + init(); + } + + public static synchronized void stop(Duration gracefulShutdownTimeout) { if (instance != null) { - instance.doStop(); + instance.doStop(gracefulShutdownTimeout); } // make sure that we remove the singleton so that the thread pool is re-created on next call to // start instance = null; } - public synchronized static ExecutorServiceManager instance() { + public static synchronized ExecutorServiceManager instance() { if (instance == null) { // provide a default configuration if none has been provided by init init(); @@ -128,23 +133,30 @@ public ExecutorService cachingExecutorService() { return cachingExecutorService; } - private void doStop() { + private void doStop(Duration gracefulShutdownTimeout) { try { + var parallelExec = Executors.newFixedThreadPool(3); log.debug("Closing executor"); - shutdown(executor); - shutdown(workflowExecutor); - shutdown(cachingExecutorService); + parallelExec.invokeAll(List.of(shutdown(executor, gracefulShutdownTimeout), + shutdown(workflowExecutor, gracefulShutdownTimeout), + shutdown(cachingExecutorService, gracefulShutdownTimeout))); + parallelExec.shutdownNow(); } catch (InterruptedException e) { log.debug("Exception closing executor: {}", e.getLocalizedMessage()); Thread.currentThread().interrupt(); } } - private static void shutdown(ExecutorService executorService) throws InterruptedException { - executorService.shutdown(); - if (!executorService.awaitTermination(instance().terminationTimeoutSeconds, TimeUnit.SECONDS)) { - executorService.shutdownNow(); // if we timed out, waiting, cancel everything - } + private static Callable shutdown(ExecutorService executorService, + Duration gracefulShutdownTimeout) { + return () -> { + executorService.shutdown(); + if (!executorService.awaitTermination(gracefulShutdownTimeout.toMillis(), + TimeUnit.MILLISECONDS)) { + executorService.shutdownNow(); // if we timed out, waiting, cancel everything + } + return null; + }; } private static class InstrumentedExecutorService implements ExecutorService { diff --git a/operator-framework-core/src/main/java/io/javaoperatorsdk/operator/processing/event/EventProcessor.java b/operator-framework-core/src/main/java/io/javaoperatorsdk/operator/processing/event/EventProcessor.java index a75f51f981..31b5c136f8 100644 --- a/operator-framework-core/src/main/java/io/javaoperatorsdk/operator/processing/event/EventProcessor.java +++ b/operator-framework-core/src/main/java/io/javaoperatorsdk/operator/processing/event/EventProcessor.java @@ -388,6 +388,13 @@ private ReconcilerExecutor(ResourceID resourceID, ExecutionScope

executionSco @Override public void run() { + if (!running) { + // this is needed for the case when controller stopped, but there is a graceful shutdown + // timeout. that should finish the currently executing reconciliations but not the ones + // which where submitted but not started yet + log.debug("Event processor not running skipping resource processing: {}", resourceID); + return; + } // change thread name for easier debugging final var thread = Thread.currentThread(); final var name = thread.getName(); diff --git a/operator-framework-core/src/test/java/io/javaoperatorsdk/operator/api/config/VersionTest.java b/operator-framework-core/src/test/java/io/javaoperatorsdk/operator/api/config/VersionTest.java index 2a6b8002e3..5df0738022 100644 --- a/operator-framework-core/src/test/java/io/javaoperatorsdk/operator/api/config/VersionTest.java +++ b/operator-framework-core/src/test/java/io/javaoperatorsdk/operator/api/config/VersionTest.java @@ -4,7 +4,7 @@ import static org.junit.Assert.assertEquals; -public class VersionTest { +class VersionTest { @Test void versionShouldReturnTheSameResultFromMavenAndProperties() { diff --git a/operator-framework-core/src/test/java/io/javaoperatorsdk/operator/processing/event/EventProcessorTest.java b/operator-framework-core/src/test/java/io/javaoperatorsdk/operator/processing/event/EventProcessorTest.java index 583e7a7289..c3880982e6 100644 --- a/operator-framework-core/src/test/java/io/javaoperatorsdk/operator/processing/event/EventProcessorTest.java +++ b/operator-framework-core/src/test/java/io/javaoperatorsdk/operator/processing/event/EventProcessorTest.java @@ -15,7 +15,9 @@ import org.slf4j.LoggerFactory; import io.fabric8.kubernetes.api.model.HasMetadata; +import io.javaoperatorsdk.operator.api.config.ConfigurationServiceProvider; import io.javaoperatorsdk.operator.api.config.ControllerConfiguration; +import io.javaoperatorsdk.operator.api.config.ExecutorServiceManager; import io.javaoperatorsdk.operator.api.config.RetryConfiguration; import io.javaoperatorsdk.operator.api.monitoring.Metrics; import io.javaoperatorsdk.operator.processing.event.rate.LinearRateLimiter; @@ -39,6 +41,7 @@ import static org.mockito.Mockito.after; import static org.mockito.Mockito.any; import static org.mockito.Mockito.anyLong; +import static org.mockito.Mockito.atMostOnce; import static org.mockito.Mockito.doAnswer; import static org.mockito.Mockito.mock; import static org.mockito.Mockito.never; @@ -56,6 +59,8 @@ class EventProcessorTest { public static final int FAKE_CONTROLLER_EXECUTION_DURATION = 250; public static final int SEPARATE_EXECUTION_TIMEOUT = 450; public static final String TEST_NAMESPACE = "default-event-handler-test"; + public static final int TIME_TO_WAIT_AFTER_SUBMISSION_BEFORE_EXECUTION = 150; + public static final int DISPATCHING_DELAY = 250; private final ReconciliationDispatcher reconciliationDispatcherMock = mock(ReconciliationDispatcher.class); @@ -417,6 +422,28 @@ void schedulesRetryForMarReconciliationIntervalIfRetryExhausted() { verify(retryTimerEventSourceMock, times(1)).scheduleOnce((ResourceID) any(), anyLong()); } + @Test + void executionOfReconciliationShouldNotStartIfProcessorStopped() throws InterruptedException { + when(reconciliationDispatcherMock.handleExecution(any())) + .then((Answer) invocationOnMock -> { + Thread.sleep(DISPATCHING_DELAY); + return PostExecutionControl.defaultDispatch(); + }); + // one event will lock the thread / executor + ConfigurationServiceProvider.overrideCurrent(o -> o.withConcurrentReconciliationThreads(1)); + ExecutorServiceManager.reset(); + eventProcessor.start(); + + eventProcessor.handleEvent(prepareCREvent()); + eventProcessor.handleEvent(prepareCREvent()); + eventProcessor.stop(); + + // wait until both event should be handled + Thread.sleep(TIME_TO_WAIT_AFTER_SUBMISSION_BEFORE_EXECUTION + 2 * DISPATCHING_DELAY); + verify(reconciliationDispatcherMock, atMostOnce()) + .handleExecution(any()); + } + private ResourceID eventAlreadyUnderProcessing() { when(reconciliationDispatcherMock.handleExecution(any())) .then( diff --git a/operator-framework-core/src/test/java/io/javaoperatorsdk/operator/processing/event/ReconciliationDispatcherTest.java b/operator-framework-core/src/test/java/io/javaoperatorsdk/operator/processing/event/ReconciliationDispatcherTest.java index c154e8a910..e7f9331d5c 100644 --- a/operator-framework-core/src/test/java/io/javaoperatorsdk/operator/processing/event/ReconciliationDispatcherTest.java +++ b/operator-framework-core/src/test/java/io/javaoperatorsdk/operator/processing/event/ReconciliationDispatcherTest.java @@ -75,6 +75,7 @@ static void classSetup() { * equals will fail on the two equal but NOT identical TestCustomResources because equals is not * implemented on TestCustomResourceSpec or TestCustomResourceStatus */ + ConfigurationServiceProvider.reset(); ConfigurationServiceProvider.overrideCurrent(overrider -> overrider .checkingCRDAndValidateLocalModel(false).withResourceCloner(new Cloner() { @Override diff --git a/operator-framework/src/test/java/io/javaoperatorsdk/operator/GracefulStopIT.java b/operator-framework/src/test/java/io/javaoperatorsdk/operator/GracefulStopIT.java new file mode 100644 index 0000000000..715921ecbb --- /dev/null +++ b/operator-framework/src/test/java/io/javaoperatorsdk/operator/GracefulStopIT.java @@ -0,0 +1,79 @@ +package io.javaoperatorsdk.operator; + +import java.time.Duration; + +import org.junit.jupiter.api.Test; +import org.junit.jupiter.api.extension.RegisterExtension; + +import io.fabric8.kubernetes.api.model.ObjectMetaBuilder; +import io.javaoperatorsdk.operator.junit.LocallyRunOperatorExtension; +import io.javaoperatorsdk.operator.sample.gracefulstop.GracefulStopTestCustomResource; +import io.javaoperatorsdk.operator.sample.gracefulstop.GracefulStopTestCustomResourceSpec; +import io.javaoperatorsdk.operator.sample.gracefulstop.GracefulStopTestReconciler; + +import static io.javaoperatorsdk.operator.sample.gracefulstop.GracefulStopTestReconciler.RECONCILER_SLEEP; +import static org.assertj.core.api.Assertions.assertThat; +import static org.awaitility.Awaitility.await; + +public class GracefulStopIT { + + public static final String TEST_1 = "test1"; + public static final String TEST_2 = "test2"; + + @RegisterExtension + LocallyRunOperatorExtension operator = + LocallyRunOperatorExtension.builder() + .withConfigurationService(o -> o.withCloseClientOnStop(false)) + .withReconciler(new GracefulStopTestReconciler()) + .build(); + + @Test + void stopsGracefullyWIthTimeout() { + testGracefulStop(TEST_1, RECONCILER_SLEEP, 2); + } + + @Test + void stopsGracefullyWithExpiredTimeout() { + testGracefulStop(TEST_2, RECONCILER_SLEEP / 5, 1); + } + + private void testGracefulStop(String resourceName, int stopTimeout, int expectedFinalGeneration) { + var testRes = operator.create(testResource(resourceName)); + await().untilAsserted(() -> { + var r = operator.get(GracefulStopTestCustomResource.class, resourceName); + assertThat(r.getStatus()).isNotNull(); + assertThat(r.getStatus().getObservedGeneration()).isEqualTo(1); + assertThat(operator.getReconcilerOfType(GracefulStopTestReconciler.class) + .getNumberOfExecutions()).isEqualTo(1); + }); + + testRes.getSpec().setValue(2); + operator.replace(testRes); + + await().pollDelay(Duration.ofMillis(50)).untilAsserted( + () -> assertThat(operator.getReconcilerOfType(GracefulStopTestReconciler.class) + .getNumberOfExecutions()).isEqualTo(2)); + + operator.getOperator().stop(Duration.ofMillis(stopTimeout)); + + await().untilAsserted(() -> { + var r = operator.get(GracefulStopTestCustomResource.class, resourceName); + assertThat(r.getStatus()).isNotNull(); + assertThat(r.getStatus().getObservedGeneration()).isEqualTo(expectedFinalGeneration); + }); + } + + public GracefulStopTestCustomResource testResource(String name) { + GracefulStopTestCustomResource resource = + new GracefulStopTestCustomResource(); + resource.setMetadata( + new ObjectMetaBuilder() + .withName(name) + .withNamespace(operator.getNamespace()) + .build()); + resource.setSpec(new GracefulStopTestCustomResourceSpec()); + resource.getSpec().setValue(1); + return resource; + } + +} diff --git a/operator-framework/src/test/java/io/javaoperatorsdk/operator/sample/gracefulstop/GracefulStopTestCustomResource.java b/operator-framework/src/test/java/io/javaoperatorsdk/operator/sample/gracefulstop/GracefulStopTestCustomResource.java new file mode 100644 index 0000000000..529c5ff480 --- /dev/null +++ b/operator-framework/src/test/java/io/javaoperatorsdk/operator/sample/gracefulstop/GracefulStopTestCustomResource.java @@ -0,0 +1,16 @@ +package io.javaoperatorsdk.operator.sample.gracefulstop; + +import io.fabric8.kubernetes.api.model.Namespaced; +import io.fabric8.kubernetes.client.CustomResource; +import io.fabric8.kubernetes.model.annotation.Group; +import io.fabric8.kubernetes.model.annotation.ShortNames; +import io.fabric8.kubernetes.model.annotation.Version; + +@Group("sample.javaoperatorsdk") +@Version("v1") +@ShortNames("gst") +public class GracefulStopTestCustomResource + extends CustomResource + implements Namespaced { + +} diff --git a/operator-framework/src/test/java/io/javaoperatorsdk/operator/sample/gracefulstop/GracefulStopTestCustomResourceSpec.java b/operator-framework/src/test/java/io/javaoperatorsdk/operator/sample/gracefulstop/GracefulStopTestCustomResourceSpec.java new file mode 100644 index 0000000000..4d1f45e646 --- /dev/null +++ b/operator-framework/src/test/java/io/javaoperatorsdk/operator/sample/gracefulstop/GracefulStopTestCustomResourceSpec.java @@ -0,0 +1,14 @@ +package io.javaoperatorsdk.operator.sample.gracefulstop; + +public class GracefulStopTestCustomResourceSpec { + + private int value; + + public int getValue() { + return value; + } + + public void setValue(int value) { + this.value = value; + } +} diff --git a/operator-framework/src/test/java/io/javaoperatorsdk/operator/sample/gracefulstop/GracefulStopTestCustomResourceStatus.java b/operator-framework/src/test/java/io/javaoperatorsdk/operator/sample/gracefulstop/GracefulStopTestCustomResourceStatus.java new file mode 100644 index 0000000000..f59f5b1163 --- /dev/null +++ b/operator-framework/src/test/java/io/javaoperatorsdk/operator/sample/gracefulstop/GracefulStopTestCustomResourceStatus.java @@ -0,0 +1,7 @@ +package io.javaoperatorsdk.operator.sample.gracefulstop; + +import io.javaoperatorsdk.operator.api.ObservedGenerationAwareStatus; + +public class GracefulStopTestCustomResourceStatus extends ObservedGenerationAwareStatus { + +} diff --git a/operator-framework/src/test/java/io/javaoperatorsdk/operator/sample/gracefulstop/GracefulStopTestReconciler.java b/operator-framework/src/test/java/io/javaoperatorsdk/operator/sample/gracefulstop/GracefulStopTestReconciler.java new file mode 100644 index 0000000000..cf266c0b48 --- /dev/null +++ b/operator-framework/src/test/java/io/javaoperatorsdk/operator/sample/gracefulstop/GracefulStopTestReconciler.java @@ -0,0 +1,34 @@ +package io.javaoperatorsdk.operator.sample.gracefulstop; + +import java.util.concurrent.atomic.AtomicInteger; + +import io.javaoperatorsdk.operator.api.reconciler.Context; +import io.javaoperatorsdk.operator.api.reconciler.ControllerConfiguration; +import io.javaoperatorsdk.operator.api.reconciler.Reconciler; +import io.javaoperatorsdk.operator.api.reconciler.UpdateControl; + +@ControllerConfiguration +public class GracefulStopTestReconciler + implements Reconciler { + + public static final int RECONCILER_SLEEP = 1000; + + private final AtomicInteger numberOfExecutions = new AtomicInteger(0); + + @Override + public UpdateControl reconcile( + GracefulStopTestCustomResource resource, + Context context) throws InterruptedException { + + numberOfExecutions.addAndGet(1); + resource.setStatus(new GracefulStopTestCustomResourceStatus()); + Thread.sleep(RECONCILER_SLEEP); + + return UpdateControl.patchStatus(resource); + } + + public int getNumberOfExecutions() { + return numberOfExecutions.get(); + } + +}