Skip to content

feat: improved per resource polling #1826

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 8 commits into from
Mar 27, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
@@ -1,7 +1,8 @@
package io.javaoperatorsdk.operator.processing.event.source.polling;

import java.time.Duration;
import java.util.*;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.*;
import java.util.function.Predicate;

import org.slf4j.Logger;
Expand Down Expand Up @@ -32,8 +33,10 @@ public class PerResourcePollingEventSource<R, P extends HasMetadata>

private static final Logger log = LoggerFactory.getLogger(PerResourcePollingEventSource.class);

private final Timer timer = new Timer();
private final Map<ResourceID, TimerTask> timerTasks = new ConcurrentHashMap<>();
public static final int DEFAULT_EXECUTOR_THREAD_NUMBER = 1;

private final ScheduledExecutorService executorService;
private final Map<ResourceID, ScheduledFuture<Void>> scheduledFutures = new ConcurrentHashMap<>();
private final ResourceFetcher<R, P> resourceFetcher;
private final Cache<P> resourceCache;
private final Predicate<P> registerPredicate;
Expand All @@ -57,11 +60,20 @@ public PerResourcePollingEventSource(ResourceFetcher<R, P> resourceFetcher,
Cache<P> resourceCache, long period,
Predicate<P> registerPredicate, Class<R> resourceClass,
CacheKeyMapper<R> cacheKeyMapper) {
this(resourceFetcher, resourceCache, period, registerPredicate, resourceClass, cacheKeyMapper,
new ScheduledThreadPoolExecutor(DEFAULT_EXECUTOR_THREAD_NUMBER));
}

public PerResourcePollingEventSource(ResourceFetcher<R, P> resourceFetcher,
Cache<P> resourceCache, long period,
Predicate<P> registerPredicate, Class<R> resourceClass,
CacheKeyMapper<R> cacheKeyMapper, ScheduledExecutorService executorService) {
super(resourceClass, cacheKeyMapper);
this.resourceFetcher = resourceFetcher;
this.resourceCache = resourceCache;
this.period = period;
this.registerPredicate = registerPredicate;
this.executorService = executorService;
}

