14
14
import io .javaoperatorsdk .operator .api .config .ConfigurationServiceProvider ;
15
15
import io .javaoperatorsdk .operator .api .config .ExecutorServiceManager ;
16
16
import io .javaoperatorsdk .operator .api .monitoring .Metrics ;
17
- import io .javaoperatorsdk .operator .api .reconciler .RetryInfo ;
18
17
import io .javaoperatorsdk .operator .processing .LifecycleAware ;
19
18
import io .javaoperatorsdk .operator .processing .MDCUtils ;
20
19
import io .javaoperatorsdk .operator .processing .event .rate .RateLimiter ;
@@ -41,7 +40,7 @@ class EventProcessor<R extends HasMetadata> implements EventHandler, LifecycleAw
41
40
private final Metrics metrics ;
42
41
private final Cache <R > cache ;
43
42
private final EventSourceManager <R > eventSourceManager ;
44
- private final EventMarker eventMarker = new EventMarker ();
43
+ // private final EventMarker eventMarker = new EventMarker();
45
44
private final RateLimiter <? extends RateLimitState > rateLimiter ;
46
45
47
46
private final ResourceStateManager resourceStateManager = new ResourceStateManager ();
@@ -108,77 +107,76 @@ public synchronized void handleEvent(Event event) {
108
107
log .debug ("Received event: {}" , event );
109
108
110
109
final var resourceID = event .getRelatedCustomResourceID ();
110
+ final var state = resourceStateManager .getOrCreate (event .getRelatedCustomResourceID ());
111
111
MDCUtils .addResourceIDInfo (resourceID );
112
112
metrics .receivedEvent (event );
113
- handleEventMarking (event );
113
+ handleEventMarking (event , state );
114
114
if (!this .running ) {
115
115
// events are received and marked, but will be processed when started, see start() method.
116
116
log .debug ("Skipping event: {} because the event processor is not started" , event );
117
117
return ;
118
118
}
119
- handleMarkedEventForResource (resourceID );
119
+ handleMarkedEventForResource (state );
120
120
} finally {
121
121
MDCUtils .removeResourceIDInfo ();
122
122
}
123
123
}
124
124
125
- private void handleMarkedEventForResource (ResourceID resourceID ) {
126
- if (eventMarker .deleteEventPresent (resourceID )) {
127
- cleanupForDeletedEvent (resourceID );
128
- } else if (!eventMarker .processedMarkForDeletionPresent (resourceID )) {
129
- submitReconciliationExecution (resourceID );
125
+ private void handleMarkedEventForResource (ResourceState state ) {
126
+ if (state .deleteEventPresent ()) {
127
+ cleanupForDeletedEvent (state . getId () );
128
+ } else if (!state .processedMarkForDeletionPresent ()) {
129
+ submitReconciliationExecution (state );
130
130
}
131
131
}
132
132
133
- private void submitReconciliationExecution (ResourceID resourceID ) {
133
+ private void submitReconciliationExecution (ResourceState state ) {
134
134
try {
135
- boolean controllerUnderExecution = isControllerUnderExecution (resourceID );
136
- Optional <R > latest = cache .get (resourceID );
135
+ boolean controllerUnderExecution = isControllerUnderExecution (state );
136
+ Optional <R > latest = cache .get (state . getId () );
137
137
latest .ifPresent (MDCUtils ::addResourceInfo );
138
138
if (!controllerUnderExecution && latest .isPresent ()) {
139
- final var resourceState = resourceStateManager .getOrCreate (resourceID );
140
- var rateLimit = resourceState .getRateLimit ();
139
+ var rateLimit = state .getRateLimit ();
141
140
if (rateLimit == null ) {
142
141
rateLimit = rateLimiter .initState ();
143
- resourceState .setRateLimit (rateLimit );
142
+ state .setRateLimit (rateLimit );
144
143
}
145
144
var rateLimiterPermission = rateLimiter .isLimited (rateLimit );
146
145
if (rateLimiterPermission .isPresent ()) {
147
- handleRateLimitedSubmission (resourceID , rateLimiterPermission .get ());
146
+ handleRateLimitedSubmission (state . getId () , rateLimiterPermission .get ());
148
147
return ;
149
148
}
150
- setUnderExecutionProcessing ( resourceID );
151
- final var retryInfo = retryInfo ( resourceID );
149
+ state . setUnderProcessing ( true );
150
+ final var retryInfo = state . getRetry ( );
152
151
ExecutionScope <R > executionScope = new ExecutionScope <>(latest .get (), retryInfo );
153
- eventMarker .unMarkEventReceived (resourceID );
154
- metrics .reconcileCustomResource (resourceID , retryInfo );
152
+ state .unMarkEventReceived ();
153
+ metrics .reconcileCustomResource (state . getId () , retryInfo );
155
154
log .debug ("Executing events for custom resource. Scope: {}" , executionScope );
156
155
executor .execute (new ControllerExecution (executionScope ));
157
156
} else {
158
157
log .debug (
159
158
"Skipping executing controller for resource id: {}. Controller in execution: {}. Latest Resource present: {}" ,
160
- resourceID ,
159
+ state ,
161
160
controllerUnderExecution ,
162
161
latest .isPresent ());
163
162
if (latest .isEmpty ()) {
164
- log .debug ("no custom resource found in cache for ResourceID: {}" , resourceID );
163
+ log .debug ("no custom resource found in cache for ResourceID: {}" , state );
165
164
}
166
165
}
167
166
} finally {
168
167
MDCUtils .removeResourceInfo ();
169
168
}
170
169
}
171
170
172
- private void handleEventMarking (Event event ) {
171
+ private void handleEventMarking (Event event , ResourceState state ) {
173
172
final var relatedCustomResourceID = event .getRelatedCustomResourceID ();
174
173
if (event instanceof ResourceEvent ) {
175
174
var resourceEvent = (ResourceEvent ) event ;
176
175
if (resourceEvent .getAction () == ResourceAction .DELETED ) {
177
176
log .debug ("Marking delete event received for: {}" , relatedCustomResourceID );
178
- eventMarker .markDeleteEventReceived (event );
177
+ state .markDeleteEventReceived ();
179
178
} else {
180
- if (eventMarker .processedMarkForDeletionPresent (relatedCustomResourceID )
181
- && isResourceMarkedForDeletion (resourceEvent )) {
179
+ if (state .processedMarkForDeletionPresent () && isResourceMarkedForDeletion (resourceEvent )) {
182
180
log .debug (
183
181
"Skipping mark of event received, since already processed mark for deletion and resource marked for deletion: {}" ,
184
182
relatedCustomResourceID );
@@ -190,22 +188,21 @@ && isResourceMarkedForDeletion(resourceEvent)) {
190
188
// removed, but also the informers websocket is disconnected and later reconnected. So
191
189
// meanwhile the resource could be deleted and recreated. In this case we just mark a new
192
190
// event as below.
193
- markEventReceived (event );
191
+ markEventReceived (state );
194
192
}
195
- } else if (!eventMarker .deleteEventPresent (relatedCustomResourceID ) ||
196
- !eventMarker .processedMarkForDeletionPresent (relatedCustomResourceID )) {
197
- markEventReceived (event );
193
+ } else if (!state .deleteEventPresent () || !state .processedMarkForDeletionPresent ()) {
194
+ markEventReceived (state );
198
195
} else if (log .isDebugEnabled ()) {
199
196
log .debug (
200
197
"Skipped marking event as received. Delete event present: {}, processed mark for deletion: {}" ,
201
- eventMarker .deleteEventPresent (relatedCustomResourceID ),
202
- eventMarker .processedMarkForDeletionPresent (relatedCustomResourceID ));
198
+ state .deleteEventPresent (),
199
+ state .processedMarkForDeletionPresent ());
203
200
}
204
201
}
205
202
206
- private void markEventReceived (Event event ) {
207
- log .debug ("Marking event received for: {}" , event . getRelatedCustomResourceID ());
208
- eventMarker .markEventReceived (event );
203
+ private void markEventReceived (ResourceState state ) {
204
+ log .debug ("Marking event received for: {}" , state . getId ());
205
+ state .markEventReceived ();
209
206
}
210
207
211
208
private boolean isResourceMarkedForDeletion (ResourceEvent resourceEvent ) {
@@ -220,16 +217,13 @@ private void handleRateLimitedSubmission(ResourceID resourceID, Duration minimal
220
217
Math .max (minimalDurationMillis , MINIMAL_RATE_LIMIT_RESCHEDULE_DURATION ));
221
218
}
222
219
223
- private RetryInfo retryInfo (ResourceID resourceID ) {
224
- return resourceStateManager .getOrCreate (resourceID ).getRetry ();
225
- }
226
-
227
220
synchronized void eventProcessingFinished (
228
221
ExecutionScope <R > executionScope , PostExecutionControl <R > postExecutionControl ) {
229
222
if (!running ) {
230
223
return ;
231
224
}
232
225
ResourceID resourceID = executionScope .getResourceID ();
226
+ final var state = resourceStateManager .getOrCreate (resourceID );
233
227
log .debug (
234
228
"Event processing finished. Scope: {}, PostExecutionControl: {}" ,
235
229
executionScope ,
@@ -241,17 +235,17 @@ synchronized void eventProcessingFinished(
241
235
// Either way we don't want to retry.
242
236
if (isRetryConfigured ()
243
237
&& postExecutionControl .exceptionDuringExecution ()
244
- && !eventMarker .deleteEventPresent (resourceID )) {
238
+ && !state .deleteEventPresent ()) {
245
239
handleRetryOnException (
246
240
executionScope , postExecutionControl .getRuntimeException ().orElseThrow ());
247
241
return ;
248
242
}
249
243
cleanupOnSuccessfulExecution (executionScope );
250
244
metrics .finishedReconciliation (resourceID );
251
- if (eventMarker .deleteEventPresent (resourceID )) {
245
+ if (state .deleteEventPresent ()) {
252
246
cleanupForDeletedEvent (executionScope .getResourceID ());
253
247
} else if (postExecutionControl .isFinalizerRemoved ()) {
254
- eventMarker .markProcessedMarkForDeletion (resourceID );
248
+ state .markProcessedMarkForDeletion ();
255
249
} else {
256
250
postExecutionControl
257
251
.getUpdatedCustomResource ()
@@ -264,8 +258,8 @@ synchronized void eventProcessingFinished(
264
258
ResourceID .fromResource (r ), r , executionScope .getResource ());
265
259
}
266
260
});
267
- if (eventMarker .eventPresent (resourceID )) {
268
- submitReconciliationExecution (resourceID );
261
+ if (state .eventPresent ()) {
262
+ submitReconciliationExecution (state );
269
263
} else {
270
264
reScheduleExecutionIfInstructed (postExecutionControl , executionScope .getResource ());
271
265
}
@@ -296,17 +290,17 @@ TimerEventSource<R> retryEventSource() {
296
290
*/
297
291
private void handleRetryOnException (
298
292
ExecutionScope <R > executionScope , Exception exception ) {
299
- RetryExecution execution = getOrInitRetryExecution (executionScope );
300
- var resourceID = executionScope . getResourceID ();
301
- boolean eventPresent = eventMarker .eventPresent (resourceID );
302
- eventMarker .markEventReceived (resourceID );
293
+ final var state = getOrInitRetryExecution (executionScope );
294
+ var resourceID = state . getId ();
295
+ boolean eventPresent = state .eventPresent ();
296
+ state .markEventReceived ();
303
297
304
298
if (eventPresent ) {
305
299
log .debug ("New events exists for for resource id: {}" , resourceID );
306
- submitReconciliationExecution (resourceID );
300
+ submitReconciliationExecution (state );
307
301
return ;
308
302
}
309
- Optional <Long > nextDelay = execution .nextDelay ();
303
+ Optional <Long > nextDelay = state . getRetry () .nextDelay ();
310
304
311
305
nextDelay .ifPresentOrElse (
312
306
delay -> {
@@ -329,29 +323,24 @@ private void cleanupOnSuccessfulExecution(ExecutionScope<R> executionScope) {
329
323
retryEventSource ().cancelOnceSchedule (executionScope .getResourceID ());
330
324
}
331
325
332
- private RetryExecution getOrInitRetryExecution (ExecutionScope <R > executionScope ) {
326
+ private ResourceState getOrInitRetryExecution (ExecutionScope <R > executionScope ) {
333
327
final var state = resourceStateManager .getOrCreate (executionScope .getResourceID ());
334
328
RetryExecution retryExecution = state .getRetry ();
335
329
if (retryExecution == null ) {
336
330
retryExecution = retry .initExecution ();
337
331
state .setRetry (retryExecution );
338
332
}
339
- return retryExecution ;
333
+ return state ;
340
334
}
341
335
342
336
private void cleanupForDeletedEvent (ResourceID resourceID ) {
343
337
log .debug ("Cleaning up for delete event for: {}" , resourceID );
344
- eventMarker .cleanup (resourceID );
345
338
resourceStateManager .remove (resourceID );
346
339
metrics .cleanupDoneFor (resourceID );
347
340
}
348
341
349
- private boolean isControllerUnderExecution (ResourceID resourceID ) {
350
- return resourceStateManager .getOrCreate (resourceID ).isUnderProcessing ();
351
- }
352
-
353
- private void setUnderExecutionProcessing (ResourceID resourceID ) {
354
- resourceStateManager .getOrCreate (resourceID ).setUnderProcessing (true );
342
+ private boolean isControllerUnderExecution (ResourceState state ) {
343
+ return state .isUnderProcessing ();
355
344
}
356
345
357
346
private void unsetUnderExecution (ResourceID resourceID ) {
@@ -374,8 +363,8 @@ public void start() throws OperatorException {
374
363
}
375
364
376
365
private void handleAlreadyMarkedEvents () {
377
- for (ResourceID resourceID : eventMarker . resourceIDsWithEventPresent ()) {
378
- handleMarkedEventForResource (resourceID );
366
+ for (var state : resourceStateManager . resourcesWithEventPresent ()) {
367
+ handleMarkedEventForResource (state );
379
368
}
380
369
}
381
370
@@ -411,6 +400,6 @@ public String toString() {
411
400
}
412
401
413
402
public synchronized boolean isUnderProcessing (ResourceID resourceID ) {
414
- return isControllerUnderExecution (resourceID );
403
+ return isControllerUnderExecution (resourceStateManager . getOrCreate ( resourceID ) );
415
404
}
416
405
}
0 commit comments