5
5
import java .util .Map ;
6
6
import java .util .Optional ;
7
7
import java .util .concurrent .ExecutorService ;
8
- import java .util .concurrent .ScheduledThreadPoolExecutor ;
9
8
10
9
import org .slf4j .Logger ;
11
10
import org .slf4j .LoggerFactory ;
12
11
13
12
import io .fabric8 .kubernetes .api .model .HasMetadata ;
14
13
import io .javaoperatorsdk .operator .OperatorException ;
15
- import io .javaoperatorsdk .operator .api .config .ConfigurationService ;
16
14
import io .javaoperatorsdk .operator .api .config .ConfigurationServiceProvider ;
17
15
import io .javaoperatorsdk .operator .api .config .ControllerConfiguration ;
18
16
import io .javaoperatorsdk .operator .api .config .ExecutorServiceManager ;
@@ -40,20 +38,19 @@ public class EventProcessor<P extends HasMetadata> implements EventHandler, Life
40
38
private final ControllerConfiguration <?> controllerConfiguration ;
41
39
private final ReconciliationDispatcher <P > reconciliationDispatcher ;
42
40
private final Retry retry ;
43
- private final ExecutorService executor ;
44
41
private final Metrics metrics ;
45
42
private final Cache <P > cache ;
46
43
private final EventSourceManager <P > eventSourceManager ;
47
44
private final RateLimiter <? extends RateLimitState > rateLimiter ;
48
45
private final ResourceStateManager resourceStateManager = new ResourceStateManager ();
49
46
private final Map <String , Object > metricsMetadata ;
47
+ private ExecutorService executor ;
50
48
51
49
52
50
public EventProcessor (EventSourceManager <P > eventSourceManager ) {
53
51
this (
54
52
eventSourceManager .getController ().getConfiguration (),
55
53
eventSourceManager .getControllerResourceEventSource (),
56
- ExecutorServiceManager .instance ().executorService (),
57
54
new ReconciliationDispatcher <>(eventSourceManager .getController ()),
58
55
ConfigurationServiceProvider .instance ().getMetrics (),
59
56
eventSourceManager );
@@ -68,7 +65,6 @@ public EventProcessor(EventSourceManager<P> eventSourceManager) {
68
65
this (
69
66
controllerConfiguration ,
70
67
eventSourceManager .getControllerResourceEventSource (),
71
- null ,
72
68
reconciliationDispatcher ,
73
69
metrics ,
74
70
eventSourceManager );
@@ -78,17 +74,11 @@ public EventProcessor(EventSourceManager<P> eventSourceManager) {
78
74
private EventProcessor (
79
75
ControllerConfiguration controllerConfiguration ,
80
76
Cache <P > cache ,
81
- ExecutorService executor ,
82
77
ReconciliationDispatcher <P > reconciliationDispatcher ,
83
78
Metrics metrics ,
84
79
EventSourceManager <P > eventSourceManager ) {
85
80
this .controllerConfiguration = controllerConfiguration ;
86
81
this .running = false ;
87
- this .executor =
88
- executor == null
89
- ? new ScheduledThreadPoolExecutor (
90
- ConfigurationService .DEFAULT_RECONCILIATION_THREADS_NUMBER )
91
- : executor ;
92
82
this .reconciliationDispatcher = reconciliationDispatcher ;
93
83
this .retry = controllerConfiguration .getRetry ();
94
84
this .cache = cache ;
@@ -376,6 +366,7 @@ public synchronized void stop() {
376
366
377
367
@ Override
378
368
public void start () throws OperatorException {
369
+ executor = ExecutorServiceManager .instance ().executorService ();
379
370
this .running = true ;
380
371
handleAlreadyMarkedEvents ();
381
372
}
0 commit comments