|
18 | 18 | import io.javaoperatorsdk.operator.api.config.RetryConfiguration;
|
19 | 19 | import io.javaoperatorsdk.operator.api.monitoring.Metrics;
|
20 | 20 | import io.javaoperatorsdk.operator.processing.event.rate.PeriodRateLimiter;
|
| 21 | +import io.javaoperatorsdk.operator.processing.event.rate.RateLimiter; |
21 | 22 | import io.javaoperatorsdk.operator.processing.event.source.controller.ControllerResourceEventSource;
|
22 | 23 | import io.javaoperatorsdk.operator.processing.event.source.controller.ResourceAction;
|
23 | 24 | import io.javaoperatorsdk.operator.processing.event.source.controller.ResourceEvent;
|
@@ -50,21 +51,23 @@ class EventProcessorTest {
|
50 | 51 | private Metrics metricsMock = mock(Metrics.class);
|
51 | 52 | private EventProcessor eventProcessor;
|
52 | 53 | private EventProcessor eventProcessorWithRetry;
|
| 54 | + private RateLimiter rateLimiterMock = mock(RateLimiter.class); |
53 | 55 |
|
54 | 56 | @BeforeEach
|
55 | 57 | void setup() {
|
56 | 58 | when(eventSourceManagerMock.getControllerResourceEventSource())
|
57 | 59 | .thenReturn(controllerResourceEventSourceMock);
|
58 | 60 | eventProcessor =
|
59 | 61 | spy(new EventProcessor(reconciliationDispatcherMock, eventSourceManagerMock, "Test", null,
|
60 |
| - new PeriodRateLimiter(), null)); |
| 62 | + rateLimiterMock, null)); |
61 | 63 | eventProcessor.start();
|
62 | 64 | eventProcessorWithRetry =
|
63 | 65 | spy(new EventProcessor(reconciliationDispatcherMock, eventSourceManagerMock, "Test",
|
64 |
| - GenericRetry.defaultLimitedExponentialRetry(), new PeriodRateLimiter(), null)); |
| 66 | + GenericRetry.defaultLimitedExponentialRetry(), rateLimiterMock, null)); |
65 | 67 | eventProcessorWithRetry.start();
|
66 | 68 | when(eventProcessor.retryEventSource()).thenReturn(retryTimerEventSourceMock);
|
67 | 69 | when(eventProcessorWithRetry.retryEventSource()).thenReturn(retryTimerEventSourceMock);
|
| 70 | + when(rateLimiterMock.acquirePermission(any())).thenReturn(Optional.empty()); |
68 | 71 | }
|
69 | 72 |
|
70 | 73 | @Test
|
@@ -331,6 +334,25 @@ void newResourceAfterMissedDeleteEvent() {
|
331 | 334 | verify(reconciliationDispatcherMock, timeout(50).times(1)).handleExecution(any());
|
332 | 335 | }
|
333 | 336 |
|
| 337 | + @Test |
| 338 | + void rateLimitsReconciliationSubmission() throws InterruptedException { |
| 339 | + // the refresh period value does not matter here |
| 340 | + var refreshPeriod = Duration.ofMillis(100); |
| 341 | + var event = prepareCREvent(); |
| 342 | + |
| 343 | + when(rateLimiterMock.acquirePermission(event.getRelatedCustomResourceID())) |
| 344 | + .thenReturn(Optional.empty()) |
| 345 | + .thenReturn(Optional.of(refreshPeriod)); |
| 346 | + |
| 347 | + eventProcessor.handleEvent(event); |
| 348 | + verify(reconciliationDispatcherMock, after(FAKE_CONTROLLER_EXECUTION_DURATION).times(1)) |
| 349 | + .handleExecution(any()); |
| 350 | + verify(retryTimerEventSourceMock, times(0)).scheduleOnce(any(), anyLong()); |
| 351 | + |
| 352 | + eventProcessor.handleEvent(event); |
| 353 | + verify(retryTimerEventSourceMock, times(1)).scheduleOnce(any(), anyLong()); |
| 354 | + } |
| 355 | + |
334 | 356 | private ResourceID eventAlreadyUnderProcessing() {
|
335 | 357 | when(reconciliationDispatcherMock.handleExecution(any()))
|
336 | 358 | .then(
|
|
0 commit comments