diff --git a/operator-framework-core/src/main/java/io/javaoperatorsdk/operator/api/config/ConfigurationService.java b/operator-framework-core/src/main/java/io/javaoperatorsdk/operator/api/config/ConfigurationService.java index 53bfc75df9..f61ceef812 100644 --- a/operator-framework-core/src/main/java/io/javaoperatorsdk/operator/api/config/ConfigurationService.java +++ b/operator-framework-core/src/main/java/io/javaoperatorsdk/operator/api/config/ConfigurationService.java @@ -127,7 +127,7 @@ default boolean checkCRDAndValidateLocalModel() { return false; } - int DEFAULT_RECONCILIATION_THREADS_NUMBER = 200; + int DEFAULT_RECONCILIATION_THREADS_NUMBER = Integer.MAX_VALUE; int MIN_DEFAULT_RECONCILIATION_THREADS_NUMBER = 10; /** @@ -190,6 +190,21 @@ default Metrics getMetrics() { return Metrics.NOOP; } + /** + * Use to provide custom executor service.
+ * By default, a {@link java.util.concurrent.ThreadPoolExecutor} is used, that honors the values + * of concurrentReconciliationThreads and minConcurrentReconciliationThreads. When a controller + * starts, all the resources are reconciled, therefore there is a natural and expected burst on + * startup. According to this there are multiple options, using the ThreadPoolExecutor. Either: + * + */ default ExecutorService getExecutorService() { return newThreadPoolExecutor(minConcurrentReconciliationThreads(), concurrentReconciliationThreads()); diff --git a/operator-framework-core/src/main/java/io/javaoperatorsdk/operator/api/config/ExecutorServiceManager.java b/operator-framework-core/src/main/java/io/javaoperatorsdk/operator/api/config/ExecutorServiceManager.java index 62f345426c..7abda2ef17 100644 --- a/operator-framework-core/src/main/java/io/javaoperatorsdk/operator/api/config/ExecutorServiceManager.java +++ b/operator-framework-core/src/main/java/io/javaoperatorsdk/operator/api/config/ExecutorServiceManager.java @@ -3,15 +3,7 @@ import java.time.Duration; import java.util.Collection; import java.util.List; -import java.util.concurrent.Callable; -import java.util.concurrent.ExecutionException; -import java.util.concurrent.ExecutorService; -import java.util.concurrent.Executors; -import java.util.concurrent.Future; -import java.util.concurrent.LinkedBlockingDeque; -import java.util.concurrent.ThreadPoolExecutor; -import java.util.concurrent.TimeUnit; -import java.util.concurrent.TimeoutException; +import java.util.concurrent.*; import java.util.function.Function; import java.util.stream.Collectors; import java.util.stream.Stream; @@ -37,9 +29,8 @@ public class ExecutorServiceManager { public static ExecutorService newThreadPoolExecutor(int minThreads, int maxThreads) { minThreads = Utils.ensureValid(minThreads, "minimum number of threads", MIN_THREAD_NUMBER); maxThreads = Utils.ensureValid(maxThreads, "maximum number of threads", minThreads + 1); - return new ThreadPoolExecutor(minThreads, maxThreads, 1, TimeUnit.MINUTES, - new LinkedBlockingDeque<>()); + new SynchronousQueue<>()); } /**