Skip to content

Commit 1d1d7c5

Browse files
committed
feat: dynamic poll period for PerResourceEventSource
1 parent 29453e8 commit 1d1d7c5

File tree

1 file changed

+84
-27
lines changed

1 file changed

+84
-27
lines changed

operator-framework-core/src/main/java/io/javaoperatorsdk/operator/processing/event/source/polling/PerResourcePollingEventSource.java

Lines changed: 84 additions & 27 deletions
Original file line numberDiff line numberDiff line change
@@ -1,7 +1,8 @@
11
package io.javaoperatorsdk.operator.processing.event.source.polling;
22

3+
import java.time.Duration;
34
import java.util.*;
4-
import java.util.concurrent.ConcurrentHashMap;
5+
import java.util.concurrent.*;
56
import java.util.function.Predicate;
67

78
import org.slf4j.Logger;
@@ -26,14 +27,17 @@
2627
* @param <R> the resource polled by the event source
2728
* @param <P> related custom resource
2829
*/
30+
// todo check docs
2931
public class PerResourcePollingEventSource<R, P extends HasMetadata>
3032
extends ExternalResourceCachingEventSource<R, P>
3133
implements ResourceEventAware<P> {
3234

3335
private static final Logger log = LoggerFactory.getLogger(PerResourcePollingEventSource.class);
3436

35-
private final Timer timer = new Timer();
36-
private final Map<ResourceID, TimerTask> timerTasks = new ConcurrentHashMap<>();
37+
public static final int DEFAULT_EXECUTOR_THREAD_NUMBER = 1;
38+
39+
private final ScheduledExecutorService executorService;
40+
private final Map<ResourceID, ScheduledFuture<Void>> scheduledFutures = new ConcurrentHashMap<>();
3741
private final ResourceFetcher<R, P> resourceFetcher;
3842
private final Cache<P> resourceCache;
3943
private final Predicate<P> registerPredicate;
@@ -57,11 +61,20 @@ public PerResourcePollingEventSource(ResourceFetcher<R, P> resourceFetcher,
5761
Cache<P> resourceCache, long period,
5862
Predicate<P> registerPredicate, Class<R> resourceClass,
5963
CacheKeyMapper<R> cacheKeyMapper) {
64+
this(resourceFetcher, resourceCache, period, registerPredicate, resourceClass, cacheKeyMapper,
65+
new ScheduledThreadPoolExecutor(DEFAULT_EXECUTOR_THREAD_NUMBER));
66+
}
67+
68+
public PerResourcePollingEventSource(ResourceFetcher<R, P> resourceFetcher,
69+
Cache<P> resourceCache, long period,
70+
Predicate<P> registerPredicate, Class<R> resourceClass,
71+
CacheKeyMapper<R> cacheKeyMapper, ScheduledExecutorService executorService) {
6072
super(resourceClass, cacheKeyMapper);
6173
this.resourceFetcher = resourceFetcher;
6274
this.resourceCache = resourceCache;
6375
this.period = period;
6476
this.registerPredicate = registerPredicate;
77+
this.executorService = executorService;
6578
}
6679

6780
private Set<R> getAndCacheResource(P primary, boolean fromGetter) {
@@ -71,6 +84,17 @@ private Set<R> getAndCacheResource(P primary, boolean fromGetter) {
7184
return values;
7285
}
7386

87+
@SuppressWarnings("unchecked")
88+
private void scheduleNextExecution(P primary, Set<R> actualResources) {
89+
var primaryID = ResourceID.fromResource(primary);
90+
var fetchDelay = resourceFetcher.fetchDelay(actualResources, primary);
91+
var fetchDuration = fetchDelay.orElse(Duration.ofMillis(period));
92+
93+
ScheduledFuture<Void> scheduledFuture = (ScheduledFuture<Void>) executorService
94+
.schedule(new FetchingExecutor(primaryID), fetchDuration.toMillis(), TimeUnit.MILLISECONDS);
95+
scheduledFutures.put(primaryID, scheduledFuture);
96+
}
97+
7498
@Override
7599
public void onResourceCreated(P resource) {
76100
checkAndRegisterTask(resource);
@@ -84,10 +108,10 @@ public void onResourceUpdated(P newResource, P oldResource) {
84108
@Override
85109
public void onResourceDeleted(P resource) {
86110
var resourceID = ResourceID.fromResource(resource);
87-
TimerTask task = timerTasks.remove(resourceID);
88-
if (task != null) {
89-
log.debug("Canceling task for resource: {}", resource);
90-
task.cancel();
111+
var scheduledFuture = scheduledFutures.remove(resourceID);
112+
if (scheduledFuture != null) {
113+
log.debug("Canceling scheduledFuture for resource: {}", resource);
114+
scheduledFuture.cancel(true);
91115
}
92116
handleDelete(resourceID);
93117
fetchedForPrimaries.remove(resourceID);
@@ -97,28 +121,45 @@ public void onResourceDeleted(P resource) {
97121
// since events from ResourceEventAware are propagated from the thread of the informer. This is
98122
// important
99123
// because otherwise there will be a race condition related to the timerTasks.
124+
@SuppressWarnings("unchecked")
100125
private void checkAndRegisterTask(P resource) {
101126
var primaryID = ResourceID.fromResource(resource);
102-
if (timerTasks.get(primaryID) == null && (registerPredicate == null
127+
if (scheduledFutures.get(primaryID) == null && (registerPredicate == null
103128
|| registerPredicate.test(resource))) {
104-
var task =
105-
new TimerTask() {
106-
@Override
107-
public void run() {
108-
if (!isRunning()) {
109-
log.debug("Event source not yet started. Will not run for: {}", primaryID);
110-
return;
111-
}
112-
// always use up-to-date resource from cache
113-
var res = resourceCache.get(primaryID);
114-
res.ifPresentOrElse(p -> getAndCacheResource(p, false),
115-
() -> log.warn("No resource in cache for resource ID: {}", primaryID));
116-
}
117-
};
118-
timerTasks.put(primaryID, task);
119-
// there is a delay, to not do two fetches when the resources first appeared
129+
130+
131+
var cachedResources = cache.get(primaryID);
132+
var actualResources =
133+
cachedResources == null ? null : new HashSet<>(cachedResources.values());
134+
// note that there is a delay, to not do two fetches when the resources first appeared
120135
// and getSecondaryResource is called on reconciliation.
121-
timer.schedule(task, period, period);
136+
scheduleNextExecution(resource, actualResources);
137+
}
138+
}
139+
140+
private class FetchingExecutor implements Runnable {
141+
private final ResourceID primaryID;
142+
143+
public FetchingExecutor(ResourceID primaryID) {
144+
this.primaryID = primaryID;
145+
}
146+
147+
@Override
148+
public void run() {
149+
if (!isRunning()) {
150+
log.debug("Event source not yet started. Will not run for: {}", primaryID);
151+
return;
152+
}
153+
// always use up-to-date resource from cache
154+
var primary = resourceCache.get(primaryID);
155+
if (primary.isEmpty()) {
156+
log.warn("No resource in cache for resource ID: {}", primaryID);
157+
// todo think through + test
158+
// no new execution is scheduled in this case, a on delete event should be received shortly
159+
} else {
160+
var actualResources = primary.map(p -> getAndCacheResource(p, false));
161+
scheduleNextExecution(primary.get(), actualResources.orElse(null));
162+
}
122163
}
123164
}
124165

@@ -146,12 +187,28 @@ public Set<R> getSecondaryResources(P primary) {
146187

147188
public interface ResourceFetcher<R, P> {
148189
Set<R> fetchResources(P primaryResource);
190+
191+
/**
192+
* By implementing this method it is possible to specify dynamic durations to wait between the
193+
* polls of the resources. This is especially handy if a resources "stabilized" so it is not
194+
* expected to change it's state frequently. For example an AWS RDS instance is up and running,
195+
* it is expected to run and be stable for a very long time. In this case it is enough to poll
196+
* with a lower frequency, compared to the phase when it is being initialized.
197+
*
198+
* @param lastFetchedResource might be null, in case no fetch happened before. Empty set if
199+
* fetch happened but no resources were found.
200+
* @param primary related primary resource
201+
* @return an Optional containing the Duration to wait until the next fetch. If an empty
202+
* Optional is returned, the default polling period will be used.
203+
*/
204+
default Optional<Duration> fetchDelay(Set<R> lastFetchedResource, P primary) {
205+
return Optional.empty();
206+
}
149207
}
150208

151209
@Override
152210
public void stop() throws OperatorException {
153211
super.stop();
154-
timer.cancel();
212+
executorService.shutdownNow();
155213
}
156-
157214
}

0 commit comments

Comments
 (0)