1
1
package io .javaoperatorsdk .operator .processing .event .source .polling ;
2
2
3
+ import java .time .Duration ;
3
4
import java .util .*;
4
- import java .util .concurrent .ConcurrentHashMap ;
5
+ import java .util .concurrent .* ;
5
6
import java .util .function .Predicate ;
6
7
7
8
import org .slf4j .Logger ;
26
27
* @param <R> the resource polled by the event source
27
28
* @param <P> related custom resource
28
29
*/
30
+ // todo check docs
29
31
public class PerResourcePollingEventSource <R , P extends HasMetadata >
30
32
extends ExternalResourceCachingEventSource <R , P >
31
33
implements ResourceEventAware <P > {
32
34
33
35
private static final Logger log = LoggerFactory .getLogger (PerResourcePollingEventSource .class );
34
36
35
- private final Timer timer = new Timer ();
36
- private final Map <ResourceID , TimerTask > timerTasks = new ConcurrentHashMap <>();
37
+ public static final int DEFAULT_EXECUTOR_THREAD_NUMBER = 1 ;
38
+
39
+ private final ScheduledExecutorService executorService ;
40
+ private final Map <ResourceID , ScheduledFuture <Void >> scheduledFutures = new ConcurrentHashMap <>();
37
41
private final ResourceFetcher <R , P > resourceFetcher ;
38
42
private final Cache <P > resourceCache ;
39
43
private final Predicate <P > registerPredicate ;
@@ -57,11 +61,20 @@ public PerResourcePollingEventSource(ResourceFetcher<R, P> resourceFetcher,
57
61
Cache <P > resourceCache , long period ,
58
62
Predicate <P > registerPredicate , Class <R > resourceClass ,
59
63
CacheKeyMapper <R > cacheKeyMapper ) {
64
+ this (resourceFetcher , resourceCache , period , registerPredicate , resourceClass , cacheKeyMapper ,
65
+ new ScheduledThreadPoolExecutor (DEFAULT_EXECUTOR_THREAD_NUMBER ));
66
+ }
67
+
68
+ public PerResourcePollingEventSource (ResourceFetcher <R , P > resourceFetcher ,
69
+ Cache <P > resourceCache , long period ,
70
+ Predicate <P > registerPredicate , Class <R > resourceClass ,
71
+ CacheKeyMapper <R > cacheKeyMapper , ScheduledExecutorService executorService ) {
60
72
super (resourceClass , cacheKeyMapper );
61
73
this .resourceFetcher = resourceFetcher ;
62
74
this .resourceCache = resourceCache ;
63
75
this .period = period ;
64
76
this .registerPredicate = registerPredicate ;
77
+ this .executorService = executorService ;
65
78
}
66
79
67
80
private Set <R > getAndCacheResource (P primary , boolean fromGetter ) {
@@ -71,6 +84,17 @@ private Set<R> getAndCacheResource(P primary, boolean fromGetter) {
71
84
return values ;
72
85
}
73
86
87
+ @ SuppressWarnings ("unchecked" )
88
+ private void scheduleNextExecution (P primary , Set <R > actualResources ) {
89
+ var primaryID = ResourceID .fromResource (primary );
90
+ var fetchDelay = resourceFetcher .fetchDelay (actualResources , primary );
91
+ var fetchDuration = fetchDelay .orElse (Duration .ofMillis (period ));
92
+
93
+ ScheduledFuture <Void > scheduledFuture = (ScheduledFuture <Void >) executorService
94
+ .schedule (new FetchingExecutor (primaryID ), fetchDuration .toMillis (), TimeUnit .MILLISECONDS );
95
+ scheduledFutures .put (primaryID , scheduledFuture );
96
+ }
97
+
74
98
@ Override
75
99
public void onResourceCreated (P resource ) {
76
100
checkAndRegisterTask (resource );
@@ -84,10 +108,10 @@ public void onResourceUpdated(P newResource, P oldResource) {
84
108
@ Override
85
109
public void onResourceDeleted (P resource ) {
86
110
var resourceID = ResourceID .fromResource (resource );
87
- TimerTask task = timerTasks .remove (resourceID );
88
- if (task != null ) {
89
- log .debug ("Canceling task for resource: {}" , resource );
90
- task .cancel ();
111
+ var scheduledFuture = scheduledFutures .remove (resourceID );
112
+ if (scheduledFuture != null ) {
113
+ log .debug ("Canceling scheduledFuture for resource: {}" , resource );
114
+ scheduledFuture .cancel (true );
91
115
}
92
116
handleDelete (resourceID );
93
117
fetchedForPrimaries .remove (resourceID );
@@ -97,28 +121,45 @@ public void onResourceDeleted(P resource) {
97
121
// since events from ResourceEventAware are propagated from the thread of the informer. This is
98
122
// important
99
123
// because otherwise there will be a race condition related to the timerTasks.
124
+ @ SuppressWarnings ("unchecked" )
100
125
private void checkAndRegisterTask (P resource ) {
101
126
var primaryID = ResourceID .fromResource (resource );
102
- if (timerTasks .get (primaryID ) == null && (registerPredicate == null
127
+ if (scheduledFutures .get (primaryID ) == null && (registerPredicate == null
103
128
|| registerPredicate .test (resource ))) {
104
- var task =
105
- new TimerTask () {
106
- @ Override
107
- public void run () {
108
- if (!isRunning ()) {
109
- log .debug ("Event source not yet started. Will not run for: {}" , primaryID );
110
- return ;
111
- }
112
- // always use up-to-date resource from cache
113
- var res = resourceCache .get (primaryID );
114
- res .ifPresentOrElse (p -> getAndCacheResource (p , false ),
115
- () -> log .warn ("No resource in cache for resource ID: {}" , primaryID ));
116
- }
117
- };
118
- timerTasks .put (primaryID , task );
119
- // there is a delay, to not do two fetches when the resources first appeared
129
+
130
+
131
+ var cachedResources = cache .get (primaryID );
132
+ var actualResources =
133
+ cachedResources == null ? null : new HashSet <>(cachedResources .values ());
134
+ // note that there is a delay, to not do two fetches when the resources first appeared
120
135
// and getSecondaryResource is called on reconciliation.
121
- timer .schedule (task , period , period );
136
+ scheduleNextExecution (resource , actualResources );
137
+ }
138
+ }
139
+
140
+ private class FetchingExecutor implements Runnable {
141
+ private final ResourceID primaryID ;
142
+
143
+ public FetchingExecutor (ResourceID primaryID ) {
144
+ this .primaryID = primaryID ;
145
+ }
146
+
147
+ @ Override
148
+ public void run () {
149
+ if (!isRunning ()) {
150
+ log .debug ("Event source not yet started. Will not run for: {}" , primaryID );
151
+ return ;
152
+ }
153
+ // always use up-to-date resource from cache
154
+ var primary = resourceCache .get (primaryID );
155
+ if (primary .isEmpty ()) {
156
+ log .warn ("No resource in cache for resource ID: {}" , primaryID );
157
+ // todo think through + test
158
+ // no new execution is scheduled in this case, a on delete event should be received shortly
159
+ } else {
160
+ var actualResources = primary .map (p -> getAndCacheResource (p , false ));
161
+ scheduleNextExecution (primary .get (), actualResources .orElse (null ));
162
+ }
122
163
}
123
164
}
124
165
@@ -146,12 +187,28 @@ public Set<R> getSecondaryResources(P primary) {
146
187
147
188
public interface ResourceFetcher <R , P > {
148
189
Set <R > fetchResources (P primaryResource );
190
+
191
+ /**
192
+ * By implementing this method it is possible to specify dynamic durations to wait between the
193
+ * polls of the resources. This is especially handy if a resources "stabilized" so it is not
194
+ * expected to change it's state frequently. For example an AWS RDS instance is up and running,
195
+ * it is expected to run and be stable for a very long time. In this case it is enough to poll
196
+ * with a lower frequency, compared to the phase when it is being initialized.
197
+ *
198
+ * @param lastFetchedResource might be null, in case no fetch happened before. Empty set if
199
+ * fetch happened but no resources were found.
200
+ * @param primary related primary resource
201
+ * @return an Optional containing the Duration to wait until the next fetch. If an empty
202
+ * Optional is returned, the default polling period will be used.
203
+ */
204
+ default Optional <Duration > fetchDelay (Set <R > lastFetchedResource , P primary ) {
205
+ return Optional .empty ();
206
+ }
149
207
}
150
208
151
209
@ Override
152
210
public void stop () throws OperatorException {
153
211
super .stop ();
154
- timer . cancel ();
212
+ executorService . shutdownNow ();
155
213
}
156
-
157
214
}
0 commit comments