Skip to content

Commit dec972b

Browse files
committed
controller integration
1 parent 4845438 commit dec972b

File tree

4 files changed

+44
-26
lines changed

4 files changed

+44
-26
lines changed

operator-framework-core/src/main/java/io/javaoperatorsdk/operator/processing/event/EventProcessor.java

Lines changed: 22 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,6 @@
11
package io.javaoperatorsdk.operator.processing.event;
22

3+
import java.time.Duration;
34
import java.util.HashMap;
45
import java.util.HashSet;
56
import java.util.Map;
@@ -20,6 +21,7 @@
2021
import io.javaoperatorsdk.operator.api.reconciler.RetryInfo;
2122
import io.javaoperatorsdk.operator.processing.LifecycleAware;
2223
import io.javaoperatorsdk.operator.processing.MDCUtils;
24+
import io.javaoperatorsdk.operator.processing.event.rate.RateLimiter;
2325
import io.javaoperatorsdk.operator.processing.event.source.Cache;
2426
import io.javaoperatorsdk.operator.processing.event.source.controller.ResourceAction;
2527
import io.javaoperatorsdk.operator.processing.event.source.controller.ResourceEvent;
@@ -44,6 +46,7 @@ class EventProcessor<R extends HasMetadata> implements EventHandler, LifecycleAw
4446
private final Cache<R> cache;
4547
private final EventSourceManager<R> eventSourceManager;
4648
private final EventMarker eventMarker = new EventMarker();
49+
private final RateLimiter rateLimiter;
4750

