Skip to content

Commit 8afc686

Browse files
committed
refactor: manage state associated with a resource in a single spot
Fixes #1314
1 parent 111506a commit 8afc686

File tree

8 files changed

+130
-54
lines changed

8 files changed

+130
-54
lines changed

operator-framework-core/src/main/java/io/javaoperatorsdk/operator/processing/event/EventProcessor.java

Lines changed: 23 additions & 17 deletions
Original file line numberDiff line numberDiff line change
@@ -1,11 +1,7 @@
11
package io.javaoperatorsdk.operator.processing.event;
22

33
import java.time.Duration;
4-
import java.util.HashMap;
5-
import java.util.HashSet;
6-
import java.util.Map;
74
import java.util.Optional;
8-
import java.util.Set;
95
import java.util.concurrent.ExecutorService;
106
import java.util.concurrent.ScheduledThreadPoolExecutor;
117

@@ -22,6 +18,7 @@
2218
import io.javaoperatorsdk.operator.processing.LifecycleAware;
2319
import io.javaoperatorsdk.operator.processing.MDCUtils;
2420
import io.javaoperatorsdk.operator.processing.event.rate.RateLimiter;
21+
import io.javaoperatorsdk.operator.processing.event.rate.RateLimiter.RateLimitState;
2522
import io.javaoperatorsdk.operator.processing.event.source.Cache;
2623
import io.javaoperatorsdk.operator.processing.event.source.controller.ResourceAction;
2724
import io.javaoperatorsdk.operator.processing.event.source.controller.ResourceEvent;
@@ -37,17 +34,17 @@ class EventProcessor<R extends HasMetadata> implements EventHandler, LifecycleAw
3734
private static final long MINIMAL_RATE_LIMIT_RESCHEDULE_DURATION = 50;
3835

3936
private volatile boolean running;
40-
private final Set<ResourceID> underProcessing = new HashSet<>();
4137
private final ReconciliationDispatcher<R> reconciliationDispatcher;
4238
private final Retry retry;
43-
private final Map<ResourceID, RetryExecution> retryState = new HashMap<>();
4439
private final ExecutorService executor;
4540
private final String controllerName;
4641
private final Metrics metrics;
4742
private final Cache<R> cache;
4843
private final EventSourceManager<R> eventSourceManager;
4944
private final EventMarker eventMarker = new EventMarker();
50-
private final RateLimiter rateLimiter;
45+
private final RateLimiter<? extends RateLimitState> rateLimiter;
46+
47+
private final ResourceStateManager resourceStateManager = new ResourceStateManager();
5148

5249
EventProcessor(EventSourceManager<R> eventSourceManager) {
5350
this(
@@ -61,6 +58,7 @@ class EventProcessor<R extends HasMetadata> implements EventHandler, LifecycleAw
6158
eventSourceManager);
6259
}
6360

61+
@SuppressWarnings("rawtypes")
6462
EventProcessor(
6563
ReconciliationDispatcher<R> reconciliationDispatcher,
6664
EventSourceManager<R> eventSourceManager,
@@ -79,6 +77,7 @@ class EventProcessor<R extends HasMetadata> implements EventHandler, LifecycleAw
7977
eventSourceManager);
8078
}
8179

