14
14
import io .javaoperatorsdk .operator .OperatorException ;
15
15
import io .javaoperatorsdk .operator .api .config .ConfigurationService ;
16
16
import io .javaoperatorsdk .operator .api .config .ConfigurationServiceProvider ;
17
+ import io .javaoperatorsdk .operator .api .config .ControllerConfiguration ;
17
18
import io .javaoperatorsdk .operator .api .config .ExecutorServiceManager ;
18
19
import io .javaoperatorsdk .operator .api .monitoring .Metrics ;
19
20
import io .javaoperatorsdk .operator .api .reconciler .Constants ;
@@ -37,71 +38,64 @@ public class EventProcessor<R extends HasMetadata> implements EventHandler, Life
37
38
private static final long MINIMAL_RATE_LIMIT_RESCHEDULE_DURATION = 50 ;
38
39
39
40
private volatile boolean running ;
41
+ private final ControllerConfiguration <?> controllerConfiguration ;
40
42
private final ReconciliationDispatcher <R > reconciliationDispatcher ;
41
43
private final Retry retry ;
42
44
private final ExecutorService executor ;
43
- private final String controllerName ;
44
45
private final Metrics metrics ;
45
46
private final Cache <R > cache ;
46
47
private final EventSourceManager <R > eventSourceManager ;
47
48
private final RateLimiter <? extends RateLimitState > rateLimiter ;
48
49
private final ResourceStateManager resourceStateManager = new ResourceStateManager ();
49
50
private final Map <String , Object > metricsMetadata ;
50
51
52
+
51
53
public EventProcessor (EventSourceManager <R > eventSourceManager ) {
52
54
this (
55
+ eventSourceManager .getController ().getConfiguration (),
53
56
eventSourceManager .getControllerResourceEventSource (),
54
57
ExecutorServiceManager .instance ().executorService (),
55
- eventSourceManager .getController ().getConfiguration ().getName (),
56
58
new ReconciliationDispatcher <>(eventSourceManager .getController ()),
57
- eventSourceManager .getController ().getConfiguration ().getRetry (),
58
59
ConfigurationServiceProvider .instance ().getMetrics (),
59
- eventSourceManager .getController ().getConfiguration ().getRateLimiter (),
60
60
eventSourceManager );
61
61
}
62
62
63
63
@ SuppressWarnings ("rawtypes" )
64
64
EventProcessor (
65
+ ControllerConfiguration controllerConfiguration ,
65
66
ReconciliationDispatcher <R > reconciliationDispatcher ,
66
67
EventSourceManager <R > eventSourceManager ,
67
- String relatedControllerName ,
68
- Retry retry ,
69
- RateLimiter rateLimiter ,
70
68
Metrics metrics ) {
71
69
this (
70
+ controllerConfiguration ,
72
71
eventSourceManager .getControllerResourceEventSource (),
73
72
null ,
74
- relatedControllerName ,
75
73
reconciliationDispatcher ,
76
- retry ,
77
74
metrics ,
78
- rateLimiter ,
79
75
eventSourceManager );
80
76
}
81
77
82
78
@ SuppressWarnings ({"rawtypes" , "unchecked" })
83
79
private EventProcessor (
80
+ ControllerConfiguration controllerConfiguration ,
84
81
Cache <R > cache ,
85
82
ExecutorService executor ,
86
- String relatedControllerName ,
87
83
ReconciliationDispatcher <R > reconciliationDispatcher ,
88
- Retry retry ,
89
84
Metrics metrics ,
90
- RateLimiter rateLimiter ,
91
85
EventSourceManager <R > eventSourceManager ) {
86
+ this .controllerConfiguration = controllerConfiguration ;
92
87
this .running = false ;
93
88
this .executor =
94
89
executor == null
95
90
? new ScheduledThreadPoolExecutor (
96
91
ConfigurationService .DEFAULT_RECONCILIATION_THREADS_NUMBER )
97
92
: executor ;
98
- this .controllerName = relatedControllerName ;
99
93
this .reconciliationDispatcher = reconciliationDispatcher ;
100
- this .retry = retry ;
94
+ this .retry = controllerConfiguration . getRetry () ;
101
95
this .cache = cache ;
102
96
this .metrics = metrics != null ? metrics : Metrics .NOOP ;
103
97
this .eventSourceManager = eventSourceManager ;
104
- this .rateLimiter = rateLimiter ;
98
+ this .rateLimiter = controllerConfiguration . getRateLimiter () ;
105
99
106
100
metricsMetadata = Optional .ofNullable (eventSourceManager .getController ())
107
101
.map (Controller ::getAssociatedGroupVersionKind )
@@ -272,18 +266,31 @@ synchronized void eventProcessingFinished(
272
266
reScheduleExecutionIfInstructed (postExecutionControl , executionScope .getResource ());
273
267
}
274
268
}
275
-
276
269
}
277
270
278
271
private void reScheduleExecutionIfInstructed (
279
272
PostExecutionControl <R > postExecutionControl , R customResource ) {
273
+
280
274
postExecutionControl
281
275
.getReScheduleDelay ()
282
- .ifPresent (delay -> {
276
+ .ifPresentOrElse (delay -> {
283
277
var resourceID = ResourceID .fromResource (customResource );
284
278
log .debug ("ReScheduling event for resource: {} with delay: {}" ,
285
279
resourceID , delay );
286
280
retryEventSource ().scheduleOnce (resourceID , delay );
281
+ }, () -> scheduleExecutionForMaxReconciliationInterval (customResource ));
282
+ }
283
+
284
+ private void scheduleExecutionForMaxReconciliationInterval (R customResource ) {
285
+ this .controllerConfiguration
286
+ .maxReconciliationInterval ()
287
+ .ifPresent (m -> {
288
+ var resourceID = ResourceID .fromResource (customResource );
289
+ var delay = m .toMillis ();
290
+ log .debug ("ReScheduling event for resource because for max reconciliation interval: " +
291
+ "{} with delay: {}" ,
292
+ resourceID , delay );
293
+ retryEventSource ().scheduleOnce (resourceID , delay );
287
294
});
288
295
}
289
296
@@ -319,7 +326,10 @@ private void handleRetryOnException(
319
326
metrics .failedReconciliation (resourceID , exception , metricsMetadata );
320
327
retryEventSource ().scheduleOnce (resourceID , delay );
321
328
},
322
- () -> log .error ("Exhausted retries for {}" , executionScope ));
329
+ () -> {
330
+ log .error ("Exhausted retries for {}" , executionScope );
331
+ scheduleExecutionForMaxReconciliationInterval (executionScope .getResource ());
332
+ });
323
333
}
324
334
325
335
private void cleanupOnSuccessfulExecution (ExecutionScope <R > executionScope ) {
@@ -390,7 +400,7 @@ public void run() {
390
400
final var name = thread .getName ();
391
401
try {
392
402
MDCUtils .addResourceInfo (executionScope .getResource ());
393
- thread .setName ("ReconcilerExecutor-" + controllerName + "-" + thread .getId ());
403
+ thread .setName ("ReconcilerExecutor-" + controllerName () + "-" + thread .getId ());
394
404
PostExecutionControl <R > postExecutionControl =
395
405
reconciliationDispatcher .handleExecution (executionScope );
396
406
eventProcessingFinished (executionScope , postExecutionControl );
@@ -403,10 +413,14 @@ public void run() {
403
413
404
414
@ Override
405
415
public String toString () {
406
- return controllerName + " -> " + executionScope ;
416
+ return controllerName () + " -> " + executionScope ;
407
417
}
408
418
}
409
419
420
+ private String controllerName () {
421
+ return controllerConfiguration .getName ();
422
+ }
423
+
410
424
public synchronized boolean isUnderProcessing (ResourceID resourceID ) {
411
425
return isControllerUnderExecution (resourceStateManager .getOrCreate (resourceID ));
412
426
}
0 commit comments