diff --git a/operator-framework-core/src/main/java/io/javaoperatorsdk/operator/api/config/ConfigurationService.java b/operator-framework-core/src/main/java/io/javaoperatorsdk/operator/api/config/ConfigurationService.java index f0d1455dd7..a7f50bcf6c 100644 --- a/operator-framework-core/src/main/java/io/javaoperatorsdk/operator/api/config/ConfigurationService.java +++ b/operator-framework-core/src/main/java/io/javaoperatorsdk/operator/api/config/ConfigurationService.java @@ -80,6 +80,12 @@ default int concurrentReconciliationThreads() { return DEFAULT_RECONCILIATION_THREADS_NUMBER; } + int DEFAULT_WORKFLOW_EXECUTOR_THREAD_NUMBER = DEFAULT_RECONCILIATION_THREADS_NUMBER; + + default int concurrentWorkflowExecutorThreads() { + return DEFAULT_WORKFLOW_EXECUTOR_THREAD_NUMBER; + } + /** * Used to clone custom resources. It is strongly suggested that implementors override this method * since the default implementation creates a new {@link Cloner} instance each time this method is @@ -122,6 +128,10 @@ default ExecutorService getExecutorService() { return Executors.newFixedThreadPool(concurrentReconciliationThreads()); } + default ExecutorService getWorkflowExecutorService() { + return Executors.newFixedThreadPool(concurrentWorkflowExecutorThreads()); + } + default boolean closeClientOnStop() { return true; } diff --git a/operator-framework-core/src/main/java/io/javaoperatorsdk/operator/api/config/ConfigurationServiceOverrider.java b/operator-framework-core/src/main/java/io/javaoperatorsdk/operator/api/config/ConfigurationServiceOverrider.java index 918bbf8d2f..0ef1e6a6f4 100644 --- a/operator-framework-core/src/main/java/io/javaoperatorsdk/operator/api/config/ConfigurationServiceOverrider.java +++ b/operator-framework-core/src/main/java/io/javaoperatorsdk/operator/api/config/ConfigurationServiceOverrider.java @@ -22,6 +22,7 @@ public class ConfigurationServiceOverrider { private Boolean closeClientOnStop; private ObjectMapper objectMapper; private ExecutorService executorService; + private ExecutorService workflowExecutorService; private LeaderElectionConfiguration leaderElectionConfiguration; ConfigurationServiceOverrider(ConfigurationService original) { @@ -68,6 +69,12 @@ public ConfigurationServiceOverrider withExecutorService(ExecutorService executo return this; } + public ConfigurationServiceOverrider withWorkflowExecutorService( + ExecutorService workflowExecutorService) { + this.workflowExecutorService = workflowExecutorService; + return this; + } + public ConfigurationServiceOverrider withObjectMapper(ObjectMapper objectMapper) { this.objectMapper = objectMapper; return this; @@ -121,6 +128,12 @@ public ExecutorService getExecutorService() { return executorService != null ? executorService : original.getExecutorService(); } + @Override + public ExecutorService getWorkflowExecutorService() { + return workflowExecutorService != null ? workflowExecutorService + : original.getWorkflowExecutorService(); + } + @Override public ObjectMapper getObjectMapper() { return objectMapper != null ? objectMapper : original.getObjectMapper(); diff --git a/operator-framework-core/src/main/java/io/javaoperatorsdk/operator/api/config/ExecutorServiceManager.java b/operator-framework-core/src/main/java/io/javaoperatorsdk/operator/api/config/ExecutorServiceManager.java index af40adedd6..38ed240c97 100644 --- a/operator-framework-core/src/main/java/io/javaoperatorsdk/operator/api/config/ExecutorServiceManager.java +++ b/operator-framework-core/src/main/java/io/javaoperatorsdk/operator/api/config/ExecutorServiceManager.java @@ -15,31 +15,35 @@ public class ExecutorServiceManager { private static final Logger log = LoggerFactory.getLogger(ExecutorServiceManager.class); private static ExecutorServiceManager instance; - private final ExecutorService executor; + private final ExecutorService workflowExecutor; private final int terminationTimeoutSeconds; - private ExecutorServiceManager(InstrumentedExecutorService executor, + private ExecutorServiceManager(ExecutorService executor, ExecutorService workflowExecutor, int terminationTimeoutSeconds) { - this.executor = executor; + this.executor = new InstrumentedExecutorService(executor); + this.workflowExecutor = new InstrumentedExecutorService(workflowExecutor); this.terminationTimeoutSeconds = terminationTimeoutSeconds; } public static void init() { if (instance == null) { final var configuration = ConfigurationServiceProvider.instance(); - instance = new ExecutorServiceManager( - new InstrumentedExecutorService(configuration.getExecutorService()), + final var executorService = configuration.getExecutorService(); + final var workflowExecutorService = configuration.getWorkflowExecutorService(); + instance = new ExecutorServiceManager(executorService, workflowExecutorService, configuration.getTerminationTimeoutSeconds()); - log.debug("Initialized ExecutorServiceManager executor: {}, timeout: {}", - configuration.getExecutorService().getClass(), + log.debug( + "Initialized ExecutorServiceManager executor: {}, workflow executor: {}, timeout: {}", + executorService.getClass(), + workflowExecutorService.getClass(), configuration.getTerminationTimeoutSeconds()); } else { log.debug("Already started, reusing already setup instance!"); } } - public static void stop() { + public synchronized static void stop() { if (instance != null) { instance.doStop(); } @@ -48,7 +52,7 @@ public static void stop() { instance = null; } - public static ExecutorServiceManager instance() { + public synchronized static ExecutorServiceManager instance() { if (instance == null) { // provide a default configuration if none has been provided by init init(); @@ -60,13 +64,22 @@ public ExecutorService executorService() { return executor; } + public ExecutorService workflowExecutorService() { + return workflowExecutor; + } + private void doStop() { try { log.debug("Closing executor"); executor.shutdown(); + workflowExecutor.shutdown(); + if (!workflowExecutor.awaitTermination(terminationTimeoutSeconds, TimeUnit.SECONDS)) { + workflowExecutor.shutdownNow(); // if we timed out, waiting, cancel everything + } if (!executor.awaitTermination(terminationTimeoutSeconds, TimeUnit.SECONDS)) { executor.shutdownNow(); // if we timed out, waiting, cancel everything } + } catch (InterruptedException e) { log.debug("Exception closing executor: {}", e.getLocalizedMessage()); } diff --git a/operator-framework-core/src/main/java/io/javaoperatorsdk/operator/processing/dependent/workflow/Workflow.java b/operator-framework-core/src/main/java/io/javaoperatorsdk/operator/processing/dependent/workflow/Workflow.java index bdd4e3cd7c..f955b83f89 100644 --- a/operator-framework-core/src/main/java/io/javaoperatorsdk/operator/processing/dependent/workflow/Workflow.java +++ b/operator-framework-core/src/main/java/io/javaoperatorsdk/operator/processing/dependent/workflow/Workflow.java @@ -7,7 +7,7 @@ import java.util.stream.Collectors; import io.fabric8.kubernetes.api.model.HasMetadata; -import io.javaoperatorsdk.operator.api.config.ConfigurationServiceProvider; +import io.javaoperatorsdk.operator.api.config.ExecutorServiceManager; import io.javaoperatorsdk.operator.api.reconciler.Context; import io.javaoperatorsdk.operator.api.reconciler.dependent.DependentResource; @@ -30,7 +30,7 @@ public class Workflow
{
private ExecutorService executorService;
public Workflow(Set withThrowExceptionFurther(boolean throwExceptionFurthe
}
public Workflow build() {
- return new Workflow(dependentResourceNodes,
- ConfigurationServiceProvider.instance().getExecutorService(), throwExceptionAutomatically);
+ return new Workflow(
+ dependentResourceNodes, ExecutorServiceManager.instance().workflowExecutorService(),
+ throwExceptionAutomatically);
}
public Workflow build(int parallelism) {
diff --git a/sample-operators/webpage/src/test/java/io/javaoperatorsdk/operator/sample/WebPageOperatorAbstractTest.java b/sample-operators/webpage/src/test/java/io/javaoperatorsdk/operator/sample/WebPageOperatorAbstractTest.java
index 15c2ceef87..48537f93a3 100644
--- a/sample-operators/webpage/src/test/java/io/javaoperatorsdk/operator/sample/WebPageOperatorAbstractTest.java
+++ b/sample-operators/webpage/src/test/java/io/javaoperatorsdk/operator/sample/WebPageOperatorAbstractTest.java
@@ -56,7 +56,7 @@ void testAddingWebPage() {
() -> {
var actual = operator().get(WebPage.class, TEST_PAGE);
var deployment = operator().get(Deployment.class, deploymentName(webPage));
-
+ assertThat(actual.getStatus()).isNotNull();
assertThat(actual.getStatus().getAreWeGood()).isTrue();
assertThat(deployment.getSpec().getReplicas())
.isEqualTo(deployment.getStatus().getReadyReplicas());