80+
@SuppressWarnings({"rawtypes", "unchecked"})
8281
private EventProcessor(
8382
Cache<R> cache,
8483
ExecutorService executor,
@@ -137,7 +136,13 @@ private void submitReconciliationExecution(ResourceID resourceID) {
137136
Optional<R> latest = cache.get(resourceID);
138137
latest.ifPresent(MDCUtils::addResourceInfo);
139138
if (!controllerUnderExecution && latest.isPresent()) {
140-
var rateLimiterPermission = rateLimiter.acquirePermission(resourceID);
139+
final var resourceState = resourceStateManager.getOrCreate(resourceID);
140+
var rateLimit = resourceState.getRateLimit();
141+
if (rateLimit == null) {
142+
rateLimit = rateLimiter.initState();
143+
resourceState.setRateLimit(rateLimit);
144+
}
145+
var rateLimiterPermission = rateLimiter.isLimited(rateLimit);
141146
if (rateLimiterPermission.isPresent()) {
142147
handleRateLimitedSubmission(resourceID, rateLimiterPermission.get());
143148
return;
@@ -216,7 +221,7 @@ private void handleRateLimitedSubmission(ResourceID resourceID, Duration minimal
216221
}
217222

218223
private RetryInfo retryInfo(ResourceID resourceID) {
219-
return retryState.get(resourceID);
224+
return resourceStateManager.getOrCreate(resourceID).getRetry();
220225
}
221226

222227
synchronized void eventProcessingFinished(
@@ -319,37 +324,38 @@ private void cleanupOnSuccessfulExecution(ExecutionScope<R> executionScope) {
319324
log.debug(
320325
"Cleanup for successful execution for resource: {}", getName(executionScope.getResource()));
321326
if (isRetryConfigured()) {
322-
retryState.remove(executionScope.getResourceID());
327+
resourceStateManager.getOrCreate(executionScope.getResourceID()).setRetry(null);
323328
}
324329
retryEventSource().cancelOnceSchedule(executionScope.getResourceID());
325330
}
326331

327332
private RetryExecution getOrInitRetryExecution(ExecutionScope<R> executionScope) {
328-
RetryExecution retryExecution = retryState.get(executionScope.getResourceID());
333+
final var state = resourceStateManager.getOrCreate(executionScope.getResourceID());
334+
RetryExecution retryExecution = state.getRetry();
329335
if (retryExecution == null) {
330336
retryExecution = retry.initExecution();
331-
retryState.put(executionScope.getResourceID(), retryExecution);
337+
state.setRetry(retryExecution);
332338
}
333339
return retryExecution;
334340
}
335341

336342
private void cleanupForDeletedEvent(ResourceID resourceID) {
337343
log.debug("Cleaning up for delete event for: {}", resourceID);
338344
eventMarker.cleanup(resourceID);
339-
rateLimiter.clear(resourceID);
345+
resourceStateManager.remove(resourceID);
340346
metrics.cleanupDoneFor(resourceID);
341347
}
342348

343349
private boolean isControllerUnderExecution(ResourceID resourceID) {
344-
return underProcessing.contains(resourceID);
350+
return resourceStateManager.getOrCreate(resourceID).isUnderProcessing();
345351
}
346352

347353
private void setUnderExecutionProcessing(ResourceID resourceID) {
348-
underProcessing.add(resourceID);
354+
resourceStateManager.getOrCreate(resourceID).setUnderProcessing(true);
349355
}
350356

351357
private void unsetUnderExecution(ResourceID resourceID) {
352-
underProcessing.remove(resourceID);
358+
resourceStateManager.getOrCreate(resourceID).setUnderProcessing(false);
353359
}
354360

355361
private boolean isRetryConfigured() {
@@ -405,6 +411,6 @@ public String toString() {
405411
}
406412

407413
public synchronized boolean isUnderProcessing(ResourceID resourceID) {
408-
return underProcessing.contains(resourceID);
414+
return isControllerUnderExecution(resourceID);
409415
}
410416
}
Lines changed: 46 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,46 @@
1+
package io.javaoperatorsdk.operator.processing.event;
2+
3+
import io.javaoperatorsdk.operator.processing.event.EventMarker.EventingState;
4+
import io.javaoperatorsdk.operator.processing.event.rate.RateLimiter.RateLimitState;
5+
import io.javaoperatorsdk.operator.processing.retry.RetryExecution;
6+
7+
class ResourceState {
8+
private final ResourceID id;
9+
10+
private boolean underProcessing;
11+
private RetryExecution retry;
12+
private EventingState eventing;
13+
private RateLimitState rateLimit;
14+
15+
public ResourceState(ResourceID id) {
16+
this.id = id;
17+
}
18+
19+
public ResourceID getId() {
20+
return id;
21+
}
22+
23+
public RateLimitState getRateLimit() {
24+
return rateLimit;
25+
}
26+
27+
public void setRateLimit(RateLimitState rateLimit) {
28+
this.rateLimit = rateLimit;
29+
}
30+
31+
public RetryExecution getRetry() {
32+
return retry;
33+
}
34+
35+
public void setRetry(RetryExecution retry) {
36+
this.retry = retry;
37+
}
38+
39+
public boolean isUnderProcessing() {
40+
return underProcessing;
41+
}
42+
43+
public void setUnderProcessing(boolean underProcessing) {
44+
this.underProcessing = underProcessing;
45+
}
46+
}
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,24 @@
1+
package io.javaoperatorsdk.operator.processing.event;
2+
3+
import java.util.Map;
4+
import java.util.concurrent.ConcurrentHashMap;
5+
6+
class ResourceStateManager {
7+
// maybe we should have a way for users to specify a hint on the amount of CRs their reconciler
8+
// will process to avoid under- or over-sizing the state maps and avoid too many resizing that
9+
// take time and memory?
10+
private final Map<ResourceID, ResourceState> states = new ConcurrentHashMap<>(100);
11+
12+
13+
public ResourceState getOrCreate(ResourceID resourceID) {
14+
return states.computeIfAbsent(resourceID, ResourceState::new);
15+
}
16+
17+
public void remove(ResourceID resourceID) {
18+
states.remove(resourceID);
19+
}
20+
21+
public boolean contains(ResourceID resourceID) {
22+
return states.containsKey(resourceID);
23+
}
24+
}

operator-framework-core/src/main/java/io/javaoperatorsdk/operator/processing/event/rate/PeriodRateLimiter.java

Lines changed: 7 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -2,16 +2,12 @@
22

33
import java.time.Duration;
44
import java.time.LocalDateTime;
5-
import java.util.HashMap;
6-
import java.util.Map;
75
import java.util.Optional;
86

9-
import io.javaoperatorsdk.operator.processing.event.ResourceID;
10-
117
/**
128
* A Simple rate limiter that limits the number of permission for a time interval.
139
*/
14-
public class PeriodRateLimiter implements RateLimiter {
10+
public class PeriodRateLimiter implements RateLimiter<RateState> {
1511

1612
public static final int DEFAULT_REFRESH_PERIOD_SECONDS = 10;
1713
public static final int DEFAULT_LIMIT_FOR_PERIOD = 3;
@@ -24,8 +20,6 @@ public class PeriodRateLimiter implements RateLimiter {
2420
private Duration refreshPeriod;
2521
private int limitForPeriod;
2622

27-
private Map<ResourceID, RateState> limitData = new HashMap<>();
28-
2923
public PeriodRateLimiter() {
3024
this(DEFAULT_REFRESH_PERIOD, DEFAULT_LIMIT_FOR_PERIOD);
3125
}
@@ -36,11 +30,12 @@ public PeriodRateLimiter(Duration refreshPeriod, int limitForPeriod) {
3630
}
3731

3832
@Override
39-
public Optional<Duration> acquirePermission(ResourceID resourceID) {
40-
if (limitForPeriod <= 0) {
33+
public Optional<Duration> isLimited(RateLimitState rateLimitState) {
34+
if (limitForPeriod <= 0 || !(rateLimitState instanceof RateState)) {
4135
return Optional.empty();
4236
}
43-
var actualState = limitData.computeIfAbsent(resourceID, r -> RateState.initialState());
37+
38+
var actualState = (RateState) rateLimitState;
4439
if (actualState.getCount() < limitForPeriod) {
4540
actualState.increaseCount();
4641
return Optional.empty();
@@ -55,7 +50,7 @@ public Optional<Duration> acquirePermission(ResourceID resourceID) {
5550
}
5651

5752
@Override
58-
public void clear(ResourceID resourceID) {
59-
limitData.remove(resourceID);
53+
public RateState initState() {
54+
return RateState.initialState();
6055
}
6156
}

operator-framework-core/src/main/java/io/javaoperatorsdk/operator/processing/event/rate/RateLimiter.java

Lines changed: 6 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -3,20 +3,17 @@
33
import java.time.Duration;
44
import java.util.Optional;
55

6-
import io.javaoperatorsdk.operator.processing.event.ResourceID;
6+
import io.javaoperatorsdk.operator.processing.event.rate.RateLimiter.RateLimitState;
77

8-
public interface RateLimiter {
8+
public interface RateLimiter<S extends RateLimitState> {
9+
interface RateLimitState {
10+
}
911

1012
/**
11-
* @param resourceID id of the resource
1213
* @return empty if permission acquired or minimal duration until a permission could be acquired
1314
* again
1415
*/
15-
Optional<Duration> acquirePermission(ResourceID resourceID);
16-
17-
/**
18-
* Cleanup state. Called when resource is deleted.
19-
*/
20-
void clear(ResourceID resourceID);
16+
Optional<Duration> isLimited(RateLimitState rateLimitState);
2117

18+
S initState();
2219
}

operator-framework-core/src/main/java/io/javaoperatorsdk/operator/processing/event/rate/RateState.java

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -2,7 +2,9 @@
22

33
import java.time.LocalDateTime;
44

5-
class RateState {
5+
import io.javaoperatorsdk.operator.processing.event.rate.RateLimiter.RateLimitState;
6+
7+
class RateState implements RateLimitState {
68

79
private LocalDateTime lastRefreshTime;
810
private int count;

operator-framework-core/src/test/java/io/javaoperatorsdk/operator/processing/event/EventProcessorTest.java

Lines changed: 5 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -19,6 +19,7 @@
1919
import io.javaoperatorsdk.operator.api.monitoring.Metrics;
2020
import io.javaoperatorsdk.operator.processing.event.rate.PeriodRateLimiter;
2121
import io.javaoperatorsdk.operator.processing.event.rate.RateLimiter;
22+
import io.javaoperatorsdk.operator.processing.event.rate.RateLimiter.RateLimitState;
2223
import io.javaoperatorsdk.operator.processing.event.source.controller.ControllerResourceEventSource;
2324
import io.javaoperatorsdk.operator.processing.event.source.controller.ResourceAction;
2425
import io.javaoperatorsdk.operator.processing.event.source.controller.ResourceEvent;
@@ -78,7 +79,7 @@ void setup() {
7879
eventProcessorWithRetry.start();
7980
when(eventProcessor.retryEventSource()).thenReturn(retryTimerEventSourceMock);
8081
when(eventProcessorWithRetry.retryEventSource()).thenReturn(retryTimerEventSourceMock);
81-
when(rateLimiterMock.acquirePermission(any())).thenReturn(Optional.empty());
82+
when(rateLimiterMock.isLimited(any())).thenReturn(Optional.empty());
8283
}
8384

8485
@Test
@@ -351,7 +352,9 @@ void rateLimitsReconciliationSubmission() {
351352
var refreshPeriod = Duration.ofMillis(100);
352353
var event = prepareCREvent();
353354

354-
when(rateLimiterMock.acquirePermission(event.getRelatedCustomResourceID()))
355+
final var rateLimit = new RateLimitState() {};
356+
when(rateLimiterMock.initState()).thenReturn(rateLimit);
357+
when(rateLimiterMock.isLimited(rateLimit))
355358
.thenReturn(Optional.empty())
356359
.thenReturn(Optional.of(refreshPeriod));
357360

0 commit comments

Comments
 (0)