From 2c46b717cc02b582e7b1e8d6f579451c411bf5d7 Mon Sep 17 00:00:00 2001 From: csviri Date: Fri, 17 Jun 2022 14:57:21 +0200 Subject: [PATCH] fix: use synchronized instead reentrantlock in event processor --- .../processing/event/EventProcessor.java | 114 ++++++++---------- 1 file changed, 48 insertions(+), 66 deletions(-) diff --git a/operator-framework-core/src/main/java/io/javaoperatorsdk/operator/processing/event/EventProcessor.java b/operator-framework-core/src/main/java/io/javaoperatorsdk/operator/processing/event/EventProcessor.java index 849366f978..b87f43f43d 100644 --- a/operator-framework-core/src/main/java/io/javaoperatorsdk/operator/processing/event/EventProcessor.java +++ b/operator-framework-core/src/main/java/io/javaoperatorsdk/operator/processing/event/EventProcessor.java @@ -7,7 +7,6 @@ import java.util.Set; import java.util.concurrent.ExecutorService; import java.util.concurrent.ScheduledThreadPoolExecutor; -import java.util.concurrent.locks.ReentrantLock; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -40,7 +39,6 @@ class EventProcessor implements EventHandler, LifecycleAw private final Map retryState = new HashMap<>(); private final ExecutorService executor; private final String controllerName; - private final ReentrantLock lock = new ReentrantLock(); private final Metrics metrics; private volatile boolean running; private final Cache cache; @@ -97,8 +95,7 @@ private EventProcessor( } @Override - public void handleEvent(Event event) { - lock.lock(); + public synchronized void handleEvent(Event event) { try { log.debug("Received event: {}", event); @@ -113,7 +110,6 @@ public void handleEvent(Event event) { } handleMarkedEventForResource(resourceID); } finally { - lock.unlock(); MDCUtils.removeResourceIDInfo(); } } @@ -201,57 +197,53 @@ private RetryInfo retryInfo(ResourceID resourceID) { return retryState.get(resourceID); } - void eventProcessingFinished( + synchronized void eventProcessingFinished( ExecutionScope executionScope, PostExecutionControl postExecutionControl) { - lock.lock(); - try { - if (!running) { - return; - } - ResourceID resourceID = executionScope.getResourceID(); - log.debug( - "Event processing finished. Scope: {}, PostExecutionControl: {}", - executionScope, - postExecutionControl); - unsetUnderExecution(resourceID); - - // If a delete event present at this phase, it was received during reconciliation. - // So we either removed the finalizer during reconciliation or we don't use finalizers. - // Either way we don't want to retry. - if (isRetryConfigured() - && postExecutionControl.exceptionDuringExecution() - && !eventMarker.deleteEventPresent(resourceID)) { - handleRetryOnException( - executionScope, postExecutionControl.getRuntimeException().orElseThrow()); - return; - } - cleanupOnSuccessfulExecution(executionScope); - metrics.finishedReconciliation(resourceID); - if (eventMarker.deleteEventPresent(resourceID)) { - cleanupForDeletedEvent(executionScope.getResourceID()); - } else if (postExecutionControl.isFinalizerRemoved()) { - eventMarker.markProcessedMarkForDeletion(resourceID); + if (!running) { + return; + } + ResourceID resourceID = executionScope.getResourceID(); + log.debug( + "Event processing finished. Scope: {}, PostExecutionControl: {}", + executionScope, + postExecutionControl); + unsetUnderExecution(resourceID); + + // If a delete event present at this phase, it was received during reconciliation. + // So we either removed the finalizer during reconciliation or we don't use finalizers. + // Either way we don't want to retry. + if (isRetryConfigured() + && postExecutionControl.exceptionDuringExecution() + && !eventMarker.deleteEventPresent(resourceID)) { + handleRetryOnException( + executionScope, postExecutionControl.getRuntimeException().orElseThrow()); + return; + } + cleanupOnSuccessfulExecution(executionScope); + metrics.finishedReconciliation(resourceID); + if (eventMarker.deleteEventPresent(resourceID)) { + cleanupForDeletedEvent(executionScope.getResourceID()); + } else if (postExecutionControl.isFinalizerRemoved()) { + eventMarker.markProcessedMarkForDeletion(resourceID); + } else { + postExecutionControl + .getUpdatedCustomResource() + .ifPresent( + r -> { + if (!postExecutionControl.updateIsStatusPatch()) { + eventSourceManager + .getControllerResourceEventSource() + .handleRecentResourceUpdate( + ResourceID.fromResource(r), r, executionScope.getResource()); + } + }); + if (eventMarker.eventPresent(resourceID)) { + submitReconciliationExecution(resourceID); } else { - postExecutionControl - .getUpdatedCustomResource() - .ifPresent( - r -> { - if (!postExecutionControl.updateIsStatusPatch()) { - eventSourceManager - .getControllerResourceEventSource() - .handleRecentResourceUpdate( - ResourceID.fromResource(r), r, executionScope.getResource()); - } - }); - if (eventMarker.eventPresent(resourceID)) { - submitReconciliationExecution(resourceID); - } else { - reScheduleExecutionIfInstructed(postExecutionControl, executionScope.getResource()); - } + reScheduleExecutionIfInstructed(postExecutionControl, executionScope.getResource()); } - } finally { - lock.unlock(); } + } private void reScheduleExecutionIfInstructed( @@ -343,24 +335,14 @@ private boolean isRetryConfigured() { } @Override - public void stop() { - lock.lock(); - try { - this.running = false; - } finally { - lock.unlock(); - } + public synchronized void stop() { + this.running = false; } @Override public void start() throws OperatorException { - lock.lock(); - try { - this.running = true; - handleAlreadyMarkedEvents(); - } finally { - lock.unlock(); - } + this.running = true; + handleAlreadyMarkedEvents(); } private void handleAlreadyMarkedEvents() {