32
32
33
33
import static io .javaoperatorsdk .operator .processing .KubernetesResourceUtils .getName ;
34
34
35
- public class EventProcessor <R extends HasMetadata > implements EventHandler , LifecycleAware {
35
+ public class EventProcessor <P extends HasMetadata > implements EventHandler , LifecycleAware {
36
36
37
37
private static final Logger log = LoggerFactory .getLogger (EventProcessor .class );
38
38
private static final long MINIMAL_RATE_LIMIT_RESCHEDULE_DURATION = 50 ;
39
39
40
40
private volatile boolean running ;
41
41
private final ControllerConfiguration <?> controllerConfiguration ;
42
- private final ReconciliationDispatcher <R > reconciliationDispatcher ;
42
+ private final ReconciliationDispatcher <P > reconciliationDispatcher ;
43
43
private final Retry retry ;
44
44
private final ExecutorService executor ;
45
45
private final Metrics metrics ;
46
- private final Cache <R > cache ;
47
- private final EventSourceManager <R > eventSourceManager ;
46
+ private final Cache <P > cache ;
47
+ private final EventSourceManager <P > eventSourceManager ;
48
48
private final RateLimiter <? extends RateLimitState > rateLimiter ;
49
49
private final ResourceStateManager resourceStateManager = new ResourceStateManager ();
50
50
private final Map <String , Object > metricsMetadata ;
51
51
52
52
53
- public EventProcessor (EventSourceManager <R > eventSourceManager ) {
53
+ public EventProcessor (EventSourceManager <P > eventSourceManager ) {
54
54
this (
55
55
eventSourceManager .getController ().getConfiguration (),
56
56
eventSourceManager .getControllerResourceEventSource (),
@@ -63,8 +63,8 @@ public EventProcessor(EventSourceManager<R> eventSourceManager) {
63
63
@ SuppressWarnings ("rawtypes" )
64
64
EventProcessor (
65
65
ControllerConfiguration controllerConfiguration ,
66
- ReconciliationDispatcher <R > reconciliationDispatcher ,
67
- EventSourceManager <R > eventSourceManager ,
66
+ ReconciliationDispatcher <P > reconciliationDispatcher ,
67
+ EventSourceManager <P > eventSourceManager ,
68
68
Metrics metrics ) {
69
69
this (
70
70
controllerConfiguration ,
@@ -78,11 +78,11 @@ public EventProcessor(EventSourceManager<R> eventSourceManager) {
78
78
@ SuppressWarnings ({"rawtypes" , "unchecked" })
79
79
private EventProcessor (
80
80
ControllerConfiguration controllerConfiguration ,
81
- Cache <R > cache ,
81
+ Cache <P > cache ,
82
82
ExecutorService executor ,
83
- ReconciliationDispatcher <R > reconciliationDispatcher ,
83
+ ReconciliationDispatcher <P > reconciliationDispatcher ,
84
84
Metrics metrics ,
85
- EventSourceManager <R > eventSourceManager ) {
85
+ EventSourceManager <P > eventSourceManager ) {
86
86
this .controllerConfiguration = controllerConfiguration ;
87
87
this .running = false ;
88
88
this .executor =
@@ -136,7 +136,7 @@ private void submitReconciliationExecution(ResourceState state) {
136
136
try {
137
137
boolean controllerUnderExecution = isControllerUnderExecution (state );
138
138
final var resourceID = state .getId ();
139
- Optional <R > maybeLatest = cache .get (resourceID );
139
+ Optional <P > maybeLatest = cache .get (resourceID );
140
140
maybeLatest .ifPresent (MDCUtils ::addResourceInfo );
141
141
if (!controllerUnderExecution && maybeLatest .isPresent ()) {
142
142
var rateLimit = state .getRateLimit ();
@@ -151,9 +151,9 @@ private void submitReconciliationExecution(ResourceState state) {
151
151
}
152
152
state .setUnderProcessing (true );
153
153
final var latest = maybeLatest .get ();
154
- ExecutionScope <R > executionScope = new ExecutionScope <>(latest , state .getRetry ());
154
+ ExecutionScope <P > executionScope = new ExecutionScope <>(latest , state .getRetry ());
155
155
state .unMarkEventReceived ();
156
- metrics .reconcileCustomResource (resourceID , state .getRetry (), metricsMetadata );
156
+ metrics .reconcileCustomResource (latest , state .getRetry (), metricsMetadata );
157
157
log .debug ("Executing events for custom resource. Scope: {}" , executionScope );
158
158
executor .execute (new ReconcilerExecutor (executionScope ));
159
159
} else {
@@ -221,7 +221,7 @@ private void handleRateLimitedSubmission(ResourceID resourceID, Duration minimal
221
221
}
222
222
223
223
synchronized void eventProcessingFinished (
224
- ExecutionScope <R > executionScope , PostExecutionControl <R > postExecutionControl ) {
224
+ ExecutionScope <P > executionScope , PostExecutionControl <P > postExecutionControl ) {
225
225
if (!running ) {
226
226
return ;
227
227
}
@@ -244,7 +244,7 @@ synchronized void eventProcessingFinished(
244
244
return ;
245
245
}
246
246
cleanupOnSuccessfulExecution (executionScope );
247
- metrics .finishedReconciliation (resourceID , metricsMetadata );
247
+ metrics .finishedReconciliation (executionScope . getResource () , metricsMetadata );
248
248
if (state .deleteEventPresent ()) {
249
249
cleanupForDeletedEvent (executionScope .getResourceID ());
250
250
} else if (postExecutionControl .isFinalizerRemoved ()) {
@@ -253,12 +253,12 @@ synchronized void eventProcessingFinished(
253
253
postExecutionControl
254
254
.getUpdatedCustomResource ()
255
255
.ifPresent (
256
- r -> {
256
+ p -> {
257
257
if (!postExecutionControl .updateIsStatusPatch ()) {
258
258
eventSourceManager
259
259
.getControllerResourceEventSource ()
260
260
.handleRecentResourceUpdate (
261
- ResourceID .fromResource (r ), r , executionScope .getResource ());
261
+ ResourceID .fromResource (p ), p , executionScope .getResource ());
262
262
}
263
263
});
264
264
if (state .eventPresent ()) {
@@ -270,7 +270,7 @@ synchronized void eventProcessingFinished(
270
270
}
271
271
272
272
private void reScheduleExecutionIfInstructed (
273
- PostExecutionControl <R > postExecutionControl , R customResource ) {
273
+ PostExecutionControl <P > postExecutionControl , P customResource ) {
274
274
275
275
postExecutionControl
276
276
.getReScheduleDelay ()
@@ -281,7 +281,7 @@ private void reScheduleExecutionIfInstructed(
281
281
}, () -> scheduleExecutionForMaxReconciliationInterval (customResource ));
282
282
}
283
283
284
- private void scheduleExecutionForMaxReconciliationInterval (R customResource ) {
284
+ private void scheduleExecutionForMaxReconciliationInterval (P customResource ) {
285
285
this .controllerConfiguration
286
286
.maxReconciliationInterval ()
287
287
.ifPresent (m -> {
@@ -294,7 +294,7 @@ private void scheduleExecutionForMaxReconciliationInterval(R customResource) {
294
294
});
295
295
}
296
296
297
- TimerEventSource <R > retryEventSource () {
297
+ TimerEventSource <P > retryEventSource () {
298
298
return eventSourceManager .retryEventSource ();
299
299
}
300
300
@@ -304,7 +304,7 @@ TimerEventSource<R> retryEventSource() {
304
304
* according to the retry timing if there was an exception.
305
305
*/
306
306
private void handleRetryOnException (
307
- ExecutionScope <R > executionScope , Exception exception ) {
307
+ ExecutionScope <P > executionScope , Exception exception ) {
308
308
final var state = getOrInitRetryExecution (executionScope );
309
309
var resourceID = state .getId ();
310
310
boolean eventPresent = state .eventPresent ();
@@ -323,7 +323,7 @@ private void handleRetryOnException(
323
323
"Scheduling timer event for retry with delay:{} for resource: {}" ,
324
324
delay ,
325
325
resourceID );
326
- metrics .failedReconciliation (resourceID , exception , metricsMetadata );
326
+ metrics .failedReconciliation (executionScope . getResource () , exception , metricsMetadata );
327
327
retryEventSource ().scheduleOnce (resourceID , delay );
328
328
},
329
329
() -> {
@@ -332,7 +332,7 @@ private void handleRetryOnException(
332
332
});
333
333
}
334
334
335
- private void cleanupOnSuccessfulExecution (ExecutionScope <R > executionScope ) {
335
+ private void cleanupOnSuccessfulExecution (ExecutionScope <P > executionScope ) {
336
336
log .debug (
337
337
"Cleanup for successful execution for resource: {}" , getName (executionScope .getResource ()));
338
338
if (isRetryConfigured ()) {
@@ -341,7 +341,7 @@ private void cleanupOnSuccessfulExecution(ExecutionScope<R> executionScope) {
341
341
retryEventSource ().cancelOnceSchedule (executionScope .getResourceID ());
342
342
}
343
343
344
- private ResourceState getOrInitRetryExecution (ExecutionScope <R > executionScope ) {
344
+ private ResourceState getOrInitRetryExecution (ExecutionScope <P > executionScope ) {
345
345
final var state = resourceStateManager .getOrCreate (executionScope .getResourceID ());
346
346
RetryExecution retryExecution = state .getRetry ();
347
347
if (retryExecution == null ) {
@@ -387,9 +387,9 @@ private void handleAlreadyMarkedEvents() {
387
387
}
388
388
389
389
private class ReconcilerExecutor implements Runnable {
390
- private final ExecutionScope <R > executionScope ;
390
+ private final ExecutionScope <P > executionScope ;
391
391
392
- private ReconcilerExecutor (ExecutionScope <R > executionScope ) {
392
+ private ReconcilerExecutor (ExecutionScope <P > executionScope ) {
393
393
this .executionScope = executionScope ;
394
394
}
395
395
@@ -401,7 +401,7 @@ public void run() {
401
401
try {
402
402
MDCUtils .addResourceInfo (executionScope .getResource ());
403
403
thread .setName ("ReconcilerExecutor-" + controllerName () + "-" + thread .getId ());
404
- PostExecutionControl <R > postExecutionControl =
404
+ PostExecutionControl <P > postExecutionControl =
405
405
reconciliationDispatcher .handleExecution (executionScope );
406
406
eventProcessingFinished (executionScope , postExecutionControl );
407
407
} finally {
0 commit comments