From bdf1ede74c289a7659d1d31b75be2bc5d2e82750 Mon Sep 17 00:00:00 2001 From: Chris Laprun Date: Thu, 7 Jul 2022 12:00:05 +0200 Subject: [PATCH 1/3] refactor: manage state associated with a resource in a single spot Fixes #1314 --- .../processing/event/EventProcessor.java | 40 +++++++++------- .../processing/event/ResourceState.java | 46 +++++++++++++++++++ .../event/ResourceStateManager.java | 24 ++++++++++ .../event/rate/LinearRateLimiter.java | 26 ++++++----- .../processing/event/rate/RateLimiter.java | 15 +++--- .../processing/event/rate/RateState.java | 4 +- .../processing/event/EventProcessorTest.java | 7 ++- .../event/rate/LinearRateLimiterTest.java | 29 ++++++------ 8 files changed, 138 insertions(+), 53 deletions(-) create mode 100644 operator-framework-core/src/main/java/io/javaoperatorsdk/operator/processing/event/ResourceState.java create mode 100644 operator-framework-core/src/main/java/io/javaoperatorsdk/operator/processing/event/ResourceStateManager.java diff --git a/operator-framework-core/src/main/java/io/javaoperatorsdk/operator/processing/event/EventProcessor.java b/operator-framework-core/src/main/java/io/javaoperatorsdk/operator/processing/event/EventProcessor.java index 2580a1318e..8865473e13 100644 --- a/operator-framework-core/src/main/java/io/javaoperatorsdk/operator/processing/event/EventProcessor.java +++ b/operator-framework-core/src/main/java/io/javaoperatorsdk/operator/processing/event/EventProcessor.java @@ -1,11 +1,7 @@ package io.javaoperatorsdk.operator.processing.event; import java.time.Duration; -import java.util.HashMap; -import java.util.HashSet; -import java.util.Map; import java.util.Optional; -import java.util.Set; import java.util.concurrent.ExecutorService; import java.util.concurrent.ScheduledThreadPoolExecutor; @@ -22,6 +18,7 @@ import io.javaoperatorsdk.operator.processing.LifecycleAware; import io.javaoperatorsdk.operator.processing.MDCUtils; import io.javaoperatorsdk.operator.processing.event.rate.RateLimiter; +import io.javaoperatorsdk.operator.processing.event.rate.RateLimiter.RateLimitState; import io.javaoperatorsdk.operator.processing.event.source.Cache; import io.javaoperatorsdk.operator.processing.event.source.controller.ResourceAction; import io.javaoperatorsdk.operator.processing.event.source.controller.ResourceEvent; @@ -37,17 +34,17 @@ class EventProcessor implements EventHandler, LifecycleAw private static final long MINIMAL_RATE_LIMIT_RESCHEDULE_DURATION = 50; private volatile boolean running; - private final Set underProcessing = new HashSet<>(); private final ReconciliationDispatcher reconciliationDispatcher; private final Retry retry; - private final Map retryState = new HashMap<>(); private final ExecutorService executor; private final String controllerName; private final Metrics metrics; private final Cache cache; private final EventSourceManager eventSourceManager; private final EventMarker eventMarker = new EventMarker(); - private final RateLimiter rateLimiter; + private final RateLimiter rateLimiter; + + private final ResourceStateManager resourceStateManager = new ResourceStateManager(); EventProcessor(EventSourceManager eventSourceManager) { this( @@ -61,6 +58,7 @@ class EventProcessor implements EventHandler, LifecycleAw eventSourceManager); } + @SuppressWarnings("rawtypes") EventProcessor( ReconciliationDispatcher reconciliationDispatcher, EventSourceManager eventSourceManager, @@ -79,6 +77,7 @@ class EventProcessor implements EventHandler, LifecycleAw eventSourceManager); } + @SuppressWarnings({"rawtypes", "unchecked"}) private EventProcessor( Cache cache, ExecutorService executor, @@ -137,7 +136,13 @@ private void submitReconciliationExecution(ResourceID resourceID) { Optional latest = cache.get(resourceID); latest.ifPresent(MDCUtils::addResourceInfo); if (!controllerUnderExecution && latest.isPresent()) { - var rateLimiterPermission = rateLimiter.acquirePermission(resourceID); + final var resourceState = resourceStateManager.getOrCreate(resourceID); + var rateLimit = resourceState.getRateLimit(); + if (rateLimit == null) { + rateLimit = rateLimiter.initState(); + resourceState.setRateLimit(rateLimit); + } + var rateLimiterPermission = rateLimiter.isLimited(rateLimit); if (rateLimiterPermission.isPresent()) { handleRateLimitedSubmission(resourceID, rateLimiterPermission.get()); return; @@ -216,7 +221,7 @@ private void handleRateLimitedSubmission(ResourceID resourceID, Duration minimal } private RetryInfo retryInfo(ResourceID resourceID) { - return retryState.get(resourceID); + return resourceStateManager.getOrCreate(resourceID).getRetry(); } synchronized void eventProcessingFinished( @@ -319,16 +324,17 @@ private void cleanupOnSuccessfulExecution(ExecutionScope executionScope) { log.debug( "Cleanup for successful execution for resource: {}", getName(executionScope.getResource())); if (isRetryConfigured()) { - retryState.remove(executionScope.getResourceID()); + resourceStateManager.getOrCreate(executionScope.getResourceID()).setRetry(null); } retryEventSource().cancelOnceSchedule(executionScope.getResourceID()); } private RetryExecution getOrInitRetryExecution(ExecutionScope executionScope) { - RetryExecution retryExecution = retryState.get(executionScope.getResourceID()); + final var state = resourceStateManager.getOrCreate(executionScope.getResourceID()); + RetryExecution retryExecution = state.getRetry(); if (retryExecution == null) { retryExecution = retry.initExecution(); - retryState.put(executionScope.getResourceID(), retryExecution); + state.setRetry(retryExecution); } return retryExecution; } @@ -336,20 +342,20 @@ private RetryExecution getOrInitRetryExecution(ExecutionScope executionScope) private void cleanupForDeletedEvent(ResourceID resourceID) { log.debug("Cleaning up for delete event for: {}", resourceID); eventMarker.cleanup(resourceID); - rateLimiter.clear(resourceID); + resourceStateManager.remove(resourceID); metrics.cleanupDoneFor(resourceID); } private boolean isControllerUnderExecution(ResourceID resourceID) { - return underProcessing.contains(resourceID); + return resourceStateManager.getOrCreate(resourceID).isUnderProcessing(); } private void setUnderExecutionProcessing(ResourceID resourceID) { - underProcessing.add(resourceID); + resourceStateManager.getOrCreate(resourceID).setUnderProcessing(true); } private void unsetUnderExecution(ResourceID resourceID) { - underProcessing.remove(resourceID); + resourceStateManager.getOrCreate(resourceID).setUnderProcessing(false); } private boolean isRetryConfigured() { @@ -405,6 +411,6 @@ public String toString() { } public synchronized boolean isUnderProcessing(ResourceID resourceID) { - return underProcessing.contains(resourceID); + return isControllerUnderExecution(resourceID); } } diff --git a/operator-framework-core/src/main/java/io/javaoperatorsdk/operator/processing/event/ResourceState.java b/operator-framework-core/src/main/java/io/javaoperatorsdk/operator/processing/event/ResourceState.java new file mode 100644 index 0000000000..351bbd6202 --- /dev/null +++ b/operator-framework-core/src/main/java/io/javaoperatorsdk/operator/processing/event/ResourceState.java @@ -0,0 +1,46 @@ +package io.javaoperatorsdk.operator.processing.event; + +import io.javaoperatorsdk.operator.processing.event.EventMarker.EventingState; +import io.javaoperatorsdk.operator.processing.event.rate.RateLimiter.RateLimitState; +import io.javaoperatorsdk.operator.processing.retry.RetryExecution; + +class ResourceState { + private final ResourceID id; + + private boolean underProcessing; + private RetryExecution retry; + private EventingState eventing; + private RateLimitState rateLimit; + + public ResourceState(ResourceID id) { + this.id = id; + } + + public ResourceID getId() { + return id; + } + + public RateLimitState getRateLimit() { + return rateLimit; + } + + public void setRateLimit(RateLimitState rateLimit) { + this.rateLimit = rateLimit; + } + + public RetryExecution getRetry() { + return retry; + } + + public void setRetry(RetryExecution retry) { + this.retry = retry; + } + + public boolean isUnderProcessing() { + return underProcessing; + } + + public void setUnderProcessing(boolean underProcessing) { + this.underProcessing = underProcessing; + } +} diff --git a/operator-framework-core/src/main/java/io/javaoperatorsdk/operator/processing/event/ResourceStateManager.java b/operator-framework-core/src/main/java/io/javaoperatorsdk/operator/processing/event/ResourceStateManager.java new file mode 100644 index 0000000000..e9dfc324fb --- /dev/null +++ b/operator-framework-core/src/main/java/io/javaoperatorsdk/operator/processing/event/ResourceStateManager.java @@ -0,0 +1,24 @@ +package io.javaoperatorsdk.operator.processing.event; + +import java.util.Map; +import java.util.concurrent.ConcurrentHashMap; + +class ResourceStateManager { + // maybe we should have a way for users to specify a hint on the amount of CRs their reconciler + // will process to avoid under- or over-sizing the state maps and avoid too many resizing that + // take time and memory? + private final Map states = new ConcurrentHashMap<>(100); + + + public ResourceState getOrCreate(ResourceID resourceID) { + return states.computeIfAbsent(resourceID, ResourceState::new); + } + + public void remove(ResourceID resourceID) { + states.remove(resourceID); + } + + public boolean contains(ResourceID resourceID) { + return states.containsKey(resourceID); + } +} diff --git a/operator-framework-core/src/main/java/io/javaoperatorsdk/operator/processing/event/rate/LinearRateLimiter.java b/operator-framework-core/src/main/java/io/javaoperatorsdk/operator/processing/event/rate/LinearRateLimiter.java index 4eb6758874..b492e11412 100644 --- a/operator-framework-core/src/main/java/io/javaoperatorsdk/operator/processing/event/rate/LinearRateLimiter.java +++ b/operator-framework-core/src/main/java/io/javaoperatorsdk/operator/processing/event/rate/LinearRateLimiter.java @@ -2,8 +2,6 @@ import java.time.Duration; import java.time.LocalDateTime; -import java.util.HashMap; -import java.util.Map; import java.util.Optional; import io.javaoperatorsdk.operator.api.config.AnnotationConfigurable; @@ -13,21 +11,26 @@ * A simple rate limiter that limits the number of permission for a time interval. */ public class LinearRateLimiter - implements RateLimiter, AnnotationConfigurable { + implements RateLimiter, AnnotationConfigurable { /** To turn off rate limiting set limit for period to a non-positive number */ public static final int NO_LIMIT_PERIOD = -1; + public static final int DEFAULT_REFRESH_PERIOD_SECONDS = 10; + public static final int DEFAULT_LIMIT_FOR_PERIOD = 3; + public static final Duration DEFAULT_REFRESH_PERIOD = + Duration.ofSeconds(DEFAULT_REFRESH_PERIOD_SECONDS); + private Duration refreshPeriod; private int limitForPeriod = NO_LIMIT_PERIOD; - private final Map limitData = new HashMap<>(); - public static LinearRateLimiter deactivatedRateLimiter() { return new LinearRateLimiter(); } - LinearRateLimiter() {} + public LinearRateLimiter() { + this(DEFAULT_REFRESH_PERIOD, DEFAULT_LIMIT_FOR_PERIOD); + } public LinearRateLimiter(Duration refreshPeriod, int limitForPeriod) { this.refreshPeriod = refreshPeriod; @@ -35,11 +38,12 @@ public LinearRateLimiter(Duration refreshPeriod, int limitForPeriod) { } @Override - public Optional acquirePermission(ResourceID resourceID) { - if (!isActivated()) { + public Optional isLimited(RateLimitState rateLimitState) { + if (!isActivated() || !(rateLimitState instanceof RateState)) { return Optional.empty(); } - var actualState = limitData.computeIfAbsent(resourceID, r -> RateState.initialState()); + + var actualState = (RateState) rateLimitState; if (actualState.getCount() < limitForPeriod) { actualState.increaseCount(); return Optional.empty(); @@ -54,8 +58,8 @@ public Optional acquirePermission(ResourceID resourceID) { } @Override - public void clear(ResourceID resourceID) { - limitData.remove(resourceID); + public RateState initState() { + return RateState.initialState(); } @Override diff --git a/operator-framework-core/src/main/java/io/javaoperatorsdk/operator/processing/event/rate/RateLimiter.java b/operator-framework-core/src/main/java/io/javaoperatorsdk/operator/processing/event/rate/RateLimiter.java index 7281315e98..b972c5587a 100644 --- a/operator-framework-core/src/main/java/io/javaoperatorsdk/operator/processing/event/rate/RateLimiter.java +++ b/operator-framework-core/src/main/java/io/javaoperatorsdk/operator/processing/event/rate/RateLimiter.java @@ -3,20 +3,17 @@ import java.time.Duration; import java.util.Optional; -import io.javaoperatorsdk.operator.processing.event.ResourceID; +import io.javaoperatorsdk.operator.processing.event.rate.RateLimiter.RateLimitState; -public interface RateLimiter { +public interface RateLimiter { + interface RateLimitState { + } /** - * @param resourceID id of the resource * @return empty if permission acquired or minimal duration until a permission could be acquired * again */ - Optional acquirePermission(ResourceID resourceID); - - /** - * Cleanup state. Called when resource is deleted. - */ - void clear(ResourceID resourceID); + Optional isLimited(RateLimitState rateLimitState); + S initState(); } diff --git a/operator-framework-core/src/main/java/io/javaoperatorsdk/operator/processing/event/rate/RateState.java b/operator-framework-core/src/main/java/io/javaoperatorsdk/operator/processing/event/rate/RateState.java index f1302b077d..e466782996 100644 --- a/operator-framework-core/src/main/java/io/javaoperatorsdk/operator/processing/event/rate/RateState.java +++ b/operator-framework-core/src/main/java/io/javaoperatorsdk/operator/processing/event/rate/RateState.java @@ -2,7 +2,9 @@ import java.time.LocalDateTime; -class RateState { +import io.javaoperatorsdk.operator.processing.event.rate.RateLimiter.RateLimitState; + +class RateState implements RateLimitState { private LocalDateTime lastRefreshTime; private int count; diff --git a/operator-framework-core/src/test/java/io/javaoperatorsdk/operator/processing/event/EventProcessorTest.java b/operator-framework-core/src/test/java/io/javaoperatorsdk/operator/processing/event/EventProcessorTest.java index bcb7f3e739..36e286d9e9 100644 --- a/operator-framework-core/src/test/java/io/javaoperatorsdk/operator/processing/event/EventProcessorTest.java +++ b/operator-framework-core/src/test/java/io/javaoperatorsdk/operator/processing/event/EventProcessorTest.java @@ -19,6 +19,7 @@ import io.javaoperatorsdk.operator.api.monitoring.Metrics; import io.javaoperatorsdk.operator.processing.event.rate.LinearRateLimiter; import io.javaoperatorsdk.operator.processing.event.rate.RateLimiter; +import io.javaoperatorsdk.operator.processing.event.rate.RateLimiter.RateLimitState; import io.javaoperatorsdk.operator.processing.event.source.controller.ControllerResourceEventSource; import io.javaoperatorsdk.operator.processing.event.source.controller.ResourceAction; import io.javaoperatorsdk.operator.processing.event.source.controller.ResourceEvent; @@ -78,7 +79,7 @@ void setup() { eventProcessorWithRetry.start(); when(eventProcessor.retryEventSource()).thenReturn(retryTimerEventSourceMock); when(eventProcessorWithRetry.retryEventSource()).thenReturn(retryTimerEventSourceMock); - when(rateLimiterMock.acquirePermission(any())).thenReturn(Optional.empty()); + when(rateLimiterMock.isLimited(any())).thenReturn(Optional.empty()); } @Test @@ -351,7 +352,9 @@ void rateLimitsReconciliationSubmission() { var refreshPeriod = Duration.ofMillis(100); var event = prepareCREvent(); - when(rateLimiterMock.acquirePermission(event.getRelatedCustomResourceID())) + final var rateLimit = new RateLimitState() {}; + when(rateLimiterMock.initState()).thenReturn(rateLimit); + when(rateLimiterMock.isLimited(rateLimit)) .thenReturn(Optional.empty()) .thenReturn(Optional.of(refreshPeriod)); diff --git a/operator-framework-core/src/test/java/io/javaoperatorsdk/operator/processing/event/rate/LinearRateLimiterTest.java b/operator-framework-core/src/test/java/io/javaoperatorsdk/operator/processing/event/rate/LinearRateLimiterTest.java index a9cd48bef4..e4aa0a7123 100644 --- a/operator-framework-core/src/test/java/io/javaoperatorsdk/operator/processing/event/rate/LinearRateLimiterTest.java +++ b/operator-framework-core/src/test/java/io/javaoperatorsdk/operator/processing/event/rate/LinearRateLimiterTest.java @@ -2,37 +2,40 @@ import java.time.Duration; +import org.junit.jupiter.api.BeforeEach; import org.junit.jupiter.api.Test; -import io.javaoperatorsdk.operator.TestUtils; -import io.javaoperatorsdk.operator.processing.event.ResourceID; - import static org.assertj.core.api.Assertions.assertThat; class LinearRateLimiterTest { public static final Duration REFRESH_PERIOD = Duration.ofMillis(300); - ResourceID resourceID = ResourceID.fromResource(TestUtils.testCustomResource()); + private RateState state; + + @BeforeEach + void initState() { + state = RateState.initialState(); + } @Test void acquirePermissionForNewResource() { var rl = new LinearRateLimiter(REFRESH_PERIOD, 2); - var res = rl.acquirePermission(resourceID); + var res = rl.isLimited(state); assertThat(res).isEmpty(); - res = rl.acquirePermission(resourceID); + res = rl.isLimited(state); assertThat(res).isEmpty(); - res = rl.acquirePermission(resourceID); + res = rl.isLimited(state); assertThat(res).isNotEmpty(); } @Test void returnsMinimalDurationToAcquirePermission() { var rl = new LinearRateLimiter(REFRESH_PERIOD, 1); - var res = rl.acquirePermission(resourceID); + var res = rl.isLimited(state); assertThat(res).isEmpty(); - res = rl.acquirePermission(resourceID); + res = rl.isLimited(state); assertThat(res).isPresent(); assertThat(res.get()).isLessThan(REFRESH_PERIOD); @@ -41,15 +44,15 @@ void returnsMinimalDurationToAcquirePermission() { @Test void resetsPeriodAfterLimit() throws InterruptedException { var rl = new LinearRateLimiter(REFRESH_PERIOD, 1); - var res = rl.acquirePermission(resourceID); + var res = rl.isLimited(state); assertThat(res).isEmpty(); - res = rl.acquirePermission(resourceID); + res = rl.isLimited(state); assertThat(res).isPresent(); // sleep plus some slack Thread.sleep(REFRESH_PERIOD.toMillis() + REFRESH_PERIOD.toMillis() / 3); - res = rl.acquirePermission(resourceID); + res = rl.isLimited(state); assertThat(res).isEmpty(); } @@ -57,7 +60,7 @@ void resetsPeriodAfterLimit() throws InterruptedException { void rateLimitCanBeTurnedOff() { var rl = new LinearRateLimiter(REFRESH_PERIOD, LinearRateLimiter.NO_LIMIT_PERIOD); - var res = rl.acquirePermission(resourceID); + var res = rl.isLimited(state); assertThat(res).isEmpty(); } From 02507ad69f2c845ad15f13360e46724441540a60 Mon Sep 17 00:00:00 2001 From: Chris Laprun Date: Thu, 7 Jul 2022 12:59:23 +0200 Subject: [PATCH 2/3] refactor: remove EventMarker altogether --- .../processing/event/EventMarker.java | 106 ---------------- .../processing/event/EventProcessor.java | 113 ++++++++---------- .../processing/event/ResourceState.java | 67 ++++++++++- .../event/ResourceStateManager.java | 8 ++ .../processing/event/EventMarkerTest.java | 78 ------------ .../event/ResourceStateManagerTest.java | 90 ++++++++++++++ 6 files changed, 215 insertions(+), 247 deletions(-) delete mode 100644 operator-framework-core/src/main/java/io/javaoperatorsdk/operator/processing/event/EventMarker.java delete mode 100644 operator-framework-core/src/test/java/io/javaoperatorsdk/operator/processing/event/EventMarkerTest.java create mode 100644 operator-framework-core/src/test/java/io/javaoperatorsdk/operator/processing/event/ResourceStateManagerTest.java diff --git a/operator-framework-core/src/main/java/io/javaoperatorsdk/operator/processing/event/EventMarker.java b/operator-framework-core/src/main/java/io/javaoperatorsdk/operator/processing/event/EventMarker.java deleted file mode 100644 index 0b5a372ead..0000000000 --- a/operator-framework-core/src/main/java/io/javaoperatorsdk/operator/processing/event/EventMarker.java +++ /dev/null @@ -1,106 +0,0 @@ -package io.javaoperatorsdk.operator.processing.event; - -import java.util.HashMap; -import java.util.List; -import java.util.Map; -import java.util.stream.Collectors; - -import static io.javaoperatorsdk.operator.processing.event.EventMarker.EventingState.NO_EVENT_PRESENT; -import static io.javaoperatorsdk.operator.processing.event.EventMarker.EventingState.PROCESSED_MARK_FOR_DELETION; - -/** - * Manages the state of received events. Basically there can be only three distinct states relevant - * for event processing. Either an event is received, so we eventually process or no event for - * processing at the moment. The third case is if a DELETE event is received, this is a special case - * meaning that the custom resource is deleted. We don't want to do any processing anymore so other - * events are irrelevant for us from this point. Note that the dependant resources are either - * cleaned up by K8S garbage collection or by the controller implementation for cleanup. - */ -class EventMarker { - - public enum EventingState { - EVENT_PRESENT, NO_EVENT_PRESENT, - /** Resource has been marked for deletion, and cleanup already executed successfully */ - PROCESSED_MARK_FOR_DELETION, - /** Delete event present, from this point other events are not relevant */ - DELETE_EVENT_PRESENT, - } - - private final HashMap eventingState = new HashMap<>(); - - private EventingState getEventingState(ResourceID resourceID) { - EventingState actualState = eventingState.get(resourceID); - return actualState == null ? NO_EVENT_PRESENT : actualState; - } - - private void setEventingState(ResourceID resourceID, EventingState state) { - eventingState.put(resourceID, state); - } - - public void markEventReceived(Event event) { - markEventReceived(event.getRelatedCustomResourceID()); - } - - public void markEventReceived(ResourceID resourceID) { - if (deleteEventPresent(resourceID)) { - throw new IllegalStateException("Cannot receive event after a delete event received"); - } - setEventingState(resourceID, EventingState.EVENT_PRESENT); - } - - public void unMarkEventReceived(ResourceID resourceID) { - var actualState = getEventingState(resourceID); - switch (actualState) { - case EVENT_PRESENT: - setEventingState(resourceID, - NO_EVENT_PRESENT); - break; - case PROCESSED_MARK_FOR_DELETION: - throw new IllegalStateException("Cannot unmark processed marked for deletion."); - case DELETE_EVENT_PRESENT: - throw new IllegalStateException("Cannot unmark delete event."); - } - } - - public void markProcessedMarkForDeletion(ResourceID resourceID) { - setEventingState(resourceID, PROCESSED_MARK_FOR_DELETION); - } - - public boolean processedMarkForDeletionPresent(ResourceID resourceID) { - return getEventingState(resourceID) == PROCESSED_MARK_FOR_DELETION; - } - - public void markDeleteEventReceived(Event event) { - markDeleteEventReceived(event.getRelatedCustomResourceID()); - } - - public void markDeleteEventReceived(ResourceID resourceID) { - setEventingState(resourceID, EventingState.DELETE_EVENT_PRESENT); - } - - public boolean deleteEventPresent(ResourceID resourceID) { - return getEventingState(resourceID) == EventingState.DELETE_EVENT_PRESENT; - } - - public boolean eventPresent(ResourceID resourceID) { - var actualState = getEventingState(resourceID); - return actualState == EventingState.EVENT_PRESENT; - } - - public boolean noEventPresent(ResourceID resourceID) { - var actualState = getEventingState(resourceID); - return actualState == NO_EVENT_PRESENT; - } - - public void cleanup(ResourceID resourceID) { - eventingState.remove(resourceID); - } - - public List resourceIDsWithEventPresent() { - return eventingState.entrySet().stream() - .filter(e -> e.getValue() != NO_EVENT_PRESENT) - .map(Map.Entry::getKey) - .collect(Collectors.toList()); - } - -} diff --git a/operator-framework-core/src/main/java/io/javaoperatorsdk/operator/processing/event/EventProcessor.java b/operator-framework-core/src/main/java/io/javaoperatorsdk/operator/processing/event/EventProcessor.java index 8865473e13..fcec37d852 100644 --- a/operator-framework-core/src/main/java/io/javaoperatorsdk/operator/processing/event/EventProcessor.java +++ b/operator-framework-core/src/main/java/io/javaoperatorsdk/operator/processing/event/EventProcessor.java @@ -14,7 +14,6 @@ import io.javaoperatorsdk.operator.api.config.ConfigurationServiceProvider; import io.javaoperatorsdk.operator.api.config.ExecutorServiceManager; import io.javaoperatorsdk.operator.api.monitoring.Metrics; -import io.javaoperatorsdk.operator.api.reconciler.RetryInfo; import io.javaoperatorsdk.operator.processing.LifecycleAware; import io.javaoperatorsdk.operator.processing.MDCUtils; import io.javaoperatorsdk.operator.processing.event.rate.RateLimiter; @@ -41,7 +40,7 @@ class EventProcessor implements EventHandler, LifecycleAw private final Metrics metrics; private final Cache cache; private final EventSourceManager eventSourceManager; - private final EventMarker eventMarker = new EventMarker(); + // private final EventMarker eventMarker = new EventMarker(); private final RateLimiter rateLimiter; private final ResourceStateManager resourceStateManager = new ResourceStateManager(); @@ -108,60 +107,60 @@ public synchronized void handleEvent(Event event) { log.debug("Received event: {}", event); final var resourceID = event.getRelatedCustomResourceID(); + final var state = resourceStateManager.getOrCreate(event.getRelatedCustomResourceID()); MDCUtils.addResourceIDInfo(resourceID); metrics.receivedEvent(event); - handleEventMarking(event); + handleEventMarking(event, state); if (!this.running) { // events are received and marked, but will be processed when started, see start() method. log.debug("Skipping event: {} because the event processor is not started", event); return; } - handleMarkedEventForResource(resourceID); + handleMarkedEventForResource(state); } finally { MDCUtils.removeResourceIDInfo(); } } - private void handleMarkedEventForResource(ResourceID resourceID) { - if (eventMarker.deleteEventPresent(resourceID)) { - cleanupForDeletedEvent(resourceID); - } else if (!eventMarker.processedMarkForDeletionPresent(resourceID)) { - submitReconciliationExecution(resourceID); + private void handleMarkedEventForResource(ResourceState state) { + if (state.deleteEventPresent()) { + cleanupForDeletedEvent(state.getId()); + } else if (!state.processedMarkForDeletionPresent()) { + submitReconciliationExecution(state); } } - private void submitReconciliationExecution(ResourceID resourceID) { + private void submitReconciliationExecution(ResourceState state) { try { - boolean controllerUnderExecution = isControllerUnderExecution(resourceID); - Optional latest = cache.get(resourceID); + boolean controllerUnderExecution = isControllerUnderExecution(state); + Optional latest = cache.get(state.getId()); latest.ifPresent(MDCUtils::addResourceInfo); if (!controllerUnderExecution && latest.isPresent()) { - final var resourceState = resourceStateManager.getOrCreate(resourceID); - var rateLimit = resourceState.getRateLimit(); + var rateLimit = state.getRateLimit(); if (rateLimit == null) { rateLimit = rateLimiter.initState(); - resourceState.setRateLimit(rateLimit); + state.setRateLimit(rateLimit); } var rateLimiterPermission = rateLimiter.isLimited(rateLimit); if (rateLimiterPermission.isPresent()) { - handleRateLimitedSubmission(resourceID, rateLimiterPermission.get()); + handleRateLimitedSubmission(state.getId(), rateLimiterPermission.get()); return; } - setUnderExecutionProcessing(resourceID); - final var retryInfo = retryInfo(resourceID); + state.setUnderProcessing(true); + final var retryInfo = state.getRetry(); ExecutionScope executionScope = new ExecutionScope<>(latest.get(), retryInfo); - eventMarker.unMarkEventReceived(resourceID); - metrics.reconcileCustomResource(resourceID, retryInfo); + state.unMarkEventReceived(); + metrics.reconcileCustomResource(state.getId(), retryInfo); log.debug("Executing events for custom resource. Scope: {}", executionScope); executor.execute(new ControllerExecution(executionScope)); } else { log.debug( "Skipping executing controller for resource id: {}. Controller in execution: {}. Latest Resource present: {}", - resourceID, + state, controllerUnderExecution, latest.isPresent()); if (latest.isEmpty()) { - log.debug("no custom resource found in cache for ResourceID: {}", resourceID); + log.debug("no custom resource found in cache for ResourceID: {}", state); } } } finally { @@ -169,16 +168,15 @@ private void submitReconciliationExecution(ResourceID resourceID) { } } - private void handleEventMarking(Event event) { + private void handleEventMarking(Event event, ResourceState state) { final var relatedCustomResourceID = event.getRelatedCustomResourceID(); if (event instanceof ResourceEvent) { var resourceEvent = (ResourceEvent) event; if (resourceEvent.getAction() == ResourceAction.DELETED) { log.debug("Marking delete event received for: {}", relatedCustomResourceID); - eventMarker.markDeleteEventReceived(event); + state.markDeleteEventReceived(); } else { - if (eventMarker.processedMarkForDeletionPresent(relatedCustomResourceID) - && isResourceMarkedForDeletion(resourceEvent)) { + if (state.processedMarkForDeletionPresent() && isResourceMarkedForDeletion(resourceEvent)) { log.debug( "Skipping mark of event received, since already processed mark for deletion and resource marked for deletion: {}", relatedCustomResourceID); @@ -190,22 +188,21 @@ && isResourceMarkedForDeletion(resourceEvent)) { // removed, but also the informers websocket is disconnected and later reconnected. So // meanwhile the resource could be deleted and recreated. In this case we just mark a new // event as below. - markEventReceived(event); + markEventReceived(state); } - } else if (!eventMarker.deleteEventPresent(relatedCustomResourceID) || - !eventMarker.processedMarkForDeletionPresent(relatedCustomResourceID)) { - markEventReceived(event); + } else if (!state.deleteEventPresent() || !state.processedMarkForDeletionPresent()) { + markEventReceived(state); } else if (log.isDebugEnabled()) { log.debug( "Skipped marking event as received. Delete event present: {}, processed mark for deletion: {}", - eventMarker.deleteEventPresent(relatedCustomResourceID), - eventMarker.processedMarkForDeletionPresent(relatedCustomResourceID)); + state.deleteEventPresent(), + state.processedMarkForDeletionPresent()); } } - private void markEventReceived(Event event) { - log.debug("Marking event received for: {}", event.getRelatedCustomResourceID()); - eventMarker.markEventReceived(event); + private void markEventReceived(ResourceState state) { + log.debug("Marking event received for: {}", state.getId()); + state.markEventReceived(); } private boolean isResourceMarkedForDeletion(ResourceEvent resourceEvent) { @@ -220,16 +217,13 @@ private void handleRateLimitedSubmission(ResourceID resourceID, Duration minimal Math.max(minimalDurationMillis, MINIMAL_RATE_LIMIT_RESCHEDULE_DURATION)); } - private RetryInfo retryInfo(ResourceID resourceID) { - return resourceStateManager.getOrCreate(resourceID).getRetry(); - } - synchronized void eventProcessingFinished( ExecutionScope executionScope, PostExecutionControl postExecutionControl) { if (!running) { return; } ResourceID resourceID = executionScope.getResourceID(); + final var state = resourceStateManager.getOrCreate(resourceID); log.debug( "Event processing finished. Scope: {}, PostExecutionControl: {}", executionScope, @@ -241,17 +235,17 @@ synchronized void eventProcessingFinished( // Either way we don't want to retry. if (isRetryConfigured() && postExecutionControl.exceptionDuringExecution() - && !eventMarker.deleteEventPresent(resourceID)) { + && !state.deleteEventPresent()) { handleRetryOnException( executionScope, postExecutionControl.getRuntimeException().orElseThrow()); return; } cleanupOnSuccessfulExecution(executionScope); metrics.finishedReconciliation(resourceID); - if (eventMarker.deleteEventPresent(resourceID)) { + if (state.deleteEventPresent()) { cleanupForDeletedEvent(executionScope.getResourceID()); } else if (postExecutionControl.isFinalizerRemoved()) { - eventMarker.markProcessedMarkForDeletion(resourceID); + state.markProcessedMarkForDeletion(); } else { postExecutionControl .getUpdatedCustomResource() @@ -264,8 +258,8 @@ synchronized void eventProcessingFinished( ResourceID.fromResource(r), r, executionScope.getResource()); } }); - if (eventMarker.eventPresent(resourceID)) { - submitReconciliationExecution(resourceID); + if (state.eventPresent()) { + submitReconciliationExecution(state); } else { reScheduleExecutionIfInstructed(postExecutionControl, executionScope.getResource()); } @@ -296,17 +290,17 @@ TimerEventSource retryEventSource() { */ private void handleRetryOnException( ExecutionScope executionScope, Exception exception) { - RetryExecution execution = getOrInitRetryExecution(executionScope); - var resourceID = executionScope.getResourceID(); - boolean eventPresent = eventMarker.eventPresent(resourceID); - eventMarker.markEventReceived(resourceID); + final var state = getOrInitRetryExecution(executionScope); + var resourceID = state.getId(); + boolean eventPresent = state.eventPresent(); + state.markEventReceived(); if (eventPresent) { log.debug("New events exists for for resource id: {}", resourceID); - submitReconciliationExecution(resourceID); + submitReconciliationExecution(state); return; } - Optional nextDelay = execution.nextDelay(); + Optional nextDelay = state.getRetry().nextDelay(); nextDelay.ifPresentOrElse( delay -> { @@ -329,29 +323,24 @@ private void cleanupOnSuccessfulExecution(ExecutionScope executionScope) { retryEventSource().cancelOnceSchedule(executionScope.getResourceID()); } - private RetryExecution getOrInitRetryExecution(ExecutionScope executionScope) { + private ResourceState getOrInitRetryExecution(ExecutionScope executionScope) { final var state = resourceStateManager.getOrCreate(executionScope.getResourceID()); RetryExecution retryExecution = state.getRetry(); if (retryExecution == null) { retryExecution = retry.initExecution(); state.setRetry(retryExecution); } - return retryExecution; + return state; } private void cleanupForDeletedEvent(ResourceID resourceID) { log.debug("Cleaning up for delete event for: {}", resourceID); - eventMarker.cleanup(resourceID); resourceStateManager.remove(resourceID); metrics.cleanupDoneFor(resourceID); } - private boolean isControllerUnderExecution(ResourceID resourceID) { - return resourceStateManager.getOrCreate(resourceID).isUnderProcessing(); - } - - private void setUnderExecutionProcessing(ResourceID resourceID) { - resourceStateManager.getOrCreate(resourceID).setUnderProcessing(true); + private boolean isControllerUnderExecution(ResourceState state) { + return state.isUnderProcessing(); } private void unsetUnderExecution(ResourceID resourceID) { @@ -374,8 +363,8 @@ public void start() throws OperatorException { } private void handleAlreadyMarkedEvents() { - for (ResourceID resourceID : eventMarker.resourceIDsWithEventPresent()) { - handleMarkedEventForResource(resourceID); + for (var state : resourceStateManager.resourcesWithEventPresent()) { + handleMarkedEventForResource(state); } } @@ -411,6 +400,6 @@ public String toString() { } public synchronized boolean isUnderProcessing(ResourceID resourceID) { - return isControllerUnderExecution(resourceID); + return isControllerUnderExecution(resourceStateManager.getOrCreate(resourceID)); } } diff --git a/operator-framework-core/src/main/java/io/javaoperatorsdk/operator/processing/event/ResourceState.java b/operator-framework-core/src/main/java/io/javaoperatorsdk/operator/processing/event/ResourceState.java index 351bbd6202..32ae9826aa 100644 --- a/operator-framework-core/src/main/java/io/javaoperatorsdk/operator/processing/event/ResourceState.java +++ b/operator-framework-core/src/main/java/io/javaoperatorsdk/operator/processing/event/ResourceState.java @@ -1,10 +1,31 @@ package io.javaoperatorsdk.operator.processing.event; -import io.javaoperatorsdk.operator.processing.event.EventMarker.EventingState; import io.javaoperatorsdk.operator.processing.event.rate.RateLimiter.RateLimitState; import io.javaoperatorsdk.operator.processing.retry.RetryExecution; class ResourceState { + + /** + * Manages the state of received events. Basically there can be only three distinct states + * relevant for event processing. Either an event is received, so we eventually process or no + * event for processing at the moment. The third case is if a DELETE event is received, this is a + * special case meaning that the custom resource is deleted. We don't want to do any processing + * anymore so other events are irrelevant for us from this point. Note that the dependant + * resources are either cleaned up by K8S garbage collection or by the controller implementation + * for cleanup. + */ + private enum EventingState { + EVENT_PRESENT, NO_EVENT_PRESENT, + /** + * Resource has been marked for deletion, and cleanup already executed successfully + */ + PROCESSED_MARK_FOR_DELETION, + /** + * Delete event present, from this point other events are not relevant + */ + DELETE_EVENT_PRESENT, + } + private final ResourceID id; private boolean underProcessing; @@ -14,6 +35,7 @@ class ResourceState { public ResourceState(ResourceID id) { this.id = id; + eventing = EventingState.NO_EVENT_PRESENT; } public ResourceID getId() { @@ -43,4 +65,47 @@ public boolean isUnderProcessing() { public void setUnderProcessing(boolean underProcessing) { this.underProcessing = underProcessing; } + + public void markDeleteEventReceived() { + eventing = EventingState.DELETE_EVENT_PRESENT; + } + + public boolean deleteEventPresent() { + return eventing == EventingState.DELETE_EVENT_PRESENT; + } + + public boolean processedMarkForDeletionPresent() { + return eventing == EventingState.PROCESSED_MARK_FOR_DELETION; + } + + public void markEventReceived() { + if (deleteEventPresent()) { + throw new IllegalStateException("Cannot receive event after a delete event received"); + } + eventing = EventingState.EVENT_PRESENT; + } + + public void markProcessedMarkForDeletion() { + eventing = EventingState.PROCESSED_MARK_FOR_DELETION; + } + + public boolean eventPresent() { + return eventing == EventingState.EVENT_PRESENT; + } + + public boolean noEventPresent() { + return eventing == EventingState.NO_EVENT_PRESENT; + } + + public void unMarkEventReceived() { + switch (eventing) { + case EVENT_PRESENT: + eventing = EventingState.NO_EVENT_PRESENT; + break; + case PROCESSED_MARK_FOR_DELETION: + throw new IllegalStateException("Cannot unmark processed marked for deletion."); + case DELETE_EVENT_PRESENT: + throw new IllegalStateException("Cannot unmark delete event."); + } + } } diff --git a/operator-framework-core/src/main/java/io/javaoperatorsdk/operator/processing/event/ResourceStateManager.java b/operator-framework-core/src/main/java/io/javaoperatorsdk/operator/processing/event/ResourceStateManager.java index e9dfc324fb..69e8126b59 100644 --- a/operator-framework-core/src/main/java/io/javaoperatorsdk/operator/processing/event/ResourceStateManager.java +++ b/operator-framework-core/src/main/java/io/javaoperatorsdk/operator/processing/event/ResourceStateManager.java @@ -1,7 +1,9 @@ package io.javaoperatorsdk.operator.processing.event; +import java.util.List; import java.util.Map; import java.util.concurrent.ConcurrentHashMap; +import java.util.stream.Collectors; class ResourceStateManager { // maybe we should have a way for users to specify a hint on the amount of CRs their reconciler @@ -21,4 +23,10 @@ public void remove(ResourceID resourceID) { public boolean contains(ResourceID resourceID) { return states.containsKey(resourceID); } + + public List resourcesWithEventPresent() { + return states.values().stream() + .filter(state -> !state.noEventPresent()) + .collect(Collectors.toList()); + } } diff --git a/operator-framework-core/src/test/java/io/javaoperatorsdk/operator/processing/event/EventMarkerTest.java b/operator-framework-core/src/test/java/io/javaoperatorsdk/operator/processing/event/EventMarkerTest.java deleted file mode 100644 index 7f808874a8..0000000000 --- a/operator-framework-core/src/test/java/io/javaoperatorsdk/operator/processing/event/EventMarkerTest.java +++ /dev/null @@ -1,78 +0,0 @@ -package io.javaoperatorsdk.operator.processing.event; - -import org.junit.jupiter.api.Assertions; -import org.junit.jupiter.api.Test; - -import static org.assertj.core.api.Assertions.assertThat; - -class EventMarkerTest { - - private final EventMarker eventMarker = new EventMarker(); - private ResourceID sampleResourceID = new ResourceID("test-name"); - private ResourceID sampleResourceID2 = new ResourceID("test-name2"); - - @Test - public void returnsNoEventPresentIfNotMarkedYet() { - assertThat(eventMarker.noEventPresent(sampleResourceID)).isTrue(); - } - - @Test - public void marksEvent() { - eventMarker.markEventReceived(sampleResourceID); - - assertThat(eventMarker.eventPresent(sampleResourceID)).isTrue(); - assertThat(eventMarker.deleteEventPresent(sampleResourceID)).isFalse(); - } - - @Test - public void marksDeleteEvent() { - eventMarker.markDeleteEventReceived(sampleResourceID); - - assertThat(eventMarker.deleteEventPresent(sampleResourceID)) - .isTrue(); - assertThat(eventMarker.eventPresent(sampleResourceID)).isFalse(); - } - - @Test - public void afterDeleteEventMarkEventIsNotRelevant() { - eventMarker.markEventReceived(sampleResourceID); - - eventMarker.markDeleteEventReceived(sampleResourceID); - - assertThat(eventMarker.deleteEventPresent(sampleResourceID)) - .isTrue(); - assertThat(eventMarker.eventPresent(sampleResourceID)).isFalse(); - } - - @Test - public void cleansUp() { - eventMarker.markEventReceived(sampleResourceID); - eventMarker.markDeleteEventReceived(sampleResourceID); - - eventMarker.cleanup(sampleResourceID); - - assertThat(eventMarker.deleteEventPresent(sampleResourceID)).isFalse(); - assertThat(eventMarker.eventPresent(sampleResourceID)).isFalse(); - } - - @Test - public void cannotMarkEventAfterDeleteEventReceived() { - Assertions.assertThrows(IllegalStateException.class, () -> { - eventMarker.markDeleteEventReceived(sampleResourceID); - eventMarker.markEventReceived(sampleResourceID); - }); - } - - @Test - public void listsResourceIDSWithEventsPresent() { - eventMarker.markEventReceived(sampleResourceID); - eventMarker.markEventReceived(sampleResourceID2); - eventMarker.unMarkEventReceived(sampleResourceID); - - var res = eventMarker.resourceIDsWithEventPresent(); - - assertThat(res).hasSize(1); - assertThat(res).contains(sampleResourceID2); - } - -} diff --git a/operator-framework-core/src/test/java/io/javaoperatorsdk/operator/processing/event/ResourceStateManagerTest.java b/operator-framework-core/src/test/java/io/javaoperatorsdk/operator/processing/event/ResourceStateManagerTest.java new file mode 100644 index 0000000000..1cc925fd01 --- /dev/null +++ b/operator-framework-core/src/test/java/io/javaoperatorsdk/operator/processing/event/ResourceStateManagerTest.java @@ -0,0 +1,90 @@ +package io.javaoperatorsdk.operator.processing.event; + +import org.junit.jupiter.api.Assertions; +import org.junit.jupiter.api.BeforeEach; +import org.junit.jupiter.api.Test; + +import static org.assertj.core.api.Assertions.assertThat; + +class ResourceStateManagerTest { + + private final ResourceStateManager manager = new ResourceStateManager(); + private final ResourceID sampleResourceID = new ResourceID("test-name"); + private final ResourceID sampleResourceID2 = new ResourceID("test-name2"); + private ResourceState state; + private ResourceState state2; + + @BeforeEach + void init() { + manager.remove(sampleResourceID); + manager.remove(sampleResourceID2); + + state = manager.getOrCreate(sampleResourceID); + state2 = manager.getOrCreate(sampleResourceID2); + } + + + @Test + public void returnsNoEventPresentIfNotMarkedYet() { + assertThat(state.noEventPresent()).isTrue(); + } + + @Test + public void marksEvent() { + state.markEventReceived(); + + assertThat(state.eventPresent()).isTrue(); + assertThat(state.deleteEventPresent()).isFalse(); + } + + @Test + public void marksDeleteEvent() { + state.markDeleteEventReceived(); + + assertThat(state.deleteEventPresent()).isTrue(); + assertThat(state.eventPresent()).isFalse(); + } + + @Test + public void afterDeleteEventMarkEventIsNotRelevant() { + state.markEventReceived(); + + state.markDeleteEventReceived(); + + assertThat(state.deleteEventPresent()).isTrue(); + assertThat(state.eventPresent()).isFalse(); + } + + @Test + public void cleansUp() { + state.markEventReceived(); + state.markDeleteEventReceived(); + + manager.remove(sampleResourceID); + + state = manager.getOrCreate(sampleResourceID); + assertThat(state.deleteEventPresent()).isFalse(); + assertThat(state.eventPresent()).isFalse(); + } + + @Test + public void cannotMarkEventAfterDeleteEventReceived() { + Assertions.assertThrows(IllegalStateException.class, () -> { + state.markDeleteEventReceived(); + state.markEventReceived(); + }); + } + + @Test + public void listsResourceIDSWithEventsPresent() { + state.markEventReceived(); + state2.markEventReceived(); + state.unMarkEventReceived(); + + var res = manager.resourcesWithEventPresent(); + + assertThat(res).hasSize(1); + assertThat(res.get(0).getId()).isEqualTo(sampleResourceID2); + } + +} From f912a2ec28c3312424e85f5553e554c1ffb0b2a1 Mon Sep 17 00:00:00 2001 From: Chris Laprun Date: Mon, 11 Jul 2022 19:02:13 +0200 Subject: [PATCH 3/3] fix: adapt default state and clean up --- .../operator/processing/event/rate/LinearRateLimiter.java | 7 ++----- 1 file changed, 2 insertions(+), 5 deletions(-) diff --git a/operator-framework-core/src/main/java/io/javaoperatorsdk/operator/processing/event/rate/LinearRateLimiter.java b/operator-framework-core/src/main/java/io/javaoperatorsdk/operator/processing/event/rate/LinearRateLimiter.java index b492e11412..02a919f547 100644 --- a/operator-framework-core/src/main/java/io/javaoperatorsdk/operator/processing/event/rate/LinearRateLimiter.java +++ b/operator-framework-core/src/main/java/io/javaoperatorsdk/operator/processing/event/rate/LinearRateLimiter.java @@ -5,7 +5,6 @@ import java.util.Optional; import io.javaoperatorsdk.operator.api.config.AnnotationConfigurable; -import io.javaoperatorsdk.operator.processing.event.ResourceID; /** * A simple rate limiter that limits the number of permission for a time interval. @@ -16,20 +15,18 @@ public class LinearRateLimiter /** To turn off rate limiting set limit for period to a non-positive number */ public static final int NO_LIMIT_PERIOD = -1; public static final int DEFAULT_REFRESH_PERIOD_SECONDS = 10; - public static final int DEFAULT_LIMIT_FOR_PERIOD = 3; public static final Duration DEFAULT_REFRESH_PERIOD = Duration.ofSeconds(DEFAULT_REFRESH_PERIOD_SECONDS); - private Duration refreshPeriod; - private int limitForPeriod = NO_LIMIT_PERIOD; + private int limitForPeriod; public static LinearRateLimiter deactivatedRateLimiter() { return new LinearRateLimiter(); } public LinearRateLimiter() { - this(DEFAULT_REFRESH_PERIOD, DEFAULT_LIMIT_FOR_PERIOD); + this(DEFAULT_REFRESH_PERIOD, NO_LIMIT_PERIOD); } public LinearRateLimiter(Duration refreshPeriod, int limitForPeriod) {