From 562f91d772c5952537f1d392014ed02c5b9d30ee Mon Sep 17 00:00:00 2001 From: csviri Date: Mon, 25 Jul 2022 16:03:18 +0200 Subject: [PATCH 1/7] feat: separate executor service for workflow --- .../dependent/workflow/builder/WorkflowBuilder.java | 5 +++-- .../io/javaoperatorsdk/operator/sample/WebPageOperator.java | 2 ++ sample-operators/webpage/src/main/resources/log4j2.xml | 2 +- 3 files changed, 6 insertions(+), 3 deletions(-) diff --git a/operator-framework-core/src/main/java/io/javaoperatorsdk/operator/processing/dependent/workflow/builder/WorkflowBuilder.java b/operator-framework-core/src/main/java/io/javaoperatorsdk/operator/processing/dependent/workflow/builder/WorkflowBuilder.java index e60b26db97..c58c7ccbb0 100644 --- a/operator-framework-core/src/main/java/io/javaoperatorsdk/operator/processing/dependent/workflow/builder/WorkflowBuilder.java +++ b/operator-framework-core/src/main/java/io/javaoperatorsdk/operator/processing/dependent/workflow/builder/WorkflowBuilder.java @@ -7,6 +7,7 @@ import io.fabric8.kubernetes.api.model.HasMetadata; import io.javaoperatorsdk.operator.api.config.ConfigurationServiceProvider; +import io.javaoperatorsdk.operator.api.config.ExecutorServiceManager; import io.javaoperatorsdk.operator.api.reconciler.dependent.DependentResource; import io.javaoperatorsdk.operator.processing.dependent.workflow.Condition; import io.javaoperatorsdk.operator.processing.dependent.workflow.DependentResourceNode; @@ -75,8 +76,8 @@ public WorkflowBuilder

withThrowExceptionFurther(boolean throwExceptionFurthe } public Workflow

build() { - return new Workflow(dependentResourceNodes, - ConfigurationServiceProvider.instance().getExecutorService(), throwExceptionAutomatically); + return new Workflow( + dependentResourceNodes, ExecutorServiceManager.instance().executorService(), throwExceptionAutomatically); } public Workflow

