14
14
import io .javaoperatorsdk .operator .OperatorException ;
15
15
import io .javaoperatorsdk .operator .api .config .ConfigurationService ;
16
16
import io .javaoperatorsdk .operator .api .config .ConfigurationServiceProvider ;
17
+ import io .javaoperatorsdk .operator .api .config .ControllerConfiguration ;
17
18
import io .javaoperatorsdk .operator .api .config .ExecutorServiceManager ;
18
19
import io .javaoperatorsdk .operator .api .monitoring .Metrics ;
19
20
import io .javaoperatorsdk .operator .api .reconciler .Constants ;
@@ -37,6 +38,7 @@ public class EventProcessor<R extends HasMetadata> implements EventHandler, Life
37
38
private static final long MINIMAL_RATE_LIMIT_RESCHEDULE_DURATION = 50 ;
38
39
39
40
private volatile boolean running ;
41
+ private final ControllerConfiguration <?> controllerConfiguration ;
40
42
private final ReconciliationDispatcher <R > reconciliationDispatcher ;
41
43
private final Retry retry ;
42
44
private final ExecutorService executor ;
@@ -48,60 +50,55 @@ public class EventProcessor<R extends HasMetadata> implements EventHandler, Life
48
50
private final ResourceStateManager resourceStateManager = new ResourceStateManager ();
49
51
private final Map <String , Object > metricsMetadata ;
50
52
51
- public EventProcessor (EventSourceManager <R > eventSourceManager ) {
53
+
54
+ public EventProcessor (ControllerConfiguration <?> controllerConfiguration ,
55
+ EventSourceManager <R > eventSourceManager ) {
52
56
this (
57
+ controllerConfiguration ,
53
58
eventSourceManager .getControllerResourceEventSource (),
54
59
ExecutorServiceManager .instance ().executorService (),
55
- eventSourceManager .getController ().getConfiguration ().getName (),
56
60
new ReconciliationDispatcher <>(eventSourceManager .getController ()),
57
- eventSourceManager .getController ().getConfiguration ().getRetry (),
58
61
ConfigurationServiceProvider .instance ().getMetrics (),
59
- eventSourceManager .getController ().getConfiguration ().getRateLimiter (),
60
62
eventSourceManager );
61
63
}
62
64
63
65
@ SuppressWarnings ("rawtypes" )
64
66
EventProcessor (
67
+ ControllerConfiguration controllerConfiguration ,
65
68
ReconciliationDispatcher <R > reconciliationDispatcher ,
66
69
EventSourceManager <R > eventSourceManager ,
67
- String relatedControllerName ,
68
- Retry retry ,
69
- RateLimiter rateLimiter ,
70
70
Metrics metrics ) {
71
71
this (
72
+ controllerConfiguration ,
72
73
eventSourceManager .getControllerResourceEventSource (),
73
74
null ,
74
- relatedControllerName ,
75
75
reconciliationDispatcher ,
76
- retry ,
77
76
metrics ,
78
- rateLimiter ,
79
77
eventSourceManager );
80
78
}
81
79
82
80
@ SuppressWarnings ({"rawtypes" , "unchecked" })
83
81
private EventProcessor (
82
+ ControllerConfiguration controllerConfiguration ,
84
83
Cache <R > cache ,
85
84
ExecutorService executor ,
86
- String relatedControllerName ,
87
85
ReconciliationDispatcher <R > reconciliationDispatcher ,
88
- Retry retry ,
89
86
Metrics metrics ,
90
- RateLimiter rateLimiter ,
91
87
EventSourceManager <R > eventSourceManager ) {
88
+ this .controllerConfiguration = controllerConfiguration ;
92
89
this .running = false ;
93
90
this .executor =
94
91
executor == null
95
92
? new ScheduledThreadPoolExecutor (
96
93
ConfigurationService .DEFAULT_RECONCILIATION_THREADS_NUMBER )
97
94
: executor ;
98
- this .controllerName = relatedControllerName ;
95
+ this .controllerName = controllerConfiguration . getName () ;
99
96
this .reconciliationDispatcher = reconciliationDispatcher ;
100
- this .retry = retry ;
97
+ this .retry = controllerConfiguration . getRetry () ;
101
98
this .cache = cache ;
102
99
this .metrics = metrics != null ? metrics : Metrics .NOOP ;
103
100
this .eventSourceManager = eventSourceManager ;
104
- this .rateLimiter = rateLimiter ;
101
+ this .rateLimiter = controllerConfiguration . getRateLimiter () ;
105
102
106
103
metricsMetadata = Optional .ofNullable (eventSourceManager .getController ())
107
104
.map (Controller ::getAssociatedGroupVersionKind )
@@ -272,18 +269,31 @@ synchronized void eventProcessingFinished(
272
269
reScheduleExecutionIfInstructed (postExecutionControl , executionScope .getResource ());
273
270
}
274
271
}
275
-
276
272
}
277
273
278
274
private void reScheduleExecutionIfInstructed (
279
275
PostExecutionControl <R > postExecutionControl , R customResource ) {
276
+
280
277
postExecutionControl
281
278
.getReScheduleDelay ()
282
- .ifPresent (delay -> {
279
+ .ifPresentOrElse (delay -> {
283
280
var resourceID = ResourceID .fromResource (customResource );
284
281
log .debug ("ReScheduling event for resource: {} with delay: {}" ,
285
282
resourceID , delay );
286
283
retryEventSource ().scheduleOnce (resourceID , delay );
284
+ }, () -> scheduleExecutionForMaxReconciliationInterval (customResource ));
285
+ }
286
+
287
+ private void scheduleExecutionForMaxReconciliationInterval (R customResource ) {
288
+ this .controllerConfiguration
289
+ .maxReconciliationInterval ()
290
+ .ifPresent (m -> {
291
+ var resourceID = ResourceID .fromResource (customResource );
292
+ var delay = m .toMillis ();
293
+ log .debug ("ReScheduling event for resource because for max reconciliation interval: " +
294
+ "{} with delay: {}" ,
295
+ resourceID , delay );
296
+ retryEventSource ().scheduleOnce (resourceID , delay );
287
297
});
288
298
}
289
299
@@ -319,7 +329,10 @@ private void handleRetryOnException(
319
329
metrics .failedReconciliation (resourceID , exception , metricsMetadata );
320
330
retryEventSource ().scheduleOnce (resourceID , delay );
321
331
},
322
- () -> log .error ("Exhausted retries for {}" , executionScope ));
332
+ () -> {
333
+ log .error ("Exhausted retries for {}" , executionScope );
334
+ scheduleExecutionForMaxReconciliationInterval (executionScope .getResource ());
335
+ });
323
336
}
324
337
325
338
private void cleanupOnSuccessfulExecution (ExecutionScope <R > executionScope ) {
0 commit comments