From caf12d5f8961f118483dd1ee49d82ee3fc42674e Mon Sep 17 00:00:00 2001 From: Chris Laprun Date: Tue, 31 May 2022 15:46:02 +0200 Subject: [PATCH 01/26] refactor: rename JUnit extensions to be more explicit (#1254) Fixes #1215 --- .../operator/junit/LocallyRunOperatorExtension.java | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/operator-framework-junit5/src/main/java/io/javaoperatorsdk/operator/junit/LocallyRunOperatorExtension.java b/operator-framework-junit5/src/main/java/io/javaoperatorsdk/operator/junit/LocallyRunOperatorExtension.java index e2f4234453..f6d98a7b45 100644 --- a/operator-framework-junit5/src/main/java/io/javaoperatorsdk/operator/junit/LocallyRunOperatorExtension.java +++ b/operator-framework-junit5/src/main/java/io/javaoperatorsdk/operator/junit/LocallyRunOperatorExtension.java @@ -239,7 +239,7 @@ public Builder withPortForward(String namespace, String labelKey, String labelVa public Builder withAdditionalCustomResourceDefinition( - Class customResource) { + Class customResource) { additionalCustomResourceDefinitions.add(customResource); return this; } From 4065802690503576fab7030552e700c7c309228e Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Attila=20M=C3=A9sz=C3=A1ros?= Date: Thu, 9 Jun 2022 15:14:46 +0200 Subject: [PATCH 02/26] feat: workflow Integration with API (dependent annotations, context) (#1257) --- .../java/io/javaoperatorsdk/operator/api/config/UtilsTest.java | 2 ++ 1 file changed, 2 insertions(+) diff --git a/operator-framework-core/src/test/java/io/javaoperatorsdk/operator/api/config/UtilsTest.java b/operator-framework-core/src/test/java/io/javaoperatorsdk/operator/api/config/UtilsTest.java index 87e60b8aa6..fa43a350ee 100644 --- a/operator-framework-core/src/test/java/io/javaoperatorsdk/operator/api/config/UtilsTest.java +++ b/operator-framework-core/src/test/java/io/javaoperatorsdk/operator/api/config/UtilsTest.java @@ -9,6 +9,8 @@ import io.javaoperatorsdk.operator.api.reconciler.Reconciler; import io.javaoperatorsdk.operator.api.reconciler.UpdateControl; import io.javaoperatorsdk.operator.api.reconciler.dependent.DependentResource; +import io.javaoperatorsdk.operator.api.reconciler.dependent.ReconcileResult; +import io.javaoperatorsdk.operator.processing.dependent.EmptyTestDependentResource; import io.javaoperatorsdk.operator.processing.dependent.EmptyTestDependentResource; import io.javaoperatorsdk.operator.processing.dependent.kubernetes.KubernetesDependentResource; import io.javaoperatorsdk.operator.sample.simple.TestCustomResource; From 9a8fc20b209e675691643c5bfa853519d44a4d39 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Attila=20M=C3=A9sz=C3=A1ros?= Date: Thu, 5 May 2022 09:23:42 +0200 Subject: [PATCH 03/26] feat: garbage collected interface (#1164) --- .../io/javaoperatorsdk/operator/processing/Controller.java | 4 ++++ .../dependent/kubernetes/CRUDKubernetesDependentResource.java | 1 + .../dependent/kubernetes/KubernetesDependentResource.java | 1 + 3 files changed, 6 insertions(+) diff --git a/operator-framework-core/src/main/java/io/javaoperatorsdk/operator/processing/Controller.java b/operator-framework-core/src/main/java/io/javaoperatorsdk/operator/processing/Controller.java index acbf2f2632..00b7753998 100644 --- a/operator-framework-core/src/main/java/io/javaoperatorsdk/operator/processing/Controller.java +++ b/operator-framework-core/src/main/java/io/javaoperatorsdk/operator/processing/Controller.java @@ -35,6 +35,10 @@ import io.javaoperatorsdk.operator.api.reconciler.dependent.managed.DefaultManagedDependentResourceContext; import io.javaoperatorsdk.operator.processing.dependent.workflow.ManagedWorkflow; import io.javaoperatorsdk.operator.processing.dependent.workflow.WorkflowCleanupResult; +import io.javaoperatorsdk.operator.api.reconciler.dependent.GarbageCollected; +import io.javaoperatorsdk.operator.api.reconciler.dependent.managed.DependentResourceConfigurator; +import io.javaoperatorsdk.operator.api.reconciler.dependent.managed.KubernetesClientAware; +import io.javaoperatorsdk.operator.api.reconciler.dependent.managed.ManagedDependentResourceException; import io.javaoperatorsdk.operator.processing.event.EventSourceManager; import static io.javaoperatorsdk.operator.api.reconciler.Constants.WATCH_CURRENT_NAMESPACE; diff --git a/operator-framework-core/src/main/java/io/javaoperatorsdk/operator/processing/dependent/kubernetes/CRUDKubernetesDependentResource.java b/operator-framework-core/src/main/java/io/javaoperatorsdk/operator/processing/dependent/kubernetes/CRUDKubernetesDependentResource.java index 9440898d2f..7f8834bad3 100644 --- a/operator-framework-core/src/main/java/io/javaoperatorsdk/operator/processing/dependent/kubernetes/CRUDKubernetesDependentResource.java +++ b/operator-framework-core/src/main/java/io/javaoperatorsdk/operator/processing/dependent/kubernetes/CRUDKubernetesDependentResource.java @@ -1,6 +1,7 @@ package io.javaoperatorsdk.operator.processing.dependent.kubernetes; import io.fabric8.kubernetes.api.model.HasMetadata; +import io.javaoperatorsdk.operator.api.reconciler.dependent.GarbageCollected; import io.javaoperatorsdk.operator.api.reconciler.Ignore; import io.javaoperatorsdk.operator.api.reconciler.dependent.GarbageCollected; import io.javaoperatorsdk.operator.processing.dependent.Creator; diff --git a/operator-framework-core/src/main/java/io/javaoperatorsdk/operator/processing/dependent/kubernetes/KubernetesDependentResource.java b/operator-framework-core/src/main/java/io/javaoperatorsdk/operator/processing/dependent/kubernetes/KubernetesDependentResource.java index ff5cc308bf..d215e7b12f 100644 --- a/operator-framework-core/src/main/java/io/javaoperatorsdk/operator/processing/dependent/kubernetes/KubernetesDependentResource.java +++ b/operator-framework-core/src/main/java/io/javaoperatorsdk/operator/processing/dependent/kubernetes/KubernetesDependentResource.java @@ -19,6 +19,7 @@ import io.javaoperatorsdk.operator.api.reconciler.EventSourceContext; import io.javaoperatorsdk.operator.api.reconciler.Ignore; import io.javaoperatorsdk.operator.api.reconciler.dependent.GarbageCollected; +import io.javaoperatorsdk.operator.api.reconciler.dependent.GarbageCollected; import io.javaoperatorsdk.operator.api.reconciler.dependent.managed.DependentResourceConfigurator; import io.javaoperatorsdk.operator.api.reconciler.dependent.managed.KubernetesClientAware; import io.javaoperatorsdk.operator.processing.dependent.AbstractEventSourceHolderDependentResource; From 0c74a2d8c4b7baa3d5a6199213b833337a80aefc Mon Sep 17 00:00:00 2001 From: csviri Date: Thu, 16 Jun 2022 14:29:26 +0200 Subject: [PATCH 04/26] feat: rate limiting --- .../processing/event/rate/RateLimiter.java | 50 +++++++++++++++++ .../processing/event/rate/RateState.java | 35 ++++++++++++ .../event/rate/RateLimiterTest.java | 53 +++++++++++++++++++ 3 files changed, 138 insertions(+) create mode 100644 operator-framework-core/src/main/java/io/javaoperatorsdk/operator/processing/event/rate/RateLimiter.java create mode 100644 operator-framework-core/src/main/java/io/javaoperatorsdk/operator/processing/event/rate/RateState.java create mode 100644 operator-framework-core/src/test/java/io/javaoperatorsdk/operator/processing/event/rate/RateLimiterTest.java diff --git a/operator-framework-core/src/main/java/io/javaoperatorsdk/operator/processing/event/rate/RateLimiter.java b/operator-framework-core/src/main/java/io/javaoperatorsdk/operator/processing/event/rate/RateLimiter.java new file mode 100644 index 0000000000..652d3b2c0d --- /dev/null +++ b/operator-framework-core/src/main/java/io/javaoperatorsdk/operator/processing/event/rate/RateLimiter.java @@ -0,0 +1,50 @@ +package io.javaoperatorsdk.operator.processing.event.rate; + +import java.time.Duration; +import java.time.LocalDateTime; +import java.util.HashMap; +import java.util.Map; +import java.util.Optional; + +import io.javaoperatorsdk.operator.processing.event.ResourceID; + +/** + * A Simple rate limiter per resource. + */ +public class RateLimiter { + + private Duration refreshPeriod; + private int limitForPeriod; + + private Map limitData = new HashMap<>(); + + public RateLimiter(Duration refreshPeriod, int limitForPeriod) { + this.refreshPeriod = refreshPeriod; + this.limitForPeriod = limitForPeriod; + } + + /** + * + * @param resourceID id of the resource + * @return empty if permission acquired or minimal duration until a permission could be acquired + * again + */ + public Optional acquirePermission(ResourceID resourceID) { + var actualState = limitData.computeIfAbsent(resourceID, r -> RateState.initialState()); + if (actualState.getCount() < limitForPeriod) { + actualState.increaseCount(); + return Optional.empty(); + } else if (actualState.getLastRefreshTime() + .isBefore(LocalDateTime.now().minus(refreshPeriod))) { + actualState.reset(); + actualState.increaseCount(); + return Optional.empty(); + } else { + return Optional.of(Duration.between(actualState.getLastRefreshTime(), LocalDateTime.now())); + } + } + + public void clear(ResourceID resourceID) { + limitData.remove(resourceID); + } +} diff --git a/operator-framework-core/src/main/java/io/javaoperatorsdk/operator/processing/event/rate/RateState.java b/operator-framework-core/src/main/java/io/javaoperatorsdk/operator/processing/event/rate/RateState.java new file mode 100644 index 0000000000..f1302b077d --- /dev/null +++ b/operator-framework-core/src/main/java/io/javaoperatorsdk/operator/processing/event/rate/RateState.java @@ -0,0 +1,35 @@ +package io.javaoperatorsdk.operator.processing.event.rate; + +import java.time.LocalDateTime; + +class RateState { + + private LocalDateTime lastRefreshTime; + private int count; + + public static RateState initialState() { + return new RateState(LocalDateTime.now(), 0); + } + + RateState(LocalDateTime lastRefreshTime, int count) { + this.lastRefreshTime = lastRefreshTime; + this.count = count; + } + + public void increaseCount() { + count = count + 1; + } + + public void reset() { + lastRefreshTime = LocalDateTime.now(); + count = 0; + } + + public LocalDateTime getLastRefreshTime() { + return lastRefreshTime; + } + + public int getCount() { + return count; + } +} diff --git a/operator-framework-core/src/test/java/io/javaoperatorsdk/operator/processing/event/rate/RateLimiterTest.java b/operator-framework-core/src/test/java/io/javaoperatorsdk/operator/processing/event/rate/RateLimiterTest.java new file mode 100644 index 0000000000..29d1daddf8 --- /dev/null +++ b/operator-framework-core/src/test/java/io/javaoperatorsdk/operator/processing/event/rate/RateLimiterTest.java @@ -0,0 +1,53 @@ +package io.javaoperatorsdk.operator.processing.event.rate; + +import java.time.Duration; + +import org.junit.jupiter.api.Test; + +import io.javaoperatorsdk.operator.TestUtils; +import io.javaoperatorsdk.operator.processing.event.ResourceID; + +import static org.assertj.core.api.Assertions.assertThat; +import static org.junit.jupiter.api.Assertions.*; + +class RateLimiterTest { + + public static final Duration REFRESH_PERIOD = Duration.ofSeconds(1); + ResourceID resourceID = ResourceID.fromResource(TestUtils.testCustomResource()); + + @Test + void acquirePermissionForNewResource() { + var rl = new RateLimiter(REFRESH_PERIOD, 1); + var res = rl.acquirePermission(resourceID); + + assertThat(res).isEmpty(); + } + + @Test + void returnsMinimalDurationToAcquirePermission() { + var rl = new RateLimiter(REFRESH_PERIOD, 1); + var res = rl.acquirePermission(resourceID); + assertThat(res).isEmpty(); + + res = rl.acquirePermission(resourceID); + + assertThat(res).isPresent(); + assertThat(res.get()).isLessThan(REFRESH_PERIOD); + } + + @Test + void resetsPeriodAfterLimit() throws InterruptedException { + var rl = new RateLimiter(REFRESH_PERIOD, 1); + var res = rl.acquirePermission(resourceID); + assertThat(res).isEmpty(); + res = rl.acquirePermission(resourceID); + assertThat(res).isPresent(); + + // sleep plus some slack + Thread.sleep(REFRESH_PERIOD.toMillis() + REFRESH_PERIOD.toMillis() / 2); + + res = rl.acquirePermission(resourceID); + assertThat(res).isEmpty(); + } + +} From 46ccd246a64bd24bd594d66178b827e6135bfd29 Mon Sep 17 00:00:00 2001 From: csviri Date: Thu, 16 Jun 2022 14:42:36 +0200 Subject: [PATCH 05/26] test improvement --- .../processing/event/rate/RateLimiterTest.java | 12 ++++++++---- 1 file changed, 8 insertions(+), 4 deletions(-) diff --git a/operator-framework-core/src/test/java/io/javaoperatorsdk/operator/processing/event/rate/RateLimiterTest.java b/operator-framework-core/src/test/java/io/javaoperatorsdk/operator/processing/event/rate/RateLimiterTest.java index 29d1daddf8..e1e20c75e2 100644 --- a/operator-framework-core/src/test/java/io/javaoperatorsdk/operator/processing/event/rate/RateLimiterTest.java +++ b/operator-framework-core/src/test/java/io/javaoperatorsdk/operator/processing/event/rate/RateLimiterTest.java @@ -12,15 +12,19 @@ class RateLimiterTest { - public static final Duration REFRESH_PERIOD = Duration.ofSeconds(1); + public static final Duration REFRESH_PERIOD = Duration.ofMillis(300); ResourceID resourceID = ResourceID.fromResource(TestUtils.testCustomResource()); @Test void acquirePermissionForNewResource() { - var rl = new RateLimiter(REFRESH_PERIOD, 1); + var rl = new RateLimiter(REFRESH_PERIOD, 2); var res = rl.acquirePermission(resourceID); - assertThat(res).isEmpty(); + res = rl.acquirePermission(resourceID); + assertThat(res).isEmpty(); + + res = rl.acquirePermission(resourceID); + assertThat(res).isNotEmpty(); } @Test @@ -44,7 +48,7 @@ void resetsPeriodAfterLimit() throws InterruptedException { assertThat(res).isPresent(); // sleep plus some slack - Thread.sleep(REFRESH_PERIOD.toMillis() + REFRESH_PERIOD.toMillis() / 2); + Thread.sleep(REFRESH_PERIOD.toMillis() + REFRESH_PERIOD.toMillis() / 3); res = rl.acquirePermission(resourceID); assertThat(res).isEmpty(); From 114c0be793023a90c2b21081eac04b3df14cf38f Mon Sep 17 00:00:00 2001 From: csviri Date: Thu, 16 Jun 2022 15:03:19 +0200 Subject: [PATCH 06/26] controller integration --- .../processing/event/EventProcessor.java | 28 +++++++++++++++---- .../event/source/timer/TimerEventSource.java | 12 ++++---- .../processing/event/EventProcessorTest.java | 6 ++-- .../source/timer/TimerEventSourceTest.java | 24 ++++++++-------- 4 files changed, 44 insertions(+), 26 deletions(-) diff --git a/operator-framework-core/src/main/java/io/javaoperatorsdk/operator/processing/event/EventProcessor.java b/operator-framework-core/src/main/java/io/javaoperatorsdk/operator/processing/event/EventProcessor.java index b87f43f43d..ffeb49efae 100644 --- a/operator-framework-core/src/main/java/io/javaoperatorsdk/operator/processing/event/EventProcessor.java +++ b/operator-framework-core/src/main/java/io/javaoperatorsdk/operator/processing/event/EventProcessor.java @@ -1,5 +1,6 @@ package io.javaoperatorsdk.operator.processing.event; +import java.time.Duration; import java.util.HashMap; import java.util.HashSet; import java.util.Map; @@ -20,6 +21,7 @@ import io.javaoperatorsdk.operator.api.reconciler.RetryInfo; import io.javaoperatorsdk.operator.processing.LifecycleAware; import io.javaoperatorsdk.operator.processing.MDCUtils; +import io.javaoperatorsdk.operator.processing.event.rate.RateLimiter; import io.javaoperatorsdk.operator.processing.event.source.Cache; import io.javaoperatorsdk.operator.processing.event.source.controller.ResourceAction; import io.javaoperatorsdk.operator.processing.event.source.controller.ResourceEvent; @@ -44,6 +46,7 @@ class EventProcessor implements EventHandler, LifecycleAw private final Cache cache; private final EventSourceManager eventSourceManager; private final EventMarker eventMarker = new EventMarker(); + private final RateLimiter rateLimiter; EventProcessor(EventSourceManager eventSourceManager) { this( @@ -92,6 +95,8 @@ private EventProcessor( this.cache = cache; this.metrics = metrics != null ? metrics : Metrics.NOOP; this.eventSourceManager = eventSourceManager; + // todo configure + this.rateLimiter = new RateLimiter(Duration.ofSeconds(1), 5); } @Override @@ -128,6 +133,11 @@ private void submitReconciliationExecution(ResourceID resourceID) { Optional latest = cache.get(resourceID); latest.ifPresent(MDCUtils::addResourceInfo); if (!controllerUnderExecution && latest.isPresent()) { + var rateLimiterPermission = rateLimiter.acquirePermission(resourceID); + if (rateLimiterPermission.isPresent()) { + handleRateLimitedSubmission(resourceID, rateLimiterPermission.get()); + return; + } setUnderExecutionProcessing(resourceID); final var retryInfo = retryInfo(resourceID); ExecutionScope executionScope = new ExecutionScope<>(latest.get(), retryInfo); @@ -193,6 +203,13 @@ private boolean isResourceMarkedForDeletion(ResourceEvent resourceEvent) { return resourceEvent.getResource().map(HasMetadata::isMarkedForDeletion).orElse(false); } + private void handleRateLimitedSubmission(ResourceID resourceID, Duration minimalDuration) { + var minimalDurationMillis = minimalDuration.toMillis(); + log.debug("Rate limited resource: {}, rescheduled in {} millis", resourceID, + minimalDurationMillis); + retryEventSource().scheduleOnce(resourceID, minimalDurationMillis); + } + private RetryInfo retryInfo(ResourceID resourceID) { return retryState.get(resourceID); } @@ -251,11 +268,10 @@ private void reScheduleExecutionIfInstructed( postExecutionControl .getReScheduleDelay() .ifPresent(delay -> { - if (log.isDebugEnabled()) { - log.debug("ReScheduling event for resource: {} with delay: {}", - ResourceID.fromResource(customResource), delay); - } - retryEventSource().scheduleOnce(customResource, delay); + var resourceID = ResourceID.fromResource(customResource); + log.debug("ReScheduling event for resource: {} with delay: {}", + resourceID, delay); + retryEventSource().scheduleOnce(resourceID, delay); }); } @@ -289,7 +305,7 @@ private void handleRetryOnException( delay, resourceID); metrics.failedReconciliation(resourceID, exception); - retryEventSource().scheduleOnce(executionScope.getResource(), delay); + retryEventSource().scheduleOnce(resourceID, delay); }, () -> log.error("Exhausted retries for {}", executionScope)); } diff --git a/operator-framework-core/src/main/java/io/javaoperatorsdk/operator/processing/event/source/timer/TimerEventSource.java b/operator-framework-core/src/main/java/io/javaoperatorsdk/operator/processing/event/source/timer/TimerEventSource.java index 28acfdc3ac..030d6e0a87 100644 --- a/operator-framework-core/src/main/java/io/javaoperatorsdk/operator/processing/event/source/timer/TimerEventSource.java +++ b/operator-framework-core/src/main/java/io/javaoperatorsdk/operator/processing/event/source/timer/TimerEventSource.java @@ -25,16 +25,16 @@ public class TimerEventSource private final Map onceTasks = new ConcurrentHashMap<>(); - public void scheduleOnce(R resource, long delay) { + public void scheduleOnce(ResourceID resourceID, long delay) { if (!running.get()) { throw new IllegalStateException("The TimerEventSource is not running"); } - ResourceID resourceUid = ResourceID.fromResource(resource); - if (onceTasks.containsKey(resourceUid)) { - cancelOnceSchedule(resourceUid); + + if (onceTasks.containsKey(resourceID)) { + cancelOnceSchedule(resourceID); } - EventProducerTimeTask task = new EventProducerTimeTask(resourceUid); - onceTasks.put(resourceUid, task); + EventProducerTimeTask task = new EventProducerTimeTask(resourceID); + onceTasks.put(resourceID, task); timer.schedule(task, delay); } diff --git a/operator-framework-core/src/test/java/io/javaoperatorsdk/operator/processing/event/EventProcessorTest.java b/operator-framework-core/src/test/java/io/javaoperatorsdk/operator/processing/event/EventProcessorTest.java index dc1c32aeda..31caa87742 100644 --- a/operator-framework-core/src/test/java/io/javaoperatorsdk/operator/processing/event/EventProcessorTest.java +++ b/operator-framework-core/src/test/java/io/javaoperatorsdk/operator/processing/event/EventProcessorTest.java @@ -105,7 +105,8 @@ void schedulesAnEventRetryOnException() { eventProcessorWithRetry.eventProcessingFinished(executionScope, postExecutionControl); verify(retryTimerEventSourceMock, times(1)) - .scheduleOnce(eq(customResource), eq(RetryConfiguration.DEFAULT_INITIAL_INTERVAL)); + .scheduleOnce(eq(ResourceID.fromResource(customResource)), + eq(RetryConfiguration.DEFAULT_INITIAL_INTERVAL)); } @Test @@ -136,7 +137,8 @@ void executesTheControllerInstantlyAfterErrorIfNewEventsReceived() { List allValues = executionScopeArgumentCaptor.getAllValues(); assertThat(allValues).hasSize(2); verify(retryTimerEventSourceMock, never()) - .scheduleOnce(eq(customResource), eq(RetryConfiguration.DEFAULT_INITIAL_INTERVAL)); + .scheduleOnce(eq(ResourceID.fromResource(customResource)), + eq(RetryConfiguration.DEFAULT_INITIAL_INTERVAL)); } @Test diff --git a/operator-framework-core/src/test/java/io/javaoperatorsdk/operator/processing/event/source/timer/TimerEventSourceTest.java b/operator-framework-core/src/test/java/io/javaoperatorsdk/operator/processing/event/source/timer/TimerEventSourceTest.java index 2dc42c0b16..3fe3a5db58 100644 --- a/operator-framework-core/src/test/java/io/javaoperatorsdk/operator/processing/event/source/timer/TimerEventSourceTest.java +++ b/operator-framework-core/src/test/java/io/javaoperatorsdk/operator/processing/event/source/timer/TimerEventSourceTest.java @@ -37,9 +37,9 @@ public void setup() { @Test public void schedulesOnce() { - TestCustomResource customResource = TestUtils.testCustomResource(); + var resourceID = ResourceID.fromResource(TestUtils.testCustomResource()); - source.scheduleOnce(customResource, PERIOD); + source.scheduleOnce(resourceID, PERIOD); untilAsserted(() -> assertThat(eventHandler.events).hasSize(1)); untilAsserted(PERIOD * 2, 0, () -> assertThat(eventHandler.events).hasSize(1)); @@ -47,20 +47,20 @@ public void schedulesOnce() { @Test public void canCancelOnce() { - TestCustomResource customResource = TestUtils.testCustomResource(); + var resourceID = ResourceID.fromResource(TestUtils.testCustomResource()); - source.scheduleOnce(customResource, PERIOD); - source.cancelOnceSchedule(ResourceID.fromResource(customResource)); + source.scheduleOnce(resourceID, PERIOD); + source.cancelOnceSchedule(resourceID); untilAsserted(() -> assertThat(eventHandler.events).isEmpty()); } @Test public void canRescheduleOnceEvent() { - TestCustomResource customResource = TestUtils.testCustomResource(); + var resourceID = ResourceID.fromResource(TestUtils.testCustomResource()); - source.scheduleOnce(customResource, PERIOD); - source.scheduleOnce(customResource, 2 * PERIOD); + source.scheduleOnce(resourceID, PERIOD); + source.scheduleOnce(resourceID, 2 * PERIOD); untilAsserted(PERIOD * 2, PERIOD, () -> assertThat(eventHandler.events).hasSize(1)); } @@ -69,7 +69,7 @@ public void canRescheduleOnceEvent() { public void deRegistersOnceEventSources() { TestCustomResource customResource = TestUtils.testCustomResource(); - source.scheduleOnce(customResource, PERIOD); + source.scheduleOnce(ResourceID.fromResource(customResource), PERIOD); source.onResourceDeleted(customResource); untilAsserted(() -> assertThat(eventHandler.events).isEmpty()); @@ -77,16 +77,16 @@ public void deRegistersOnceEventSources() { @Test public void eventNotRegisteredIfStopped() throws IOException { - TestCustomResource customResource = TestUtils.testCustomResource(); + var resourceID = ResourceID.fromResource(TestUtils.testCustomResource()); source.stop(); assertThatExceptionOfType(IllegalStateException.class).isThrownBy( - () -> source.scheduleOnce(customResource, PERIOD)); + () -> source.scheduleOnce(resourceID, PERIOD)); } @Test public void eventNotFiredIfStopped() throws IOException { - source.scheduleOnce(TestUtils.testCustomResource(), PERIOD); + source.scheduleOnce(ResourceID.fromResource(TestUtils.testCustomResource()), PERIOD); source.stop(); untilAsserted(() -> assertThat(eventHandler.events).isEmpty()); From e5277dfa3e8d6e7a1ab1a1ac10e26826a433514a Mon Sep 17 00:00:00 2001 From: csviri Date: Thu, 16 Jun 2022 17:30:58 +0200 Subject: [PATCH 07/26] Config support start default values --- .../api/config/ControllerConfiguration.java | 6 ++ .../ControllerConfigurationOverrider.java | 9 +++ .../DefaultControllerConfiguration.java | 10 ++- .../operator/api/reconciler/RateLimit.java | 21 +++++++ .../processing/event/EventProcessor.java | 12 +++- .../event/rate/PeriodRateLimiter.java | 63 +++++++++++++++++++ .../processing/event/rate/RateLimiter.java | 42 +------------ .../processing/event/EventProcessorTest.java | 6 +- ...erTest.java => PeriodRateLimiterTest.java} | 18 ++++-- .../source/CustomResourceSelectorTest.java | 3 +- .../event/source/ResourceEventFilterTest.java | 2 +- .../ControllerResourceEventSourceTest.java | 2 +- 12 files changed, 141 insertions(+), 53 deletions(-) create mode 100644 operator-framework-core/src/main/java/io/javaoperatorsdk/operator/api/reconciler/RateLimit.java create mode 100644 operator-framework-core/src/main/java/io/javaoperatorsdk/operator/processing/event/rate/PeriodRateLimiter.java rename operator-framework-core/src/test/java/io/javaoperatorsdk/operator/processing/event/rate/{RateLimiterTest.java => PeriodRateLimiterTest.java} (77%) diff --git a/operator-framework-core/src/main/java/io/javaoperatorsdk/operator/api/config/ControllerConfiguration.java b/operator-framework-core/src/main/java/io/javaoperatorsdk/operator/api/config/ControllerConfiguration.java index 60d47d2b47..d78b45bf4a 100644 --- a/operator-framework-core/src/main/java/io/javaoperatorsdk/operator/api/config/ControllerConfiguration.java +++ b/operator-framework-core/src/main/java/io/javaoperatorsdk/operator/api/config/ControllerConfiguration.java @@ -8,6 +8,8 @@ import io.fabric8.kubernetes.api.model.HasMetadata; import io.javaoperatorsdk.operator.ReconcilerUtils; import io.javaoperatorsdk.operator.api.config.dependent.DependentResourceSpec; +import io.javaoperatorsdk.operator.processing.event.rate.PeriodRateLimiter; +import io.javaoperatorsdk.operator.processing.event.rate.RateLimiter; import io.javaoperatorsdk.operator.processing.event.source.controller.ResourceEventFilter; import io.javaoperatorsdk.operator.processing.event.source.controller.ResourceEventFilters; import io.javaoperatorsdk.operator.processing.retry.GenericRetry; @@ -43,6 +45,10 @@ default RetryConfiguration getRetryConfiguration() { return RetryConfiguration.DEFAULT; } + default RateLimiter getRateLimiter() { + return new PeriodRateLimiter(); + } + /** * Allow controllers to filter events before they are passed to the * {@link io.javaoperatorsdk.operator.processing.event.EventHandler}. diff --git a/operator-framework-core/src/main/java/io/javaoperatorsdk/operator/api/config/ControllerConfigurationOverrider.java b/operator-framework-core/src/main/java/io/javaoperatorsdk/operator/api/config/ControllerConfigurationOverrider.java index dc579bb67f..440a076428 100644 --- a/operator-framework-core/src/main/java/io/javaoperatorsdk/operator/api/config/ControllerConfigurationOverrider.java +++ b/operator-framework-core/src/main/java/io/javaoperatorsdk/operator/api/config/ControllerConfigurationOverrider.java @@ -13,6 +13,7 @@ import io.fabric8.kubernetes.api.model.HasMetadata; import io.javaoperatorsdk.operator.api.config.dependent.DependentResourceSpec; import io.javaoperatorsdk.operator.processing.dependent.kubernetes.KubernetesDependentResourceConfig; +import io.javaoperatorsdk.operator.processing.event.rate.RateLimiter; import io.javaoperatorsdk.operator.processing.event.source.controller.ResourceEventFilter; import io.javaoperatorsdk.operator.processing.retry.GenericRetry; import io.javaoperatorsdk.operator.processing.retry.Retry; @@ -35,6 +36,7 @@ public class ControllerConfigurationOverrider { private Predicate onAddFilter; private BiPredicate onUpdateFilter; private Predicate genericFilter; + private RateLimiter rateLimiter; private ControllerConfigurationOverrider(ControllerConfiguration original) { finalizer = original.getFinalizerName(); @@ -52,6 +54,7 @@ private ControllerConfigurationOverrider(ControllerConfiguration original) { this.genericFilter = original.genericFilter().orElse(null); dependentResources.forEach(drs -> namedDependentResourceSpecs.put(drs.getName(), drs)); this.original = original; + this.rateLimiter = original.getRateLimiter(); } public ControllerConfigurationOverrider withFinalizer(String finalizer) { @@ -114,6 +117,11 @@ public ControllerConfigurationOverrider withRetry(RetryConfiguration retry) { return this; } + public ControllerConfigurationOverrider withRateLimiter(RateLimiter rateLimiter) { + this.rateLimiter = rateLimiter; + return this; + } + public ControllerConfigurationOverrider withLabelSelector(String labelSelector) { this.labelSelector = labelSelector; return this; @@ -196,6 +204,7 @@ public ControllerConfiguration build() { onAddFilter, onUpdateFilter, genericFilter, + rateLimiter, newDependentSpecs); } diff --git a/operator-framework-core/src/main/java/io/javaoperatorsdk/operator/api/config/DefaultControllerConfiguration.java b/operator-framework-core/src/main/java/io/javaoperatorsdk/operator/api/config/DefaultControllerConfiguration.java index b68c5e8f4f..9cc6fd1608 100644 --- a/operator-framework-core/src/main/java/io/javaoperatorsdk/operator/api/config/DefaultControllerConfiguration.java +++ b/operator-framework-core/src/main/java/io/javaoperatorsdk/operator/api/config/DefaultControllerConfiguration.java @@ -10,6 +10,7 @@ import io.fabric8.kubernetes.api.model.HasMetadata; import io.javaoperatorsdk.operator.api.config.dependent.DependentResourceSpec; +import io.javaoperatorsdk.operator.processing.event.rate.RateLimiter; import io.javaoperatorsdk.operator.processing.event.source.controller.ResourceEventFilter; import io.javaoperatorsdk.operator.processing.retry.Retry; @@ -27,6 +28,7 @@ public class DefaultControllerConfiguration private final ResourceEventFilter resourceEventFilter; private final List dependents; private final Duration reconciliationMaxInterval; + private final RateLimiter rateLimiter; // NOSONAR constructor is meant to provide all information public DefaultControllerConfiguration( @@ -44,6 +46,7 @@ public DefaultControllerConfiguration( Predicate onAddFilter, BiPredicate onUpdateFilter, Predicate genericFilter, + RateLimiter rateLimiter, List dependents) { super(labelSelector, resourceClass, onAddFilter, onUpdateFilter, genericFilter, namespaces); this.associatedControllerClassName = associatedControllerClassName; @@ -57,7 +60,7 @@ public DefaultControllerConfiguration( ? ControllerConfiguration.super.getRetry() : retry; this.resourceEventFilter = resourceEventFilter; - + this.rateLimiter = rateLimiter; this.dependents = dependents != null ? dependents : Collections.emptyList(); } @@ -105,4 +108,9 @@ public List getDependentResources() { public Optional reconciliationMaxInterval() { return Optional.ofNullable(reconciliationMaxInterval); } + + @Override + public RateLimiter getRateLimiter() { + return rateLimiter; + } } diff --git a/operator-framework-core/src/main/java/io/javaoperatorsdk/operator/api/reconciler/RateLimit.java b/operator-framework-core/src/main/java/io/javaoperatorsdk/operator/api/reconciler/RateLimit.java new file mode 100644 index 0000000000..3ac69dbe06 --- /dev/null +++ b/operator-framework-core/src/main/java/io/javaoperatorsdk/operator/api/reconciler/RateLimit.java @@ -0,0 +1,21 @@ +package io.javaoperatorsdk.operator.api.reconciler; + +import java.lang.annotation.ElementType; +import java.lang.annotation.Retention; +import java.lang.annotation.RetentionPolicy; +import java.lang.annotation.Target; +import java.util.concurrent.TimeUnit; + +@Retention(RetentionPolicy.RUNTIME) +@Target({ElementType.TYPE}) +public @interface RateLimit { + + int limitForPeriod(); + + int refreshPeriod(); + + /** + * @return time unit for max delay between reconciliations + */ + TimeUnit refreshPeriodTimeUnit() default TimeUnit.SECONDS; +} diff --git a/operator-framework-core/src/main/java/io/javaoperatorsdk/operator/processing/event/EventProcessor.java b/operator-framework-core/src/main/java/io/javaoperatorsdk/operator/processing/event/EventProcessor.java index ffeb49efae..2047256b6a 100644 --- a/operator-framework-core/src/main/java/io/javaoperatorsdk/operator/processing/event/EventProcessor.java +++ b/operator-framework-core/src/main/java/io/javaoperatorsdk/operator/processing/event/EventProcessor.java @@ -34,6 +34,7 @@ class EventProcessor implements EventHandler, LifecycleAware { private static final Logger log = LoggerFactory.getLogger(EventProcessor.class); + private static final long MINIMAL_RATE_LIMIT_RESCHEDULE_DURATION = 50; private final Set underProcessing = new HashSet<>(); private final ReconciliationDispatcher reconciliationDispatcher; @@ -56,6 +57,7 @@ class EventProcessor implements EventHandler, LifecycleAw new ReconciliationDispatcher<>(eventSourceManager.getController()), eventSourceManager.getController().getConfiguration().getRetry(), ConfigurationServiceProvider.instance().getMetrics(), + eventSourceManager.getController().getConfiguration().getRateLimiter(), eventSourceManager); } @@ -64,6 +66,7 @@ class EventProcessor implements EventHandler, LifecycleAw EventSourceManager eventSourceManager, String relatedControllerName, Retry retry, + RateLimiter rateLimiter, Metrics metrics) { this( eventSourceManager.getControllerResourceEventSource(), @@ -72,6 +75,7 @@ class EventProcessor implements EventHandler, LifecycleAw reconciliationDispatcher, retry, metrics, + rateLimiter, eventSourceManager); } @@ -82,6 +86,7 @@ private EventProcessor( ReconciliationDispatcher reconciliationDispatcher, Retry retry, Metrics metrics, + RateLimiter rateLimiter, EventSourceManager eventSourceManager) { this.running = false; this.executor = @@ -95,8 +100,7 @@ private EventProcessor( this.cache = cache; this.metrics = metrics != null ? metrics : Metrics.NOOP; this.eventSourceManager = eventSourceManager; - // todo configure - this.rateLimiter = new RateLimiter(Duration.ofSeconds(1), 5); + this.rateLimiter = rateLimiter; } @Override @@ -207,7 +211,8 @@ private void handleRateLimitedSubmission(ResourceID resourceID, Duration minimal var minimalDurationMillis = minimalDuration.toMillis(); log.debug("Rate limited resource: {}, rescheduled in {} millis", resourceID, minimalDurationMillis); - retryEventSource().scheduleOnce(resourceID, minimalDurationMillis); + retryEventSource().scheduleOnce(resourceID, + Math.max(minimalDurationMillis, MINIMAL_RATE_LIMIT_RESCHEDULE_DURATION)); } private RetryInfo retryInfo(ResourceID resourceID) { @@ -331,6 +336,7 @@ private RetryExecution getOrInitRetryExecution(ExecutionScope executionScope) private void cleanupForDeletedEvent(ResourceID resourceID) { log.debug("Cleaning up for delete event for: {}", resourceID); eventMarker.cleanup(resourceID); + rateLimiter.clear(resourceID); metrics.cleanupDoneFor(resourceID); } diff --git a/operator-framework-core/src/main/java/io/javaoperatorsdk/operator/processing/event/rate/PeriodRateLimiter.java b/operator-framework-core/src/main/java/io/javaoperatorsdk/operator/processing/event/rate/PeriodRateLimiter.java new file mode 100644 index 0000000000..9f718fbd3c --- /dev/null +++ b/operator-framework-core/src/main/java/io/javaoperatorsdk/operator/processing/event/rate/PeriodRateLimiter.java @@ -0,0 +1,63 @@ +package io.javaoperatorsdk.operator.processing.event.rate; + +import java.time.Duration; +import java.time.LocalDateTime; +import java.util.HashMap; +import java.util.Map; +import java.util.Optional; + +import io.javaoperatorsdk.operator.processing.event.ResourceID; + +/** + * A Simple rate limiter that limits the number of permission for a time interval. + */ +public class PeriodRateLimiter implements RateLimiter { + + public static final Duration DEFAULT_REFRESH_PERIOD = Duration.ofSeconds(3); + public static final int DEFAULT_LIMIT_FOR_PERIOD = 3; + /** To turn off rate limiting set limit fod period to a non-positive number */ + public static final int NO_LIMIT_PERIOD = -1; + + private Duration refreshPeriod; + private int limitForPeriod; + + private Map limitData = new HashMap<>(); + + public PeriodRateLimiter() { + this(DEFAULT_REFRESH_PERIOD, DEFAULT_LIMIT_FOR_PERIOD); + } + + public PeriodRateLimiter(Duration refreshPeriod, int limitForPeriod) { + this.refreshPeriod = refreshPeriod; + this.limitForPeriod = limitForPeriod; + } + + /** + * @param resourceID id of the resource + * @return empty if permission acquired or minimal duration until a permission could be acquired + * again + */ + @Override + public Optional acquirePermission(ResourceID resourceID) { + if (limitForPeriod <= 0) { + return Optional.empty(); + } + var actualState = limitData.computeIfAbsent(resourceID, r -> RateState.initialState()); + if (actualState.getCount() < limitForPeriod) { + actualState.increaseCount(); + return Optional.empty(); + } else if (actualState.getLastRefreshTime() + .isBefore(LocalDateTime.now().minus(refreshPeriod))) { + actualState.reset(); + actualState.increaseCount(); + return Optional.empty(); + } else { + return Optional.of(Duration.between(actualState.getLastRefreshTime(), LocalDateTime.now())); + } + } + + @Override + public void clear(ResourceID resourceID) { + limitData.remove(resourceID); + } +} diff --git a/operator-framework-core/src/main/java/io/javaoperatorsdk/operator/processing/event/rate/RateLimiter.java b/operator-framework-core/src/main/java/io/javaoperatorsdk/operator/processing/event/rate/RateLimiter.java index 652d3b2c0d..ad29d802cf 100644 --- a/operator-framework-core/src/main/java/io/javaoperatorsdk/operator/processing/event/rate/RateLimiter.java +++ b/operator-framework-core/src/main/java/io/javaoperatorsdk/operator/processing/event/rate/RateLimiter.java @@ -1,50 +1,14 @@ package io.javaoperatorsdk.operator.processing.event.rate; import java.time.Duration; -import java.time.LocalDateTime; -import java.util.HashMap; -import java.util.Map; import java.util.Optional; import io.javaoperatorsdk.operator.processing.event.ResourceID; -/** - * A Simple rate limiter per resource. - */ -public class RateLimiter { +public interface RateLimiter { - private Duration refreshPeriod; - private int limitForPeriod; + Optional acquirePermission(ResourceID resourceID); - private Map limitData = new HashMap<>(); + void clear(ResourceID resourceID); - public RateLimiter(Duration refreshPeriod, int limitForPeriod) { - this.refreshPeriod = refreshPeriod; - this.limitForPeriod = limitForPeriod; - } - - /** - * - * @param resourceID id of the resource - * @return empty if permission acquired or minimal duration until a permission could be acquired - * again - */ - public Optional acquirePermission(ResourceID resourceID) { - var actualState = limitData.computeIfAbsent(resourceID, r -> RateState.initialState()); - if (actualState.getCount() < limitForPeriod) { - actualState.increaseCount(); - return Optional.empty(); - } else if (actualState.getLastRefreshTime() - .isBefore(LocalDateTime.now().minus(refreshPeriod))) { - actualState.reset(); - actualState.increaseCount(); - return Optional.empty(); - } else { - return Optional.of(Duration.between(actualState.getLastRefreshTime(), LocalDateTime.now())); - } - } - - public void clear(ResourceID resourceID) { - limitData.remove(resourceID); - } } diff --git a/operator-framework-core/src/test/java/io/javaoperatorsdk/operator/processing/event/EventProcessorTest.java b/operator-framework-core/src/test/java/io/javaoperatorsdk/operator/processing/event/EventProcessorTest.java index 31caa87742..087bdb2b22 100644 --- a/operator-framework-core/src/test/java/io/javaoperatorsdk/operator/processing/event/EventProcessorTest.java +++ b/operator-framework-core/src/test/java/io/javaoperatorsdk/operator/processing/event/EventProcessorTest.java @@ -17,6 +17,7 @@ import io.fabric8.kubernetes.api.model.HasMetadata; import io.javaoperatorsdk.operator.api.config.RetryConfiguration; import io.javaoperatorsdk.operator.api.monitoring.Metrics; +import io.javaoperatorsdk.operator.processing.event.rate.PeriodRateLimiter; import io.javaoperatorsdk.operator.processing.event.source.controller.ControllerResourceEventSource; import io.javaoperatorsdk.operator.processing.event.source.controller.ResourceAction; import io.javaoperatorsdk.operator.processing.event.source.controller.ResourceEvent; @@ -56,11 +57,11 @@ void setup() { .thenReturn(controllerResourceEventSourceMock); eventProcessor = spy(new EventProcessor(reconciliationDispatcherMock, eventSourceManagerMock, "Test", null, - null)); + new PeriodRateLimiter(), null)); eventProcessor.start(); eventProcessorWithRetry = spy(new EventProcessor(reconciliationDispatcherMock, eventSourceManagerMock, "Test", - GenericRetry.defaultLimitedExponentialRetry(), null)); + GenericRetry.defaultLimitedExponentialRetry(), new PeriodRateLimiter(), null)); eventProcessorWithRetry.start(); when(eventProcessor.retryEventSource()).thenReturn(retryTimerEventSourceMock); when(eventProcessorWithRetry.retryEventSource()).thenReturn(retryTimerEventSourceMock); @@ -243,6 +244,7 @@ void startProcessedMarkedEventReceivedBefore() { var crID = new ResourceID("test-cr", TEST_NAMESPACE); eventProcessor = spy(new EventProcessor(reconciliationDispatcherMock, eventSourceManagerMock, "Test", null, + new PeriodRateLimiter(), metricsMock)); when(controllerResourceEventSourceMock.get(eq(crID))) .thenReturn(Optional.of(testCustomResource())); diff --git a/operator-framework-core/src/test/java/io/javaoperatorsdk/operator/processing/event/rate/RateLimiterTest.java b/operator-framework-core/src/test/java/io/javaoperatorsdk/operator/processing/event/rate/PeriodRateLimiterTest.java similarity index 77% rename from operator-framework-core/src/test/java/io/javaoperatorsdk/operator/processing/event/rate/RateLimiterTest.java rename to operator-framework-core/src/test/java/io/javaoperatorsdk/operator/processing/event/rate/PeriodRateLimiterTest.java index e1e20c75e2..90ad8447cf 100644 --- a/operator-framework-core/src/test/java/io/javaoperatorsdk/operator/processing/event/rate/RateLimiterTest.java +++ b/operator-framework-core/src/test/java/io/javaoperatorsdk/operator/processing/event/rate/PeriodRateLimiterTest.java @@ -8,16 +8,15 @@ import io.javaoperatorsdk.operator.processing.event.ResourceID; import static org.assertj.core.api.Assertions.assertThat; -import static org.junit.jupiter.api.Assertions.*; -class RateLimiterTest { +class PeriodRateLimiterTest { public static final Duration REFRESH_PERIOD = Duration.ofMillis(300); ResourceID resourceID = ResourceID.fromResource(TestUtils.testCustomResource()); @Test void acquirePermissionForNewResource() { - var rl = new RateLimiter(REFRESH_PERIOD, 2); + var rl = new PeriodRateLimiter(REFRESH_PERIOD, 2); var res = rl.acquirePermission(resourceID); assertThat(res).isEmpty(); res = rl.acquirePermission(resourceID); @@ -29,7 +28,7 @@ void acquirePermissionForNewResource() { @Test void returnsMinimalDurationToAcquirePermission() { - var rl = new RateLimiter(REFRESH_PERIOD, 1); + var rl = new PeriodRateLimiter(REFRESH_PERIOD, 1); var res = rl.acquirePermission(resourceID); assertThat(res).isEmpty(); @@ -41,7 +40,7 @@ void returnsMinimalDurationToAcquirePermission() { @Test void resetsPeriodAfterLimit() throws InterruptedException { - var rl = new RateLimiter(REFRESH_PERIOD, 1); + var rl = new PeriodRateLimiter(REFRESH_PERIOD, 1); var res = rl.acquirePermission(resourceID); assertThat(res).isEmpty(); res = rl.acquirePermission(resourceID); @@ -54,4 +53,13 @@ void resetsPeriodAfterLimit() throws InterruptedException { assertThat(res).isEmpty(); } + @Test + void rateLimitCanBeTurnedOff() { + var rl = new PeriodRateLimiter(REFRESH_PERIOD, PeriodRateLimiter.NO_LIMIT_PERIOD); + + var res = rl.acquirePermission(resourceID); + + assertThat(res).isEmpty(); + } + } diff --git a/operator-framework-core/src/test/java/io/javaoperatorsdk/operator/processing/event/source/CustomResourceSelectorTest.java b/operator-framework-core/src/test/java/io/javaoperatorsdk/operator/processing/event/source/CustomResourceSelectorTest.java index bee750a324..19ee0bf8bd 100644 --- a/operator-framework-core/src/test/java/io/javaoperatorsdk/operator/processing/event/source/CustomResourceSelectorTest.java +++ b/operator-framework-core/src/test/java/io/javaoperatorsdk/operator/processing/event/source/CustomResourceSelectorTest.java @@ -25,6 +25,7 @@ import io.javaoperatorsdk.operator.api.reconciler.ControllerConfiguration; import io.javaoperatorsdk.operator.api.reconciler.Reconciler; import io.javaoperatorsdk.operator.api.reconciler.UpdateControl; +import io.javaoperatorsdk.operator.processing.event.rate.PeriodRateLimiter; import io.javaoperatorsdk.operator.sample.simple.TestCustomResource; import static org.assertj.core.api.Assertions.assertThat; @@ -137,7 +138,7 @@ public MyConfiguration() { super(MyController.class.getCanonicalName(), "mycontroller", null, Constants.NO_VALUE_SET, false, null, null, null, null, TestCustomResource.class, null, - null, null, null, null); + null, null, null, null,null); } } diff --git a/operator-framework-core/src/test/java/io/javaoperatorsdk/operator/processing/event/source/ResourceEventFilterTest.java b/operator-framework-core/src/test/java/io/javaoperatorsdk/operator/processing/event/source/ResourceEventFilterTest.java index 29af7b8428..1d55d0101b 100644 --- a/operator-framework-core/src/test/java/io/javaoperatorsdk/operator/processing/event/source/ResourceEventFilterTest.java +++ b/operator-framework-core/src/test/java/io/javaoperatorsdk/operator/processing/event/source/ResourceEventFilterTest.java @@ -145,7 +145,7 @@ public ControllerConfig(String finalizer, boolean generationAware, eventFilter, customResourceClass, null, - null, null, null, null); + null, null, null, null,null); } } diff --git a/operator-framework-core/src/test/java/io/javaoperatorsdk/operator/processing/event/source/controller/ControllerResourceEventSourceTest.java b/operator-framework-core/src/test/java/io/javaoperatorsdk/operator/processing/event/source/controller/ControllerResourceEventSourceTest.java index 062298754d..4140df444f 100644 --- a/operator-framework-core/src/test/java/io/javaoperatorsdk/operator/processing/event/source/controller/ControllerResourceEventSourceTest.java +++ b/operator-framework-core/src/test/java/io/javaoperatorsdk/operator/processing/event/source/controller/ControllerResourceEventSourceTest.java @@ -187,7 +187,7 @@ public TestConfiguration(boolean generationAware, Predicate null, TestCustomResource.class, null, - onAddFilter, onUpdateFilter, genericFilter, null); + onAddFilter, onUpdateFilter, genericFilter,null, null); } } } From 0b0388a7700b950fab4765cb9ebf68e8c7203520 Mon Sep 17 00:00:00 2001 From: csviri Date: Thu, 16 Jun 2022 17:32:37 +0200 Subject: [PATCH 08/26] format --- .../processing/event/source/CustomResourceSelectorTest.java | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/operator-framework-core/src/test/java/io/javaoperatorsdk/operator/processing/event/source/CustomResourceSelectorTest.java b/operator-framework-core/src/test/java/io/javaoperatorsdk/operator/processing/event/source/CustomResourceSelectorTest.java index 19ee0bf8bd..90eb12eb21 100644 --- a/operator-framework-core/src/test/java/io/javaoperatorsdk/operator/processing/event/source/CustomResourceSelectorTest.java +++ b/operator-framework-core/src/test/java/io/javaoperatorsdk/operator/processing/event/source/CustomResourceSelectorTest.java @@ -137,8 +137,8 @@ public static class MyConfiguration extends DefaultControllerConfiguration Date: Fri, 17 Jun 2022 08:47:53 +0200 Subject: [PATCH 09/26] annotation controller config --- .../AnnotationControllerConfiguration.java | 25 ++++++++++++++----- .../reconciler/ControllerConfiguration.java | 5 ++++ .../{RateLimit.java => RateLimiter.java} | 2 +- 3 files changed, 25 insertions(+), 7 deletions(-) rename operator-framework-core/src/main/java/io/javaoperatorsdk/operator/api/reconciler/{RateLimit.java => RateLimiter.java} (94%) diff --git a/operator-framework-core/src/main/java/io/javaoperatorsdk/operator/api/config/AnnotationControllerConfiguration.java b/operator-framework-core/src/main/java/io/javaoperatorsdk/operator/api/config/AnnotationControllerConfiguration.java index 3cee86802c..c124949996 100644 --- a/operator-framework-core/src/main/java/io/javaoperatorsdk/operator/api/config/AnnotationControllerConfiguration.java +++ b/operator-framework-core/src/main/java/io/javaoperatorsdk/operator/api/config/AnnotationControllerConfiguration.java @@ -27,6 +27,8 @@ import io.javaoperatorsdk.operator.processing.dependent.kubernetes.KubernetesDependentResource; import io.javaoperatorsdk.operator.processing.dependent.kubernetes.KubernetesDependentResourceConfig; import io.javaoperatorsdk.operator.processing.dependent.workflow.Condition; +import io.javaoperatorsdk.operator.processing.event.rate.PeriodRateLimiter; +import io.javaoperatorsdk.operator.processing.event.rate.RateLimiter; import io.javaoperatorsdk.operator.processing.event.source.controller.ResourceEventFilter; import io.javaoperatorsdk.operator.processing.event.source.controller.ResourceEventFilters; import io.javaoperatorsdk.operator.processing.event.source.filter.VoidGenericFilter; @@ -150,16 +152,27 @@ public Optional reconciliationMaxInterval() { } } + @Override + public RateLimiter getRateLimiter() { + if (annotation.rateLimiter() != null) { + return new PeriodRateLimiter(Duration.of(annotation.rateLimiter().refreshPeriod(), + annotation.rateLimiter().refreshPeriodTimeUnit().toChronoUnit()), + annotation.rateLimiter().limitForPeriod()); + } else { + return io.javaoperatorsdk.operator.api.config.ControllerConfiguration.super.getRateLimiter(); + } + } + @Override @SuppressWarnings("unchecked") public Optional> onAddFilter() { return (Optional>) createFilter(annotation.onAddFilter(), FilterType.onAdd, - annotation.getClass().getSimpleName()); + annotation.getClass().getSimpleName()); } private enum FilterType { onAdd(VoidOnAddFilter.class), onUpdate(VoidOnUpdateFilter.class), onDelete( - VoidOnDeleteFilter.class), generic(VoidGenericFilter.class); + VoidOnDeleteFilter.class), generic(VoidGenericFilter.class); final Class defaultValue; @@ -176,11 +189,11 @@ private Optional createFilter(Class filter, FilterType filterType, Str var instance = (T) filter.getDeclaredConstructor().newInstance(); return Optional.of(instance); } catch (InstantiationException | IllegalAccessException | InvocationTargetException - | NoSuchMethodException e) { + | NoSuchMethodException e) { throw new OperatorException( - "Couldn't create " + filterType + " filter from " + filter.getName() + " class in " - + origin + " for reconciler " + getName(), - e); + "Couldn't create " + filterType + " filter from " + filter.getName() + " class in " + + origin + " for reconciler " + getName(), + e); } } } diff --git a/operator-framework-core/src/main/java/io/javaoperatorsdk/operator/api/reconciler/ControllerConfiguration.java b/operator-framework-core/src/main/java/io/javaoperatorsdk/operator/api/reconciler/ControllerConfiguration.java index 4c6621bb57..2211cdec51 100644 --- a/operator-framework-core/src/main/java/io/javaoperatorsdk/operator/api/reconciler/ControllerConfiguration.java +++ b/operator-framework-core/src/main/java/io/javaoperatorsdk/operator/api/reconciler/ControllerConfiguration.java @@ -9,6 +9,7 @@ import io.fabric8.kubernetes.api.model.HasMetadata; import io.javaoperatorsdk.operator.api.reconciler.dependent.Dependent; +import io.javaoperatorsdk.operator.processing.event.rate.PeriodRateLimiter; import io.javaoperatorsdk.operator.processing.event.source.controller.ResourceEventFilter; import io.javaoperatorsdk.operator.processing.event.source.filter.VoidGenericFilter; import io.javaoperatorsdk.operator.processing.event.source.filter.VoidOnAddFilter; @@ -91,6 +92,10 @@ ReconciliationMaxInterval reconciliationMaxInterval() default @ReconciliationMaxInterval( interval = 10); + + RateLimiter rateLimiter() default @RateLimiter(limitForPeriod = PeriodRateLimiter.NO_LIMIT_PERIOD, + refreshPeriod = 1); + /** * Optional list of {@link Dependent} configurations which associate a resource type to a * {@link io.javaoperatorsdk.operator.api.reconciler.dependent.DependentResource} implementation diff --git a/operator-framework-core/src/main/java/io/javaoperatorsdk/operator/api/reconciler/RateLimit.java b/operator-framework-core/src/main/java/io/javaoperatorsdk/operator/api/reconciler/RateLimiter.java similarity index 94% rename from operator-framework-core/src/main/java/io/javaoperatorsdk/operator/api/reconciler/RateLimit.java rename to operator-framework-core/src/main/java/io/javaoperatorsdk/operator/api/reconciler/RateLimiter.java index 3ac69dbe06..f2601f3274 100644 --- a/operator-framework-core/src/main/java/io/javaoperatorsdk/operator/api/reconciler/RateLimit.java +++ b/operator-framework-core/src/main/java/io/javaoperatorsdk/operator/api/reconciler/RateLimiter.java @@ -8,7 +8,7 @@ @Retention(RetentionPolicy.RUNTIME) @Target({ElementType.TYPE}) -public @interface RateLimit { +public @interface RateLimiter { int limitForPeriod(); From e26a789ae5c0a7990a07b2f8bd406991a535e98a Mon Sep 17 00:00:00 2001 From: csviri Date: Fri, 17 Jun 2022 09:07:55 +0200 Subject: [PATCH 10/26] wip --- .../operator/processing/event/rate/PeriodRateLimiter.java | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/operator-framework-core/src/main/java/io/javaoperatorsdk/operator/processing/event/rate/PeriodRateLimiter.java b/operator-framework-core/src/main/java/io/javaoperatorsdk/operator/processing/event/rate/PeriodRateLimiter.java index 9f718fbd3c..983bf3bc40 100644 --- a/operator-framework-core/src/main/java/io/javaoperatorsdk/operator/processing/event/rate/PeriodRateLimiter.java +++ b/operator-framework-core/src/main/java/io/javaoperatorsdk/operator/processing/event/rate/PeriodRateLimiter.java @@ -13,7 +13,7 @@ */ public class PeriodRateLimiter implements RateLimiter { - public static final Duration DEFAULT_REFRESH_PERIOD = Duration.ofSeconds(3); + public static final Duration DEFAULT_REFRESH_PERIOD = Duration.ofSeconds(2); public static final int DEFAULT_LIMIT_FOR_PERIOD = 3; /** To turn off rate limiting set limit fod period to a non-positive number */ public static final int NO_LIMIT_PERIOD = -1; From 28addc8a3e21e6bd17cd7cd3858ad420dd0b1aad Mon Sep 17 00:00:00 2001 From: csviri Date: Fri, 17 Jun 2022 09:51:50 +0200 Subject: [PATCH 11/26] test fix --- .../operator/api/reconciler/RateLimiter.java | 6 ++++-- .../processing/event/rate/PeriodRateLimiter.java | 5 ++++- .../event/source/CustomResourceSelectorTest.java | 9 +++------ 3 files changed, 11 insertions(+), 9 deletions(-) diff --git a/operator-framework-core/src/main/java/io/javaoperatorsdk/operator/api/reconciler/RateLimiter.java b/operator-framework-core/src/main/java/io/javaoperatorsdk/operator/api/reconciler/RateLimiter.java index f2601f3274..5ce9b7a54f 100644 --- a/operator-framework-core/src/main/java/io/javaoperatorsdk/operator/api/reconciler/RateLimiter.java +++ b/operator-framework-core/src/main/java/io/javaoperatorsdk/operator/api/reconciler/RateLimiter.java @@ -6,13 +6,15 @@ import java.lang.annotation.Target; import java.util.concurrent.TimeUnit; +import io.javaoperatorsdk.operator.processing.event.rate.PeriodRateLimiter; + @Retention(RetentionPolicy.RUNTIME) @Target({ElementType.TYPE}) public @interface RateLimiter { - int limitForPeriod(); + int limitForPeriod() default PeriodRateLimiter.NO_LIMIT_PERIOD; - int refreshPeriod(); + int refreshPeriod() default PeriodRateLimiter.DEFAULT_REFRESH_PERIOD_SECONDS; /** * @return time unit for max delay between reconciliations diff --git a/operator-framework-core/src/main/java/io/javaoperatorsdk/operator/processing/event/rate/PeriodRateLimiter.java b/operator-framework-core/src/main/java/io/javaoperatorsdk/operator/processing/event/rate/PeriodRateLimiter.java index 983bf3bc40..c4d6f3e470 100644 --- a/operator-framework-core/src/main/java/io/javaoperatorsdk/operator/processing/event/rate/PeriodRateLimiter.java +++ b/operator-framework-core/src/main/java/io/javaoperatorsdk/operator/processing/event/rate/PeriodRateLimiter.java @@ -13,7 +13,10 @@ */ public class PeriodRateLimiter implements RateLimiter { - public static final Duration DEFAULT_REFRESH_PERIOD = Duration.ofSeconds(2); + public static final int DEFAULT_REFRESH_PERIOD_SECONDS = 2; + public static final Duration DEFAULT_REFRESH_PERIOD = + Duration.ofSeconds(DEFAULT_REFRESH_PERIOD_SECONDS); + public static final int DEFAULT_LIMIT_FOR_PERIOD = 3; /** To turn off rate limiting set limit fod period to a non-positive number */ public static final int NO_LIMIT_PERIOD = -1; diff --git a/operator-framework-core/src/test/java/io/javaoperatorsdk/operator/processing/event/source/CustomResourceSelectorTest.java b/operator-framework-core/src/test/java/io/javaoperatorsdk/operator/processing/event/source/CustomResourceSelectorTest.java index 90eb12eb21..225464c95c 100644 --- a/operator-framework-core/src/test/java/io/javaoperatorsdk/operator/processing/event/source/CustomResourceSelectorTest.java +++ b/operator-framework-core/src/test/java/io/javaoperatorsdk/operator/processing/event/source/CustomResourceSelectorTest.java @@ -20,11 +20,7 @@ import io.javaoperatorsdk.operator.api.config.ConfigurationServiceProvider; import io.javaoperatorsdk.operator.api.config.DefaultControllerConfiguration; import io.javaoperatorsdk.operator.api.config.Version; -import io.javaoperatorsdk.operator.api.reconciler.Constants; -import io.javaoperatorsdk.operator.api.reconciler.Context; -import io.javaoperatorsdk.operator.api.reconciler.ControllerConfiguration; -import io.javaoperatorsdk.operator.api.reconciler.Reconciler; -import io.javaoperatorsdk.operator.api.reconciler.UpdateControl; +import io.javaoperatorsdk.operator.api.reconciler.*; import io.javaoperatorsdk.operator.processing.event.rate.PeriodRateLimiter; import io.javaoperatorsdk.operator.sample.simple.TestCustomResource; @@ -142,7 +138,8 @@ public MyConfiguration() { } } - @ControllerConfiguration(namespaces = NAMESPACE) + @ControllerConfiguration(namespaces = NAMESPACE, + rateLimiter = @RateLimiter(limitForPeriod = PeriodRateLimiter.DEFAULT_LIMIT_FOR_PERIOD)) public static class MyController implements Reconciler { private final Consumer consumer; From 95a7fd50fe36a1827bc7722eb90aae50511c5a77 Mon Sep 17 00:00:00 2001 From: csviri Date: Fri, 17 Jun 2022 10:48:15 +0200 Subject: [PATCH 12/26] unit test --- .../processing/event/EventProcessorTest.java | 26 +++++++++++++++++-- .../source/CustomResourceSelectorTest.java | 6 +++-- 2 files changed, 28 insertions(+), 4 deletions(-) diff --git a/operator-framework-core/src/test/java/io/javaoperatorsdk/operator/processing/event/EventProcessorTest.java b/operator-framework-core/src/test/java/io/javaoperatorsdk/operator/processing/event/EventProcessorTest.java index 087bdb2b22..ae46fe5583 100644 --- a/operator-framework-core/src/test/java/io/javaoperatorsdk/operator/processing/event/EventProcessorTest.java +++ b/operator-framework-core/src/test/java/io/javaoperatorsdk/operator/processing/event/EventProcessorTest.java @@ -18,6 +18,7 @@ import io.javaoperatorsdk.operator.api.config.RetryConfiguration; import io.javaoperatorsdk.operator.api.monitoring.Metrics; import io.javaoperatorsdk.operator.processing.event.rate.PeriodRateLimiter; +import io.javaoperatorsdk.operator.processing.event.rate.RateLimiter; import io.javaoperatorsdk.operator.processing.event.source.controller.ControllerResourceEventSource; import io.javaoperatorsdk.operator.processing.event.source.controller.ResourceAction; import io.javaoperatorsdk.operator.processing.event.source.controller.ResourceEvent; @@ -50,6 +51,7 @@ class EventProcessorTest { private Metrics metricsMock = mock(Metrics.class); private EventProcessor eventProcessor; private EventProcessor eventProcessorWithRetry; + private RateLimiter rateLimiterMock = mock(RateLimiter.class); @BeforeEach void setup() { @@ -57,14 +59,15 @@ void setup() { .thenReturn(controllerResourceEventSourceMock); eventProcessor = spy(new EventProcessor(reconciliationDispatcherMock, eventSourceManagerMock, "Test", null, - new PeriodRateLimiter(), null)); + rateLimiterMock, null)); eventProcessor.start(); eventProcessorWithRetry = spy(new EventProcessor(reconciliationDispatcherMock, eventSourceManagerMock, "Test", - GenericRetry.defaultLimitedExponentialRetry(), new PeriodRateLimiter(), null)); + GenericRetry.defaultLimitedExponentialRetry(), rateLimiterMock, null)); eventProcessorWithRetry.start(); when(eventProcessor.retryEventSource()).thenReturn(retryTimerEventSourceMock); when(eventProcessorWithRetry.retryEventSource()).thenReturn(retryTimerEventSourceMock); + when(rateLimiterMock.acquirePermission(any())).thenReturn(Optional.empty()); } @Test @@ -331,6 +334,25 @@ void newResourceAfterMissedDeleteEvent() { verify(reconciliationDispatcherMock, timeout(50).times(1)).handleExecution(any()); } + @Test + void rateLimitsReconciliationSubmission() throws InterruptedException { + // the refresh period value does not matter here + var refreshPeriod = Duration.ofMillis(100); + var event = prepareCREvent(); + + when(rateLimiterMock.acquirePermission(event.getRelatedCustomResourceID())) + .thenReturn(Optional.empty()) + .thenReturn(Optional.of(refreshPeriod)); + + eventProcessor.handleEvent(event); + verify(reconciliationDispatcherMock, after(FAKE_CONTROLLER_EXECUTION_DURATION).times(1)) + .handleExecution(any()); + verify(retryTimerEventSourceMock, times(0)).scheduleOnce(any(), anyLong()); + + eventProcessor.handleEvent(event); + verify(retryTimerEventSourceMock, times(1)).scheduleOnce(any(), anyLong()); + } + private ResourceID eventAlreadyUnderProcessing() { when(reconciliationDispatcherMock.handleExecution(any())) .then( diff --git a/operator-framework-core/src/test/java/io/javaoperatorsdk/operator/processing/event/source/CustomResourceSelectorTest.java b/operator-framework-core/src/test/java/io/javaoperatorsdk/operator/processing/event/source/CustomResourceSelectorTest.java index 225464c95c..e0f7da1965 100644 --- a/operator-framework-core/src/test/java/io/javaoperatorsdk/operator/processing/event/source/CustomResourceSelectorTest.java +++ b/operator-framework-core/src/test/java/io/javaoperatorsdk/operator/processing/event/source/CustomResourceSelectorTest.java @@ -52,7 +52,10 @@ void setUpResources() { configurationService = spy(ConfigurationService.class); when(configurationService.checkCRDAndValidateLocalModel()).thenReturn(false); when(configurationService.getVersion()).thenReturn(new Version("1", "1", new Date())); + // make sure not the same config instance is used for the controller, so rate limiter is not + // shared when(configurationService.getConfigurationFor(any(MyController.class))) + .thenReturn(new MyConfiguration()) .thenReturn(new MyConfiguration()); } @@ -138,8 +141,7 @@ public MyConfiguration() { } } - @ControllerConfiguration(namespaces = NAMESPACE, - rateLimiter = @RateLimiter(limitForPeriod = PeriodRateLimiter.DEFAULT_LIMIT_FOR_PERIOD)) + @ControllerConfiguration(namespaces = NAMESPACE) public static class MyController implements Reconciler { private final Consumer consumer; From 52394dc5dc957bba9af6a96abe57da463d3f8ba0 Mon Sep 17 00:00:00 2001 From: csviri Date: Fri, 17 Jun 2022 14:37:06 +0200 Subject: [PATCH 13/26] integration test added --- .../javaoperatorsdk/operator/RateLimitIT.java | 62 +++++++++++++++++++ .../ratelimit/RateLimitCustomResource.java | 16 +++++ .../RateLimitCustomResourceSpec.java | 15 +++++ .../RateLimitCustomResourceStatus.java | 7 +++ .../sample/ratelimit/RateLimitReconciler.java | 30 +++++++++ 5 files changed, 130 insertions(+) create mode 100644 operator-framework/src/test/java/io/javaoperatorsdk/operator/RateLimitIT.java create mode 100644 operator-framework/src/test/java/io/javaoperatorsdk/operator/sample/ratelimit/RateLimitCustomResource.java create mode 100644 operator-framework/src/test/java/io/javaoperatorsdk/operator/sample/ratelimit/RateLimitCustomResourceSpec.java create mode 100644 operator-framework/src/test/java/io/javaoperatorsdk/operator/sample/ratelimit/RateLimitCustomResourceStatus.java create mode 100644 operator-framework/src/test/java/io/javaoperatorsdk/operator/sample/ratelimit/RateLimitReconciler.java diff --git a/operator-framework/src/test/java/io/javaoperatorsdk/operator/RateLimitIT.java b/operator-framework/src/test/java/io/javaoperatorsdk/operator/RateLimitIT.java new file mode 100644 index 0000000000..25509155de --- /dev/null +++ b/operator-framework/src/test/java/io/javaoperatorsdk/operator/RateLimitIT.java @@ -0,0 +1,62 @@ +package io.javaoperatorsdk.operator; + +import java.time.Duration; +import java.util.stream.IntStream; + +import org.junit.jupiter.api.Test; +import org.junit.jupiter.api.extension.RegisterExtension; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import io.fabric8.kubernetes.api.model.ObjectMetaBuilder; +import io.javaoperatorsdk.operator.junit.LocallyRunOperatorExtension; +import io.javaoperatorsdk.operator.sample.ratelimit.RateLimitCustomResource; +import io.javaoperatorsdk.operator.sample.ratelimit.RateLimitCustomResourceSpec; +import io.javaoperatorsdk.operator.sample.ratelimit.RateLimitReconciler; + +import static io.javaoperatorsdk.operator.sample.ratelimit.RateLimitReconciler.REFRESH_PERIOD; +import static org.assertj.core.api.Assertions.assertThat; +import static org.awaitility.Awaitility.await; + +class RateLimitIT { + + private final static Logger log = LoggerFactory.getLogger(RateLimitIT.class); + + @RegisterExtension + LocallyRunOperatorExtension operator = + LocallyRunOperatorExtension.builder() + .withReconciler(new RateLimitReconciler()) + .build(); + + @Test + void rateLimitsExecution() { + var res = operator.create(RateLimitCustomResource.class, createResource()); + IntStream.rangeClosed(1, 5).forEach(i -> { + log.debug("replacing resource version: {}", i); + var resource = createResource(); + resource.getSpec().setNumber(i); + operator.replace(RateLimitCustomResource.class, resource); + }); + await().pollInterval(Duration.ofMillis(100)) + .pollDelay(Duration.ofMillis(REFRESH_PERIOD / 2)) + .untilAsserted(() -> assertThat( + operator.getReconcilerOfType(RateLimitReconciler.class).getNumberOfExecutions()) + .isEqualTo(1)); + + await().pollDelay(Duration.ofMillis(REFRESH_PERIOD)) + .untilAsserted(() -> assertThat( + operator.getReconcilerOfType(RateLimitReconciler.class).getNumberOfExecutions()) + .isEqualTo(2)); + } + + public RateLimitCustomResource createResource() { + RateLimitCustomResource res = new RateLimitCustomResource(); + res.setMetadata(new ObjectMetaBuilder() + .withName("test") + .build()); + res.setSpec(new RateLimitCustomResourceSpec()); + res.getSpec().setNumber(0); + return res; + } + +} diff --git a/operator-framework/src/test/java/io/javaoperatorsdk/operator/sample/ratelimit/RateLimitCustomResource.java b/operator-framework/src/test/java/io/javaoperatorsdk/operator/sample/ratelimit/RateLimitCustomResource.java new file mode 100644 index 0000000000..60456e6a4d --- /dev/null +++ b/operator-framework/src/test/java/io/javaoperatorsdk/operator/sample/ratelimit/RateLimitCustomResource.java @@ -0,0 +1,16 @@ +package io.javaoperatorsdk.operator.sample.ratelimit; + +import io.fabric8.kubernetes.api.model.Namespaced; +import io.fabric8.kubernetes.client.CustomResource; +import io.fabric8.kubernetes.model.annotation.Group; +import io.fabric8.kubernetes.model.annotation.ShortNames; +import io.fabric8.kubernetes.model.annotation.Version; + +@Group("sample.javaoperatorsdk") +@Version("v1") +@ShortNames("rlc") +public class RateLimitCustomResource + extends CustomResource + implements Namespaced { + +} diff --git a/operator-framework/src/test/java/io/javaoperatorsdk/operator/sample/ratelimit/RateLimitCustomResourceSpec.java b/operator-framework/src/test/java/io/javaoperatorsdk/operator/sample/ratelimit/RateLimitCustomResourceSpec.java new file mode 100644 index 0000000000..7dbee7f75a --- /dev/null +++ b/operator-framework/src/test/java/io/javaoperatorsdk/operator/sample/ratelimit/RateLimitCustomResourceSpec.java @@ -0,0 +1,15 @@ +package io.javaoperatorsdk.operator.sample.ratelimit; + +public class RateLimitCustomResourceSpec { + + private int number; + + public int getNumber() { + return number; + } + + public RateLimitCustomResourceSpec setNumber(int number) { + this.number = number; + return this; + } +} diff --git a/operator-framework/src/test/java/io/javaoperatorsdk/operator/sample/ratelimit/RateLimitCustomResourceStatus.java b/operator-framework/src/test/java/io/javaoperatorsdk/operator/sample/ratelimit/RateLimitCustomResourceStatus.java new file mode 100644 index 0000000000..087408fc16 --- /dev/null +++ b/operator-framework/src/test/java/io/javaoperatorsdk/operator/sample/ratelimit/RateLimitCustomResourceStatus.java @@ -0,0 +1,7 @@ +package io.javaoperatorsdk.operator.sample.ratelimit; + +import io.javaoperatorsdk.operator.api.ObservedGenerationAwareStatus; + +public class RateLimitCustomResourceStatus extends ObservedGenerationAwareStatus { + +} diff --git a/operator-framework/src/test/java/io/javaoperatorsdk/operator/sample/ratelimit/RateLimitReconciler.java b/operator-framework/src/test/java/io/javaoperatorsdk/operator/sample/ratelimit/RateLimitReconciler.java new file mode 100644 index 0000000000..0538ea0916 --- /dev/null +++ b/operator-framework/src/test/java/io/javaoperatorsdk/operator/sample/ratelimit/RateLimitReconciler.java @@ -0,0 +1,30 @@ +package io.javaoperatorsdk.operator.sample.ratelimit; + +import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicInteger; + +import io.javaoperatorsdk.operator.api.reconciler.*; + +@ControllerConfiguration(rateLimiter = @RateLimiter(limitForPeriod = 1, + refreshPeriod = RateLimitReconciler.REFRESH_PERIOD, + refreshPeriodTimeUnit = TimeUnit.MILLISECONDS)) +public class RateLimitReconciler + implements Reconciler { + + public static final int REFRESH_PERIOD = 3000; + + private final AtomicInteger numberOfExecutions = new AtomicInteger(0); + + @Override + public UpdateControl reconcile( + RateLimitCustomResource resource, + Context context) { + + numberOfExecutions.addAndGet(1); + return UpdateControl.noUpdate(); + } + + public int getNumberOfExecutions() { + return numberOfExecutions.get(); + } +} From cd217089aa7e0512e18db7c08fd8b5365c46e00b Mon Sep 17 00:00:00 2001 From: csviri Date: Fri, 17 Jun 2022 15:59:33 +0200 Subject: [PATCH 14/26] default rate limit --- docs/documentation/features.md | 3 +++ .../operator/api/reconciler/ControllerConfiguration.java | 4 +--- .../javaoperatorsdk/operator/api/reconciler/RateLimiter.java | 2 +- .../operator/processing/event/rate/PeriodRateLimiter.java | 2 +- 4 files changed, 6 insertions(+), 5 deletions(-) diff --git a/docs/documentation/features.md b/docs/documentation/features.md index 7a604dc14e..bceb4dd3e5 100644 --- a/docs/documentation/features.md +++ b/docs/documentation/features.md @@ -324,6 +324,9 @@ intersections: will still happen, but won't reset the retry, will be still marked as the last attempt in the retry info. The point (1) still holds, but in case of an error, no retry will happen. +## Rate Limiting Reconciliation + + ## Handling Related Events with Event Sources See also this [blog post](https://csviri.medium.com/java-operator-sdk-introduction-to-event-sources-a1aab5af4b7b). diff --git a/operator-framework-core/src/main/java/io/javaoperatorsdk/operator/api/reconciler/ControllerConfiguration.java b/operator-framework-core/src/main/java/io/javaoperatorsdk/operator/api/reconciler/ControllerConfiguration.java index 2211cdec51..bd6b82ebf6 100644 --- a/operator-framework-core/src/main/java/io/javaoperatorsdk/operator/api/reconciler/ControllerConfiguration.java +++ b/operator-framework-core/src/main/java/io/javaoperatorsdk/operator/api/reconciler/ControllerConfiguration.java @@ -9,7 +9,6 @@ import io.fabric8.kubernetes.api.model.HasMetadata; import io.javaoperatorsdk.operator.api.reconciler.dependent.Dependent; -import io.javaoperatorsdk.operator.processing.event.rate.PeriodRateLimiter; import io.javaoperatorsdk.operator.processing.event.source.controller.ResourceEventFilter; import io.javaoperatorsdk.operator.processing.event.source.filter.VoidGenericFilter; import io.javaoperatorsdk.operator.processing.event.source.filter.VoidOnAddFilter; @@ -93,8 +92,7 @@ ReconciliationMaxInterval reconciliationMaxInterval() default @ReconciliationMax interval = 10); - RateLimiter rateLimiter() default @RateLimiter(limitForPeriod = PeriodRateLimiter.NO_LIMIT_PERIOD, - refreshPeriod = 1); + RateLimiter rateLimiter() default @RateLimiter; /** * Optional list of {@link Dependent} configurations which associate a resource type to a diff --git a/operator-framework-core/src/main/java/io/javaoperatorsdk/operator/api/reconciler/RateLimiter.java b/operator-framework-core/src/main/java/io/javaoperatorsdk/operator/api/reconciler/RateLimiter.java index 5ce9b7a54f..32f443114f 100644 --- a/operator-framework-core/src/main/java/io/javaoperatorsdk/operator/api/reconciler/RateLimiter.java +++ b/operator-framework-core/src/main/java/io/javaoperatorsdk/operator/api/reconciler/RateLimiter.java @@ -12,7 +12,7 @@ @Target({ElementType.TYPE}) public @interface RateLimiter { - int limitForPeriod() default PeriodRateLimiter.NO_LIMIT_PERIOD; + int limitForPeriod() default PeriodRateLimiter.DEFAULT_LIMIT_FOR_PERIOD; int refreshPeriod() default PeriodRateLimiter.DEFAULT_REFRESH_PERIOD_SECONDS; diff --git a/operator-framework-core/src/main/java/io/javaoperatorsdk/operator/processing/event/rate/PeriodRateLimiter.java b/operator-framework-core/src/main/java/io/javaoperatorsdk/operator/processing/event/rate/PeriodRateLimiter.java index c4d6f3e470..de75f0884e 100644 --- a/operator-framework-core/src/main/java/io/javaoperatorsdk/operator/processing/event/rate/PeriodRateLimiter.java +++ b/operator-framework-core/src/main/java/io/javaoperatorsdk/operator/processing/event/rate/PeriodRateLimiter.java @@ -14,10 +14,10 @@ public class PeriodRateLimiter implements RateLimiter { public static final int DEFAULT_REFRESH_PERIOD_SECONDS = 2; + public static final int DEFAULT_LIMIT_FOR_PERIOD = 3; public static final Duration DEFAULT_REFRESH_PERIOD = Duration.ofSeconds(DEFAULT_REFRESH_PERIOD_SECONDS); - public static final int DEFAULT_LIMIT_FOR_PERIOD = 3; /** To turn off rate limiting set limit fod period to a non-positive number */ public static final int NO_LIMIT_PERIOD = -1; From 4c03511f6153ba635a9c0804809be98422b10bc5 Mon Sep 17 00:00:00 2001 From: csviri Date: Fri, 17 Jun 2022 16:10:38 +0200 Subject: [PATCH 15/26] rate limiter turned off by default --- .../javaoperatorsdk/operator/api/reconciler/RateLimiter.java | 2 +- .../operator/processing/event/rate/PeriodRateLimiter.java | 5 ----- .../operator/processing/event/rate/RateLimiter.java | 5 +++++ 3 files changed, 6 insertions(+), 6 deletions(-) diff --git a/operator-framework-core/src/main/java/io/javaoperatorsdk/operator/api/reconciler/RateLimiter.java b/operator-framework-core/src/main/java/io/javaoperatorsdk/operator/api/reconciler/RateLimiter.java index 32f443114f..5ce9b7a54f 100644 --- a/operator-framework-core/src/main/java/io/javaoperatorsdk/operator/api/reconciler/RateLimiter.java +++ b/operator-framework-core/src/main/java/io/javaoperatorsdk/operator/api/reconciler/RateLimiter.java @@ -12,7 +12,7 @@ @Target({ElementType.TYPE}) public @interface RateLimiter { - int limitForPeriod() default PeriodRateLimiter.DEFAULT_LIMIT_FOR_PERIOD; + int limitForPeriod() default PeriodRateLimiter.NO_LIMIT_PERIOD; int refreshPeriod() default PeriodRateLimiter.DEFAULT_REFRESH_PERIOD_SECONDS; diff --git a/operator-framework-core/src/main/java/io/javaoperatorsdk/operator/processing/event/rate/PeriodRateLimiter.java b/operator-framework-core/src/main/java/io/javaoperatorsdk/operator/processing/event/rate/PeriodRateLimiter.java index de75f0884e..f4b9c4e967 100644 --- a/operator-framework-core/src/main/java/io/javaoperatorsdk/operator/processing/event/rate/PeriodRateLimiter.java +++ b/operator-framework-core/src/main/java/io/javaoperatorsdk/operator/processing/event/rate/PeriodRateLimiter.java @@ -35,11 +35,6 @@ public PeriodRateLimiter(Duration refreshPeriod, int limitForPeriod) { this.limitForPeriod = limitForPeriod; } - /** - * @param resourceID id of the resource - * @return empty if permission acquired or minimal duration until a permission could be acquired - * again - */ @Override public Optional acquirePermission(ResourceID resourceID) { if (limitForPeriod <= 0) { diff --git a/operator-framework-core/src/main/java/io/javaoperatorsdk/operator/processing/event/rate/RateLimiter.java b/operator-framework-core/src/main/java/io/javaoperatorsdk/operator/processing/event/rate/RateLimiter.java index ad29d802cf..e76043e8c5 100644 --- a/operator-framework-core/src/main/java/io/javaoperatorsdk/operator/processing/event/rate/RateLimiter.java +++ b/operator-framework-core/src/main/java/io/javaoperatorsdk/operator/processing/event/rate/RateLimiter.java @@ -7,6 +7,11 @@ public interface RateLimiter { + /** + * @param resourceID id of the resource + * @return empty if permission acquired or minimal duration until a permission could be acquired + * again + */ Optional acquirePermission(ResourceID resourceID); void clear(ResourceID resourceID); From 4884a9bea694e812bbe2ae3124f1a89baaa9d0ae Mon Sep 17 00:00:00 2001 From: csviri Date: Fri, 17 Jun 2022 16:29:24 +0200 Subject: [PATCH 16/26] docs --- docs/documentation/features.md | 25 ++++++++++++++++++- .../AnnotationControllerConfiguration.java | 8 +++--- .../reconciler/ControllerConfiguration.java | 2 +- .../{RateLimiter.java => RateLimit.java} | 2 +- .../processing/event/rate/RateLimiter.java | 3 +++ .../sample/ratelimit/RateLimitReconciler.java | 2 +- .../operator/sample/WebPageReconciler.java | 13 +++------- 7 files changed, 38 insertions(+), 17 deletions(-) rename operator-framework-core/src/main/java/io/javaoperatorsdk/operator/api/reconciler/{RateLimiter.java => RateLimit.java} (95%) diff --git a/docs/documentation/features.md b/docs/documentation/features.md index bceb4dd3e5..d2f57a8bdb 100644 --- a/docs/documentation/features.md +++ b/docs/documentation/features.md @@ -324,7 +324,30 @@ intersections: will still happen, but won't reset the retry, will be still marked as the last attempt in the retry info. The point (1) still holds, but in case of an error, no retry will happen. -## Rate Limiting Reconciliation +## Rate Limiting + +It is possible to rate limit reconciliation for a resource. Thus rate limiting is per resource, +and it takes precedence over retry and re-schedule configurations. So for example event a retry is scheduled in +1 seconds but this does not meet the rate limit, the next reconciliation will be postponed according rate limiting rules; +however never cancelled, just executed as early as possible according rate limit configuration. + +Rate limiting is by default turned off, since correct configuration depends on the reconciler implementation, and +how long an execution takes. +(The parallelism of reconciliation itself can be limited [`ConfigurationService`](https://github.com/java-operator-sdk/java-operator-sdk/blob/ce4d996ee073ebef5715737995fc3d33f4751275/operator-framework-core/src/main/java/io/javaoperatorsdk/operator/api/config/ConfigurationService.java#L120-L120) +by setting appropriate ExecutorService.) + +A default implementation of rate limiter is provided, see: [`PeriodRateLimiter`](https://github.com/java-operator-sdk/java-operator-sdk/blob/ce4d996ee073ebef5715737995fc3d33f4751275/operator-framework-core/src/main/java/io/javaoperatorsdk/operator/processing/event/rate/PeriodRateLimiter.java#L14-L14). +Users can override it with a custom implementation of +[`RateLimiter`](https://github.com/java-operator-sdk/java-operator-sdk/blob/ce4d996ee073ebef5715737995fc3d33f4751275/operator-framework-core/src/main/java/io/javaoperatorsdk/operator/processing/event/rate/RateLimiter.java) +interface. + +To configure the default rate limiter use `@ControllerConfiguration` annotation. The following configuration limits +the reconciliation to 2 in 3 seconds: + +`@ControllerConfiguration(rateLimit = @RateLimit(limitForPeriod = 2,refreshPeriod = 3,refreshPeriodTimeUnit = TimeUnit.SECONDS))`. + +That means if the reconciler executed twice in one second, it will wait at least additional two seconds before it is +reconciled again. ## Handling Related Events with Event Sources diff --git a/operator-framework-core/src/main/java/io/javaoperatorsdk/operator/api/config/AnnotationControllerConfiguration.java b/operator-framework-core/src/main/java/io/javaoperatorsdk/operator/api/config/AnnotationControllerConfiguration.java index c124949996..c2a63c1707 100644 --- a/operator-framework-core/src/main/java/io/javaoperatorsdk/operator/api/config/AnnotationControllerConfiguration.java +++ b/operator-framework-core/src/main/java/io/javaoperatorsdk/operator/api/config/AnnotationControllerConfiguration.java @@ -154,10 +154,10 @@ public Optional reconciliationMaxInterval() { @Override public RateLimiter getRateLimiter() { - if (annotation.rateLimiter() != null) { - return new PeriodRateLimiter(Duration.of(annotation.rateLimiter().refreshPeriod(), - annotation.rateLimiter().refreshPeriodTimeUnit().toChronoUnit()), - annotation.rateLimiter().limitForPeriod()); + if (annotation.rateLimit() != null) { + return new PeriodRateLimiter(Duration.of(annotation.rateLimit().refreshPeriod(), + annotation.rateLimit().refreshPeriodTimeUnit().toChronoUnit()), + annotation.rateLimit().limitForPeriod()); } else { return io.javaoperatorsdk.operator.api.config.ControllerConfiguration.super.getRateLimiter(); } diff --git a/operator-framework-core/src/main/java/io/javaoperatorsdk/operator/api/reconciler/ControllerConfiguration.java b/operator-framework-core/src/main/java/io/javaoperatorsdk/operator/api/reconciler/ControllerConfiguration.java index bd6b82ebf6..3d86c74779 100644 --- a/operator-framework-core/src/main/java/io/javaoperatorsdk/operator/api/reconciler/ControllerConfiguration.java +++ b/operator-framework-core/src/main/java/io/javaoperatorsdk/operator/api/reconciler/ControllerConfiguration.java @@ -92,7 +92,7 @@ ReconciliationMaxInterval reconciliationMaxInterval() default @ReconciliationMax interval = 10); - RateLimiter rateLimiter() default @RateLimiter; + RateLimit rateLimit() default @RateLimit; /** * Optional list of {@link Dependent} configurations which associate a resource type to a diff --git a/operator-framework-core/src/main/java/io/javaoperatorsdk/operator/api/reconciler/RateLimiter.java b/operator-framework-core/src/main/java/io/javaoperatorsdk/operator/api/reconciler/RateLimit.java similarity index 95% rename from operator-framework-core/src/main/java/io/javaoperatorsdk/operator/api/reconciler/RateLimiter.java rename to operator-framework-core/src/main/java/io/javaoperatorsdk/operator/api/reconciler/RateLimit.java index 5ce9b7a54f..6c4feb2b2a 100644 --- a/operator-framework-core/src/main/java/io/javaoperatorsdk/operator/api/reconciler/RateLimiter.java +++ b/operator-framework-core/src/main/java/io/javaoperatorsdk/operator/api/reconciler/RateLimit.java @@ -10,7 +10,7 @@ @Retention(RetentionPolicy.RUNTIME) @Target({ElementType.TYPE}) -public @interface RateLimiter { +public @interface RateLimit { int limitForPeriod() default PeriodRateLimiter.NO_LIMIT_PERIOD; diff --git a/operator-framework-core/src/main/java/io/javaoperatorsdk/operator/processing/event/rate/RateLimiter.java b/operator-framework-core/src/main/java/io/javaoperatorsdk/operator/processing/event/rate/RateLimiter.java index e76043e8c5..7281315e98 100644 --- a/operator-framework-core/src/main/java/io/javaoperatorsdk/operator/processing/event/rate/RateLimiter.java +++ b/operator-framework-core/src/main/java/io/javaoperatorsdk/operator/processing/event/rate/RateLimiter.java @@ -14,6 +14,9 @@ public interface RateLimiter { */ Optional acquirePermission(ResourceID resourceID); + /** + * Cleanup state. Called when resource is deleted. + */ void clear(ResourceID resourceID); } diff --git a/operator-framework/src/test/java/io/javaoperatorsdk/operator/sample/ratelimit/RateLimitReconciler.java b/operator-framework/src/test/java/io/javaoperatorsdk/operator/sample/ratelimit/RateLimitReconciler.java index 0538ea0916..b4a77a8fc0 100644 --- a/operator-framework/src/test/java/io/javaoperatorsdk/operator/sample/ratelimit/RateLimitReconciler.java +++ b/operator-framework/src/test/java/io/javaoperatorsdk/operator/sample/ratelimit/RateLimitReconciler.java @@ -5,7 +5,7 @@ import io.javaoperatorsdk.operator.api.reconciler.*; -@ControllerConfiguration(rateLimiter = @RateLimiter(limitForPeriod = 1, +@ControllerConfiguration(rateLimit = @RateLimit(limitForPeriod = 1, refreshPeriod = RateLimitReconciler.REFRESH_PERIOD, refreshPeriodTimeUnit = TimeUnit.MILLISECONDS)) public class RateLimitReconciler diff --git a/sample-operators/webpage/src/main/java/io/javaoperatorsdk/operator/sample/WebPageReconciler.java b/sample-operators/webpage/src/main/java/io/javaoperatorsdk/operator/sample/WebPageReconciler.java index 19b130da88..2aa1674564 100644 --- a/sample-operators/webpage/src/main/java/io/javaoperatorsdk/operator/sample/WebPageReconciler.java +++ b/sample-operators/webpage/src/main/java/io/javaoperatorsdk/operator/sample/WebPageReconciler.java @@ -3,6 +3,7 @@ import java.util.HashMap; import java.util.Map; import java.util.Objects; +import java.util.concurrent.TimeUnit; import org.apache.commons.lang3.StringUtils; import org.slf4j.Logger; @@ -18,14 +19,7 @@ import io.fabric8.kubernetes.client.KubernetesClient; import io.javaoperatorsdk.operator.ReconcilerUtils; import io.javaoperatorsdk.operator.api.config.informer.InformerConfiguration; -import io.javaoperatorsdk.operator.api.reconciler.Context; -import io.javaoperatorsdk.operator.api.reconciler.ControllerConfiguration; -import io.javaoperatorsdk.operator.api.reconciler.ErrorStatusHandler; -import io.javaoperatorsdk.operator.api.reconciler.ErrorStatusUpdateControl; -import io.javaoperatorsdk.operator.api.reconciler.EventSourceContext; -import io.javaoperatorsdk.operator.api.reconciler.EventSourceInitializer; -import io.javaoperatorsdk.operator.api.reconciler.Reconciler; -import io.javaoperatorsdk.operator.api.reconciler.UpdateControl; +import io.javaoperatorsdk.operator.api.reconciler.*; import io.javaoperatorsdk.operator.processing.event.source.EventSource; import io.javaoperatorsdk.operator.processing.event.source.informer.InformerEventSource; @@ -33,7 +27,8 @@ import static io.javaoperatorsdk.operator.sample.WebPageManagedDependentsReconciler.SELECTOR; /** Shows how to implement reconciler using the low level api directly. */ -@ControllerConfiguration +@ControllerConfiguration(rateLimit = @RateLimit(limitForPeriod = 2, refreshPeriod = 3, + refreshPeriodTimeUnit = TimeUnit.SECONDS)) public class WebPageReconciler implements Reconciler, ErrorStatusHandler, EventSourceInitializer { From 16f2d57ab46693123194f3512ebe518b05d9167e Mon Sep 17 00:00:00 2001 From: csviri Date: Fri, 17 Jun 2022 16:58:59 +0200 Subject: [PATCH 17/26] default change --- .../operator/processing/event/rate/PeriodRateLimiter.java | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/operator-framework-core/src/main/java/io/javaoperatorsdk/operator/processing/event/rate/PeriodRateLimiter.java b/operator-framework-core/src/main/java/io/javaoperatorsdk/operator/processing/event/rate/PeriodRateLimiter.java index f4b9c4e967..caa6075ee7 100644 --- a/operator-framework-core/src/main/java/io/javaoperatorsdk/operator/processing/event/rate/PeriodRateLimiter.java +++ b/operator-framework-core/src/main/java/io/javaoperatorsdk/operator/processing/event/rate/PeriodRateLimiter.java @@ -13,7 +13,7 @@ */ public class PeriodRateLimiter implements RateLimiter { - public static final int DEFAULT_REFRESH_PERIOD_SECONDS = 2; + public static final int DEFAULT_REFRESH_PERIOD_SECONDS = 10; public static final int DEFAULT_LIMIT_FOR_PERIOD = 3; public static final Duration DEFAULT_REFRESH_PERIOD = Duration.ofSeconds(DEFAULT_REFRESH_PERIOD_SECONDS); From 50ada53d9e087d070996c9ce8aa96baa82b109b8 Mon Sep 17 00:00:00 2001 From: csviri Date: Mon, 20 Jun 2022 07:42:51 +0200 Subject: [PATCH 18/26] format --- .../io/javaoperatorsdk/operator/processing/Controller.java | 4 ---- .../dependent/kubernetes/CRUDKubernetesDependentResource.java | 1 - .../dependent/kubernetes/KubernetesDependentResource.java | 1 - 3 files changed, 6 deletions(-) diff --git a/operator-framework-core/src/main/java/io/javaoperatorsdk/operator/processing/Controller.java b/operator-framework-core/src/main/java/io/javaoperatorsdk/operator/processing/Controller.java index 00b7753998..acbf2f2632 100644 --- a/operator-framework-core/src/main/java/io/javaoperatorsdk/operator/processing/Controller.java +++ b/operator-framework-core/src/main/java/io/javaoperatorsdk/operator/processing/Controller.java @@ -35,10 +35,6 @@ import io.javaoperatorsdk.operator.api.reconciler.dependent.managed.DefaultManagedDependentResourceContext; import io.javaoperatorsdk.operator.processing.dependent.workflow.ManagedWorkflow; import io.javaoperatorsdk.operator.processing.dependent.workflow.WorkflowCleanupResult; -import io.javaoperatorsdk.operator.api.reconciler.dependent.GarbageCollected; -import io.javaoperatorsdk.operator.api.reconciler.dependent.managed.DependentResourceConfigurator; -import io.javaoperatorsdk.operator.api.reconciler.dependent.managed.KubernetesClientAware; -import io.javaoperatorsdk.operator.api.reconciler.dependent.managed.ManagedDependentResourceException; import io.javaoperatorsdk.operator.processing.event.EventSourceManager; import static io.javaoperatorsdk.operator.api.reconciler.Constants.WATCH_CURRENT_NAMESPACE; diff --git a/operator-framework-core/src/main/java/io/javaoperatorsdk/operator/processing/dependent/kubernetes/CRUDKubernetesDependentResource.java b/operator-framework-core/src/main/java/io/javaoperatorsdk/operator/processing/dependent/kubernetes/CRUDKubernetesDependentResource.java index 7f8834bad3..9440898d2f 100644 --- a/operator-framework-core/src/main/java/io/javaoperatorsdk/operator/processing/dependent/kubernetes/CRUDKubernetesDependentResource.java +++ b/operator-framework-core/src/main/java/io/javaoperatorsdk/operator/processing/dependent/kubernetes/CRUDKubernetesDependentResource.java @@ -1,7 +1,6 @@ package io.javaoperatorsdk.operator.processing.dependent.kubernetes; import io.fabric8.kubernetes.api.model.HasMetadata; -import io.javaoperatorsdk.operator.api.reconciler.dependent.GarbageCollected; import io.javaoperatorsdk.operator.api.reconciler.Ignore; import io.javaoperatorsdk.operator.api.reconciler.dependent.GarbageCollected; import io.javaoperatorsdk.operator.processing.dependent.Creator; diff --git a/operator-framework-core/src/main/java/io/javaoperatorsdk/operator/processing/dependent/kubernetes/KubernetesDependentResource.java b/operator-framework-core/src/main/java/io/javaoperatorsdk/operator/processing/dependent/kubernetes/KubernetesDependentResource.java index d215e7b12f..ff5cc308bf 100644 --- a/operator-framework-core/src/main/java/io/javaoperatorsdk/operator/processing/dependent/kubernetes/KubernetesDependentResource.java +++ b/operator-framework-core/src/main/java/io/javaoperatorsdk/operator/processing/dependent/kubernetes/KubernetesDependentResource.java @@ -19,7 +19,6 @@ import io.javaoperatorsdk.operator.api.reconciler.EventSourceContext; import io.javaoperatorsdk.operator.api.reconciler.Ignore; import io.javaoperatorsdk.operator.api.reconciler.dependent.GarbageCollected; -import io.javaoperatorsdk.operator.api.reconciler.dependent.GarbageCollected; import io.javaoperatorsdk.operator.api.reconciler.dependent.managed.DependentResourceConfigurator; import io.javaoperatorsdk.operator.api.reconciler.dependent.managed.KubernetesClientAware; import io.javaoperatorsdk.operator.processing.dependent.AbstractEventSourceHolderDependentResource; From f9e30f08857ea63530240cff7baf33b7841900eb Mon Sep 17 00:00:00 2001 From: csviri Date: Tue, 28 Jun 2022 09:34:58 +0200 Subject: [PATCH 19/26] rebase on next --- .../java/io/javaoperatorsdk/operator/api/config/UtilsTest.java | 2 -- .../operator/junit/LocallyRunOperatorExtension.java | 2 +- 2 files changed, 1 insertion(+), 3 deletions(-) diff --git a/operator-framework-core/src/test/java/io/javaoperatorsdk/operator/api/config/UtilsTest.java b/operator-framework-core/src/test/java/io/javaoperatorsdk/operator/api/config/UtilsTest.java index fa43a350ee..87e60b8aa6 100644 --- a/operator-framework-core/src/test/java/io/javaoperatorsdk/operator/api/config/UtilsTest.java +++ b/operator-framework-core/src/test/java/io/javaoperatorsdk/operator/api/config/UtilsTest.java @@ -9,8 +9,6 @@ import io.javaoperatorsdk.operator.api.reconciler.Reconciler; import io.javaoperatorsdk.operator.api.reconciler.UpdateControl; import io.javaoperatorsdk.operator.api.reconciler.dependent.DependentResource; -import io.javaoperatorsdk.operator.api.reconciler.dependent.ReconcileResult; -import io.javaoperatorsdk.operator.processing.dependent.EmptyTestDependentResource; import io.javaoperatorsdk.operator.processing.dependent.EmptyTestDependentResource; import io.javaoperatorsdk.operator.processing.dependent.kubernetes.KubernetesDependentResource; import io.javaoperatorsdk.operator.sample.simple.TestCustomResource; diff --git a/operator-framework-junit5/src/main/java/io/javaoperatorsdk/operator/junit/LocallyRunOperatorExtension.java b/operator-framework-junit5/src/main/java/io/javaoperatorsdk/operator/junit/LocallyRunOperatorExtension.java index f6d98a7b45..e2f4234453 100644 --- a/operator-framework-junit5/src/main/java/io/javaoperatorsdk/operator/junit/LocallyRunOperatorExtension.java +++ b/operator-framework-junit5/src/main/java/io/javaoperatorsdk/operator/junit/LocallyRunOperatorExtension.java @@ -239,7 +239,7 @@ public Builder withPortForward(String namespace, String labelKey, String labelVa public Builder withAdditionalCustomResourceDefinition( - Class customResource) { + Class customResource) { additionalCustomResourceDefinitions.add(customResource); return this; } From 398f343880bc25d1a01a77f6a2e8a80aa7735197 Mon Sep 17 00:00:00 2001 From: csviri Date: Mon, 4 Jul 2022 14:04:12 +0200 Subject: [PATCH 20/26] ordering params --- .../operator/processing/event/EventProcessor.java | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/operator-framework-core/src/main/java/io/javaoperatorsdk/operator/processing/event/EventProcessor.java b/operator-framework-core/src/main/java/io/javaoperatorsdk/operator/processing/event/EventProcessor.java index 2047256b6a..2580a1318e 100644 --- a/operator-framework-core/src/main/java/io/javaoperatorsdk/operator/processing/event/EventProcessor.java +++ b/operator-framework-core/src/main/java/io/javaoperatorsdk/operator/processing/event/EventProcessor.java @@ -36,6 +36,7 @@ class EventProcessor implements EventHandler, LifecycleAw private static final Logger log = LoggerFactory.getLogger(EventProcessor.class); private static final long MINIMAL_RATE_LIMIT_RESCHEDULE_DURATION = 50; + private volatile boolean running; private final Set underProcessing = new HashSet<>(); private final ReconciliationDispatcher reconciliationDispatcher; private final Retry retry; @@ -43,7 +44,6 @@ class EventProcessor implements EventHandler, LifecycleAw private final ExecutorService executor; private final String controllerName; private final Metrics metrics; - private volatile boolean running; private final Cache cache; private final EventSourceManager eventSourceManager; private final EventMarker eventMarker = new EventMarker(); From c5f1a37311b24f00f7adfcddfcac2d9fe3b448a0 Mon Sep 17 00:00:00 2001 From: Chris Laprun Date: Mon, 4 Jul 2022 11:20:32 +0200 Subject: [PATCH 21/26] docs: improve/clarify --- docs/documentation/features.md | 45 ++++++++++++++++++++-------------- 1 file changed, 27 insertions(+), 18 deletions(-) diff --git a/docs/documentation/features.md b/docs/documentation/features.md index d2f57a8bdb..91778ee275 100644 --- a/docs/documentation/features.md +++ b/docs/documentation/features.md @@ -326,28 +326,37 @@ intersections: ## Rate Limiting -It is possible to rate limit reconciliation for a resource. Thus rate limiting is per resource, -and it takes precedence over retry and re-schedule configurations. So for example event a retry is scheduled in -1 seconds but this does not meet the rate limit, the next reconciliation will be postponed according rate limiting rules; -however never cancelled, just executed as early as possible according rate limit configuration. - -Rate limiting is by default turned off, since correct configuration depends on the reconciler implementation, and -how long an execution takes. -(The parallelism of reconciliation itself can be limited [`ConfigurationService`](https://github.com/java-operator-sdk/java-operator-sdk/blob/ce4d996ee073ebef5715737995fc3d33f4751275/operator-framework-core/src/main/java/io/javaoperatorsdk/operator/api/config/ConfigurationService.java#L120-L120) -by setting appropriate ExecutorService.) - -A default implementation of rate limiter is provided, see: [`PeriodRateLimiter`](https://github.com/java-operator-sdk/java-operator-sdk/blob/ce4d996ee073ebef5715737995fc3d33f4751275/operator-framework-core/src/main/java/io/javaoperatorsdk/operator/processing/event/rate/PeriodRateLimiter.java#L14-L14). -Users can override it with a custom implementation of +It is possible to rate limit reconciliation on a per-resource basis. The rate limit also takes +precedence over retry/re-schedule configurations: for example, even if a retry was scheduled for +the next second but this request would make the resource go over its rate limit, the next +reconciliation will be postponed according to the rate limiting rules. Note that the +reconciliation is never cancelled, it will just be executed as early as possible based on rate +limitations. + +Rate limiting is by default turned **off**, since correct configuration depends on the reconciler +implementation, in particular, on how long a typical reconciliation takes. +(The parallelism of reconciliation itself can be +limited [`ConfigurationService`](https://github.com/java-operator-sdk/java-operator-sdk/blob/ce4d996ee073ebef5715737995fc3d33f4751275/operator-framework-core/src/main/java/io/javaoperatorsdk/operator/api/config/ConfigurationService.java#L120-L120) +by configuring the `ExecutorService` appropriately.) + +A default rate limiter implementation is provided, see: +[`PeriodRateLimiter`](https://github.com/java-operator-sdk/java-operator-sdk/blob/ce4d996ee073ebef5715737995fc3d33f4751275/operator-framework-core/src/main/java/io/javaoperatorsdk/operator/processing/event/rate/PeriodRateLimiter.java#L14-L14) +. +Users can override it by implementing their own [`RateLimiter`](https://github.com/java-operator-sdk/java-operator-sdk/blob/ce4d996ee073ebef5715737995fc3d33f4751275/operator-framework-core/src/main/java/io/javaoperatorsdk/operator/processing/event/rate/RateLimiter.java) -interface. +. -To configure the default rate limiter use `@ControllerConfiguration` annotation. The following configuration limits -the reconciliation to 2 in 3 seconds: +To configure the default rate limiter use `@ControllerConfiguration` annotation. The following +configuration limits +each resource to reconcile at most twice within a 3 second interval: -`@ControllerConfiguration(rateLimit = @RateLimit(limitForPeriod = 2,refreshPeriod = 3,refreshPeriodTimeUnit = TimeUnit.SECONDS))`. +`@ControllerConfiguration(rateLimit = @RateLimit(limitForPeriod = 2,refreshPeriod = 3,refreshPeriodTimeUnit = TimeUnit.SECONDS))` +. -That means if the reconciler executed twice in one second, it will wait at least additional two seconds before it is -reconciled again. +Thus, if a given resource was reconciled twice in one second, no further reconciliation for this +resource will happen before two seconds have elapsed. Note that, since rate is limited on a +per-resource basis, other resources can still be reconciled at the same time, as long, of course, +that they stay within their own rate limits. ## Handling Related Events with Event Sources From aee5d59f2c57e0f9588f3b93b0106edc40184c65 Mon Sep 17 00:00:00 2001 From: Chris Laprun Date: Mon, 4 Jul 2022 11:59:00 +0200 Subject: [PATCH 22/26] fix: do not create a new RateLimiter every time by default --- .../operator/api/config/ControllerConfiguration.java | 4 +++- 1 file changed, 3 insertions(+), 1 deletion(-) diff --git a/operator-framework-core/src/main/java/io/javaoperatorsdk/operator/api/config/ControllerConfiguration.java b/operator-framework-core/src/main/java/io/javaoperatorsdk/operator/api/config/ControllerConfiguration.java index d78b45bf4a..339e06f894 100644 --- a/operator-framework-core/src/main/java/io/javaoperatorsdk/operator/api/config/ControllerConfiguration.java +++ b/operator-framework-core/src/main/java/io/javaoperatorsdk/operator/api/config/ControllerConfiguration.java @@ -17,6 +17,8 @@ public interface ControllerConfiguration extends ResourceConfiguration { + RateLimiter DEFAULT_RATE_LIMITER = new PeriodRateLimiter(); + default String getName() { return ReconcilerUtils.getDefaultReconcilerName(getAssociatedReconcilerClassName()); } @@ -46,7 +48,7 @@ default RetryConfiguration getRetryConfiguration() { } default RateLimiter getRateLimiter() { - return new PeriodRateLimiter(); + return DEFAULT_RATE_LIMITER; } /** From bc535288dd1475949634e022c8f5e3681ce0fa48 Mon Sep 17 00:00:00 2001 From: Chris Laprun Date: Mon, 4 Jul 2022 12:08:36 +0200 Subject: [PATCH 23/26] fix: restore backwards compatibility --- .../event/source/timer/TimerEventSource.java | 3 ++ .../processing/event/EventProcessorTest.java | 39 ++++++++++++------- 2 files changed, 28 insertions(+), 14 deletions(-) diff --git a/operator-framework-core/src/main/java/io/javaoperatorsdk/operator/processing/event/source/timer/TimerEventSource.java b/operator-framework-core/src/main/java/io/javaoperatorsdk/operator/processing/event/source/timer/TimerEventSource.java index 030d6e0a87..f22400453a 100644 --- a/operator-framework-core/src/main/java/io/javaoperatorsdk/operator/processing/event/source/timer/TimerEventSource.java +++ b/operator-framework-core/src/main/java/io/javaoperatorsdk/operator/processing/event/source/timer/TimerEventSource.java @@ -24,6 +24,9 @@ public class TimerEventSource private final AtomicBoolean running = new AtomicBoolean(); private final Map onceTasks = new ConcurrentHashMap<>(); + public void scheduleOnce(R resource, long delay) { + scheduleOnce(ResourceID.fromResource(resource), delay); + } public void scheduleOnce(ResourceID resourceID, long delay) { if (!running.get()) { diff --git a/operator-framework-core/src/test/java/io/javaoperatorsdk/operator/processing/event/EventProcessorTest.java b/operator-framework-core/src/test/java/io/javaoperatorsdk/operator/processing/event/EventProcessorTest.java index ae46fe5583..b692d66522 100644 --- a/operator-framework-core/src/test/java/io/javaoperatorsdk/operator/processing/event/EventProcessorTest.java +++ b/operator-framework-core/src/test/java/io/javaoperatorsdk/operator/processing/event/EventProcessorTest.java @@ -32,8 +32,19 @@ import static org.awaitility.Awaitility.await; import static org.mockito.ArgumentMatchers.eq; import static org.mockito.ArgumentMatchers.isNull; -import static org.mockito.Mockito.*; - +import static org.mockito.Mockito.after; +import static org.mockito.Mockito.any; +import static org.mockito.Mockito.anyLong; +import static org.mockito.Mockito.doAnswer; +import static org.mockito.Mockito.mock; +import static org.mockito.Mockito.never; +import static org.mockito.Mockito.spy; +import static org.mockito.Mockito.timeout; +import static org.mockito.Mockito.times; +import static org.mockito.Mockito.verify; +import static org.mockito.Mockito.when; + +@SuppressWarnings({"rawtypes", "unchecked"}) class EventProcessorTest { private static final Logger log = LoggerFactory.getLogger(EventProcessorTest.class); @@ -42,16 +53,16 @@ class EventProcessorTest { public static final int SEPARATE_EXECUTION_TIMEOUT = 450; public static final String TEST_NAMESPACE = "default-event-handler-test"; - private ReconciliationDispatcher reconciliationDispatcherMock = + private final ReconciliationDispatcher reconciliationDispatcherMock = mock(ReconciliationDispatcher.class); - private EventSourceManager eventSourceManagerMock = mock(EventSourceManager.class); - private TimerEventSource retryTimerEventSourceMock = mock(TimerEventSource.class); - private ControllerResourceEventSource controllerResourceEventSourceMock = + private final EventSourceManager eventSourceManagerMock = mock(EventSourceManager.class); + private final TimerEventSource retryTimerEventSourceMock = mock(TimerEventSource.class); + private final ControllerResourceEventSource controllerResourceEventSourceMock = mock(ControllerResourceEventSource.class); - private Metrics metricsMock = mock(Metrics.class); + private final Metrics metricsMock = mock(Metrics.class); private EventProcessor eventProcessor; private EventProcessor eventProcessorWithRetry; - private RateLimiter rateLimiterMock = mock(RateLimiter.class); + private final RateLimiter rateLimiterMock = mock(RateLimiter.class); @BeforeEach void setup() { @@ -89,7 +100,7 @@ void skipProcessingIfLatestCustomResourceNotInCache() { } @Test - void ifExecutionInProgressWaitsUntilItsFinished() throws InterruptedException { + void ifExecutionInProgressWaitsUntilItsFinished() { ResourceID resourceUid = eventAlreadyUnderProcessing(); eventProcessor.handleEvent(nonCREvent(resourceUid)); @@ -204,7 +215,7 @@ void scheduleTimedEventIfInstructedByPostExecutionControl() { eventProcessor.handleEvent(prepareCREvent()); verify(retryTimerEventSourceMock, timeout(SEPARATE_EXECUTION_TIMEOUT).times(1)) - .scheduleOnce(any(), eq(testDelay)); + .scheduleOnce((HasMetadata) any(), eq(testDelay)); } @Test @@ -220,7 +231,7 @@ void reScheduleOnlyIfNotExecutedEventsReceivedMeanwhile() throws InterruptedExce verify(retryTimerEventSourceMock, after((long) (FAKE_CONTROLLER_EXECUTION_DURATION * 1.5)).times(0)) - .scheduleOnce(any(), eq(testDelay)); + .scheduleOnce((HasMetadata) any(), eq(testDelay)); } @Test @@ -335,7 +346,7 @@ void newResourceAfterMissedDeleteEvent() { } @Test - void rateLimitsReconciliationSubmission() throws InterruptedException { + void rateLimitsReconciliationSubmission() { // the refresh period value does not matter here var refreshPeriod = Duration.ofMillis(100); var event = prepareCREvent(); @@ -347,10 +358,10 @@ void rateLimitsReconciliationSubmission() throws InterruptedException { eventProcessor.handleEvent(event); verify(reconciliationDispatcherMock, after(FAKE_CONTROLLER_EXECUTION_DURATION).times(1)) .handleExecution(any()); - verify(retryTimerEventSourceMock, times(0)).scheduleOnce(any(), anyLong()); + verify(retryTimerEventSourceMock, times(0)).scheduleOnce((HasMetadata) any(), anyLong()); eventProcessor.handleEvent(event); - verify(retryTimerEventSourceMock, times(1)).scheduleOnce(any(), anyLong()); + verify(retryTimerEventSourceMock, times(1)).scheduleOnce((HasMetadata) any(), anyLong()); } private ResourceID eventAlreadyUnderProcessing() { From 8bd34283c328424667461598d11e13f29739b6eb Mon Sep 17 00:00:00 2001 From: Chris Laprun Date: Mon, 4 Jul 2022 13:13:34 +0200 Subject: [PATCH 24/26] fix: proper cast --- .../operator/processing/event/EventProcessorTest.java | 8 ++++---- 1 file changed, 4 insertions(+), 4 deletions(-) diff --git a/operator-framework-core/src/test/java/io/javaoperatorsdk/operator/processing/event/EventProcessorTest.java b/operator-framework-core/src/test/java/io/javaoperatorsdk/operator/processing/event/EventProcessorTest.java index b692d66522..9dbab9c5eb 100644 --- a/operator-framework-core/src/test/java/io/javaoperatorsdk/operator/processing/event/EventProcessorTest.java +++ b/operator-framework-core/src/test/java/io/javaoperatorsdk/operator/processing/event/EventProcessorTest.java @@ -215,7 +215,7 @@ void scheduleTimedEventIfInstructedByPostExecutionControl() { eventProcessor.handleEvent(prepareCREvent()); verify(retryTimerEventSourceMock, timeout(SEPARATE_EXECUTION_TIMEOUT).times(1)) - .scheduleOnce((HasMetadata) any(), eq(testDelay)); + .scheduleOnce((ResourceID) any(), eq(testDelay)); } @Test @@ -231,7 +231,7 @@ void reScheduleOnlyIfNotExecutedEventsReceivedMeanwhile() throws InterruptedExce verify(retryTimerEventSourceMock, after((long) (FAKE_CONTROLLER_EXECUTION_DURATION * 1.5)).times(0)) - .scheduleOnce((HasMetadata) any(), eq(testDelay)); + .scheduleOnce((ResourceID) any(), eq(testDelay)); } @Test @@ -358,10 +358,10 @@ void rateLimitsReconciliationSubmission() { eventProcessor.handleEvent(event); verify(reconciliationDispatcherMock, after(FAKE_CONTROLLER_EXECUTION_DURATION).times(1)) .handleExecution(any()); - verify(retryTimerEventSourceMock, times(0)).scheduleOnce((HasMetadata) any(), anyLong()); + verify(retryTimerEventSourceMock, times(0)).scheduleOnce((ResourceID) any(), anyLong()); eventProcessor.handleEvent(event); - verify(retryTimerEventSourceMock, times(1)).scheduleOnce((HasMetadata) any(), anyLong()); + verify(retryTimerEventSourceMock, times(1)).scheduleOnce((ResourceID) any(), anyLong()); } private ResourceID eventAlreadyUnderProcessing() { From 47ee101ea56381c865fb41498e42040939bf134d Mon Sep 17 00:00:00 2001 From: csviri Date: Mon, 4 Jul 2022 14:34:44 +0200 Subject: [PATCH 25/26] format --- .../operator/processing/event/EventProcessorTest.java | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/operator-framework-core/src/test/java/io/javaoperatorsdk/operator/processing/event/EventProcessorTest.java b/operator-framework-core/src/test/java/io/javaoperatorsdk/operator/processing/event/EventProcessorTest.java index 9dbab9c5eb..788325ebe8 100644 --- a/operator-framework-core/src/test/java/io/javaoperatorsdk/operator/processing/event/EventProcessorTest.java +++ b/operator-framework-core/src/test/java/io/javaoperatorsdk/operator/processing/event/EventProcessorTest.java @@ -231,7 +231,7 @@ void reScheduleOnlyIfNotExecutedEventsReceivedMeanwhile() throws InterruptedExce verify(retryTimerEventSourceMock, after((long) (FAKE_CONTROLLER_EXECUTION_DURATION * 1.5)).times(0)) - .scheduleOnce((ResourceID) any(), eq(testDelay)); + .scheduleOnce((ResourceID) any(), eq(testDelay)); } @Test From 544860a0a9cbc1fd2fe12d323c2dc0b0e74e1f5d Mon Sep 17 00:00:00 2001 From: csviri Date: Mon, 4 Jul 2022 17:00:00 +0200 Subject: [PATCH 26/26] rebase next --- .../config/AnnotationControllerConfiguration.java | 12 ++++++------ .../operator/ControllerManagerTest.java | 2 +- .../event/source/CustomResourceSelectorTest.java | 2 +- .../event/source/ResourceEventFilterTest.java | 2 +- .../ControllerResourceEventSourceTest.java | 2 +- 5 files changed, 10 insertions(+), 10 deletions(-) diff --git a/operator-framework-core/src/main/java/io/javaoperatorsdk/operator/api/config/AnnotationControllerConfiguration.java b/operator-framework-core/src/main/java/io/javaoperatorsdk/operator/api/config/AnnotationControllerConfiguration.java index c2a63c1707..560a79e5be 100644 --- a/operator-framework-core/src/main/java/io/javaoperatorsdk/operator/api/config/AnnotationControllerConfiguration.java +++ b/operator-framework-core/src/main/java/io/javaoperatorsdk/operator/api/config/AnnotationControllerConfiguration.java @@ -167,12 +167,12 @@ public RateLimiter getRateLimiter() { @SuppressWarnings("unchecked") public Optional> onAddFilter() { return (Optional>) createFilter(annotation.onAddFilter(), FilterType.onAdd, - annotation.getClass().getSimpleName()); + annotation.getClass().getSimpleName()); } private enum FilterType { onAdd(VoidOnAddFilter.class), onUpdate(VoidOnUpdateFilter.class), onDelete( - VoidOnDeleteFilter.class), generic(VoidGenericFilter.class); + VoidOnDeleteFilter.class), generic(VoidGenericFilter.class); final Class defaultValue; @@ -189,11 +189,11 @@ private Optional createFilter(Class filter, FilterType filterType, Str var instance = (T) filter.getDeclaredConstructor().newInstance(); return Optional.of(instance); } catch (InstantiationException | IllegalAccessException | InvocationTargetException - | NoSuchMethodException e) { + | NoSuchMethodException e) { throw new OperatorException( - "Couldn't create " + filterType + " filter from " + filter.getName() + " class in " - + origin + " for reconciler " + getName(), - e); + "Couldn't create " + filterType + " filter from " + filter.getName() + " class in " + + origin + " for reconciler " + getName(), + e); } } } diff --git a/operator-framework-core/src/test/java/io/javaoperatorsdk/operator/ControllerManagerTest.java b/operator-framework-core/src/test/java/io/javaoperatorsdk/operator/ControllerManagerTest.java index 6c7358d86e..b328ace132 100644 --- a/operator-framework-core/src/test/java/io/javaoperatorsdk/operator/ControllerManagerTest.java +++ b/operator-framework-core/src/test/java/io/javaoperatorsdk/operator/ControllerManagerTest.java @@ -63,7 +63,7 @@ private static class TestControllerConfiguration public TestControllerConfiguration(Reconciler controller, Class crClass) { super(null, getControllerName(controller), CustomResource.getCRDName(crClass), null, false, null, null, null, null, crClass, - null, null, null, null, null); + null, null, null, null, null, null); this.controller = controller; } diff --git a/operator-framework-core/src/test/java/io/javaoperatorsdk/operator/processing/event/source/CustomResourceSelectorTest.java b/operator-framework-core/src/test/java/io/javaoperatorsdk/operator/processing/event/source/CustomResourceSelectorTest.java index e0f7da1965..ed073f950c 100644 --- a/operator-framework-core/src/test/java/io/javaoperatorsdk/operator/processing/event/source/CustomResourceSelectorTest.java +++ b/operator-framework-core/src/test/java/io/javaoperatorsdk/operator/processing/event/source/CustomResourceSelectorTest.java @@ -136,7 +136,7 @@ public static class MyConfiguration extends DefaultControllerConfiguration null, TestCustomResource.class, null, - onAddFilter, onUpdateFilter, genericFilter,null, null); + onAddFilter, onUpdateFilter, genericFilter, null, null); } } }