Skip to content

feat: decouple configuration of rate limiting and retry from controller #1317

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 17 commits into from
Jul 11, 2022
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
16 changes: 11 additions & 5 deletions docs/documentation/features.md
Original file line number Diff line number Diff line change
Expand Up @@ -346,12 +346,18 @@ Users can override it by implementing their own
[`RateLimiter`](https://github.com/java-operator-sdk/java-operator-sdk/blob/ce4d996ee073ebef5715737995fc3d33f4751275/operator-framework-core/src/main/java/io/javaoperatorsdk/operator/processing/event/rate/RateLimiter.java)
.

To configure the default rate limiter use `@ControllerConfiguration` annotation. The following
configuration limits
each resource to reconcile at most twice within a 3 second interval:
To configure the default rate limiter use the `@LimitingRateOverPeriod` annotation on your
`Reconciler` class. The following configuration limits each resource to reconcile at most twice
within a 3 second interval:

`@ControllerConfiguration(rateLimit = @RateLimit(limitForPeriod = 2,refreshPeriod = 3,refreshPeriodTimeUnit = TimeUnit.SECONDS))`
.
```java

@LimitingRateOverPeriod(maxReconciliations = 2, within = 3, unit = TimeUnit.SECONDS)
@ControllerConfiguration
public class MyReconciler implements Reconciler<MyCR> {

}
```

Thus, if a given resource was reconciled twice in one second, no further reconciliation for this
resource will happen before two seconds have elapsed. Note that, since rate is limited on a
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,7 @@
package io.javaoperatorsdk.operator.api.config;

import java.lang.annotation.Annotation;

public interface AnnotationConfigurable<C extends Annotation> {
void initFrom(C configuration);
}
Original file line number Diff line number Diff line change
@@ -1,5 +1,7 @@
package io.javaoperatorsdk.operator.api.config;

import java.lang.annotation.Annotation;
import java.lang.reflect.Constructor;
import java.lang.reflect.InvocationTargetException;
import java.time.Duration;
import java.util.Arrays;
Expand Down Expand Up @@ -27,14 +29,14 @@
import io.javaoperatorsdk.operator.processing.dependent.kubernetes.KubernetesDependentResource;
import io.javaoperatorsdk.operator.processing.dependent.kubernetes.KubernetesDependentResourceConfig;
import io.javaoperatorsdk.operator.processing.dependent.workflow.Condition;
import io.javaoperatorsdk.operator.processing.event.rate.PeriodRateLimiter;
import io.javaoperatorsdk.operator.processing.event.rate.RateLimiter;
import io.javaoperatorsdk.operator.processing.event.source.controller.ResourceEventFilter;
import io.javaoperatorsdk.operator.processing.event.source.controller.ResourceEventFilters;
import io.javaoperatorsdk.operator.processing.event.source.filter.VoidGenericFilter;
import io.javaoperatorsdk.operator.processing.event.source.filter.VoidOnAddFilter;
import io.javaoperatorsdk.operator.processing.event.source.filter.VoidOnDeleteFilter;
import io.javaoperatorsdk.operator.processing.event.source.filter.VoidOnUpdateFilter;
import io.javaoperatorsdk.operator.processing.retry.Retry;

import static io.javaoperatorsdk.operator.api.reconciler.Constants.DEFAULT_NAMESPACES_SET;

Expand Down Expand Up @@ -154,12 +156,39 @@ public Optional<Duration> reconciliationMaxInterval() {

@Override
public RateLimiter getRateLimiter() {
if (annotation.rateLimit() != null) {
return new PeriodRateLimiter(Duration.of(annotation.rateLimit().refreshPeriod(),
annotation.rateLimit().refreshPeriodTimeUnit().toChronoUnit()),
annotation.rateLimit().limitForPeriod());
} else {
return io.javaoperatorsdk.operator.api.config.ControllerConfiguration.super.getRateLimiter();
final Class<? extends RateLimiter> rateLimiterClass = annotation.rateLimiter();
return instantiateAndConfigureIfNeeded(rateLimiterClass, RateLimiter.class);
}

@Override
public Retry getRetry() {
final Class<? extends Retry> retryClass = annotation.retry();
return instantiateAndConfigureIfNeeded(retryClass, Retry.class);
}

@SuppressWarnings("unchecked")
private <T> T instantiateAndConfigureIfNeeded(Class<? extends T> targetClass,
Class<T> expectedType) {
try {
final Constructor<? extends T> constructor = targetClass.getDeclaredConstructor();
constructor.setAccessible(true);
final var instance = constructor.newInstance();
if (instance instanceof AnnotationConfigurable) {
AnnotationConfigurable configurable = (AnnotationConfigurable) instance;
final Class<? extends Annotation> configurationClass =
(Class<? extends Annotation>) Utils.getFirstTypeArgumentFromSuperClassOrInterface(
targetClass, AnnotationConfigurable.class);
final var configAnnotation = reconciler.getClass().getAnnotation(configurationClass);
if (configAnnotation != null) {
configurable.initFrom(configAnnotation);
}
}
return instance;
} catch (InstantiationException | IllegalAccessException | InvocationTargetException
| NoSuchMethodException e) {
throw new OperatorException("Couldn't instantiate " + expectedType.getSimpleName() + " '"
+ targetClass.getName() + "' for '" + getName()
+ "' reconciler. You need to provide an accessible no-arg constructor.", e);
}
}

Expand Down Expand Up @@ -280,7 +309,7 @@ private String getName(Dependent dependent, Class<? extends DependentResource> d
return name;
}

@SuppressWarnings("rawtypes")
@SuppressWarnings({"rawtypes", "unchecked"})
private Object createKubernetesResourceConfig(Class<? extends DependentResource> dependentType) {

Object config;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -8,16 +8,17 @@
import io.fabric8.kubernetes.api.model.HasMetadata;
import io.javaoperatorsdk.operator.ReconcilerUtils;
import io.javaoperatorsdk.operator.api.config.dependent.DependentResourceSpec;
import io.javaoperatorsdk.operator.processing.event.rate.PeriodRateLimiter;
import io.javaoperatorsdk.operator.processing.event.rate.LinearRateLimiter;
import io.javaoperatorsdk.operator.processing.event.rate.RateLimiter;
import io.javaoperatorsdk.operator.processing.event.source.controller.ResourceEventFilter;
import io.javaoperatorsdk.operator.processing.event.source.controller.ResourceEventFilters;
import io.javaoperatorsdk.operator.processing.retry.GenericRetry;
import io.javaoperatorsdk.operator.processing.retry.GradualRetry;
import io.javaoperatorsdk.operator.processing.retry.Retry;

public interface ControllerConfiguration<R extends HasMetadata> extends ResourceConfiguration<R> {

RateLimiter DEFAULT_RATE_LIMITER = new PeriodRateLimiter();
RateLimiter DEFAULT_RATE_LIMITER = LinearRateLimiter.deactivatedRateLimiter();

default String getName() {
return ReconcilerUtils.getDefaultReconcilerName(getAssociatedReconcilerClassName());
Expand All @@ -34,15 +35,20 @@ default boolean isGenerationAware() {
String getAssociatedReconcilerClassName();

default Retry getRetry() {
return GenericRetry.fromConfiguration(getRetryConfiguration()); // NOSONAR
final var configuration = getRetryConfiguration();
return !RetryConfiguration.DEFAULT.equals(configuration)
? GenericRetry.fromConfiguration(configuration)
: GenericRetry.DEFAULT; // NOSONAR
}

/**
* Use getRetry instead.
* Use {@link #getRetry()} instead.
*
* @return configuration for retry.
* @deprecated provide your own {@link Retry} implementation or use the {@link GradualRetry}
* annotation instead
*/
@Deprecated
@Deprecated(forRemoval = true)
default RetryConfiguration getRetryConfiguration() {
return RetryConfiguration.DEFAULT;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -106,14 +106,17 @@ public ControllerConfigurationOverrider<R> watchingAllNamespaces() {
return this;
}

public ControllerConfigurationOverrider<R> withRetry(Retry retry) {
this.retry = retry;
/**
* @deprecated Use {@link #withRetry(Retry)} instead
*/
@Deprecated(forRemoval = true)
public ControllerConfigurationOverrider<R> withRetry(RetryConfiguration retry) {
this.retry = GenericRetry.fromConfiguration(retry);
return this;
}

@Deprecated
public ControllerConfigurationOverrider<R> withRetry(RetryConfiguration retry) {
this.retry = GenericRetry.fromConfiguration(retry);
public ControllerConfigurationOverrider<R> withRetry(Retry retry) {
this.retry = retry;
return this;
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@

import io.fabric8.kubernetes.api.model.HasMetadata;
import io.javaoperatorsdk.operator.api.config.dependent.DependentResourceSpec;
import io.javaoperatorsdk.operator.processing.event.rate.LinearRateLimiter;
import io.javaoperatorsdk.operator.processing.event.rate.RateLimiter;
import io.javaoperatorsdk.operator.processing.event.source.controller.ResourceEventFilter;
import io.javaoperatorsdk.operator.processing.retry.Retry;
Expand Down Expand Up @@ -60,7 +61,8 @@ public DefaultControllerConfiguration(
? ControllerConfiguration.super.getRetry()
: retry;
this.resourceEventFilter = resourceEventFilter;
this.rateLimiter = rateLimiter;
this.rateLimiter =
rateLimiter != null ? rateLimiter : LinearRateLimiter.deactivatedRateLimiter();
this.dependents = dependents != null ? dependents : Collections.emptyList();
}

Expand Down
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
package io.javaoperatorsdk.operator.api.config;

public class DefaultRetryConfiguration implements RetryConfiguration {

}
Original file line number Diff line number Diff line change
@@ -1,5 +1,12 @@
package io.javaoperatorsdk.operator.api.config;

import io.javaoperatorsdk.operator.processing.retry.GradualRetry;

/**
* @deprecated specify your own {@link io.javaoperatorsdk.operator.processing.retry.Retry}
* implementation or use {@link GradualRetry} annotation instead
*/
@Deprecated(forRemoval = true)
public interface RetryConfiguration {

RetryConfiguration DEFAULT = new DefaultRetryConfiguration();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -9,10 +9,14 @@

import io.fabric8.kubernetes.api.model.HasMetadata;
import io.javaoperatorsdk.operator.api.reconciler.dependent.Dependent;
import io.javaoperatorsdk.operator.processing.event.rate.LinearRateLimiter;
import io.javaoperatorsdk.operator.processing.event.rate.RateLimiter;
import io.javaoperatorsdk.operator.processing.event.source.controller.ResourceEventFilter;
import io.javaoperatorsdk.operator.processing.event.source.filter.VoidGenericFilter;
import io.javaoperatorsdk.operator.processing.event.source.filter.VoidOnAddFilter;
import io.javaoperatorsdk.operator.processing.event.source.filter.VoidOnUpdateFilter;
import io.javaoperatorsdk.operator.processing.retry.GenericRetry;
import io.javaoperatorsdk.operator.processing.retry.Retry;

@Retention(RetentionPolicy.RUNTIME)
@Target({ElementType.TYPE})
Expand Down Expand Up @@ -92,13 +96,27 @@ ReconciliationMaxInterval reconciliationMaxInterval() default @ReconciliationMax
interval = 10);


RateLimit rateLimit() default @RateLimit;

/**
* Optional list of {@link Dependent} configurations which associate a resource type to a
* {@link io.javaoperatorsdk.operator.api.reconciler.dependent.DependentResource} implementation
*
* @return the list of {@link Dependent} configurations
*/
Dependent[] dependents() default {};

/**
* Optional {@link Retry} implementation for the associated controller to use.
*
* @return the class providing the {@link Retry} implementation to use, needs to provide an
* accessible no-arg constructor.
*/
Class<? extends Retry> retry() default GenericRetry.class;
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Maybe have some javadoc here? link how to configur it.


/**
* Optional {@link RateLimiter} implementation for the associated controller to use.
*
* @return the class providing the {@link RateLimiter} implementation to use, needs to provide an
* accessible no-arg constructor.
*/
Class<? extends RateLimiter> rateLimiter() default LinearRateLimiter.class;
}

This file was deleted.

Original file line number Diff line number Diff line change
Expand Up @@ -6,38 +6,37 @@
import java.util.Map;
import java.util.Optional;

import io.javaoperatorsdk.operator.api.config.AnnotationConfigurable;
import io.javaoperatorsdk.operator.processing.event.ResourceID;

/**
* A Simple rate limiter that limits the number of permission for a time interval.
* A simple rate limiter that limits the number of permission for a time interval.
*/
public class PeriodRateLimiter implements RateLimiter {
public class LinearRateLimiter
implements RateLimiter, AnnotationConfigurable<RateLimited> {

public static final int DEFAULT_REFRESH_PERIOD_SECONDS = 10;
public static final int DEFAULT_LIMIT_FOR_PERIOD = 3;
public static final Duration DEFAULT_REFRESH_PERIOD =
Duration.ofSeconds(DEFAULT_REFRESH_PERIOD_SECONDS);

/** To turn off rate limiting set limit fod period to a non-positive number */
/** To turn off rate limiting set limit for period to a non-positive number */
public static final int NO_LIMIT_PERIOD = -1;

private Duration refreshPeriod;
private int limitForPeriod;
private int limitForPeriod = NO_LIMIT_PERIOD;

private Map<ResourceID, RateState> limitData = new HashMap<>();
private final Map<ResourceID, RateState> limitData = new HashMap<>();

public PeriodRateLimiter() {
this(DEFAULT_REFRESH_PERIOD, DEFAULT_LIMIT_FOR_PERIOD);
public static LinearRateLimiter deactivatedRateLimiter() {
return new LinearRateLimiter();
}

public PeriodRateLimiter(Duration refreshPeriod, int limitForPeriod) {
LinearRateLimiter() {}

public LinearRateLimiter(Duration refreshPeriod, int limitForPeriod) {
this.refreshPeriod = refreshPeriod;
this.limitForPeriod = limitForPeriod;
}

@Override
public Optional<Duration> acquirePermission(ResourceID resourceID) {
if (limitForPeriod <= 0) {
if (!isActivated()) {
return Optional.empty();
}
var actualState = limitData.computeIfAbsent(resourceID, r -> RateState.initialState());
Expand All @@ -58,4 +57,23 @@ public Optional<Duration> acquirePermission(ResourceID resourceID) {
public void clear(ResourceID resourceID) {
limitData.remove(resourceID);
}

@Override
public void initFrom(RateLimited configuration) {
this.refreshPeriod = Duration.of(configuration.within(),
configuration.unit().toChronoUnit());
this.limitForPeriod = configuration.maxReconciliations();
}

public boolean isActivated() {
return limitForPeriod > 0;
}

public int getLimitForPeriod() {
return limitForPeriod;
}

public Duration getRefreshPeriod() {
return refreshPeriod;
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,21 @@
package io.javaoperatorsdk.operator.processing.event.rate;

import java.lang.annotation.ElementType;
import java.lang.annotation.Retention;
import java.lang.annotation.RetentionPolicy;
import java.lang.annotation.Target;
import java.util.concurrent.TimeUnit;

@Retention(RetentionPolicy.RUNTIME)
@Target({ElementType.TYPE})
public @interface RateLimited {

int maxReconciliations();

int within();

/**
* @return time unit for max delay between reconciliations
*/
TimeUnit unit() default TimeUnit.SECONDS;
}
Loading