build(int parallelism) { diff --git a/sample-operators/webpage/src/main/java/io/javaoperatorsdk/operator/sample/WebPageOperator.java b/sample-operators/webpage/src/main/java/io/javaoperatorsdk/operator/sample/WebPageOperator.java index 9b137649fc..408dfe6158 100644 --- a/sample-operators/webpage/src/main/java/io/javaoperatorsdk/operator/sample/WebPageOperator.java +++ b/sample-operators/webpage/src/main/java/io/javaoperatorsdk/operator/sample/WebPageOperator.java @@ -1,7 +1,9 @@ package io.javaoperatorsdk.operator.sample; import java.io.IOException; +import java.util.function.Consumer; +import io.javaoperatorsdk.operator.api.config.ConfigurationServiceOverrider; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import org.takes.facets.fork.FkRegex; diff --git a/sample-operators/webpage/src/main/resources/log4j2.xml b/sample-operators/webpage/src/main/resources/log4j2.xml index 5b794e7de3..2ce4ba07f3 100644 --- a/sample-operators/webpage/src/main/resources/log4j2.xml +++ b/sample-operators/webpage/src/main/resources/log4j2.xml @@ -2,7 +2,7 @@ - + From 32d91dcdda5d33ffa9b116ccd5e037bfd3da4cf8 Mon Sep 17 00:00:00 2001 From: csviri Date: Mon, 25 Jul 2022 16:21:14 +0200 Subject: [PATCH 2/7] fix: format --- .../dependent/workflow/builder/WorkflowBuilder.java | 4 ++-- .../io/javaoperatorsdk/operator/sample/WebPageOperator.java | 2 -- 2 files changed, 2 insertions(+), 4 deletions(-) diff --git a/operator-framework-core/src/main/java/io/javaoperatorsdk/operator/processing/dependent/workflow/builder/WorkflowBuilder.java b/operator-framework-core/src/main/java/io/javaoperatorsdk/operator/processing/dependent/workflow/builder/WorkflowBuilder.java index c58c7ccbb0..bda1006bb2 100644 --- a/operator-framework-core/src/main/java/io/javaoperatorsdk/operator/processing/dependent/workflow/builder/WorkflowBuilder.java +++ b/operator-framework-core/src/main/java/io/javaoperatorsdk/operator/processing/dependent/workflow/builder/WorkflowBuilder.java @@ -6,7 +6,6 @@ import java.util.concurrent.ExecutorService; import io.fabric8.kubernetes.api.model.HasMetadata; -import io.javaoperatorsdk.operator.api.config.ConfigurationServiceProvider; import io.javaoperatorsdk.operator.api.config.ExecutorServiceManager; import io.javaoperatorsdk.operator.api.reconciler.dependent.DependentResource; import io.javaoperatorsdk.operator.processing.dependent.workflow.Condition; @@ -77,7 +76,8 @@ public WorkflowBuilder

withThrowExceptionFurther(boolean throwExceptionFurthe public Workflow

build() { return new Workflow( - dependentResourceNodes, ExecutorServiceManager.instance().executorService(), throwExceptionAutomatically); + dependentResourceNodes, ExecutorServiceManager.instance().executorService(), + throwExceptionAutomatically); } public Workflow

build(int parallelism) { diff --git a/sample-operators/webpage/src/main/java/io/javaoperatorsdk/operator/sample/WebPageOperator.java b/sample-operators/webpage/src/main/java/io/javaoperatorsdk/operator/sample/WebPageOperator.java index 408dfe6158..9b137649fc 100644 --- a/sample-operators/webpage/src/main/java/io/javaoperatorsdk/operator/sample/WebPageOperator.java +++ b/sample-operators/webpage/src/main/java/io/javaoperatorsdk/operator/sample/WebPageOperator.java @@ -1,9 +1,7 @@ package io.javaoperatorsdk.operator.sample; import java.io.IOException; -import java.util.function.Consumer; -import io.javaoperatorsdk.operator.api.config.ConfigurationServiceOverrider; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import org.takes.facets.fork.FkRegex; From 4e605cd7c216ee8121dc4a3234330cc1111bf9b1 Mon Sep 17 00:00:00 2001 From: csviri Date: Fri, 26 Aug 2022 13:18:31 +0200 Subject: [PATCH 3/7] simple impl --- .../api/config/ConfigurationService.java | 10 ++++++++++ .../config/ConfigurationServiceOverrider.java | 12 ++++++++++++ .../api/config/ExecutorServiceManager.java | 16 ++++++++++++++++ .../processing/dependent/workflow/Workflow.java | 4 ++-- .../workflow/builder/WorkflowBuilder.java | 2 +- 5 files changed, 41 insertions(+), 3 deletions(-) diff --git a/operator-framework-core/src/main/java/io/javaoperatorsdk/operator/api/config/ConfigurationService.java b/operator-framework-core/src/main/java/io/javaoperatorsdk/operator/api/config/ConfigurationService.java index f0d1455dd7..a7f50bcf6c 100644 --- a/operator-framework-core/src/main/java/io/javaoperatorsdk/operator/api/config/ConfigurationService.java +++ b/operator-framework-core/src/main/java/io/javaoperatorsdk/operator/api/config/ConfigurationService.java @@ -80,6 +80,12 @@ default int concurrentReconciliationThreads() { return DEFAULT_RECONCILIATION_THREADS_NUMBER; } + int DEFAULT_WORKFLOW_EXECUTOR_THREAD_NUMBER = DEFAULT_RECONCILIATION_THREADS_NUMBER; + + default int concurrentWorkflowExecutorThreads() { + return DEFAULT_WORKFLOW_EXECUTOR_THREAD_NUMBER; + } + /** * Used to clone custom resources. It is strongly suggested that implementors override this method * since the default implementation creates a new {@link Cloner} instance each time this method is @@ -122,6 +128,10 @@ default ExecutorService getExecutorService() { return Executors.newFixedThreadPool(concurrentReconciliationThreads()); } + default ExecutorService getWorkflowExecutorService() { + return Executors.newFixedThreadPool(concurrentWorkflowExecutorThreads()); + } + default boolean closeClientOnStop() { return true; } diff --git a/operator-framework-core/src/main/java/io/javaoperatorsdk/operator/api/config/ConfigurationServiceOverrider.java b/operator-framework-core/src/main/java/io/javaoperatorsdk/operator/api/config/ConfigurationServiceOverrider.java index 918bbf8d2f..7da5250863 100644 --- a/operator-framework-core/src/main/java/io/javaoperatorsdk/operator/api/config/ConfigurationServiceOverrider.java +++ b/operator-framework-core/src/main/java/io/javaoperatorsdk/operator/api/config/ConfigurationServiceOverrider.java @@ -22,6 +22,7 @@ public class ConfigurationServiceOverrider { private Boolean closeClientOnStop; private ObjectMapper objectMapper; private ExecutorService executorService; + private ExecutorService workflowExecutorService; private LeaderElectionConfiguration leaderElectionConfiguration; ConfigurationServiceOverrider(ConfigurationService original) { @@ -68,6 +69,12 @@ public ConfigurationServiceOverrider withExecutorService(ExecutorService executo return this; } + public ConfigurationServiceOverrider withWorkflowExecutorService( + ExecutorService workflowExecutorService) { + this.workflowExecutorService = workflowExecutorService; + return this; + } + public ConfigurationServiceOverrider withObjectMapper(ObjectMapper objectMapper) { this.objectMapper = objectMapper; return this; @@ -121,6 +128,11 @@ public ExecutorService getExecutorService() { return executorService != null ? executorService : original.getExecutorService(); } + @Override + public ExecutorService getWorkflowExecutorService() { + return workflowExecutorService != null ? workflowExecutorService : original.getWorkflowExecutorService(); + } + @Override public ObjectMapper getObjectMapper() { return objectMapper != null ? objectMapper : original.getObjectMapper(); diff --git a/operator-framework-core/src/main/java/io/javaoperatorsdk/operator/api/config/ExecutorServiceManager.java b/operator-framework-core/src/main/java/io/javaoperatorsdk/operator/api/config/ExecutorServiceManager.java index af40adedd6..64cbd9dbff 100644 --- a/operator-framework-core/src/main/java/io/javaoperatorsdk/operator/api/config/ExecutorServiceManager.java +++ b/operator-framework-core/src/main/java/io/javaoperatorsdk/operator/api/config/ExecutorServiceManager.java @@ -17,6 +17,7 @@ public class ExecutorServiceManager { private static ExecutorServiceManager instance; private final ExecutorService executor; + private ExecutorService workflowExecutor; private final int terminationTimeoutSeconds; private ExecutorServiceManager(InstrumentedExecutorService executor, @@ -60,13 +61,28 @@ public ExecutorService executorService() { return executor; } + // needs to be synchronized since only called when a workflow is initialized, but that happens in + // controllers which are started in parallel + public synchronized ExecutorService workflowExecutorService() { + if (workflowExecutor == null) { + workflowExecutor = + new InstrumentedExecutorService( + ConfigurationServiceProvider.instance().getWorkflowExecutorService()); + } + return workflowExecutor; + } + private void doStop() { try { log.debug("Closing executor"); executor.shutdown(); + workflowExecutor.shutdown(); if (!executor.awaitTermination(terminationTimeoutSeconds, TimeUnit.SECONDS)) { executor.shutdownNow(); // if we timed out, waiting, cancel everything } + if (!workflowExecutor.awaitTermination(terminationTimeoutSeconds, TimeUnit.SECONDS)) { + executor.shutdownNow(); // if we timed out, waiting, cancel everything + } } catch (InterruptedException e) { log.debug("Exception closing executor: {}", e.getLocalizedMessage()); } diff --git a/operator-framework-core/src/main/java/io/javaoperatorsdk/operator/processing/dependent/workflow/Workflow.java b/operator-framework-core/src/main/java/io/javaoperatorsdk/operator/processing/dependent/workflow/Workflow.java index bdd4e3cd7c..f955b83f89 100644 --- a/operator-framework-core/src/main/java/io/javaoperatorsdk/operator/processing/dependent/workflow/Workflow.java +++ b/operator-framework-core/src/main/java/io/javaoperatorsdk/operator/processing/dependent/workflow/Workflow.java @@ -7,7 +7,7 @@ import java.util.stream.Collectors; import io.fabric8.kubernetes.api.model.HasMetadata; -import io.javaoperatorsdk.operator.api.config.ConfigurationServiceProvider; +import io.javaoperatorsdk.operator.api.config.ExecutorServiceManager; import io.javaoperatorsdk.operator.api.reconciler.Context; import io.javaoperatorsdk.operator.api.reconciler.dependent.DependentResource; @@ -30,7 +30,7 @@ public class Workflow

{ private ExecutorService executorService; public Workflow(Set dependentResourceNodes) { - this.executorService = ConfigurationServiceProvider.instance().getExecutorService(); + this.executorService = ExecutorServiceManager.instance().workflowExecutorService(); this.dependentResourceNodes = dependentResourceNodes; this.throwExceptionAutomatically = THROW_EXCEPTION_AUTOMATICALLY_DEFAULT; preprocessForReconcile(); diff --git a/operator-framework-core/src/main/java/io/javaoperatorsdk/operator/processing/dependent/workflow/builder/WorkflowBuilder.java b/operator-framework-core/src/main/java/io/javaoperatorsdk/operator/processing/dependent/workflow/builder/WorkflowBuilder.java index bda1006bb2..a9dd3f121d 100644 --- a/operator-framework-core/src/main/java/io/javaoperatorsdk/operator/processing/dependent/workflow/builder/WorkflowBuilder.java +++ b/operator-framework-core/src/main/java/io/javaoperatorsdk/operator/processing/dependent/workflow/builder/WorkflowBuilder.java @@ -76,7 +76,7 @@ public WorkflowBuilder

withThrowExceptionFurther(boolean throwExceptionFurthe public Workflow

build() { return new Workflow( - dependentResourceNodes, ExecutorServiceManager.instance().executorService(), + dependentResourceNodes, ExecutorServiceManager.instance().workflowExecutorService(), throwExceptionAutomatically); } From 9468825d92781f0885a6d9b01f32086a1c803847 Mon Sep 17 00:00:00 2001 From: csviri Date: Fri, 26 Aug 2022 13:19:57 +0200 Subject: [PATCH 4/7] log4j pattern revert --- sample-operators/webpage/src/main/resources/log4j2.xml | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/sample-operators/webpage/src/main/resources/log4j2.xml b/sample-operators/webpage/src/main/resources/log4j2.xml index 2ce4ba07f3..5b794e7de3 100644 --- a/sample-operators/webpage/src/main/resources/log4j2.xml +++ b/sample-operators/webpage/src/main/resources/log4j2.xml @@ -2,7 +2,7 @@ - + From 93ad705c668a47237cecd694ca5ee216069f1d91 Mon Sep 17 00:00:00 2001 From: csviri Date: Fri, 26 Aug 2022 13:31:31 +0200 Subject: [PATCH 5/7] improves assert --- .../operator/sample/WebPageOperatorAbstractTest.java | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/sample-operators/webpage/src/test/java/io/javaoperatorsdk/operator/sample/WebPageOperatorAbstractTest.java b/sample-operators/webpage/src/test/java/io/javaoperatorsdk/operator/sample/WebPageOperatorAbstractTest.java index 15c2ceef87..48537f93a3 100644 --- a/sample-operators/webpage/src/test/java/io/javaoperatorsdk/operator/sample/WebPageOperatorAbstractTest.java +++ b/sample-operators/webpage/src/test/java/io/javaoperatorsdk/operator/sample/WebPageOperatorAbstractTest.java @@ -56,7 +56,7 @@ void testAddingWebPage() { () -> { var actual = operator().get(WebPage.class, TEST_PAGE); var deployment = operator().get(Deployment.class, deploymentName(webPage)); - + assertThat(actual.getStatus()).isNotNull(); assertThat(actual.getStatus().getAreWeGood()).isTrue(); assertThat(deployment.getSpec().getReplicas()) .isEqualTo(deployment.getStatus().getReadyReplicas()); From bf26c88ff3d3c77f8573acb5135bcb69fac3bfac Mon Sep 17 00:00:00 2001 From: csviri Date: Fri, 26 Aug 2022 15:06:59 +0200 Subject: [PATCH 6/7] fix --- .../api/config/ConfigurationServiceOverrider.java | 3 ++- .../operator/api/config/ExecutorServiceManager.java | 11 +++++++---- 2 files changed, 9 insertions(+), 5 deletions(-) diff --git a/operator-framework-core/src/main/java/io/javaoperatorsdk/operator/api/config/ConfigurationServiceOverrider.java b/operator-framework-core/src/main/java/io/javaoperatorsdk/operator/api/config/ConfigurationServiceOverrider.java index 7da5250863..0ef1e6a6f4 100644 --- a/operator-framework-core/src/main/java/io/javaoperatorsdk/operator/api/config/ConfigurationServiceOverrider.java +++ b/operator-framework-core/src/main/java/io/javaoperatorsdk/operator/api/config/ConfigurationServiceOverrider.java @@ -130,7 +130,8 @@ public ExecutorService getExecutorService() { @Override public ExecutorService getWorkflowExecutorService() { - return workflowExecutorService != null ? workflowExecutorService : original.getWorkflowExecutorService(); + return workflowExecutorService != null ? workflowExecutorService + : original.getWorkflowExecutorService(); } @Override diff --git a/operator-framework-core/src/main/java/io/javaoperatorsdk/operator/api/config/ExecutorServiceManager.java b/operator-framework-core/src/main/java/io/javaoperatorsdk/operator/api/config/ExecutorServiceManager.java index 64cbd9dbff..83081d024e 100644 --- a/operator-framework-core/src/main/java/io/javaoperatorsdk/operator/api/config/ExecutorServiceManager.java +++ b/operator-framework-core/src/main/java/io/javaoperatorsdk/operator/api/config/ExecutorServiceManager.java @@ -76,13 +76,16 @@ private void doStop() { try { log.debug("Closing executor"); executor.shutdown(); - workflowExecutor.shutdown(); - if (!executor.awaitTermination(terminationTimeoutSeconds, TimeUnit.SECONDS)) { - executor.shutdownNow(); // if we timed out, waiting, cancel everything + if (workflowExecutor != null) { + workflowExecutor.shutdown(); + if (!workflowExecutor.awaitTermination(terminationTimeoutSeconds, TimeUnit.SECONDS)) { + workflowExecutor.shutdownNow(); // if we timed out, waiting, cancel everything + } } - if (!workflowExecutor.awaitTermination(terminationTimeoutSeconds, TimeUnit.SECONDS)) { + if (!executor.awaitTermination(terminationTimeoutSeconds, TimeUnit.SECONDS)) { executor.shutdownNow(); // if we timed out, waiting, cancel everything } + } catch (InterruptedException e) { log.debug("Exception closing executor: {}", e.getLocalizedMessage()); } From 9facd8ff058f8e3f90212f72ce612339d09415cd Mon Sep 17 00:00:00 2001 From: Chris Laprun Date: Fri, 26 Aug 2022 21:34:19 +0200 Subject: [PATCH 7/7] refactor: unify executor services management --- .../api/config/ExecutorServiceManager.java | 40 ++++++++----------- 1 file changed, 17 insertions(+), 23 deletions(-) diff --git a/operator-framework-core/src/main/java/io/javaoperatorsdk/operator/api/config/ExecutorServiceManager.java b/operator-framework-core/src/main/java/io/javaoperatorsdk/operator/api/config/ExecutorServiceManager.java index 83081d024e..38ed240c97 100644 --- a/operator-framework-core/src/main/java/io/javaoperatorsdk/operator/api/config/ExecutorServiceManager.java +++ b/operator-framework-core/src/main/java/io/javaoperatorsdk/operator/api/config/ExecutorServiceManager.java @@ -15,32 +15,35 @@ public class ExecutorServiceManager { private static final Logger log = LoggerFactory.getLogger(ExecutorServiceManager.class); private static ExecutorServiceManager instance; - private final ExecutorService executor; - private ExecutorService workflowExecutor; + private final ExecutorService workflowExecutor; private final int terminationTimeoutSeconds; - private ExecutorServiceManager(InstrumentedExecutorService executor, + private ExecutorServiceManager(ExecutorService executor, ExecutorService workflowExecutor, int terminationTimeoutSeconds) { - this.executor = executor; + this.executor = new InstrumentedExecutorService(executor); + this.workflowExecutor = new InstrumentedExecutorService(workflowExecutor); this.terminationTimeoutSeconds = terminationTimeoutSeconds; } public static void init() { if (instance == null) { final var configuration = ConfigurationServiceProvider.instance(); - instance = new ExecutorServiceManager( - new InstrumentedExecutorService(configuration.getExecutorService()), + final var executorService = configuration.getExecutorService(); + final var workflowExecutorService = configuration.getWorkflowExecutorService(); + instance = new ExecutorServiceManager(executorService, workflowExecutorService, configuration.getTerminationTimeoutSeconds()); - log.debug("Initialized ExecutorServiceManager executor: {}, timeout: {}", - configuration.getExecutorService().getClass(), + log.debug( + "Initialized ExecutorServiceManager executor: {}, workflow executor: {}, timeout: {}", + executorService.getClass(), + workflowExecutorService.getClass(), configuration.getTerminationTimeoutSeconds()); } else { log.debug("Already started, reusing already setup instance!"); } } - public static void stop() { + public synchronized static void stop() { if (instance != null) { instance.doStop(); } @@ -49,7 +52,7 @@ public static void stop() { instance = null; } - public static ExecutorServiceManager instance() { + public synchronized static ExecutorServiceManager instance() { if (instance == null) { // provide a default configuration if none has been provided by init init(); @@ -61,14 +64,7 @@ public ExecutorService executorService() { return executor; } - // needs to be synchronized since only called when a workflow is initialized, but that happens in - // controllers which are started in parallel - public synchronized ExecutorService workflowExecutorService() { - if (workflowExecutor == null) { - workflowExecutor = - new InstrumentedExecutorService( - ConfigurationServiceProvider.instance().getWorkflowExecutorService()); - } + public ExecutorService workflowExecutorService() { return workflowExecutor; } @@ -76,11 +72,9 @@ private void doStop() { try { log.debug("Closing executor"); executor.shutdown(); - if (workflowExecutor != null) { - workflowExecutor.shutdown(); - if (!workflowExecutor.awaitTermination(terminationTimeoutSeconds, TimeUnit.SECONDS)) { - workflowExecutor.shutdownNow(); // if we timed out, waiting, cancel everything - } + workflowExecutor.shutdown(); + if (!workflowExecutor.awaitTermination(terminationTimeoutSeconds, TimeUnit.SECONDS)) { + workflowExecutor.shutdownNow(); // if we timed out, waiting, cancel everything } if (!executor.awaitTermination(terminationTimeoutSeconds, TimeUnit.SECONDS)) { executor.shutdownNow(); // if we timed out, waiting, cancel everything