7
7
import java .util .Set ;
8
8
import java .util .concurrent .ExecutorService ;
9
9
import java .util .concurrent .ScheduledThreadPoolExecutor ;
10
- import java .util .concurrent .locks .ReentrantLock ;
11
10
12
11
import org .slf4j .Logger ;
13
12
import org .slf4j .LoggerFactory ;
@@ -40,7 +39,6 @@ class EventProcessor<R extends HasMetadata> implements EventHandler, LifecycleAw
40
39
private final Map <ResourceID , RetryExecution > retryState = new HashMap <>();
41
40
private final ExecutorService executor ;
42
41
private final String controllerName ;
43
- private final ReentrantLock lock = new ReentrantLock ();
44
42
private final Metrics metrics ;
45
43
private volatile boolean running ;
46
44
private final Cache <R > cache ;
@@ -97,8 +95,7 @@ private EventProcessor(
97
95
}
98
96
99
97
@ Override
100
- public void handleEvent (Event event ) {
101
- lock .lock ();
98
+ public synchronized void handleEvent (Event event ) {
102
99
try {
103
100
log .debug ("Received event: {}" , event );
104
101
@@ -113,7 +110,6 @@ public void handleEvent(Event event) {
113
110
}
114
111
handleMarkedEventForResource (resourceID );
115
112
} finally {
116
- lock .unlock ();
117
113
MDCUtils .removeResourceIDInfo ();
118
114
}
119
115
}
@@ -201,57 +197,53 @@ private RetryInfo retryInfo(ResourceID resourceID) {
201
197
return retryState .get (resourceID );
202
198
}
203
199
204
- void eventProcessingFinished (
200
+ synchronized void eventProcessingFinished (
205
201
ExecutionScope <R > executionScope , PostExecutionControl <R > postExecutionControl ) {
206
- lock .lock ();
207
- try {
208
- if (!running ) {
209
- return ;
210
- }
211
- ResourceID resourceID = executionScope .getResourceID ();
212
- log .debug (
213
- "Event processing finished. Scope: {}, PostExecutionControl: {}" ,
214
- executionScope ,
215
- postExecutionControl );
216
- unsetUnderExecution (resourceID );
217
-
218
- // If a delete event present at this phase, it was received during reconciliation.
219
- // So we either removed the finalizer during reconciliation or we don't use finalizers.
220
- // Either way we don't want to retry.
221
- if (isRetryConfigured ()
222
- && postExecutionControl .exceptionDuringExecution ()
223
- && !eventMarker .deleteEventPresent (resourceID )) {
224
- handleRetryOnException (
225
- executionScope , postExecutionControl .getRuntimeException ().orElseThrow ());
226
- return ;
227
- }
228
- cleanupOnSuccessfulExecution (executionScope );
229
- metrics .finishedReconciliation (resourceID );
230
- if (eventMarker .deleteEventPresent (resourceID )) {
231
- cleanupForDeletedEvent (executionScope .getResourceID ());
232
- } else if (postExecutionControl .isFinalizerRemoved ()) {
233
- eventMarker .markProcessedMarkForDeletion (resourceID );
202
+ if (!running ) {
203
+ return ;
204
+ }
205
+ ResourceID resourceID = executionScope .getResourceID ();
206
+ log .debug (
207
+ "Event processing finished. Scope: {}, PostExecutionControl: {}" ,
208
+ executionScope ,
209
+ postExecutionControl );
210
+ unsetUnderExecution (resourceID );
211
+
212
+ // If a delete event present at this phase, it was received during reconciliation.
213
+ // So we either removed the finalizer during reconciliation or we don't use finalizers.
214
+ // Either way we don't want to retry.
215
+ if (isRetryConfigured ()
216
+ && postExecutionControl .exceptionDuringExecution ()
217
+ && !eventMarker .deleteEventPresent (resourceID )) {
218
+ handleRetryOnException (
219
+ executionScope , postExecutionControl .getRuntimeException ().orElseThrow ());
220
+ return ;
221
+ }
222
+ cleanupOnSuccessfulExecution (executionScope );
223
+ metrics .finishedReconciliation (resourceID );
224
+ if (eventMarker .deleteEventPresent (resourceID )) {
225
+ cleanupForDeletedEvent (executionScope .getResourceID ());
226
+ } else if (postExecutionControl .isFinalizerRemoved ()) {
227
+ eventMarker .markProcessedMarkForDeletion (resourceID );
228
+ } else {
229
+ postExecutionControl
230
+ .getUpdatedCustomResource ()
231
+ .ifPresent (
232
+ r -> {
233
+ if (!postExecutionControl .updateIsStatusPatch ()) {
234
+ eventSourceManager
235
+ .getControllerResourceEventSource ()
236
+ .handleRecentResourceUpdate (
237
+ ResourceID .fromResource (r ), r , executionScope .getResource ());
238
+ }
239
+ });
240
+ if (eventMarker .eventPresent (resourceID )) {
241
+ submitReconciliationExecution (resourceID );
234
242
} else {
235
- postExecutionControl
236
- .getUpdatedCustomResource ()
237
- .ifPresent (
238
- r -> {
239
- if (!postExecutionControl .updateIsStatusPatch ()) {
240
- eventSourceManager
241
- .getControllerResourceEventSource ()
242
- .handleRecentResourceUpdate (
243
- ResourceID .fromResource (r ), r , executionScope .getResource ());
244
- }
245
- });
246
- if (eventMarker .eventPresent (resourceID )) {
247
- submitReconciliationExecution (resourceID );
248
- } else {
249
- reScheduleExecutionIfInstructed (postExecutionControl , executionScope .getResource ());
250
- }
243
+ reScheduleExecutionIfInstructed (postExecutionControl , executionScope .getResource ());
251
244
}
252
- } finally {
253
- lock .unlock ();
254
245
}
246
+
255
247
}
256
248
257
249
private void reScheduleExecutionIfInstructed (
@@ -343,24 +335,14 @@ private boolean isRetryConfigured() {
343
335
}
344
336
345
337
@ Override
346
- public void stop () {
347
- lock .lock ();
348
- try {
349
- this .running = false ;
350
- } finally {
351
- lock .unlock ();
352
- }
338
+ public synchronized void stop () {
339
+ this .running = false ;
353
340
}
354
341
355
342
@ Override
356
343
public void start () throws OperatorException {
357
- lock .lock ();
358
- try {
359
- this .running = true ;
360
- handleAlreadyMarkedEvents ();
361
- } finally {
362
- lock .unlock ();
363
- }
344
+ this .running = true ;
345
+ handleAlreadyMarkedEvents ();
364
346
}
365
347
366
348
private void handleAlreadyMarkedEvents () {
0 commit comments