diff --git a/operator-framework-core/src/main/java/io/javaoperatorsdk/operator/Operator.java b/operator-framework-core/src/main/java/io/javaoperatorsdk/operator/Operator.java index 613ea622b5..d18b340f36 100644 --- a/operator-framework-core/src/main/java/io/javaoperatorsdk/operator/Operator.java +++ b/operator-framework-core/src/main/java/io/javaoperatorsdk/operator/Operator.java @@ -1,5 +1,6 @@ package io.javaoperatorsdk.operator; +import java.time.Duration; import java.util.HashSet; import java.util.Optional; import java.util.Set; @@ -77,14 +78,31 @@ public Operator(KubernetesClient kubernetesClient, ConfigurationService configur } /** - * Adds a shutdown hook that automatically calls {@link #stop()} when the app shuts down. + * Uses {@link ConfigurationService#getTerminationTimeoutSeconds()} for graceful shutdown timeout + * + * @deprecated use the overloaded version with graceful shutdown timeout parameter. * - * @deprecated This feature should not be used anymore */ @Deprecated(forRemoval = true) public void installShutdownHook() { + installShutdownHook( + Duration.ofSeconds(ConfigurationServiceProvider.instance().getTerminationTimeoutSeconds())); + } + + /** + * Adds a shutdown hook that automatically calls {@link #stop()} when the app shuts down. Note + * that graceful shutdown is usually not needed, but your {@link Reconciler} implementations might + * require it. + *
+ * Note that you might want to tune "terminationGracePeriodSeconds" for the Pod running the
+ * controller.
+ *
+ * @param gracefulShutdownTimeout timeout to wait for executor threads to complete actual
+ * reconciliations
+ */
+ public void installShutdownHook(Duration gracefulShutdownTimeout) {
if (!leaderElectionManager.isLeaderElectionEnabled()) {
- Runtime.getRuntime().addShutdownHook(new Thread(this::stop));
+ Runtime.getRuntime().addShutdownHook(new Thread(() -> stop(gracefulShutdownTimeout)));
} else {
log.warn("Leader election is on, shutdown hook will not be installed.");
}
@@ -126,14 +144,16 @@ public synchronized void start() {
}
}
- @Override
- public void stop() throws OperatorException {
+ public void stop(Duration gracefulShutdownTimeout) throws OperatorException {
+ if (!started) {
+ return;
+ }
final var configurationService = ConfigurationServiceProvider.instance();
log.info(
"Operator SDK {} is shutting down...", configurationService.getVersion().getSdkVersion());
controllerManager.stop();
- ExecutorServiceManager.stop();
+ ExecutorServiceManager.stop(gracefulShutdownTimeout);
leaderElectionManager.stop();
if (configurationService.closeClientOnStop()) {
kubernetesClient.close();
@@ -142,6 +162,11 @@ public void stop() throws OperatorException {
started = false;
}
+ @Override
+ public void stop() throws OperatorException {
+ stop(Duration.ZERO);
+ }
+
/**
* Add a registration requests for the specified reconciler with this operator. The effective
* registration of the reconciler is delayed till the operator is started.
diff --git a/operator-framework-core/src/main/java/io/javaoperatorsdk/operator/api/config/ConfigurationService.java b/operator-framework-core/src/main/java/io/javaoperatorsdk/operator/api/config/ConfigurationService.java
index e9249505ab..e25bb29f82 100644
--- a/operator-framework-core/src/main/java/io/javaoperatorsdk/operator/api/config/ConfigurationService.java
+++ b/operator-framework-core/src/main/java/io/javaoperatorsdk/operator/api/config/ConfigurationService.java
@@ -121,8 +121,12 @@ public HasMetadata clone(HasMetadata object) {
* Retrieves the number of seconds the SDK waits for reconciliation threads to terminate before
* shutting down.
*
+ * @deprecated use {@link io.javaoperatorsdk.operator.Operator#stop(Duration)} instead. Where the
+ * parameter can be passed to specify graceful timeout.
+ *
* @return the number of seconds to wait before terminating reconciliation threads
*/
+ @Deprecated(forRemoval = true)
default int getTerminationTimeoutSeconds() {
return DEFAULT_TERMINATION_TIMEOUT_SECONDS;
}
diff --git a/operator-framework-core/src/main/java/io/javaoperatorsdk/operator/api/config/ExecutorServiceManager.java b/operator-framework-core/src/main/java/io/javaoperatorsdk/operator/api/config/ExecutorServiceManager.java
index 3261651f3d..54e4586aa0 100644
--- a/operator-framework-core/src/main/java/io/javaoperatorsdk/operator/api/config/ExecutorServiceManager.java
+++ b/operator-framework-core/src/main/java/io/javaoperatorsdk/operator/api/config/ExecutorServiceManager.java
@@ -1,5 +1,6 @@
package io.javaoperatorsdk.operator.api.config;
+import java.time.Duration;
import java.util.Collection;
import java.util.List;
import java.util.concurrent.Callable;
@@ -24,23 +25,20 @@ public class ExecutorServiceManager {
private final ExecutorService executor;
private final ExecutorService workflowExecutor;
private final ExecutorService cachingExecutorService;
- private final int terminationTimeoutSeconds;
- private ExecutorServiceManager(ExecutorService executor, ExecutorService workflowExecutor,
- int terminationTimeoutSeconds) {
+ private ExecutorServiceManager(ExecutorService executor, ExecutorService workflowExecutor) {
this.cachingExecutorService = Executors.newCachedThreadPool();
this.executor = new InstrumentedExecutorService(executor);
this.workflowExecutor = new InstrumentedExecutorService(workflowExecutor);
- this.terminationTimeoutSeconds = terminationTimeoutSeconds;
+
}
- public static void init() {
+ public static synchronized void init() {
if (instance == null) {
final var configuration = ConfigurationServiceProvider.instance();
final var executorService = configuration.getExecutorService();
final var workflowExecutorService = configuration.getWorkflowExecutorService();
- instance = new ExecutorServiceManager(executorService, workflowExecutorService,
- configuration.getTerminationTimeoutSeconds());
+ instance = new ExecutorServiceManager(executorService, workflowExecutorService);
log.debug(
"Initialized ExecutorServiceManager executor: {}, workflow executor: {}, timeout: {}",
executorService.getClass(),
@@ -51,16 +49,23 @@ public static void init() {
}
}
- public static synchronized void stop() {
+ /** For testing purposes only */
+ public static synchronized void reset() {
+ instance().doStop(Duration.ZERO);
+ instance = null;
+ init();
+ }
+
+ public static synchronized void stop(Duration gracefulShutdownTimeout) {
if (instance != null) {
- instance.doStop();
+ instance.doStop(gracefulShutdownTimeout);
}
// make sure that we remove the singleton so that the thread pool is re-created on next call to
// start
instance = null;
}
- public synchronized static ExecutorServiceManager instance() {
+ public static synchronized ExecutorServiceManager instance() {
if (instance == null) {
// provide a default configuration if none has been provided by init
init();
@@ -128,23 +133,30 @@ public ExecutorService cachingExecutorService() {
return cachingExecutorService;
}
- private void doStop() {
+ private void doStop(Duration gracefulShutdownTimeout) {
try {
+ var parallelExec = Executors.newFixedThreadPool(3);
log.debug("Closing executor");
- shutdown(executor);
- shutdown(workflowExecutor);
- shutdown(cachingExecutorService);
+ parallelExec.invokeAll(List.of(shutdown(executor, gracefulShutdownTimeout),
+ shutdown(workflowExecutor, gracefulShutdownTimeout),
+ shutdown(cachingExecutorService, gracefulShutdownTimeout)));
+ parallelExec.shutdownNow();
} catch (InterruptedException e) {
log.debug("Exception closing executor: {}", e.getLocalizedMessage());
Thread.currentThread().interrupt();
}
}
- private static void shutdown(ExecutorService executorService) throws InterruptedException {
- executorService.shutdown();
- if (!executorService.awaitTermination(instance().terminationTimeoutSeconds, TimeUnit.SECONDS)) {
- executorService.shutdownNow(); // if we timed out, waiting, cancel everything
- }
+ private static Callable executionSco
@Override
public void run() {
+ if (!running) {
+ // this is needed for the case when controller stopped, but there is a graceful shutdown
+ // timeout. that should finish the currently executing reconciliations but not the ones
+ // which where submitted but not started yet
+ log.debug("Event processor not running skipping resource processing: {}", resourceID);
+ return;
+ }
// change thread name for easier debugging
final var thread = Thread.currentThread();
final var name = thread.getName();
diff --git a/operator-framework-core/src/test/java/io/javaoperatorsdk/operator/api/config/VersionTest.java b/operator-framework-core/src/test/java/io/javaoperatorsdk/operator/api/config/VersionTest.java
index 2a6b8002e3..5df0738022 100644
--- a/operator-framework-core/src/test/java/io/javaoperatorsdk/operator/api/config/VersionTest.java
+++ b/operator-framework-core/src/test/java/io/javaoperatorsdk/operator/api/config/VersionTest.java
@@ -4,7 +4,7 @@
import static org.junit.Assert.assertEquals;
-public class VersionTest {
+class VersionTest {
@Test
void versionShouldReturnTheSameResultFromMavenAndProperties() {
diff --git a/operator-framework-core/src/test/java/io/javaoperatorsdk/operator/processing/event/EventProcessorTest.java b/operator-framework-core/src/test/java/io/javaoperatorsdk/operator/processing/event/EventProcessorTest.java
index 583e7a7289..c3880982e6 100644
--- a/operator-framework-core/src/test/java/io/javaoperatorsdk/operator/processing/event/EventProcessorTest.java
+++ b/operator-framework-core/src/test/java/io/javaoperatorsdk/operator/processing/event/EventProcessorTest.java
@@ -15,7 +15,9 @@
import org.slf4j.LoggerFactory;
import io.fabric8.kubernetes.api.model.HasMetadata;
+import io.javaoperatorsdk.operator.api.config.ConfigurationServiceProvider;
import io.javaoperatorsdk.operator.api.config.ControllerConfiguration;
+import io.javaoperatorsdk.operator.api.config.ExecutorServiceManager;
import io.javaoperatorsdk.operator.api.config.RetryConfiguration;
import io.javaoperatorsdk.operator.api.monitoring.Metrics;
import io.javaoperatorsdk.operator.processing.event.rate.LinearRateLimiter;
@@ -39,6 +41,7 @@
import static org.mockito.Mockito.after;
import static org.mockito.Mockito.any;
import static org.mockito.Mockito.anyLong;
+import static org.mockito.Mockito.atMostOnce;
import static org.mockito.Mockito.doAnswer;
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.never;
@@ -56,6 +59,8 @@ class EventProcessorTest {
public static final int FAKE_CONTROLLER_EXECUTION_DURATION = 250;
public static final int SEPARATE_EXECUTION_TIMEOUT = 450;
public static final String TEST_NAMESPACE = "default-event-handler-test";
+ public static final int TIME_TO_WAIT_AFTER_SUBMISSION_BEFORE_EXECUTION = 150;
+ public static final int DISPATCHING_DELAY = 250;
private final ReconciliationDispatcher reconciliationDispatcherMock =
mock(ReconciliationDispatcher.class);
@@ -417,6 +422,28 @@ void schedulesRetryForMarReconciliationIntervalIfRetryExhausted() {
verify(retryTimerEventSourceMock, times(1)).scheduleOnce((ResourceID) any(), anyLong());
}
+ @Test
+ void executionOfReconciliationShouldNotStartIfProcessorStopped() throws InterruptedException {
+ when(reconciliationDispatcherMock.handleExecution(any()))
+ .then((Answer