Skip to content

Commit 496912f

Browse files
committed
Config support start default values
1 parent dec972b commit 496912f

File tree

13 files changed

+142
-54
lines changed

13 files changed

+142
-54
lines changed

operator-framework-core/src/main/java/io/javaoperatorsdk/operator/api/config/ControllerConfiguration.java

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -8,6 +8,8 @@
88
import io.fabric8.kubernetes.api.model.HasMetadata;
99
import io.javaoperatorsdk.operator.ReconcilerUtils;
1010
import io.javaoperatorsdk.operator.api.config.dependent.DependentResourceSpec;
11+
import io.javaoperatorsdk.operator.processing.event.rate.PeriodRateLimiter;
12+
import io.javaoperatorsdk.operator.processing.event.rate.RateLimiter;
1113
import io.javaoperatorsdk.operator.processing.event.source.controller.ResourceEventFilter;
1214
import io.javaoperatorsdk.operator.processing.event.source.controller.ResourceEventFilters;
1315
import io.javaoperatorsdk.operator.processing.retry.GenericRetry;
@@ -43,6 +45,10 @@ default RetryConfiguration getRetryConfiguration() {
4345
return RetryConfiguration.DEFAULT;
4446
}
4547

48+
default RateLimiter getRateLimiter() {
49+
return new PeriodRateLimiter();
50+
}
51+
4652
/**
4753
* Allow controllers to filter events before they are passed to the
4854
* {@link io.javaoperatorsdk.operator.processing.event.EventHandler}.

operator-framework-core/src/main/java/io/javaoperatorsdk/operator/api/config/ControllerConfigurationOverrider.java

Lines changed: 9 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -8,6 +8,7 @@
88
import io.fabric8.kubernetes.api.model.HasMetadata;
99
import io.javaoperatorsdk.operator.api.config.dependent.DependentResourceSpec;
1010
import io.javaoperatorsdk.operator.processing.dependent.kubernetes.KubernetesDependentResourceConfig;
11+
import io.javaoperatorsdk.operator.processing.event.rate.RateLimiter;
1112
import io.javaoperatorsdk.operator.processing.event.source.controller.ResourceEventFilter;
1213
import io.javaoperatorsdk.operator.processing.retry.GenericRetry;
1314
import io.javaoperatorsdk.operator.processing.retry.Retry;
@@ -27,6 +28,7 @@ public class ControllerConfigurationOverrider<R extends HasMetadata> {
2728
private final ControllerConfiguration<R> original;
2829
private Duration reconciliationMaxInterval;
2930
private final LinkedHashMap<String, DependentResourceSpec> namedDependentResourceSpecs;
31+
private RateLimiter rateLimiter;
3032

3133
private ControllerConfigurationOverrider(ControllerConfiguration<R> original) {
3234
finalizer = original.getFinalizerName();
@@ -41,6 +43,7 @@ private ControllerConfigurationOverrider(ControllerConfiguration<R> original) {
4143
namedDependentResourceSpecs = new LinkedHashMap<>(dependentResources.size());
4244
dependentResources.forEach(drs -> namedDependentResourceSpecs.put(drs.getName(), drs));
4345
this.original = original;
46+
this.rateLimiter = original.getRateLimiter();
4447
}
4548

4649
public ControllerConfigurationOverrider<R> withFinalizer(String finalizer) {
@@ -103,6 +106,11 @@ public ControllerConfigurationOverrider<R> withRetry(RetryConfiguration retry) {
103106
return this;
104107
}
105108

109+
public ControllerConfigurationOverrider<R> withRateLimiter(RateLimiter rateLimiter) {
110+
this.rateLimiter = rateLimiter;
111+
return this;
112+
}
113+
106114
public ControllerConfigurationOverrider<R> withLabelSelector(String labelSelector) {
107115
this.labelSelector = labelSelector;
108116
return this;
@@ -167,6 +175,7 @@ public ControllerConfiguration<R> build() {
167175
customResourcePredicate,
168176
original.getResourceClass(),
169177
reconciliationMaxInterval,
178+
rateLimiter,
170179
newDependentSpecs);
171180
}
172181

operator-framework-core/src/main/java/io/javaoperatorsdk/operator/api/config/DefaultControllerConfiguration.java

Lines changed: 9 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -8,6 +8,7 @@
88

99
import io.fabric8.kubernetes.api.model.HasMetadata;
1010
import io.javaoperatorsdk.operator.api.config.dependent.DependentResourceSpec;
11+
import io.javaoperatorsdk.operator.processing.event.rate.RateLimiter;
1112
import io.javaoperatorsdk.operator.processing.event.source.controller.ResourceEventFilter;
1213
import io.javaoperatorsdk.operator.processing.retry.Retry;
1314

@@ -25,6 +26,7 @@ public class DefaultControllerConfiguration<R extends HasMetadata>
2526
private final ResourceEventFilter<R> resourceEventFilter;
2627
private final List<DependentResourceSpec> dependents;
2728
private final Duration reconciliationMaxInterval;
29+
private final RateLimiter rateLimiter;
2830

2931
// NOSONAR constructor is meant to provide all information
3032
public DefaultControllerConfiguration(
@@ -39,6 +41,7 @@ public DefaultControllerConfiguration(
3941
ResourceEventFilter<R> resourceEventFilter,
4042
Class<R> resourceClass,
4143
Duration reconciliationMaxInterval,
44+
RateLimiter rateLimiter,
4245
List<DependentResourceSpec> dependents) {
4346
super(labelSelector, resourceClass, namespaces);
4447
this.associatedControllerClassName = associatedControllerClassName;
@@ -52,7 +55,7 @@ public DefaultControllerConfiguration(
5255
? ControllerConfiguration.super.getRetry()
5356
: retry;
5457
this.resourceEventFilter = resourceEventFilter;
55-
58+
this.rateLimiter = rateLimiter;
5659
this.dependents = dependents != null ? dependents : Collections.emptyList();
5760
}
5861

@@ -100,4 +103,9 @@ public List<DependentResourceSpec> getDependentResources() {
100103
public Optional<Duration> reconciliationMaxInterval() {
101104
return Optional.ofNullable(reconciliationMaxInterval);
102105
}
106+
107+
@Override
108+
public RateLimiter getRateLimiter() {
109+
return rateLimiter;
110+
}
103111
}
Lines changed: 21 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,21 @@
1+
package io.javaoperatorsdk.operator.api.reconciler;
2+
3+
import java.lang.annotation.ElementType;
4+
import java.lang.annotation.Retention;
5+
import java.lang.annotation.RetentionPolicy;
6+
import java.lang.annotation.Target;
7+
import java.util.concurrent.TimeUnit;
8+
9+
@Retention(RetentionPolicy.RUNTIME)
10+
@Target({ElementType.TYPE})
11+
public @interface RateLimit {
12+
13+
int limitForPeriod();
14+
15+
int refreshPeriod();
16+
17+
/**
18+
* @return time unit for max delay between reconciliations
19+
*/
20+
TimeUnit refreshPeriodTimeUnit() default TimeUnit.SECONDS;
21+
}

