Skip to content

feat: rate limiting #1290

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 26 commits into from
Jul 4, 2022
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
35 changes: 35 additions & 0 deletions docs/documentation/features.md
Original file line number Diff line number Diff line change
Expand Up @@ -324,6 +324,41 @@ intersections:
will still happen, but won't reset the retry, will be still marked as the last attempt in the retry info. The point
(1) still holds, but in case of an error, no retry will happen.

## Rate Limiting

It is possible to rate limit reconciliation on a per-resource basis. The rate limit also takes
precedence over retry/re-schedule configurations: for example, even if a retry was scheduled for
the next second but this request would make the resource go over its rate limit, the next
reconciliation will be postponed according to the rate limiting rules. Note that the
reconciliation is never cancelled, it will just be executed as early as possible based on rate
limitations.

Rate limiting is by default turned **off**, since correct configuration depends on the reconciler
implementation, in particular, on how long a typical reconciliation takes.
(The parallelism of reconciliation itself can be
limited [`ConfigurationService`](https://github.com/java-operator-sdk/java-operator-sdk/blob/ce4d996ee073ebef5715737995fc3d33f4751275/operator-framework-core/src/main/java/io/javaoperatorsdk/operator/api/config/ConfigurationService.java#L120-L120)
by configuring the `ExecutorService` appropriately.)

A default rate limiter implementation is provided, see:
[`PeriodRateLimiter`](https://github.com/java-operator-sdk/java-operator-sdk/blob/ce4d996ee073ebef5715737995fc3d33f4751275/operator-framework-core/src/main/java/io/javaoperatorsdk/operator/processing/event/rate/PeriodRateLimiter.java#L14-L14)
.
Users can override it by implementing their own
[`RateLimiter`](https://github.com/java-operator-sdk/java-operator-sdk/blob/ce4d996ee073ebef5715737995fc3d33f4751275/operator-framework-core/src/main/java/io/javaoperatorsdk/operator/processing/event/rate/RateLimiter.java)
.

To configure the default rate limiter use `@ControllerConfiguration` annotation. The following
configuration limits
each resource to reconcile at most twice within a 3 second interval:

`@ControllerConfiguration(rateLimit = @RateLimit(limitForPeriod = 2,refreshPeriod = 3,refreshPeriodTimeUnit = TimeUnit.SECONDS))`
.

Thus, if a given resource was reconciled twice in one second, no further reconciliation for this
resource will happen before two seconds have elapsed. Note that, since rate is limited on a
per-resource basis, other resources can still be reconciled at the same time, as long, of course,
that they stay within their own rate limits.


## Handling Related Events with Event Sources

See also this [blog post](https://csviri.medium.com/java-operator-sdk-introduction-to-event-sources-a1aab5af4b7b).
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,8 @@
import io.javaoperatorsdk.operator.processing.dependent.kubernetes.KubernetesDependentResource;
import io.javaoperatorsdk.operator.processing.dependent.kubernetes.KubernetesDependentResourceConfig;
import io.javaoperatorsdk.operator.processing.dependent.workflow.Condition;
import io.javaoperatorsdk.operator.processing.event.rate.PeriodRateLimiter;
import io.javaoperatorsdk.operator.processing.event.rate.RateLimiter;
import io.javaoperatorsdk.operator.processing.event.source.controller.ResourceEventFilter;
import io.javaoperatorsdk.operator.processing.event.source.controller.ResourceEventFilters;
import io.javaoperatorsdk.operator.processing.event.source.filter.VoidGenericFilter;
Expand Down Expand Up @@ -150,6 +152,17 @@ public Optional<Duration> reconciliationMaxInterval() {
}
}

@Override
public RateLimiter getRateLimiter() {
if (annotation.rateLimit() != null) {
return new PeriodRateLimiter(Duration.of(annotation.rateLimit().refreshPeriod(),
annotation.rateLimit().refreshPeriodTimeUnit().toChronoUnit()),
annotation.rateLimit().limitForPeriod());
} else {
return io.javaoperatorsdk.operator.api.config.ControllerConfiguration.super.getRateLimiter();
}
}

@Override
@SuppressWarnings("unchecked")
public Optional<Predicate<P>> onAddFilter() {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -8,13 +8,17 @@
import io.fabric8.kubernetes.api.model.HasMetadata;
import io.javaoperatorsdk.operator.ReconcilerUtils;
import io.javaoperatorsdk.operator.api.config.dependent.DependentResourceSpec;
import io.javaoperatorsdk.operator.processing.event.rate.PeriodRateLimiter;
import io.javaoperatorsdk.operator.processing.event.rate.RateLimiter;
import io.javaoperatorsdk.operator.processing.event.source.controller.ResourceEventFilter;
import io.javaoperatorsdk.operator.processing.event.source.controller.ResourceEventFilters;
import io.javaoperatorsdk.operator.processing.retry.GenericRetry;
import io.javaoperatorsdk.operator.processing.retry.Retry;

public interface ControllerConfiguration<R extends HasMetadata> extends ResourceConfiguration<R> {

RateLimiter DEFAULT_RATE_LIMITER = new PeriodRateLimiter();

default String getName() {
return ReconcilerUtils.getDefaultReconcilerName(getAssociatedReconcilerClassName());
}
Expand Down Expand Up @@ -43,6 +47,10 @@ default RetryConfiguration getRetryConfiguration() {
return RetryConfiguration.DEFAULT;
}

default RateLimiter getRateLimiter() {
return DEFAULT_RATE_LIMITER;
}

/**
* Allow controllers to filter events before they are passed to the
* {@link io.javaoperatorsdk.operator.processing.event.EventHandler}.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@
import io.fabric8.kubernetes.api.model.HasMetadata;
import io.javaoperatorsdk.operator.api.config.dependent.DependentResourceSpec;
import io.javaoperatorsdk.operator.processing.dependent.kubernetes.KubernetesDependentResourceConfig;
import io.javaoperatorsdk.operator.processing.event.rate.RateLimiter;
import io.javaoperatorsdk.operator.processing.event.source.controller.ResourceEventFilter;
import io.javaoperatorsdk.operator.processing.retry.GenericRetry;
import io.javaoperatorsdk.operator.processing.retry.Retry;
Expand All @@ -35,6 +36,7 @@ public class ControllerConfigurationOverrider<R extends HasMetadata> {
private Predicate<R> onAddFilter;
private BiPredicate<R, R> onUpdateFilter;
private Predicate<R> genericFilter;
private RateLimiter rateLimiter;

private ControllerConfigurationOverrider(ControllerConfiguration<R> original) {
finalizer = original.getFinalizerName();
Expand All @@ -52,6 +54,7 @@ private ControllerConfigurationOverrider(ControllerConfiguration<R> original) {
this.genericFilter = original.genericFilter().orElse(null);
dependentResources.forEach(drs -> namedDependentResourceSpecs.put(drs.getName(), drs));
this.original = original;
this.rateLimiter = original.getRateLimiter();
}

public ControllerConfigurationOverrider<R> withFinalizer(String finalizer) {
Expand Down Expand Up @@ -114,6 +117,11 @@ public ControllerConfigurationOverrider<R> withRetry(RetryConfiguration retry) {
return this;
}

public ControllerConfigurationOverrider<R> withRateLimiter(RateLimiter rateLimiter) {
this.rateLimiter = rateLimiter;
return this;
}

public ControllerConfigurationOverrider<R> withLabelSelector(String labelSelector) {
this.labelSelector = labelSelector;
return this;
Expand Down Expand Up @@ -196,6 +204,7 @@ public ControllerConfiguration<R> build() {
onAddFilter,
onUpdateFilter,
genericFilter,
rateLimiter,
newDependentSpecs);
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@

import io.fabric8.kubernetes.api.model.HasMetadata;
import io.javaoperatorsdk.operator.api.config.dependent.DependentResourceSpec;
import io.javaoperatorsdk.operator.processing.event.rate.RateLimiter;
import io.javaoperatorsdk.operator.processing.event.source.controller.ResourceEventFilter;
import io.javaoperatorsdk.operator.processing.retry.Retry;

Expand All @@ -27,6 +28,7 @@ public class DefaultControllerConfiguration<R extends HasMetadata>
private final ResourceEventFilter<R> resourceEventFilter;
private final List<DependentResourceSpec> dependents;
private final Duration reconciliationMaxInterval;
private final RateLimiter rateLimiter;

// NOSONAR constructor is meant to provide all information
public DefaultControllerConfiguration(
Expand All @@ -44,6 +46,7 @@ public DefaultControllerConfiguration(
Predicate<R> onAddFilter,
BiPredicate<R, R> onUpdateFilter,
Predicate<R> genericFilter,
RateLimiter rateLimiter,
List<DependentResourceSpec> dependents) {
super(labelSelector, resourceClass, onAddFilter, onUpdateFilter, genericFilter, namespaces);
this.associatedControllerClassName = associatedControllerClassName;
Expand All @@ -57,7 +60,7 @@ public DefaultControllerConfiguration(
? ControllerConfiguration.super.getRetry()
: retry;
this.resourceEventFilter = resourceEventFilter;

this.rateLimiter = rateLimiter;
this.dependents = dependents != null ? dependents : Collections.emptyList();
}

Expand Down Expand Up @@ -105,4 +108,9 @@ public List<DependentResourceSpec> getDependentResources() {
public Optional<Duration> reconciliationMaxInterval() {
return Optional.ofNullable(reconciliationMaxInterval);
}

@Override
public RateLimiter getRateLimiter() {
return rateLimiter;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -91,6 +91,9 @@
ReconciliationMaxInterval reconciliationMaxInterval() default @ReconciliationMaxInterval(
interval = 10);


RateLimit rateLimit() default @RateLimit;
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This will need to be fixed as this doesn't make any sense if users can provide their own RateLimiter implementations…

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The idea is that some can configure basic rate limit, what is usuabble in most of the cases. If the default implementation does not fit, configuration overrride still can be used.

Note that this is empty by default, so there is no default rate limiter.


/**
* Optional list of {@link Dependent} configurations which associate a resource type to a
* {@link io.javaoperatorsdk.operator.api.reconciler.dependent.DependentResource} implementation
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,23 @@
package io.javaoperatorsdk.operator.api.reconciler;

import java.lang.annotation.ElementType;
import java.lang.annotation.Retention;
import java.lang.annotation.RetentionPolicy;
import java.lang.annotation.Target;
import java.util.concurrent.TimeUnit;

import io.javaoperatorsdk.operator.processing.event.rate.PeriodRateLimiter;

@Retention(RetentionPolicy.RUNTIME)
@Target({ElementType.TYPE})
public @interface RateLimit {

int limitForPeriod() default PeriodRateLimiter.NO_LIMIT_PERIOD;

int refreshPeriod() default PeriodRateLimiter.DEFAULT_REFRESH_PERIOD_SECONDS;

/**
* @return time unit for max delay between reconciliations
*/
TimeUnit refreshPeriodTimeUnit() default TimeUnit.SECONDS;
}
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
package io.javaoperatorsdk.operator.processing.event;

import java.time.Duration;
import java.util.HashMap;
import java.util.HashSet;
import java.util.Map;
Expand All @@ -20,6 +21,7 @@
import io.javaoperatorsdk.operator.api.reconciler.RetryInfo;
import io.javaoperatorsdk.operator.processing.LifecycleAware;
import io.javaoperatorsdk.operator.processing.MDCUtils;
import io.javaoperatorsdk.operator.processing.event.rate.RateLimiter;
import io.javaoperatorsdk.operator.processing.event.source.Cache;
import io.javaoperatorsdk.operator.processing.event.source.controller.ResourceAction;
import io.javaoperatorsdk.operator.processing.event.source.controller.ResourceEvent;
Expand All @@ -32,18 +34,20 @@
class EventProcessor<R extends HasMetadata> implements EventHandler, LifecycleAware {

private static final Logger log = LoggerFactory.getLogger(EventProcessor.class);
private static final long MINIMAL_RATE_LIMIT_RESCHEDULE_DURATION = 50;

private volatile boolean running;
private final Set<ResourceID> underProcessing = new HashSet<>();
private final ReconciliationDispatcher<R> reconciliationDispatcher;
private final Retry retry;
private final Map<ResourceID, RetryExecution> retryState = new HashMap<>();
private final ExecutorService executor;
private final String controllerName;
private final Metrics metrics;
private volatile boolean running;
private final Cache<R> cache;
private final EventSourceManager<R> eventSourceManager;
private final EventMarker eventMarker = new EventMarker();
private final RateLimiter rateLimiter;

EventProcessor(EventSourceManager<R> eventSourceManager) {
this(
Expand All @@ -53,6 +57,7 @@ class EventProcessor<R extends HasMetadata> implements EventHandler, LifecycleAw
new ReconciliationDispatcher<>(eventSourceManager.getController()),
eventSourceManager.getController().getConfiguration().getRetry(),
ConfigurationServiceProvider.instance().getMetrics(),
eventSourceManager.getController().getConfiguration().getRateLimiter(),
eventSourceManager);
}

Expand All @@ -61,6 +66,7 @@ class EventProcessor<R extends HasMetadata> implements EventHandler, LifecycleAw
EventSourceManager<R> eventSourceManager,
String relatedControllerName,
Retry retry,
RateLimiter rateLimiter,
Metrics metrics) {
this(
eventSourceManager.getControllerResourceEventSource(),
Expand All @@ -69,6 +75,7 @@ class EventProcessor<R extends HasMetadata> implements EventHandler, LifecycleAw
reconciliationDispatcher,
retry,
metrics,
rateLimiter,
eventSourceManager);
}

Expand All @@ -79,6 +86,7 @@ private EventProcessor(
ReconciliationDispatcher<R> reconciliationDispatcher,
Retry retry,
Metrics metrics,
RateLimiter rateLimiter,
EventSourceManager<R> eventSourceManager) {
this.running = false;
this.executor =
Expand All @@ -92,6 +100,7 @@ private EventProcessor(
this.cache = cache;
this.metrics = metrics != null ? metrics : Metrics.NOOP;
this.eventSourceManager = eventSourceManager;
this.rateLimiter = rateLimiter;
}

@Override
Expand Down Expand Up @@ -128,6 +137,11 @@ private void submitReconciliationExecution(ResourceID resourceID) {
Optional<R> latest = cache.get(resourceID);
latest.ifPresent(MDCUtils::addResourceInfo);
if (!controllerUnderExecution && latest.isPresent()) {
var rateLimiterPermission = rateLimiter.acquirePermission(resourceID);
if (rateLimiterPermission.isPresent()) {
handleRateLimitedSubmission(resourceID, rateLimiterPermission.get());
return;
}
setUnderExecutionProcessing(resourceID);
final var retryInfo = retryInfo(resourceID);
ExecutionScope<R> executionScope = new ExecutionScope<>(latest.get(), retryInfo);
Expand Down Expand Up @@ -193,6 +207,14 @@ private boolean isResourceMarkedForDeletion(ResourceEvent resourceEvent) {
return resourceEvent.getResource().map(HasMetadata::isMarkedForDeletion).orElse(false);
}

private void handleRateLimitedSubmission(ResourceID resourceID, Duration minimalDuration) {
var minimalDurationMillis = minimalDuration.toMillis();
log.debug("Rate limited resource: {}, rescheduled in {} millis", resourceID,
minimalDurationMillis);
retryEventSource().scheduleOnce(resourceID,
Math.max(minimalDurationMillis, MINIMAL_RATE_LIMIT_RESCHEDULE_DURATION));
}

private RetryInfo retryInfo(ResourceID resourceID) {
return retryState.get(resourceID);
}
Expand Down Expand Up @@ -251,11 +273,10 @@ private void reScheduleExecutionIfInstructed(
postExecutionControl
.getReScheduleDelay()
.ifPresent(delay -> {
if (log.isDebugEnabled()) {
log.debug("ReScheduling event for resource: {} with delay: {}",
ResourceID.fromResource(customResource), delay);
}
retryEventSource().scheduleOnce(customResource, delay);
var resourceID = ResourceID.fromResource(customResource);
log.debug("ReScheduling event for resource: {} with delay: {}",
resourceID, delay);
retryEventSource().scheduleOnce(resourceID, delay);
});
}

Expand Down Expand Up @@ -289,7 +310,7 @@ private void handleRetryOnException(
delay,
resourceID);
metrics.failedReconciliation(resourceID, exception);
retryEventSource().scheduleOnce(executionScope.getResource(), delay);
retryEventSource().scheduleOnce(resourceID, delay);
},
() -> log.error("Exhausted retries for {}", executionScope));
}
Expand All @@ -315,6 +336,7 @@ private RetryExecution getOrInitRetryExecution(ExecutionScope<R> executionScope)
private void cleanupForDeletedEvent(ResourceID resourceID) {
log.debug("Cleaning up for delete event for: {}", resourceID);
eventMarker.cleanup(resourceID);
rateLimiter.clear(resourceID);
metrics.cleanupDoneFor(resourceID);
}

Expand Down
Loading