4851
EventProcessor(EventSourceManager<R> eventSourceManager) {
4952
this(
@@ -92,6 +95,8 @@ private EventProcessor(
9295
this.cache = cache;
9396
this.metrics = metrics != null ? metrics : Metrics.NOOP;
9497
this.eventSourceManager = eventSourceManager;
98+
// todo configure
99+
this.rateLimiter = new RateLimiter(Duration.ofSeconds(1), 5);
95100
}
96101

97102
@Override
@@ -128,6 +133,11 @@ private void submitReconciliationExecution(ResourceID resourceID) {
128133
Optional<R> latest = cache.get(resourceID);
129134
latest.ifPresent(MDCUtils::addResourceInfo);
130135
if (!controllerUnderExecution && latest.isPresent()) {
136+
var rateLimiterPermission = rateLimiter.acquirePermission(resourceID);
137+
if (rateLimiterPermission.isPresent()) {
138+
handleRateLimitedSubmission(resourceID, rateLimiterPermission.get());
139+
return;
140+
}
131141
setUnderExecutionProcessing(resourceID);
132142
final var retryInfo = retryInfo(resourceID);
133143
ExecutionScope<R> executionScope = new ExecutionScope<>(latest.get(), retryInfo);
@@ -193,6 +203,13 @@ private boolean isResourceMarkedForDeletion(ResourceEvent resourceEvent) {
193203
return resourceEvent.getResource().map(HasMetadata::isMarkedForDeletion).orElse(false);
194204
}
195205

206+
private void handleRateLimitedSubmission(ResourceID resourceID, Duration minimalDuration) {
207+
var minimalDurationMillis = minimalDuration.toMillis();
208+
log.debug("Rate limited resource: {}, rescheduled in {} millis", resourceID,
209+
minimalDurationMillis);
210+
retryEventSource().scheduleOnce(resourceID, minimalDurationMillis);
211+
}
212+
196213
private RetryInfo retryInfo(ResourceID resourceID) {
197214
return retryState.get(resourceID);
198215
}
@@ -251,11 +268,10 @@ private void reScheduleExecutionIfInstructed(
251268
postExecutionControl
252269
.getReScheduleDelay()
253270
.ifPresent(delay -> {
254-
if (log.isDebugEnabled()) {
255-
log.debug("ReScheduling event for resource: {} with delay: {}",
256-
ResourceID.fromResource(customResource), delay);
257-
}
258-
retryEventSource().scheduleOnce(customResource, delay);
271+
var resourceID = ResourceID.fromResource(customResource);
272+
log.debug("ReScheduling event for resource: {} with delay: {}",
273+
resourceID, delay);
274+
retryEventSource().scheduleOnce(resourceID, delay);
259275
});
260276
}
261277

@@ -289,7 +305,7 @@ private void handleRetryOnException(
289305
delay,
290306
resourceID);
291307
metrics.failedReconciliation(resourceID, exception);
292-
retryEventSource().scheduleOnce(executionScope.getResource(), delay);
308+
retryEventSource().scheduleOnce(resourceID, delay);
293309
},
294310
() -> log.error("Exhausted retries for {}", executionScope));
295311
}

operator-framework-core/src/main/java/io/javaoperatorsdk/operator/processing/event/source/timer/TimerEventSource.java

Lines changed: 6 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -25,16 +25,16 @@ public class TimerEventSource<R extends HasMetadata>
2525
private final Map<ResourceID, EventProducerTimeTask> onceTasks = new ConcurrentHashMap<>();
2626

2727

28-
public void scheduleOnce(R resource, long delay) {
28+
public void scheduleOnce(ResourceID resourceID, long delay) {
2929
if (!running.get()) {
3030
throw new IllegalStateException("The TimerEventSource is not running");
3131
}
32-
ResourceID resourceUid = ResourceID.fromResource(resource);
33-
if (onceTasks.containsKey(resourceUid)) {
34-
cancelOnceSchedule(resourceUid);
32+
33+
if (onceTasks.containsKey(resourceID)) {
34+
cancelOnceSchedule(resourceID);
3535
}
36-
EventProducerTimeTask task = new EventProducerTimeTask(resourceUid);
37-
onceTasks.put(resourceUid, task);
36+
EventProducerTimeTask task = new EventProducerTimeTask(resourceID);
37+
onceTasks.put(resourceID, task);
3838
timer.schedule(task, delay);
3939
}
4040

operator-framework-core/src/test/java/io/javaoperatorsdk/operator/processing/event/EventProcessorTest.java

Lines changed: 4 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -105,7 +105,8 @@ void schedulesAnEventRetryOnException() {
105105
eventProcessorWithRetry.eventProcessingFinished(executionScope, postExecutionControl);
106106

107107
verify(retryTimerEventSourceMock, times(1))
108-
.scheduleOnce(eq(customResource), eq(RetryConfiguration.DEFAULT_INITIAL_INTERVAL));
108+
.scheduleOnce(eq(ResourceID.fromResource(customResource)),
109+
eq(RetryConfiguration.DEFAULT_INITIAL_INTERVAL));
109110
}
110111

111112
@Test
@@ -136,7 +137,8 @@ void executesTheControllerInstantlyAfterErrorIfNewEventsReceived() {
136137
List<ExecutionScope> allValues = executionScopeArgumentCaptor.getAllValues();
137138
assertThat(allValues).hasSize(2);
138139
verify(retryTimerEventSourceMock, never())
139-
.scheduleOnce(eq(customResource), eq(RetryConfiguration.DEFAULT_INITIAL_INTERVAL));
140+
.scheduleOnce(eq(ResourceID.fromResource(customResource)),
141+
eq(RetryConfiguration.DEFAULT_INITIAL_INTERVAL));
140142
}
141143

142144
@Test

operator-framework-core/src/test/java/io/javaoperatorsdk/operator/processing/event/source/timer/TimerEventSourceTest.java

Lines changed: 12 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -37,30 +37,30 @@ public void setup() {
3737

3838
@Test
3939
public void schedulesOnce() {
40-
TestCustomResource customResource = TestUtils.testCustomResource();
40+
var resourceID = ResourceID.fromResource(TestUtils.testCustomResource());
4141

42-
source.scheduleOnce(customResource, PERIOD);
42+
source.scheduleOnce(resourceID, PERIOD);
4343

4444
untilAsserted(() -> assertThat(eventHandler.events).hasSize(1));
4545
untilAsserted(PERIOD * 2, 0, () -> assertThat(eventHandler.events).hasSize(1));
4646
}
4747

4848
@Test
4949
public void canCancelOnce() {
50-
TestCustomResource customResource = TestUtils.testCustomResource();
50+
var resourceID = ResourceID.fromResource(TestUtils.testCustomResource());
5151

52-
source.scheduleOnce(customResource, PERIOD);
53-
source.cancelOnceSchedule(ResourceID.fromResource(customResource));
52+
source.scheduleOnce(resourceID, PERIOD);
53+
source.cancelOnceSchedule(resourceID);
5454

5555
untilAsserted(() -> assertThat(eventHandler.events).isEmpty());
5656
}
5757

5858
@Test
5959
public void canRescheduleOnceEvent() {
60-
TestCustomResource customResource = TestUtils.testCustomResource();
60+
var resourceID = ResourceID.fromResource(TestUtils.testCustomResource());
6161

62-
source.scheduleOnce(customResource, PERIOD);
63-
source.scheduleOnce(customResource, 2 * PERIOD);
62+
source.scheduleOnce(resourceID, PERIOD);
63+
source.scheduleOnce(resourceID, 2 * PERIOD);
6464

6565
untilAsserted(PERIOD * 2, PERIOD, () -> assertThat(eventHandler.events).hasSize(1));
6666
}
@@ -69,24 +69,24 @@ public void canRescheduleOnceEvent() {
6969
public void deRegistersOnceEventSources() {
7070
TestCustomResource customResource = TestUtils.testCustomResource();
7171

72-
source.scheduleOnce(customResource, PERIOD);
72+
source.scheduleOnce(ResourceID.fromResource(customResource), PERIOD);
7373
source.onResourceDeleted(customResource);
7474

7575
untilAsserted(() -> assertThat(eventHandler.events).isEmpty());
7676
}
7777

7878
@Test
7979
public void eventNotRegisteredIfStopped() throws IOException {
80-
TestCustomResource customResource = TestUtils.testCustomResource();
80+
var resourceID = ResourceID.fromResource(TestUtils.testCustomResource());
8181

8282
source.stop();
8383
assertThatExceptionOfType(IllegalStateException.class).isThrownBy(
84-
() -> source.scheduleOnce(customResource, PERIOD));
84+
() -> source.scheduleOnce(resourceID, PERIOD));
8585
}
8686

8787
@Test
8888
public void eventNotFiredIfStopped() throws IOException {
89-
source.scheduleOnce(TestUtils.testCustomResource(), PERIOD);
89+
source.scheduleOnce(ResourceID.fromResource(TestUtils.testCustomResource()), PERIOD);
9090
source.stop();
9191

9292
untilAsserted(() -> assertThat(eventHandler.events).isEmpty());

0 commit comments

Comments
 (0)