Skip to content

Commit 1510028

Browse files
committed
Merge branch 'rate-limiting' of github.com:java-operator-sdk/java-operator-sdk into rate-limiting
2 parents 3fafe30 + bf9ba2b commit 1510028

File tree

4 files changed

+58
-33
lines changed

4 files changed

+58
-33
lines changed

docs/documentation/features.md

Lines changed: 27 additions & 18 deletions
Original file line numberDiff line numberDiff line change
@@ -326,28 +326,37 @@ intersections:
326326

327327
## Rate Limiting
328328

329-
It is possible to rate limit reconciliation for a resource. Thus rate limiting is per resource,
330-
and it takes precedence over retry and re-schedule configurations. So for example event a retry is scheduled in
331-
1 seconds but this does not meet the rate limit, the next reconciliation will be postponed according rate limiting rules;
332-
however never cancelled, just executed as early as possible according rate limit configuration.
333-
334-
Rate limiting is by default turned off, since correct configuration depends on the reconciler implementation, and
335-
how long an execution takes.
336-
(The parallelism of reconciliation itself can be limited [`ConfigurationService`](https://github.com/java-operator-sdk/java-operator-sdk/blob/ce4d996ee073ebef5715737995fc3d33f4751275/operator-framework-core/src/main/java/io/javaoperatorsdk/operator/api/config/ConfigurationService.java#L120-L120)
337-
by setting appropriate ExecutorService.)
338-
339-
A default implementation of rate limiter is provided, see: [`PeriodRateLimiter`](https://github.com/java-operator-sdk/java-operator-sdk/blob/ce4d996ee073ebef5715737995fc3d33f4751275/operator-framework-core/src/main/java/io/javaoperatorsdk/operator/processing/event/rate/PeriodRateLimiter.java#L14-L14).
340-
Users can override it with a custom implementation of
329+
It is possible to rate limit reconciliation on a per-resource basis. The rate limit also takes
330+
precedence over retry/re-schedule configurations: for example, even if a retry was scheduled for
331+
the next second but this request would make the resource go over its rate limit, the next
332+
reconciliation will be postponed according to the rate limiting rules. Note that the
333+
reconciliation is never cancelled, it will just be executed as early as possible based on rate
334+
limitations.
335+
336+
Rate limiting is by default turned **off**, since correct configuration depends on the reconciler
337+
implementation, in particular, on how long a typical reconciliation takes.
338+
(The parallelism of reconciliation itself can be
339+
limited [`ConfigurationService`](https://github.com/java-operator-sdk/java-operator-sdk/blob/ce4d996ee073ebef5715737995fc3d33f4751275/operator-framework-core/src/main/java/io/javaoperatorsdk/operator/api/config/ConfigurationService.java#L120-L120)
340+
by configuring the `ExecutorService` appropriately.)
341+
342+
A default rate limiter implementation is provided, see:
343+
[`PeriodRateLimiter`](https://github.com/java-operator-sdk/java-operator-sdk/blob/ce4d996ee073ebef5715737995fc3d33f4751275/operator-framework-core/src/main/java/io/javaoperatorsdk/operator/processing/event/rate/PeriodRateLimiter.java#L14-L14)
344+
.
345+
Users can override it by implementing their own
341346
[`RateLimiter`](https://github.com/java-operator-sdk/java-operator-sdk/blob/ce4d996ee073ebef5715737995fc3d33f4751275/operator-framework-core/src/main/java/io/javaoperatorsdk/operator/processing/event/rate/RateLimiter.java)
342-
interface.
347+
.
343348

344-
To configure the default rate limiter use `@ControllerConfiguration` annotation. The following configuration limits
345-
the reconciliation to 2 in 3 seconds:
349+
To configure the default rate limiter use `@ControllerConfiguration` annotation. The following
350+
configuration limits
351+
each resource to reconcile at most twice within a 3 second interval:
346352

347-
`@ControllerConfiguration(rateLimit = @RateLimit(limitForPeriod = 2,refreshPeriod = 3,refreshPeriodTimeUnit = TimeUnit.SECONDS))`.
353+
`@ControllerConfiguration(rateLimit = @RateLimit(limitForPeriod = 2,refreshPeriod = 3,refreshPeriodTimeUnit = TimeUnit.SECONDS))`
354+
.
348355

349-
That means if the reconciler executed twice in one second, it will wait at least additional two seconds before it is
350-
reconciled again.
356+
Thus, if a given resource was reconciled twice in one second, no further reconciliation for this
357+
resource will happen before two seconds have elapsed. Note that, since rate is limited on a
358+
per-resource basis, other resources can still be reconciled at the same time, as long, of course,
359+
that they stay within their own rate limits.
351360

352361

353362
## Handling Related Events with Event Sources

operator-framework-core/src/main/java/io/javaoperatorsdk/operator/api/config/ControllerConfiguration.java

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -17,6 +17,8 @@
1717

1818
public interface ControllerConfiguration<R extends HasMetadata> extends ResourceConfiguration<R> {
1919

20+
RateLimiter DEFAULT_RATE_LIMITER = new PeriodRateLimiter();
21+
2022
default String getName() {
2123
return ReconcilerUtils.getDefaultReconcilerName(getAssociatedReconcilerClassName());
2224
}
@@ -46,7 +48,7 @@ default RetryConfiguration getRetryConfiguration() {
4648
}
4749

4850
default RateLimiter getRateLimiter() {
49-
return new PeriodRateLimiter();
51+
return DEFAULT_RATE_LIMITER;
5052
}
5153

5254
/**

operator-framework-core/src/main/java/io/javaoperatorsdk/operator/processing/event/source/timer/TimerEventSource.java

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -24,6 +24,9 @@ public class TimerEventSource<R extends HasMetadata>
2424
private final AtomicBoolean running = new AtomicBoolean();
2525
private final Map<ResourceID, EventProducerTimeTask> onceTasks = new ConcurrentHashMap<>();
2626

27+
public void scheduleOnce(R resource, long delay) {
28+
scheduleOnce(ResourceID.fromResource(resource), delay);
29+
}
2730

2831
public void scheduleOnce(ResourceID resourceID, long delay) {
2932
if (!running.get()) {

operator-framework-core/src/test/java/io/javaoperatorsdk/operator/processing/event/EventProcessorTest.java

Lines changed: 25 additions & 14 deletions
Original file line numberDiff line numberDiff line change
@@ -32,8 +32,19 @@
3232
import static org.awaitility.Awaitility.await;
3333
import static org.mockito.ArgumentMatchers.eq;
3434
import static org.mockito.ArgumentMatchers.isNull;
35-
import static org.mockito.Mockito.*;
36-
35+
import static org.mockito.Mockito.after;
36+
import static org.mockito.Mockito.any;
37+
import static org.mockito.Mockito.anyLong;
38+
import static org.mockito.Mockito.doAnswer;
39+
import static org.mockito.Mockito.mock;
40+
import static org.mockito.Mockito.never;
41+
import static org.mockito.Mockito.spy;
42+
import static org.mockito.Mockito.timeout;
43+
import static org.mockito.Mockito.times;
44+
import static org.mockito.Mockito.verify;
45+
import static org.mockito.Mockito.when;
46+
47+
@SuppressWarnings({"rawtypes", "unchecked"})
3748
class EventProcessorTest {
3849

3950
private static final Logger log = LoggerFactory.getLogger(EventProcessorTest.class);
@@ -42,16 +53,16 @@ class EventProcessorTest {
4253
public static final int SEPARATE_EXECUTION_TIMEOUT = 450;
4354
public static final String TEST_NAMESPACE = "default-event-handler-test";
4455

45-
private ReconciliationDispatcher reconciliationDispatcherMock =
56+
private final ReconciliationDispatcher reconciliationDispatcherMock =
4657
mock(ReconciliationDispatcher.class);
47-
private EventSourceManager eventSourceManagerMock = mock(EventSourceManager.class);
48-
private TimerEventSource retryTimerEventSourceMock = mock(TimerEventSource.class);
49-
private ControllerResourceEventSource controllerResourceEventSourceMock =
58+
private final EventSourceManager eventSourceManagerMock = mock(EventSourceManager.class);
59+
private final TimerEventSource retryTimerEventSourceMock = mock(TimerEventSource.class);
60+
private final ControllerResourceEventSource controllerResourceEventSourceMock =
5061
mock(ControllerResourceEventSource.class);
51-
private Metrics metricsMock = mock(Metrics.class);
62+
private final Metrics metricsMock = mock(Metrics.class);
5263
private EventProcessor eventProcessor;
5364
private EventProcessor eventProcessorWithRetry;
54-
private RateLimiter rateLimiterMock = mock(RateLimiter.class);
65+
private final RateLimiter rateLimiterMock = mock(RateLimiter.class);
5566

5667
@BeforeEach
5768
void setup() {
@@ -89,7 +100,7 @@ void skipProcessingIfLatestCustomResourceNotInCache() {
89100
}
90101

91102
@Test
92-
void ifExecutionInProgressWaitsUntilItsFinished() throws InterruptedException {
103+
void ifExecutionInProgressWaitsUntilItsFinished() {
93104
ResourceID resourceUid = eventAlreadyUnderProcessing();
94105

95106
eventProcessor.handleEvent(nonCREvent(resourceUid));
@@ -204,7 +215,7 @@ void scheduleTimedEventIfInstructedByPostExecutionControl() {
204215
eventProcessor.handleEvent(prepareCREvent());
205216

206217
verify(retryTimerEventSourceMock, timeout(SEPARATE_EXECUTION_TIMEOUT).times(1))
207-
.scheduleOnce(any(), eq(testDelay));
218+
.scheduleOnce((ResourceID) any(), eq(testDelay));
208219
}
209220

210221
@Test
@@ -220,7 +231,7 @@ void reScheduleOnlyIfNotExecutedEventsReceivedMeanwhile() throws InterruptedExce
220231

221232
verify(retryTimerEventSourceMock,
222233
after((long) (FAKE_CONTROLLER_EXECUTION_DURATION * 1.5)).times(0))
223-
.scheduleOnce(any(), eq(testDelay));
234+
.scheduleOnce((ResourceID) any(), eq(testDelay));
224235
}
225236

226237
@Test
@@ -335,7 +346,7 @@ void newResourceAfterMissedDeleteEvent() {
335346
}
336347

337348
@Test
338-
void rateLimitsReconciliationSubmission() throws InterruptedException {
349+
void rateLimitsReconciliationSubmission() {
339350
// the refresh period value does not matter here
340351
var refreshPeriod = Duration.ofMillis(100);
341352
var event = prepareCREvent();
@@ -347,10 +358,10 @@ void rateLimitsReconciliationSubmission() throws InterruptedException {
347358
eventProcessor.handleEvent(event);
348359
verify(reconciliationDispatcherMock, after(FAKE_CONTROLLER_EXECUTION_DURATION).times(1))
349360
.handleExecution(any());
350-
verify(retryTimerEventSourceMock, times(0)).scheduleOnce(any(), anyLong());
361+
verify(retryTimerEventSourceMock, times(0)).scheduleOnce((ResourceID) any(), anyLong());
351362

352363
eventProcessor.handleEvent(event);
353-
verify(retryTimerEventSourceMock, times(1)).scheduleOnce(any(), anyLong());
364+
verify(retryTimerEventSourceMock, times(1)).scheduleOnce((ResourceID) any(), anyLong());
354365
}
355366

356367
private ResourceID eventAlreadyUnderProcessing() {

0 commit comments

Comments
 (0)