3
3
import java .time .Duration ;
4
4
import java .util .Optional ;
5
5
import java .util .Set ;
6
- import java .util .concurrent .ExecutorService ;
7
- import java .util .concurrent .Executors ;
6
+ import java .util .concurrent .*;
8
7
9
8
import org .slf4j .Logger ;
10
9
import org .slf4j .LoggerFactory ;
@@ -75,7 +74,8 @@ default boolean checkCRDAndValidateLocalModel() {
75
74
return false ;
76
75
}
77
76
78
- int DEFAULT_RECONCILIATION_THREADS_NUMBER = 10 ;
77
+ int DEFAULT_RECONCILIATION_THREADS_NUMBER = 200 ;
78
+ int MIN_DEFAULT_RECONCILIATION_THREADS_NUMBER = 10 ;
79
79
80
80
/**
81
81
* Retrieves the maximum number of threads the operator can spin out to dispatch reconciliation
@@ -87,12 +87,26 @@ default int concurrentReconciliationThreads() {
87
87
return DEFAULT_RECONCILIATION_THREADS_NUMBER ;
88
88
}
89
89
90
+ default int minConcurrentReconciliationThreads () {
91
+ return MIN_DEFAULT_RECONCILIATION_THREADS_NUMBER ;
92
+ }
93
+
90
94
int DEFAULT_WORKFLOW_EXECUTOR_THREAD_NUMBER = DEFAULT_RECONCILIATION_THREADS_NUMBER ;
95
+ int MIN_DEFAULT_WORKFLOW_EXECUTOR_THREAD_NUMBER = MIN_DEFAULT_RECONCILIATION_THREADS_NUMBER ;
91
96
97
+ /**
98
+ * Retrieves the maximum number of threads the operator can spin out to be used in the workflows.
99
+ *
100
+ * @return the maximum number of concurrent workflow threads
101
+ */
92
102
default int concurrentWorkflowExecutorThreads () {
93
103
return DEFAULT_WORKFLOW_EXECUTOR_THREAD_NUMBER ;
94
104
}
95
105
106
+ default int minConcurrentWorkflowExecutorThreads () {
107
+ return MIN_DEFAULT_WORKFLOW_EXECUTOR_THREAD_NUMBER ;
108
+ }
109
+
96
110
/**
97
111
* Used to clone custom resources. It is strongly suggested that implementors override this method
98
112
* since the default implementation creates a new {@link Cloner} instance each time this method is
@@ -136,11 +150,15 @@ default Metrics getMetrics() {
136
150
}
137
151
138
152
default ExecutorService getExecutorService () {
139
- return Executors .newFixedThreadPool (concurrentReconciliationThreads ());
153
+ return new ThreadPoolExecutor (minConcurrentReconciliationThreads (),
154
+ concurrentReconciliationThreads (),
155
+ 1 , TimeUnit .MINUTES , new LinkedBlockingDeque <>());
140
156
}
141
157
142
158
default ExecutorService getWorkflowExecutorService () {
143
- return Executors .newFixedThreadPool (concurrentWorkflowExecutorThreads ());
159
+ return new ThreadPoolExecutor (minConcurrentWorkflowExecutorThreads (),
160
+ concurrentWorkflowExecutorThreads (),
161
+ 1 , TimeUnit .MINUTES , new LinkedBlockingDeque <>());
144
162
}
145
163
146
164
default boolean closeClientOnStop () {
0 commit comments