diff --git a/operator-framework-core/src/main/java/io/javaoperatorsdk/operator/api/config/ConfigurationService.java b/operator-framework-core/src/main/java/io/javaoperatorsdk/operator/api/config/ConfigurationService.java
index 53bfc75df9..f61ceef812 100644
--- a/operator-framework-core/src/main/java/io/javaoperatorsdk/operator/api/config/ConfigurationService.java
+++ b/operator-framework-core/src/main/java/io/javaoperatorsdk/operator/api/config/ConfigurationService.java
@@ -127,7 +127,7 @@ default boolean checkCRDAndValidateLocalModel() {
return false;
}
- int DEFAULT_RECONCILIATION_THREADS_NUMBER = 200;
+ int DEFAULT_RECONCILIATION_THREADS_NUMBER = Integer.MAX_VALUE;
int MIN_DEFAULT_RECONCILIATION_THREADS_NUMBER = 10;
/**
@@ -190,6 +190,21 @@ default Metrics getMetrics() {
return Metrics.NOOP;
}
+ /**
+ * Use to provide custom executor service.
+ * By default, a {@link java.util.concurrent.ThreadPoolExecutor} is used, that honors the values
+ * of concurrentReconciliationThreads and minConcurrentReconciliationThreads. When a controller
+ * starts, all the resources are reconciled, therefore there is a natural and expected burst on
+ * startup. According to this there are multiple options, using the ThreadPoolExecutor. Either:
+ *
+ * - Use a very high upper bound thread limit with
+ * {@link java.util.concurrent.SynchronousQueue}. This is the default approach.
+ * - Use fixed number of threads with infinite queue, lik
+ * {@link java.util.concurrent.LinkedBlockingDeque}.
+ * - In addition to that, could be further fine tuned using the
+ * {@link java.util.concurrent.ArrayBlockingQueue}.
+ *
+ */
default ExecutorService getExecutorService() {
return newThreadPoolExecutor(minConcurrentReconciliationThreads(),
concurrentReconciliationThreads());
diff --git a/operator-framework-core/src/main/java/io/javaoperatorsdk/operator/api/config/ExecutorServiceManager.java b/operator-framework-core/src/main/java/io/javaoperatorsdk/operator/api/config/ExecutorServiceManager.java
index 62f345426c..7abda2ef17 100644
--- a/operator-framework-core/src/main/java/io/javaoperatorsdk/operator/api/config/ExecutorServiceManager.java
+++ b/operator-framework-core/src/main/java/io/javaoperatorsdk/operator/api/config/ExecutorServiceManager.java
@@ -3,15 +3,7 @@
import java.time.Duration;
import java.util.Collection;
import java.util.List;
-import java.util.concurrent.Callable;
-import java.util.concurrent.ExecutionException;
-import java.util.concurrent.ExecutorService;
-import java.util.concurrent.Executors;
-import java.util.concurrent.Future;
-import java.util.concurrent.LinkedBlockingDeque;
-import java.util.concurrent.ThreadPoolExecutor;
-import java.util.concurrent.TimeUnit;
-import java.util.concurrent.TimeoutException;
+import java.util.concurrent.*;
import java.util.function.Function;
import java.util.stream.Collectors;
import java.util.stream.Stream;
@@ -37,9 +29,8 @@ public class ExecutorServiceManager {
public static ExecutorService newThreadPoolExecutor(int minThreads, int maxThreads) {
minThreads = Utils.ensureValid(minThreads, "minimum number of threads", MIN_THREAD_NUMBER);
maxThreads = Utils.ensureValid(maxThreads, "maximum number of threads", minThreads + 1);
-
return new ThreadPoolExecutor(minThreads, maxThreads, 1, TimeUnit.MINUTES,
- new LinkedBlockingDeque<>());
+ new SynchronousQueue<>());
}
/**