diff --git a/operator-framework-core/src/main/java/io/javaoperatorsdk/operator/api/config/ConfigurationService.java b/operator-framework-core/src/main/java/io/javaoperatorsdk/operator/api/config/ConfigurationService.java index 53f2ae4176..d55984fae1 100644 --- a/operator-framework-core/src/main/java/io/javaoperatorsdk/operator/api/config/ConfigurationService.java +++ b/operator-framework-core/src/main/java/io/javaoperatorsdk/operator/api/config/ConfigurationService.java @@ -3,8 +3,7 @@ import java.time.Duration; import java.util.Optional; import java.util.Set; -import java.util.concurrent.ExecutorService; -import java.util.concurrent.Executors; +import java.util.concurrent.*; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -75,11 +74,12 @@ default boolean checkCRDAndValidateLocalModel() { return false; } - int DEFAULT_RECONCILIATION_THREADS_NUMBER = 10; + int DEFAULT_RECONCILIATION_THREADS_NUMBER = 200; + int MIN_DEFAULT_RECONCILIATION_THREADS_NUMBER = 10; /** - * Retrieves the maximum number of threads the operator can spin out to dispatch reconciliation - * requests to reconcilers + * The maximum number of threads the operator can spin out to dispatch reconciliation requests to + * reconcilers * * @return the maximum number of concurrent reconciliation threads */ @@ -87,12 +87,36 @@ default int concurrentReconciliationThreads() { return DEFAULT_RECONCILIATION_THREADS_NUMBER; } + /** + * The minimum number of threads the operator starts in the thread pool for reconciliations. + * + * @return the minimum number of concurrent reconciliation threads + */ + default int minConcurrentReconciliationThreads() { + return MIN_DEFAULT_RECONCILIATION_THREADS_NUMBER; + } + int DEFAULT_WORKFLOW_EXECUTOR_THREAD_NUMBER = DEFAULT_RECONCILIATION_THREADS_NUMBER; + int MIN_DEFAULT_WORKFLOW_EXECUTOR_THREAD_NUMBER = MIN_DEFAULT_RECONCILIATION_THREADS_NUMBER; + /** + * Retrieves the maximum number of threads the operator can spin out to be used in the workflows. + * + * @return the maximum number of concurrent workflow threads + */ default int concurrentWorkflowExecutorThreads() { return DEFAULT_WORKFLOW_EXECUTOR_THREAD_NUMBER; } + /** + * The minimum number of threads the operator starts in the thread pool for workflows. + * + * @return the minimum number of concurrent workflow threads + */ + default int minConcurrentWorkflowExecutorThreads() { + return MIN_DEFAULT_WORKFLOW_EXECUTOR_THREAD_NUMBER; + } + /** * Used to clone custom resources. It is strongly suggested that implementors override this method * since the default implementation creates a new {@link Cloner} instance each time this method is @@ -136,11 +160,15 @@ default Metrics getMetrics() { } default ExecutorService getExecutorService() { - return Executors.newFixedThreadPool(concurrentReconciliationThreads()); + return new ThreadPoolExecutor(minConcurrentReconciliationThreads(), + concurrentReconciliationThreads(), + 1, TimeUnit.MINUTES, new LinkedBlockingDeque<>()); } default ExecutorService getWorkflowExecutorService() { - return Executors.newFixedThreadPool(concurrentWorkflowExecutorThreads()); + return new ThreadPoolExecutor(minConcurrentWorkflowExecutorThreads(), + concurrentWorkflowExecutorThreads(), + 1, TimeUnit.MINUTES, new LinkedBlockingDeque<>()); } default boolean closeClientOnStop() { diff --git a/operator-framework-core/src/main/java/io/javaoperatorsdk/operator/api/config/ConfigurationServiceOverrider.java b/operator-framework-core/src/main/java/io/javaoperatorsdk/operator/api/config/ConfigurationServiceOverrider.java index 22a4b6e6bd..3cc8dd6078 100644 --- a/operator-framework-core/src/main/java/io/javaoperatorsdk/operator/api/config/ConfigurationServiceOverrider.java +++ b/operator-framework-core/src/main/java/io/javaoperatorsdk/operator/api/config/ConfigurationServiceOverrider.java @@ -19,7 +19,9 @@ public class ConfigurationServiceOverrider { private Config clientConfig; private Boolean checkCR; private Integer concurrentReconciliationThreads; + private Integer minConcurrentReconciliationThreads; private Integer concurrentWorkflowExecutorThreads; + private Integer minConcurrentWorkflowExecutorThreads; private Cloner cloner; private Integer timeoutSeconds; private Boolean closeClientOnStop; @@ -56,6 +58,16 @@ public ConfigurationServiceOverrider withConcurrentWorkflowExecutorThreads(int t return this; } + public ConfigurationServiceOverrider withMinConcurrentReconciliationThreads(int threadNumber) { + this.minConcurrentReconciliationThreads = threadNumber; + return this; + } + + public ConfigurationServiceOverrider withMinConcurrentWorkflowExecutorThreads(int threadNumber) { + this.minConcurrentWorkflowExecutorThreads = threadNumber; + return this; + } + public ConfigurationServiceOverrider withResourceCloner(Cloner cloner) { this.cloner = cloner; return this; @@ -149,6 +161,18 @@ public int concurrentWorkflowExecutorThreads() { : original.concurrentWorkflowExecutorThreads(); } + @Override + public int minConcurrentReconciliationThreads() { + return minConcurrentReconciliationThreads != null ? minConcurrentReconciliationThreads + : original.minConcurrentReconciliationThreads(); + } + + @Override + public int minConcurrentWorkflowExecutorThreads() { + return minConcurrentWorkflowExecutorThreads != null ? minConcurrentWorkflowExecutorThreads + : original.minConcurrentWorkflowExecutorThreads(); + } + @Override public int getTerminationTimeoutSeconds() { return timeoutSeconds != null ? timeoutSeconds : original.getTerminationTimeoutSeconds();