operator-framework-core/src/main/java/io/javaoperatorsdk/operator/processing/event/EventProcessor.java

Lines changed: 9 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -34,6 +34,7 @@
3434
class EventProcessor<R extends HasMetadata> implements EventHandler, LifecycleAware {
3535

3636
private static final Logger log = LoggerFactory.getLogger(EventProcessor.class);
37+
private static final long MINIMAL_RATE_LIMIT_RESCHEDULE_DURATION = 50;
3738

3839
private final Set<ResourceID> underProcessing = new HashSet<>();
3940
private final ReconciliationDispatcher<R> reconciliationDispatcher;
@@ -56,6 +57,7 @@ class EventProcessor<R extends HasMetadata> implements EventHandler, LifecycleAw
5657
new ReconciliationDispatcher<>(eventSourceManager.getController()),
5758
eventSourceManager.getController().getConfiguration().getRetry(),
5859
ConfigurationServiceProvider.instance().getMetrics(),
60+
eventSourceManager.getController().getConfiguration().getRateLimiter(),
5961
eventSourceManager);
6062
}
6163

@@ -64,6 +66,7 @@ class EventProcessor<R extends HasMetadata> implements EventHandler, LifecycleAw
6466
EventSourceManager<R> eventSourceManager,
6567
String relatedControllerName,
6668
Retry retry,
69+
RateLimiter rateLimiter,
6770
Metrics metrics) {
6871
this(
6972
eventSourceManager.getControllerResourceEventSource(),
@@ -72,6 +75,7 @@ class EventProcessor<R extends HasMetadata> implements EventHandler, LifecycleAw
7275
reconciliationDispatcher,
7376
retry,
7477
metrics,
78+
rateLimiter,
7579
eventSourceManager);
7680
}
7781

