Skip to content

Commit 213cf0f

Browse files
authored
feat: using dynamic ThreadPoolExecutor for reconcile and workflow executor services (#1804)
1 parent 51a1c37 commit 213cf0f

File tree

2 files changed

+59
-7
lines changed

2 files changed

+59
-7
lines changed

operator-framework-core/src/main/java/io/javaoperatorsdk/operator/api/config/ConfigurationService.java

Lines changed: 35 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -3,8 +3,7 @@
33
import java.time.Duration;
44
import java.util.Optional;
55
import java.util.Set;
6-
import java.util.concurrent.ExecutorService;
7-
import java.util.concurrent.Executors;
6+
import java.util.concurrent.*;
87

98
import org.slf4j.Logger;
109
import org.slf4j.LoggerFactory;
@@ -75,24 +74,49 @@ default boolean checkCRDAndValidateLocalModel() {
7574
return false;
7675
}
7776

78-
int DEFAULT_RECONCILIATION_THREADS_NUMBER = 10;
77+
int DEFAULT_RECONCILIATION_THREADS_NUMBER = 200;
78+
int MIN_DEFAULT_RECONCILIATION_THREADS_NUMBER = 10;
7979

8080
/**
81-
* Retrieves the maximum number of threads the operator can spin out to dispatch reconciliation
82-
* requests to reconcilers
81+
* The maximum number of threads the operator can spin out to dispatch reconciliation requests to
82+
* reconcilers
8383
*
8484
* @return the maximum number of concurrent reconciliation threads
8585
*/
8686
default int concurrentReconciliationThreads() {
8787
return DEFAULT_RECONCILIATION_THREADS_NUMBER;
8888
}
8989

90+
/**
91+
* The minimum number of threads the operator starts in the thread pool for reconciliations.
92+
*
93+
* @return the minimum number of concurrent reconciliation threads
94+
*/
95+
default int minConcurrentReconciliationThreads() {
96+
return MIN_DEFAULT_RECONCILIATION_THREADS_NUMBER;
97+
}
98+
9099
int DEFAULT_WORKFLOW_EXECUTOR_THREAD_NUMBER = DEFAULT_RECONCILIATION_THREADS_NUMBER;
100+
int MIN_DEFAULT_WORKFLOW_EXECUTOR_THREAD_NUMBER = MIN_DEFAULT_RECONCILIATION_THREADS_NUMBER;
91101

102+
/**
103+
* Retrieves the maximum number of threads the operator can spin out to be used in the workflows.
104+
*
105+
* @return the maximum number of concurrent workflow threads
106+
*/
92107
default int concurrentWorkflowExecutorThreads() {
93108
return DEFAULT_WORKFLOW_EXECUTOR_THREAD_NUMBER;
94109
}
95110

111+
/**
112+
* The minimum number of threads the operator starts in the thread pool for workflows.
113+
*
114+
* @return the minimum number of concurrent workflow threads
115+
*/
116+
default int minConcurrentWorkflowExecutorThreads() {
117+
return MIN_DEFAULT_WORKFLOW_EXECUTOR_THREAD_NUMBER;
118+
}
119+
96120
/**
97121
* Used to clone custom resources. It is strongly suggested that implementors override this method
98122
* since the default implementation creates a new {@link Cloner} instance each time this method is
@@ -136,11 +160,15 @@ default Metrics getMetrics() {
136160
}
137161

138162
default ExecutorService getExecutorService() {
139-
return Executors.newFixedThreadPool(concurrentReconciliationThreads());
163+
return new ThreadPoolExecutor(minConcurrentReconciliationThreads(),
164+
concurrentReconciliationThreads(),
165+
1, TimeUnit.MINUTES, new LinkedBlockingDeque<>());
140166
}
141167

142168
default ExecutorService getWorkflowExecutorService() {
143-
return Executors.newFixedThreadPool(concurrentWorkflowExecutorThreads());
169+
return new ThreadPoolExecutor(minConcurrentWorkflowExecutorThreads(),
170+
concurrentWorkflowExecutorThreads(),
171+
1, TimeUnit.MINUTES, new LinkedBlockingDeque<>());
144172
}
145173

146174
default boolean closeClientOnStop() {

operator-framework-core/src/main/java/io/javaoperatorsdk/operator/api/config/ConfigurationServiceOverrider.java

Lines changed: 24 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -19,7 +19,9 @@ public class ConfigurationServiceOverrider {
1919
private Config clientConfig;
2020
private Boolean checkCR;
2121
private Integer concurrentReconciliationThreads;
22+
private Integer minConcurrentReconciliationThreads;
2223
private Integer concurrentWorkflowExecutorThreads;
24+
private Integer minConcurrentWorkflowExecutorThreads;
2325
private Cloner cloner;
2426
private Integer timeoutSeconds;
2527
private Boolean closeClientOnStop;
@@ -56,6 +58,16 @@ public ConfigurationServiceOverrider withConcurrentWorkflowExecutorThreads(int t
5658
return this;
5759
}
5860

61+
public ConfigurationServiceOverrider withMinConcurrentReconciliationThreads(int threadNumber) {
62+
this.minConcurrentReconciliationThreads = threadNumber;
63+
return this;
64+
}
65+
66+
public ConfigurationServiceOverrider withMinConcurrentWorkflowExecutorThreads(int threadNumber) {
67+
this.minConcurrentWorkflowExecutorThreads = threadNumber;
68+
return this;
69+
}
70+
5971
public ConfigurationServiceOverrider withResourceCloner(Cloner cloner) {
6072
this.cloner = cloner;
6173
return this;
@@ -149,6 +161,18 @@ public int concurrentWorkflowExecutorThreads() {
149161
: original.concurrentWorkflowExecutorThreads();
150162
}
151163

164+
@Override
165+
public int minConcurrentReconciliationThreads() {
166+
return minConcurrentReconciliationThreads != null ? minConcurrentReconciliationThreads
167+
: original.minConcurrentReconciliationThreads();
168+
}
169+
170+
@Override
171+
public int minConcurrentWorkflowExecutorThreads() {
172+
return minConcurrentWorkflowExecutorThreads != null ? minConcurrentWorkflowExecutorThreads
173+
: original.minConcurrentWorkflowExecutorThreads();
174+
}
175+
152176
@Override
153177
public int getTerminationTimeoutSeconds() {
154178
return timeoutSeconds != null ? timeoutSeconds : original.getTerminationTimeoutSeconds();

0 commit comments

Comments
 (0)