diff --git a/operator-framework-core/src/main/java/io/javaoperatorsdk/operator/processing/event/source/polling/PerResourcePollingEventSource.java b/operator-framework-core/src/main/java/io/javaoperatorsdk/operator/processing/event/source/polling/PerResourcePollingEventSource.java index 8bf5c50fd4..ad688599db 100644 --- a/operator-framework-core/src/main/java/io/javaoperatorsdk/operator/processing/event/source/polling/PerResourcePollingEventSource.java +++ b/operator-framework-core/src/main/java/io/javaoperatorsdk/operator/processing/event/source/polling/PerResourcePollingEventSource.java @@ -1,7 +1,8 @@ package io.javaoperatorsdk.operator.processing.event.source.polling; +import java.time.Duration; import java.util.*; -import java.util.concurrent.ConcurrentHashMap; +import java.util.concurrent.*; import java.util.function.Predicate; import org.slf4j.Logger; @@ -32,8 +33,10 @@ public class PerResourcePollingEventSource private static final Logger log = LoggerFactory.getLogger(PerResourcePollingEventSource.class); - private final Timer timer = new Timer(); - private final Map timerTasks = new ConcurrentHashMap<>(); + public static final int DEFAULT_EXECUTOR_THREAD_NUMBER = 1; + + private final ScheduledExecutorService executorService; + private final Map> scheduledFutures = new ConcurrentHashMap<>(); private final ResourceFetcher resourceFetcher; private final Cache

resourceCache; private final Predicate