@@ -82,6 +86,7 @@ private EventProcessor(
8286
ReconciliationDispatcher<R> reconciliationDispatcher,
8387
Retry retry,
8488
Metrics metrics,
89+
RateLimiter rateLimiter,
8590
EventSourceManager<R> eventSourceManager) {
8691
this.running = false;
8792
this.executor =
@@ -95,8 +100,7 @@ private EventProcessor(
95100
this.cache = cache;
96101
this.metrics = metrics != null ? metrics : Metrics.NOOP;
97102
this.eventSourceManager = eventSourceManager;
98-
// todo configure
99-
this.rateLimiter = new RateLimiter(Duration.ofSeconds(1), 5);
103+
this.rateLimiter = rateLimiter;
100104
}
101105

102106
@Override
@@ -207,7 +211,8 @@ private void handleRateLimitedSubmission(ResourceID resourceID, Duration minimal
207211
var minimalDurationMillis = minimalDuration.toMillis();
208212
log.debug("Rate limited resource: {}, rescheduled in {} millis", resourceID,
209213
minimalDurationMillis);
210-
retryEventSource().scheduleOnce(resourceID, minimalDurationMillis);
214+
retryEventSource().scheduleOnce(resourceID,
215+
Math.max(minimalDurationMillis, MINIMAL_RATE_LIMIT_RESCHEDULE_DURATION));
211216
}
212217

213218
private RetryInfo retryInfo(ResourceID resourceID) {
@@ -331,6 +336,7 @@ private RetryExecution getOrInitRetryExecution(ExecutionScope<R> executionScope)
331336
private void cleanupForDeletedEvent(ResourceID resourceID) {
332337
log.debug("Cleaning up for delete event for: {}", resourceID);
333338
eventMarker.cleanup(resourceID);
339+
rateLimiter.clear(resourceID);
334340
metrics.cleanupDoneFor(resourceID);
335341
}
336342

Original file line numberDiff line numberDiff line change
@@ -0,0 +1,63 @@
1+
package io.javaoperatorsdk.operator.processing.event.rate;
2+
3+
import java.time.Duration;
4+
import java.time.LocalDateTime;
5+
import java.util.HashMap;
6+
import java.util.Map;
7+
import java.util.Optional;
8+
9+
import io.javaoperatorsdk.operator.processing.event.ResourceID;
10+
11+
/**
12+
* A Simple rate limiter that limits the number of permission for a time interval.
13+
*/
14+
public class PeriodRateLimiter implements RateLimiter {
15+
16+
public static final Duration DEFAULT_REFRESH_PERIOD = Duration.ofSeconds(3);
17+
public static final int DEFAULT_LIMIT_FOR_PERIOD = 3;
18+
/** To turn off rate limiting set limit fod period to a non-positive number */
19+
public static final int NO_LIMIT_PERIOD = -1;
20+
21+
private Duration refreshPeriod;
22+
private int limitForPeriod;
23+
24+
private Map<ResourceID, RateState> limitData = new HashMap<>();
25+
26+
public PeriodRateLimiter() {
27+
this(DEFAULT_REFRESH_PERIOD, DEFAULT_LIMIT_FOR_PERIOD);
28+
}
29+
30+
public PeriodRateLimiter(Duration refreshPeriod, int limitForPeriod) {
31+
this.refreshPeriod = refreshPeriod;
32+
this.limitForPeriod = limitForPeriod;
33+
}
34+
35+
/**
36+
* @param resourceID id of the resource
37+
* @return empty if permission acquired or minimal duration until a permission could be acquired
38+
* again
39+
*/
40+
@Override
41+
public Optional<Duration> acquirePermission(ResourceID resourceID) {
42+
if (limitForPeriod <= 0) {
43+
return Optional.empty();
44+
}
45+
var actualState = limitData.computeIfAbsent(resourceID, r -> RateState.initialState());
46+
if (actualState.getCount() < limitForPeriod) {
47+
actualState.increaseCount();
48+
return Optional.empty();
49+
} else if (actualState.getLastRefreshTime()
50+
.isBefore(LocalDateTime.now().minus(refreshPeriod))) {
51+
actualState.reset();
52+
actualState.increaseCount();
53+
return Optional.empty();
54+
} else {
55+
return Optional.of(Duration.between(actualState.getLastRefreshTime(), LocalDateTime.now()));
56+
}
57+
}
58+
59+
@Override
60+
public void clear(ResourceID resourceID) {
61+
limitData.remove(resourceID);
62+
}
63+
}
Lines changed: 3 additions & 39 deletions
Original file line numberDiff line numberDiff line change
@@ -1,50 +1,14 @@
11
package io.javaoperatorsdk.operator.processing.event.rate;
22

33
import java.time.Duration;
4-
import java.time.LocalDateTime;
5-
import java.util.HashMap;
6-
import java.util.Map;
74
import java.util.Optional;
85

96
import io.javaoperatorsdk.operator.processing.event.ResourceID;
107

11-
/**
12-
* A Simple rate limiter per resource.
13-
*/
14-
public class RateLimiter {
8+
public interface RateLimiter {
159

16-
private Duration refreshPeriod;
17-
private int limitForPeriod;
10+
Optional<Duration> acquirePermission(ResourceID resourceID);
1811

19-
private Map<ResourceID, RateState> limitData = new HashMap<>();
12+
void clear(ResourceID resourceID);
2013

21-
public RateLimiter(Duration refreshPeriod, int limitForPeriod) {
22-
this.refreshPeriod = refreshPeriod;
23-
this.limitForPeriod = limitForPeriod;
24-
}
25-
26-
/**
27-
*
28-
* @param resourceID id of the resource
29-
* @return empty if permission acquired or minimal duration until a permission could be acquired
30-
* again
31-
*/
32-
public Optional<Duration> acquirePermission(ResourceID resourceID) {
33-
var actualState = limitData.computeIfAbsent(resourceID, r -> RateState.initialState());
34-
if (actualState.getCount() < limitForPeriod) {
35-
actualState.increaseCount();
36-
return Optional.empty();
37-
} else if (actualState.getLastRefreshTime()
38-
.isBefore(LocalDateTime.now().minus(refreshPeriod))) {
39-
actualState.reset();
40-
actualState.increaseCount();
41-
return Optional.empty();
42-
} else {
43-
return Optional.of(Duration.between(actualState.getLastRefreshTime(), LocalDateTime.now()));
44-
}
45-
}
46-
47-
public void clear(ResourceID resourceID) {
48-
limitData.remove(resourceID);
49-
}
5014
}

operator-framework-core/src/test/java/io/javaoperatorsdk/operator/ControllerManagerTest.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -63,7 +63,7 @@ private static class TestControllerConfiguration<R extends HasMetadata>
6363
public TestControllerConfiguration(Reconciler<R> controller, Class<R> crClass) {
6464
super(null, getControllerName(controller),
6565
CustomResource.getCRDName(crClass), null, false, null, null, null, null, crClass,
66-
null, null);
66+
null, null, null);
6767
this.controller = controller;
6868
}
6969

operator-framework-core/src/test/java/io/javaoperatorsdk/operator/processing/event/EventProcessorTest.java

Lines changed: 4 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -17,6 +17,7 @@
1717
import io.fabric8.kubernetes.api.model.HasMetadata;
1818
import io.javaoperatorsdk.operator.api.config.RetryConfiguration;
1919
import io.javaoperatorsdk.operator.api.monitoring.Metrics;
20+
import io.javaoperatorsdk.operator.processing.event.rate.PeriodRateLimiter;
2021
import io.javaoperatorsdk.operator.processing.event.source.controller.ControllerResourceEventSource;
2122
import io.javaoperatorsdk.operator.processing.event.source.controller.ResourceAction;
2223
import io.javaoperatorsdk.operator.processing.event.source.controller.ResourceEvent;
@@ -56,11 +57,11 @@ void setup() {
5657
.thenReturn(controllerResourceEventSourceMock);
5758
eventProcessor =
5859
spy(new EventProcessor(reconciliationDispatcherMock, eventSourceManagerMock, "Test", null,
59-
null));
60+
new PeriodRateLimiter(), null));
6061
eventProcessor.start();
6162
eventProcessorWithRetry =
6263
spy(new EventProcessor(reconciliationDispatcherMock, eventSourceManagerMock, "Test",
63-
GenericRetry.defaultLimitedExponentialRetry(), null));
64+
GenericRetry.defaultLimitedExponentialRetry(), new PeriodRateLimiter(), null));
6465
eventProcessorWithRetry.start();
6566
when(eventProcessor.retryEventSource()).thenReturn(retryTimerEventSourceMock);
6667
when(eventProcessorWithRetry.retryEventSource()).thenReturn(retryTimerEventSourceMock);
@@ -243,6 +244,7 @@ void startProcessedMarkedEventReceivedBefore() {
243244
var crID = new ResourceID("test-cr", TEST_NAMESPACE);
244245
eventProcessor =
245246
spy(new EventProcessor(reconciliationDispatcherMock, eventSourceManagerMock, "Test", null,
247+
new PeriodRateLimiter(),
246248
metricsMock));
247249
when(controllerResourceEventSourceMock.get(eq(crID)))
248250
.thenReturn(Optional.of(testCustomResource()));

0 commit comments

Comments
 (0)