diff --git a/operator-framework-core/src/main/java/io/javaoperatorsdk/operator/api/config/ConfigurationService.java b/operator-framework-core/src/main/java/io/javaoperatorsdk/operator/api/config/ConfigurationService.java index f0d1455dd7..a7f50bcf6c 100644 --- a/operator-framework-core/src/main/java/io/javaoperatorsdk/operator/api/config/ConfigurationService.java +++ b/operator-framework-core/src/main/java/io/javaoperatorsdk/operator/api/config/ConfigurationService.java @@ -80,6 +80,12 @@ default int concurrentReconciliationThreads() { return DEFAULT_RECONCILIATION_THREADS_NUMBER; } + int DEFAULT_WORKFLOW_EXECUTOR_THREAD_NUMBER = DEFAULT_RECONCILIATION_THREADS_NUMBER; + + default int concurrentWorkflowExecutorThreads() { + return DEFAULT_WORKFLOW_EXECUTOR_THREAD_NUMBER; + } + /** * Used to clone custom resources. It is strongly suggested that implementors override this method * since the default implementation creates a new {@link Cloner} instance each time this method is @@ -122,6 +128,10 @@ default ExecutorService getExecutorService() { return Executors.newFixedThreadPool(concurrentReconciliationThreads()); } + default ExecutorService getWorkflowExecutorService() { + return Executors.newFixedThreadPool(concurrentWorkflowExecutorThreads()); + } + default boolean closeClientOnStop() { return true; } diff --git a/operator-framework-core/src/main/java/io/javaoperatorsdk/operator/api/config/ConfigurationServiceOverrider.java b/operator-framework-core/src/main/java/io/javaoperatorsdk/operator/api/config/ConfigurationServiceOverrider.java index 918bbf8d2f..0ef1e6a6f4 100644 --- a/operator-framework-core/src/main/java/io/javaoperatorsdk/operator/api/config/ConfigurationServiceOverrider.java +++ b/operator-framework-core/src/main/java/io/javaoperatorsdk/operator/api/config/ConfigurationServiceOverrider.java @@ -22,6 +22,7 @@ public class ConfigurationServiceOverrider { private Boolean closeClientOnStop; private ObjectMapper objectMapper; private ExecutorService executorService; + private ExecutorService workflowExecutorService; private LeaderElectionConfiguration leaderElectionConfiguration; ConfigurationServiceOverrider(ConfigurationService original) { @@ -68,6 +69,12 @@ public ConfigurationServiceOverrider withExecutorService(ExecutorService executo return this; } + public ConfigurationServiceOverrider withWorkflowExecutorService( + ExecutorService workflowExecutorService) { + this.workflowExecutorService = workflowExecutorService; + return this; + } + public ConfigurationServiceOverrider withObjectMapper(ObjectMapper objectMapper) { this.objectMapper = objectMapper; return this; @@ -121,6 +128,12 @@ public ExecutorService getExecutorService() { return executorService != null ? executorService : original.getExecutorService(); } + @Override + public ExecutorService getWorkflowExecutorService() { + return workflowExecutorService != null ? workflowExecutorService + : original.getWorkflowExecutorService(); + } + @Override public ObjectMapper getObjectMapper() { return objectMapper != null ? objectMapper : original.getObjectMapper(); diff --git a/operator-framework-core/src/main/java/io/javaoperatorsdk/operator/api/config/ExecutorServiceManager.java b/operator-framework-core/src/main/java/io/javaoperatorsdk/operator/api/config/ExecutorServiceManager.java index af40adedd6..38ed240c97 100644 --- a/operator-framework-core/src/main/java/io/javaoperatorsdk/operator/api/config/ExecutorServiceManager.java +++ b/operator-framework-core/src/main/java/io/javaoperatorsdk/operator/api/config/ExecutorServiceManager.java @@ -15,31 +15,35 @@ public class ExecutorServiceManager { private static final Logger log = LoggerFactory.getLogger(ExecutorServiceManager.class); private static ExecutorServiceManager instance; - private final ExecutorService executor; + private final ExecutorService workflowExecutor; private final int terminationTimeoutSeconds; - private ExecutorServiceManager(InstrumentedExecutorService executor, + private ExecutorServiceManager(ExecutorService executor, ExecutorService workflowExecutor, int terminationTimeoutSeconds) { - this.executor = executor; + this.executor = new InstrumentedExecutorService(executor); + this.workflowExecutor = new InstrumentedExecutorService(workflowExecutor); this.terminationTimeoutSeconds = terminationTimeoutSeconds; } public static void init() { if (instance == null) { final var configuration = ConfigurationServiceProvider.instance(); - instance = new ExecutorServiceManager( - new InstrumentedExecutorService(configuration.getExecutorService()), + final var executorService = configuration.getExecutorService(); + final var workflowExecutorService = configuration.getWorkflowExecutorService(); + instance = new ExecutorServiceManager(executorService, workflowExecutorService, configuration.getTerminationTimeoutSeconds()); - log.debug("Initialized ExecutorServiceManager executor: {}, timeout: {}", - configuration.getExecutorService().getClass(), + log.debug( + "Initialized ExecutorServiceManager executor: {}, workflow executor: {}, timeout: {}", + executorService.getClass(), + workflowExecutorService.getClass(), configuration.getTerminationTimeoutSeconds()); } else { log.debug("Already started, reusing already setup instance!"); } } - public static void stop() { + public synchronized static void stop() { if (instance != null) { instance.doStop(); } @@ -48,7 +52,7 @@ public static void stop() { instance = null; } - public static ExecutorServiceManager instance() { + public synchronized static ExecutorServiceManager instance() { if (instance == null) { // provide a default configuration if none has been provided by init init(); @@ -60,13 +64,22 @@ public ExecutorService executorService() { return executor; } + public ExecutorService workflowExecutorService() { + return workflowExecutor; + } + private void doStop() { try { log.debug("Closing executor"); executor.shutdown(); + workflowExecutor.shutdown(); + if (!workflowExecutor.awaitTermination(terminationTimeoutSeconds, TimeUnit.SECONDS)) { + workflowExecutor.shutdownNow(); // if we timed out, waiting, cancel everything + } if (!executor.awaitTermination(terminationTimeoutSeconds, TimeUnit.SECONDS)) { executor.shutdownNow(); // if we timed out, waiting, cancel everything } + } catch (InterruptedException e) { log.debug("Exception closing executor: {}", e.getLocalizedMessage()); } diff --git a/operator-framework-core/src/main/java/io/javaoperatorsdk/operator/processing/dependent/workflow/Workflow.java b/operator-framework-core/src/main/java/io/javaoperatorsdk/operator/processing/dependent/workflow/Workflow.java index bdd4e3cd7c..f955b83f89 100644 --- a/operator-framework-core/src/main/java/io/javaoperatorsdk/operator/processing/dependent/workflow/Workflow.java +++ b/operator-framework-core/src/main/java/io/javaoperatorsdk/operator/processing/dependent/workflow/Workflow.java @@ -7,7 +7,7 @@ import java.util.stream.Collectors; import io.fabric8.kubernetes.api.model.HasMetadata; -import io.javaoperatorsdk.operator.api.config.ConfigurationServiceProvider; +import io.javaoperatorsdk.operator.api.config.ExecutorServiceManager; import io.javaoperatorsdk.operator.api.reconciler.Context; import io.javaoperatorsdk.operator.api.reconciler.dependent.DependentResource; @@ -30,7 +30,7 @@ public class Workflow

{ private ExecutorService executorService; public Workflow(Set dependentResourceNodes) { - this.executorService = ConfigurationServiceProvider.instance().getExecutorService(); + this.executorService = ExecutorServiceManager.instance().workflowExecutorService(); this.dependentResourceNodes = dependentResourceNodes; this.throwExceptionAutomatically = THROW_EXCEPTION_AUTOMATICALLY_DEFAULT; preprocessForReconcile(); diff --git a/operator-framework-core/src/main/java/io/javaoperatorsdk/operator/processing/dependent/workflow/builder/WorkflowBuilder.java b/operator-framework-core/src/main/java/io/javaoperatorsdk/operator/processing/dependent/workflow/builder/WorkflowBuilder.java index e60b26db97..a9dd3f121d 100644 --- a/operator-framework-core/src/main/java/io/javaoperatorsdk/operator/processing/dependent/workflow/builder/WorkflowBuilder.java +++ b/operator-framework-core/src/main/java/io/javaoperatorsdk/operator/processing/dependent/workflow/builder/WorkflowBuilder.java @@ -6,7 +6,7 @@ import java.util.concurrent.ExecutorService; import io.fabric8.kubernetes.api.model.HasMetadata; -import io.javaoperatorsdk.operator.api.config.ConfigurationServiceProvider; +import io.javaoperatorsdk.operator.api.config.ExecutorServiceManager; import io.javaoperatorsdk.operator.api.reconciler.dependent.DependentResource; import io.javaoperatorsdk.operator.processing.dependent.workflow.Condition; import io.javaoperatorsdk.operator.processing.dependent.workflow.DependentResourceNode; @@ -75,8 +75,9 @@ public WorkflowBuilder

withThrowExceptionFurther(boolean throwExceptionFurthe } public Workflow

build() { - return new Workflow(dependentResourceNodes, - ConfigurationServiceProvider.instance().getExecutorService(), throwExceptionAutomatically); + return new Workflow( + dependentResourceNodes, ExecutorServiceManager.instance().workflowExecutorService(), + throwExceptionAutomatically); } public Workflow

build(int parallelism) { diff --git a/sample-operators/webpage/src/test/java/io/javaoperatorsdk/operator/sample/WebPageOperatorAbstractTest.java b/sample-operators/webpage/src/test/java/io/javaoperatorsdk/operator/sample/WebPageOperatorAbstractTest.java index 15c2ceef87..48537f93a3 100644 --- a/sample-operators/webpage/src/test/java/io/javaoperatorsdk/operator/sample/WebPageOperatorAbstractTest.java +++ b/sample-operators/webpage/src/test/java/io/javaoperatorsdk/operator/sample/WebPageOperatorAbstractTest.java @@ -56,7 +56,7 @@ void testAddingWebPage() { () -> { var actual = operator().get(WebPage.class, TEST_PAGE); var deployment = operator().get(Deployment.class, deploymentName(webPage)); - + assertThat(actual.getStatus()).isNotNull(); assertThat(actual.getStatus().getAreWeGood()).isTrue(); assertThat(deployment.getSpec().getReplicas()) .isEqualTo(deployment.getStatus().getReadyReplicas());