registerPredicate; @@ -57,11 +60,20 @@ public PerResourcePollingEventSource(ResourceFetcher resourceFetcher, Cache

resourceCache, long period, Predicate

registerPredicate, Class resourceClass, CacheKeyMapper cacheKeyMapper) { + this(resourceFetcher, resourceCache, period, registerPredicate, resourceClass, cacheKeyMapper, + new ScheduledThreadPoolExecutor(DEFAULT_EXECUTOR_THREAD_NUMBER)); + } + + public PerResourcePollingEventSource(ResourceFetcher resourceFetcher, + Cache

resourceCache, long period, + Predicate

registerPredicate, Class resourceClass, + CacheKeyMapper cacheKeyMapper, ScheduledExecutorService executorService) { super(resourceClass, cacheKeyMapper); this.resourceFetcher = resourceFetcher; this.resourceCache = resourceCache; this.period = period; this.registerPredicate = registerPredicate; + this.executorService = executorService; } private Set getAndCacheResource(P primary, boolean fromGetter) { @@ -71,6 +83,17 @@ private Set getAndCacheResource(P primary, boolean fromGetter) { return values; } + @SuppressWarnings("unchecked") + private void scheduleNextExecution(P primary, Set actualResources) { + var primaryID = ResourceID.fromResource(primary); + var fetchDelay = resourceFetcher.fetchDelay(actualResources, primary); + var fetchDuration = fetchDelay.orElse(Duration.ofMillis(period)); + + ScheduledFuture scheduledFuture = (ScheduledFuture) executorService + .schedule(new FetchingExecutor(primaryID), fetchDuration.toMillis(), TimeUnit.MILLISECONDS); + scheduledFutures.put(primaryID, scheduledFuture); + } + @Override public void onResourceCreated(P resource) { checkAndRegisterTask(resource); @@ -84,10 +107,10 @@ public void onResourceUpdated(P newResource, P oldResource) { @Override public void onResourceDeleted(P resource) { var resourceID = ResourceID.fromResource(resource); - TimerTask task = timerTasks.remove(resourceID); - if (task != null) { - log.debug("Canceling task for resource: {}", resource); - task.cancel(); + var scheduledFuture = scheduledFutures.remove(resourceID); + if (scheduledFuture != null) { + log.debug("Canceling scheduledFuture for resource: {}", resource); + scheduledFuture.cancel(true); } handleDelete(resourceID); fetchedForPrimaries.remove(resourceID); @@ -95,30 +118,42 @@ public void onResourceDeleted(P resource) { // This method is always called from the same Thread for the same resource, // since events from ResourceEventAware are propagated from the thread of the informer. This is - // important - // because otherwise there will be a race condition related to the timerTasks. + // important because otherwise there will be a race condition related to the timerTasks. private void checkAndRegisterTask(P resource) { var primaryID = ResourceID.fromResource(resource); - if (timerTasks.get(primaryID) == null && (registerPredicate == null + if (scheduledFutures.get(primaryID) == null && (registerPredicate == null || registerPredicate.test(resource))) { - var task = - new TimerTask() { - @Override - public void run() { - if (!isRunning()) { - log.debug("Event source not yet started. Will not run for: {}", primaryID); - return; - } - // always use up-to-date resource from cache - var res = resourceCache.get(primaryID); - res.ifPresentOrElse(p -> getAndCacheResource(p, false), - () -> log.warn("No resource in cache for resource ID: {}", primaryID)); - } - }; - timerTasks.put(primaryID, task); - // there is a delay, to not do two fetches when the resources first appeared + var cachedResources = cache.get(primaryID); + var actualResources = + cachedResources == null ? null : new HashSet<>(cachedResources.values()); + // note that there is a delay, to not do two fetches when the resources first appeared // and getSecondaryResource is called on reconciliation. - timer.schedule(task, period, period); + scheduleNextExecution(resource, actualResources); + } + } + + private class FetchingExecutor implements Runnable { + private final ResourceID primaryID; + + public FetchingExecutor(ResourceID primaryID) { + this.primaryID = primaryID; + } + + @Override + public void run() { + if (!isRunning()) { + log.debug("Event source not yet started. Will not run for: {}", primaryID); + return; + } + // always use up-to-date resource from cache + var primary = resourceCache.get(primaryID); + if (primary.isEmpty()) { + log.warn("No resource in cache for resource ID: {}", primaryID); + // no new execution is scheduled in this case, a on delete event should be received shortly + } else { + var actualResources = primary.map(p -> getAndCacheResource(p, false)); + scheduleNextExecution(primary.get(), actualResources.orElse(null)); + } } } @@ -146,12 +181,28 @@ public Set getSecondaryResources(P primary) { public interface ResourceFetcher { Set fetchResources(P primaryResource); + + /** + * By implementing this method it is possible to specify dynamic durations to wait between the + * polls of the resources. This is especially handy if a resources "stabilized" so it is not + * expected to change its state frequently. For example an AWS RDS instance is up and running, + * it is expected to run and be stable for a very long time. In this case it is enough to poll + * with a lower frequency, compared to the phase when it is being initialized. + * + * @param lastFetchedResource might be null, in case no fetch happened before. Empty set if + * fetch happened but no resources were found. + * @param primary related primary resource + * @return an Optional containing the Duration to wait until the next fetch. If an empty + * Optional is returned, the default polling period will be used. + */ + default Optional fetchDelay(Set lastFetchedResource, P primary) { + return Optional.empty(); + } } @Override public void stop() throws OperatorException { super.stop(); - timer.cancel(); + executorService.shutdownNow(); } - } diff --git a/operator-framework-core/src/test/java/io/javaoperatorsdk/operator/processing/event/source/polling/PerResourcePollingEventSourceTest.java b/operator-framework-core/src/test/java/io/javaoperatorsdk/operator/processing/event/source/polling/PerResourcePollingEventSourceTest.java index fa4a624d59..727b60ed9b 100644 --- a/operator-framework-core/src/test/java/io/javaoperatorsdk/operator/processing/event/source/polling/PerResourcePollingEventSourceTest.java +++ b/operator-framework-core/src/test/java/io/javaoperatorsdk/operator/processing/event/source/polling/PerResourcePollingEventSourceTest.java @@ -1,5 +1,6 @@ package io.javaoperatorsdk.operator.processing.event.source.polling; +import java.time.Duration; import java.util.Collections; import java.util.Optional; import java.util.Set; @@ -16,6 +17,7 @@ import io.javaoperatorsdk.operator.sample.simple.TestCustomResource; import static org.assertj.core.api.Assertions.assertThat; +import static org.awaitility.Awaitility.await; import static org.mockito.ArgumentMatchers.any; import static org.mockito.ArgumentMatchers.eq; import static org.mockito.Mockito.atLeast; @@ -47,45 +49,50 @@ public void setup() { } @Test - void pollsTheResourceAfterAwareOfIt() throws InterruptedException { + void pollsTheResourceAfterAwareOfIt() { source.onResourceCreated(testCustomResource); - Thread.sleep(3 * PERIOD); - verify(supplier, atLeast(2)).fetchResources(eq(testCustomResource)); - verify(eventHandler, times(1)).handleEvent(any()); + await().pollDelay(Duration.ofMillis(3 * PERIOD)).untilAsserted(() -> { + verify(supplier, atLeast(2)).fetchResources(eq(testCustomResource)); + verify(supplier, atLeast(2)).fetchDelay(any(), eq(testCustomResource)); + verify(eventHandler, times(1)).handleEvent(any()); + }); } @Test - void registeringTaskOnAPredicate() throws InterruptedException { + void registeringTaskOnAPredicate() { setUpSource(new PerResourcePollingEventSource<>(supplier, resourceCache, PERIOD, testCustomResource -> testCustomResource.getMetadata().getGeneration() > 1, SampleExternalResource.class, CacheKeyMapper.singleResourceCacheKeyMapper())); source.onResourceCreated(testCustomResource); - Thread.sleep(2 * PERIOD); - verify(supplier, times(0)).fetchResources(eq(testCustomResource)); + + await().pollDelay(Duration.ofMillis(2 * PERIOD)) + .untilAsserted(() -> verify(supplier, times(0)).fetchResources(eq(testCustomResource))); + testCustomResource.getMetadata().setGeneration(2L); source.onResourceUpdated(testCustomResource, testCustomResource); - Thread.sleep(2 * PERIOD); - verify(supplier, atLeast(1)).fetchResources(eq(testCustomResource)); + await().pollDelay(Duration.ofMillis(2 * PERIOD)) + .untilAsserted(() -> verify(supplier, atLeast(1)).fetchResources(eq(testCustomResource))); } @Test - void propagateEventOnDeletedResource() throws InterruptedException { + void propagateEventOnDeletedResource() { source.onResourceCreated(testCustomResource); when(supplier.fetchResources(any())) .thenReturn(Set.of(SampleExternalResource.testResource1())) .thenReturn(Collections.emptySet()); - Thread.sleep(3 * PERIOD); - verify(supplier, atLeast(2)).fetchResources(eq(testCustomResource)); - verify(eventHandler, times(2)).handleEvent(any()); + await().pollDelay(Duration.ofMillis(3 * PERIOD)).untilAsserted(() -> { + verify(supplier, atLeast(2)).fetchResources(eq(testCustomResource)); + verify(eventHandler, times(2)).handleEvent(any()); + }); } @Test - void getSecondaryResourceInitiatesFetchJustForFirstTime() throws InterruptedException { + void getSecondaryResourceInitiatesFetchJustForFirstTime() { source.onResourceCreated(testCustomResource); when(supplier.fetchResources(any())) .thenReturn(Set.of(SampleExternalResource.testResource1())) @@ -104,31 +111,73 @@ void getSecondaryResourceInitiatesFetchJustForFirstTime() throws InterruptedExce verify(supplier, times(1)).fetchResources(eq(testCustomResource)); verify(eventHandler, never()).handleEvent(any()); - Thread.sleep(PERIOD * 2); - - verify(supplier, atLeast(2)).fetchResources(eq(testCustomResource)); - value = source.getSecondaryResources(testCustomResource); - assertThat(value).hasSize(2); + await().pollDelay(Duration.ofMillis(PERIOD * 2)).untilAsserted(() -> { + verify(supplier, atLeast(2)).fetchResources(eq(testCustomResource)); + var val = source.getSecondaryResources(testCustomResource); + assertThat(val).hasSize(2); + }); } @Test - void getsValueFromCacheOrSupplier() throws InterruptedException { + void getsValueFromCacheOrSupplier() { source.onResourceCreated(testCustomResource); when(supplier.fetchResources(any())) .thenReturn(Collections.emptySet()) .thenReturn(Set.of(SampleExternalResource.testResource1())); - Thread.sleep(PERIOD / 3); + await().pollDelay(Duration.ofMillis(PERIOD / 3)).untilAsserted(() -> { + var value = source.getSecondaryResources(testCustomResource); + verify(eventHandler, times(0)).handleEvent(any()); + assertThat(value).isEmpty(); + }); + + await().pollDelay(Duration.ofMillis(PERIOD * 2)).untilAsserted(() -> { + var value2 = source.getSecondaryResources(testCustomResource); + assertThat(value2).hasSize(1); + verify(eventHandler, times(1)).handleEvent(any()); + }); + } - var value = source.getSecondaryResources(testCustomResource); - verify(eventHandler, times(0)).handleEvent(any()); - assertThat(value).isEmpty(); + @Test + void supportsDynamicPollingDelay() { + when(supplier.fetchResources(any())) + .thenReturn(Set.of(SampleExternalResource.testResource1())); + when(supplier.fetchDelay(any(),any())) + .thenReturn(Optional.of(Duration.ofMillis(PERIOD))) + .thenReturn(Optional.of(Duration.ofMillis(PERIOD*2))); - Thread.sleep(PERIOD * 2); + source.onResourceCreated(testCustomResource); - value = source.getSecondaryResources(testCustomResource); - assertThat(value).hasSize(1); - verify(eventHandler, times(1)).handleEvent(any()); + await().pollDelay(Duration.ofMillis(PERIOD)).atMost(Duration.ofMillis((long) (1.5 * PERIOD))) + .pollInterval(Duration.ofMillis(20)) + .untilAsserted(() -> verify(supplier,times(1)).fetchResources(any())); + // verifying that it is not called as with normal interval + await().pollDelay(Duration.ofMillis(PERIOD)).atMost(Duration.ofMillis((long) (1.5*PERIOD))) + .pollInterval(Duration.ofMillis(20)) + .untilAsserted(() -> verify(supplier,times(1)).fetchResources(any())); + await().pollDelay(Duration.ofMillis(PERIOD)).atMost(Duration.ofMillis(2 * PERIOD)) + .pollInterval(Duration.ofMillis(20)) + .untilAsserted(() -> verify(supplier,times(2)).fetchResources(any())); + } + + @Test + void deleteEventCancelsTheScheduling() { + when(supplier.fetchResources(any())) + .thenReturn(Set.of(SampleExternalResource.testResource1())); + + source.onResourceCreated(testCustomResource); + + await().pollDelay(Duration.ofMillis(PERIOD)) + .atMost(Duration.ofMillis((2* PERIOD))) + .pollInterval(Duration.ofMillis(20)) + .untilAsserted(() -> verify(supplier,times(1)).fetchResources(any())); + + source.onResourceDeleted(testCustomResource); + + // check if not called again. + await().pollDelay(Duration.ofMillis(PERIOD)) + .atMost(Duration.ofMillis((2* PERIOD))) + .untilAsserted(() -> verify(supplier,times(1)).fetchResources(any())); } }