private Set<R> getAndCacheResource(P primary, boolean fromGetter) {
Expand All @@ -71,6 +83,17 @@ private Set<R> getAndCacheResource(P primary, boolean fromGetter) {
return values;
}

@SuppressWarnings("unchecked")
private void scheduleNextExecution(P primary, Set<R> actualResources) {
var primaryID = ResourceID.fromResource(primary);
var fetchDelay = resourceFetcher.fetchDelay(actualResources, primary);
var fetchDuration = fetchDelay.orElse(Duration.ofMillis(period));

ScheduledFuture<Void> scheduledFuture = (ScheduledFuture<Void>) executorService
.schedule(new FetchingExecutor(primaryID), fetchDuration.toMillis(), TimeUnit.MILLISECONDS);
scheduledFutures.put(primaryID, scheduledFuture);
}

@Override
public void onResourceCreated(P resource) {
checkAndRegisterTask(resource);
Expand All @@ -84,41 +107,53 @@ public void onResourceUpdated(P newResource, P oldResource) {
@Override
public void onResourceDeleted(P resource) {
var resourceID = ResourceID.fromResource(resource);
TimerTask task = timerTasks.remove(resourceID);
if (task != null) {
log.debug("Canceling task for resource: {}", resource);
task.cancel();
var scheduledFuture = scheduledFutures.remove(resourceID);
if (scheduledFuture != null) {
log.debug("Canceling scheduledFuture for resource: {}", resource);
scheduledFuture.cancel(true);
}
handleDelete(resourceID);
fetchedForPrimaries.remove(resourceID);
}

// This method is always called from the same Thread for the same resource,
// since events from ResourceEventAware are propagated from the thread of the informer. This is
// important
// because otherwise there will be a race condition related to the timerTasks.
// important because otherwise there will be a race condition related to the timerTasks.
private void checkAndRegisterTask(P resource) {
var primaryID = ResourceID.fromResource(resource);
if (timerTasks.get(primaryID) == null && (registerPredicate == null
if (scheduledFutures.get(primaryID) == null && (registerPredicate == null
|| registerPredicate.test(resource))) {
var task =
new TimerTask() {
@Override
public void run() {
if (!isRunning()) {
log.debug("Event source not yet started. Will not run for: {}", primaryID);
return;
}
// always use up-to-date resource from cache
var res = resourceCache.get(primaryID);
res.ifPresentOrElse(p -> getAndCacheResource(p, false),
() -> log.warn("No resource in cache for resource ID: {}", primaryID));
}
};
timerTasks.put(primaryID, task);
// there is a delay, to not do two fetches when the resources first appeared
var cachedResources = cache.get(primaryID);
var actualResources =
cachedResources == null ? null : new HashSet<>(cachedResources.values());
// note that there is a delay, to not do two fetches when the resources first appeared
// and getSecondaryResource is called on reconciliation.
timer.schedule(task, period, period);
scheduleNextExecution(resource, actualResources);
}
}

private class FetchingExecutor implements Runnable {
private final ResourceID primaryID;

public FetchingExecutor(ResourceID primaryID) {
this.primaryID = primaryID;
}

@Override
public void run() {
if (!isRunning()) {
log.debug("Event source not yet started. Will not run for: {}", primaryID);
return;
}
// always use up-to-date resource from cache
var primary = resourceCache.get(primaryID);
if (primary.isEmpty()) {
log.warn("No resource in cache for resource ID: {}", primaryID);
// no new execution is scheduled in this case, a on delete event should be received shortly
} else {
var actualResources = primary.map(p -> getAndCacheResource(p, false));
scheduleNextExecution(primary.get(), actualResources.orElse(null));
}
}
}

Expand Down Expand Up @@ -146,12 +181,28 @@ public Set<R> getSecondaryResources(P primary) {

public interface ResourceFetcher<R, P> {
Set<R> fetchResources(P primaryResource);

/**
* By implementing this method it is possible to specify dynamic durations to wait between the
* polls of the resources. This is especially handy if a resources "stabilized" so it is not
* expected to change its state frequently. For example an AWS RDS instance is up and running,
* it is expected to run and be stable for a very long time. In this case it is enough to poll
* with a lower frequency, compared to the phase when it is being initialized.
*
* @param lastFetchedResource might be null, in case no fetch happened before. Empty set if
* fetch happened but no resources were found.
* @param primary related primary resource
* @return an Optional containing the Duration to wait until the next fetch. If an empty
* Optional is returned, the default polling period will be used.
*/
default Optional<Duration> fetchDelay(Set<R> lastFetchedResource, P primary) {
return Optional.empty();
}
}

@Override
public void stop() throws OperatorException {
super.stop();
timer.cancel();
executorService.shutdownNow();
}

}
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
package io.javaoperatorsdk.operator.processing.event.source.polling;

import java.time.Duration;
import java.util.Collections;
import java.util.Optional;
import java.util.Set;
Expand All @@ -16,6 +17,7 @@
import io.javaoperatorsdk.operator.sample.simple.TestCustomResource;

import static org.assertj.core.api.Assertions.assertThat;
import static org.awaitility.Awaitility.await;
import static org.mockito.ArgumentMatchers.any;
import static org.mockito.ArgumentMatchers.eq;
import static org.mockito.Mockito.atLeast;
Expand Down Expand Up @@ -47,45 +49,50 @@ public void setup() {
}

@Test
void pollsTheResourceAfterAwareOfIt() throws InterruptedException {
void pollsTheResourceAfterAwareOfIt() {
source.onResourceCreated(testCustomResource);

Thread.sleep(3 * PERIOD);
verify(supplier, atLeast(2)).fetchResources(eq(testCustomResource));
verify(eventHandler, times(1)).handleEvent(any());
await().pollDelay(Duration.ofMillis(3 * PERIOD)).untilAsserted(() -> {
verify(supplier, atLeast(2)).fetchResources(eq(testCustomResource));
verify(supplier, atLeast(2)).fetchDelay(any(), eq(testCustomResource));
verify(eventHandler, times(1)).handleEvent(any());
});
}

@Test
void registeringTaskOnAPredicate() throws InterruptedException {
void registeringTaskOnAPredicate() {
setUpSource(new PerResourcePollingEventSource<>(supplier, resourceCache, PERIOD,
testCustomResource -> testCustomResource.getMetadata().getGeneration() > 1,
SampleExternalResource.class, CacheKeyMapper.singleResourceCacheKeyMapper()));
source.onResourceCreated(testCustomResource);
Thread.sleep(2 * PERIOD);

verify(supplier, times(0)).fetchResources(eq(testCustomResource));

await().pollDelay(Duration.ofMillis(2 * PERIOD))
.untilAsserted(() -> verify(supplier, times(0)).fetchResources(eq(testCustomResource)));

testCustomResource.getMetadata().setGeneration(2L);
source.onResourceUpdated(testCustomResource, testCustomResource);

Thread.sleep(2 * PERIOD);

verify(supplier, atLeast(1)).fetchResources(eq(testCustomResource));
await().pollDelay(Duration.ofMillis(2 * PERIOD))
.untilAsserted(() -> verify(supplier, atLeast(1)).fetchResources(eq(testCustomResource)));
}

@Test
void propagateEventOnDeletedResource() throws InterruptedException {
void propagateEventOnDeletedResource() {
source.onResourceCreated(testCustomResource);
when(supplier.fetchResources(any()))
.thenReturn(Set.of(SampleExternalResource.testResource1()))
.thenReturn(Collections.emptySet());

Thread.sleep(3 * PERIOD);
verify(supplier, atLeast(2)).fetchResources(eq(testCustomResource));
verify(eventHandler, times(2)).handleEvent(any());
await().pollDelay(Duration.ofMillis(3 * PERIOD)).untilAsserted(() -> {
verify(supplier, atLeast(2)).fetchResources(eq(testCustomResource));
verify(eventHandler, times(2)).handleEvent(any());
});
}

@Test
void getSecondaryResourceInitiatesFetchJustForFirstTime() throws InterruptedException {
void getSecondaryResourceInitiatesFetchJustForFirstTime() {
source.onResourceCreated(testCustomResource);
when(supplier.fetchResources(any()))
.thenReturn(Set.of(SampleExternalResource.testResource1()))
Expand All @@ -104,31 +111,73 @@ void getSecondaryResourceInitiatesFetchJustForFirstTime() throws InterruptedExce
verify(supplier, times(1)).fetchResources(eq(testCustomResource));
verify(eventHandler, never()).handleEvent(any());

Thread.sleep(PERIOD * 2);

verify(supplier, atLeast(2)).fetchResources(eq(testCustomResource));
value = source.getSecondaryResources(testCustomResource);
assertThat(value).hasSize(2);
await().pollDelay(Duration.ofMillis(PERIOD * 2)).untilAsserted(() -> {
verify(supplier, atLeast(2)).fetchResources(eq(testCustomResource));
var val = source.getSecondaryResources(testCustomResource);
assertThat(val).hasSize(2);
});
}

@Test
void getsValueFromCacheOrSupplier() throws InterruptedException {
void getsValueFromCacheOrSupplier() {
source.onResourceCreated(testCustomResource);
when(supplier.fetchResources(any()))
.thenReturn(Collections.emptySet())
.thenReturn(Set.of(SampleExternalResource.testResource1()));

Thread.sleep(PERIOD / 3);
await().pollDelay(Duration.ofMillis(PERIOD / 3)).untilAsserted(() -> {
var value = source.getSecondaryResources(testCustomResource);
verify(eventHandler, times(0)).handleEvent(any());
assertThat(value).isEmpty();
});

await().pollDelay(Duration.ofMillis(PERIOD * 2)).untilAsserted(() -> {
var value2 = source.getSecondaryResources(testCustomResource);
assertThat(value2).hasSize(1);
verify(eventHandler, times(1)).handleEvent(any());
});
}

var value = source.getSecondaryResources(testCustomResource);
verify(eventHandler, times(0)).handleEvent(any());
assertThat(value).isEmpty();
@Test
void supportsDynamicPollingDelay() {
when(supplier.fetchResources(any()))
.thenReturn(Set.of(SampleExternalResource.testResource1()));
when(supplier.fetchDelay(any(),any()))
.thenReturn(Optional.of(Duration.ofMillis(PERIOD)))
.thenReturn(Optional.of(Duration.ofMillis(PERIOD*2)));

Thread.sleep(PERIOD * 2);
source.onResourceCreated(testCustomResource);

value = source.getSecondaryResources(testCustomResource);
assertThat(value).hasSize(1);
verify(eventHandler, times(1)).handleEvent(any());
await().pollDelay(Duration.ofMillis(PERIOD)).atMost(Duration.ofMillis((long) (1.5 * PERIOD)))
.pollInterval(Duration.ofMillis(20))
.untilAsserted(() -> verify(supplier,times(1)).fetchResources(any()));
// verifying that it is not called as with normal interval
await().pollDelay(Duration.ofMillis(PERIOD)).atMost(Duration.ofMillis((long) (1.5*PERIOD)))
.pollInterval(Duration.ofMillis(20))
.untilAsserted(() -> verify(supplier,times(1)).fetchResources(any()));
await().pollDelay(Duration.ofMillis(PERIOD)).atMost(Duration.ofMillis(2 * PERIOD))
.pollInterval(Duration.ofMillis(20))
.untilAsserted(() -> verify(supplier,times(2)).fetchResources(any()));
}

@Test
void deleteEventCancelsTheScheduling() {
when(supplier.fetchResources(any()))
.thenReturn(Set.of(SampleExternalResource.testResource1()));

source.onResourceCreated(testCustomResource);

await().pollDelay(Duration.ofMillis(PERIOD))
.atMost(Duration.ofMillis((2* PERIOD)))
.pollInterval(Duration.ofMillis(20))
.untilAsserted(() -> verify(supplier,times(1)).fetchResources(any()));

source.onResourceDeleted(testCustomResource);

// check if not called again.
await().pollDelay(Duration.ofMillis(PERIOD))
.atMost(Duration.ofMillis((2* PERIOD)))
.untilAsserted(() -> verify(supplier,times(1)).fetchResources(any()));
}

}