Skip to content

Commit f976c5c

Browse files
authored
improve: PerResourcePollingEventSource init improvements (#1918)
1 parent 3280f69 commit f976c5c

File tree

5 files changed

+119
-14
lines changed

5 files changed

+119
-14
lines changed

operator-framework-core/src/main/java/io/javaoperatorsdk/operator/processing/dependent/external/PerResourcePollingDependentResource.java

Lines changed: 4 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,7 @@
11
package io.javaoperatorsdk.operator.processing.dependent.external;
22

3+
import java.time.Duration;
4+
35
import io.fabric8.kubernetes.api.model.HasMetadata;
46
import io.javaoperatorsdk.operator.api.reconciler.EventSourceContext;
57
import io.javaoperatorsdk.operator.api.reconciler.Ignore;
@@ -23,8 +25,8 @@ public PerResourcePollingDependentResource(Class<R> resourceType, long pollingPe
2325
@Override
2426
protected ExternalResourceCachingEventSource<R, P> createEventSource(
2527
EventSourceContext<P> context) {
26-
return new PerResourcePollingEventSource<>(this, context.getPrimaryCache(),
27-
getPollingPeriod(), resourceType(), this);
28+
return new PerResourcePollingEventSource<>(this, context,
29+
Duration.ofMillis(getPollingPeriod()), resourceType(), this);
2830
}
2931

3032
}

operator-framework-core/src/main/java/io/javaoperatorsdk/operator/processing/event/source/polling/PerResourcePollingEventSource.java

Lines changed: 103 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -1,15 +1,24 @@
11
package io.javaoperatorsdk.operator.processing.event.source.polling;
22

33
import java.time.Duration;
4-
import java.util.*;
5-
import java.util.concurrent.*;
4+
import java.util.Collections;
5+
import java.util.HashSet;
6+
import java.util.Map;
7+
import java.util.Optional;
8+
import java.util.Set;
9+
import java.util.concurrent.ConcurrentHashMap;
10+
import java.util.concurrent.ScheduledExecutorService;
11+
import java.util.concurrent.ScheduledFuture;
12+
import java.util.concurrent.ScheduledThreadPoolExecutor;
13+
import java.util.concurrent.TimeUnit;
614
import java.util.function.Predicate;
715

816
import org.slf4j.Logger;
917
import org.slf4j.LoggerFactory;
1018

1119
import io.fabric8.kubernetes.api.model.HasMetadata;
1220
import io.javaoperatorsdk.operator.OperatorException;
21+
import io.javaoperatorsdk.operator.api.reconciler.EventSourceContext;
1322
import io.javaoperatorsdk.operator.processing.event.ResourceID;
1423
import io.javaoperatorsdk.operator.processing.event.source.Cache;
1524
import io.javaoperatorsdk.operator.processing.event.source.CacheKeyMapper;
@@ -43,19 +52,82 @@ public class PerResourcePollingEventSource<R, P extends HasMetadata>
4352
private final long period;
4453
private final Set<ResourceID> fetchedForPrimaries = ConcurrentHashMap.newKeySet();
4554

55+
public PerResourcePollingEventSource(ResourceFetcher<R, P> resourceFetcher,
56+
EventSourceContext<P> context, Duration defaultPollingPeriod,
57+
Class<R> resourceClass) {
58+
this(resourceFetcher, context.getPrimaryCache(), defaultPollingPeriod.toMillis(),
59+
null, resourceClass,
60+
CacheKeyMapper.singleResourceCacheKeyMapper());
61+
}
4662

63+
/**
64+
* @deprecated use the variant which uses {@link EventSourceContext} instead of {@link Cache} and
65+
* {@link Duration} for period parameter as it provides a more intuitive API.
66+
*
67+
* @param resourceFetcher fetches resource related to a primary resource
68+
* @param resourceCache cache of the primary resource
69+
* @param period default polling period
70+
* @param resourceClass class of the target resource
71+
*/
72+
@Deprecated(forRemoval = true)
4773
public PerResourcePollingEventSource(ResourceFetcher<R, P> resourceFetcher,
4874
Cache<P> resourceCache, long period, Class<R> resourceClass) {
4975
this(resourceFetcher, resourceCache, period, null, resourceClass,
5076
CacheKeyMapper.singleResourceCacheKeyMapper());
5177
}
5278

79+
public PerResourcePollingEventSource(ResourceFetcher<R, P> resourceFetcher,
80+
EventSourceContext<P> context,
81+
Duration defaultPollingPeriod,
82+
Class<R> resourceClass,
83+
CacheKeyMapper<R> cacheKeyMapper) {
84+
this(resourceFetcher, context.getPrimaryCache(), defaultPollingPeriod.toMillis(),
85+
null, resourceClass, cacheKeyMapper);
86+
}
87+
88+
/**
89+
* @deprecated use the variant which uses {@link EventSourceContext} instead of {@link Cache} and
90+
* {@link Duration} for period parameter as it provides a more intuitive API.
91+
*
92+
* @param resourceFetcher fetches resource related to a primary resource
93+
* @param resourceCache cache of the primary resource
94+
* @param period default polling period
95+
* @param resourceClass class of the target resource
96+
* @param cacheKeyMapper use to distinguish resource in case more resources are handled for a
97+
* single primary resource
98+
*/
99+
@Deprecated(forRemoval = true)
53100
public PerResourcePollingEventSource(ResourceFetcher<R, P> resourceFetcher,
54101
Cache<P> resourceCache, long period, Class<R> resourceClass,
55102
CacheKeyMapper<R> cacheKeyMapper) {
56103
this(resourceFetcher, resourceCache, period, null, resourceClass, cacheKeyMapper);
57104
}
58105

106+
public PerResourcePollingEventSource(ResourceFetcher<R, P> resourceFetcher,
107+
EventSourceContext<P> context,
108+
Duration defaultPollingPeriod,
109+
Predicate<P> registerPredicate,
110+
Class<R> resourceClass,
111+
CacheKeyMapper<R> cacheKeyMapper) {
112+
this(resourceFetcher, context.getPrimaryCache(), defaultPollingPeriod.toMillis(),
113+
registerPredicate, resourceClass, cacheKeyMapper,
114+
new ScheduledThreadPoolExecutor(DEFAULT_EXECUTOR_THREAD_NUMBER));
115+
}
116+
117+
/**
118+
* @deprecated use the variant which uses {@link EventSourceContext} instead of {@link Cache} and
119+
* {@link Duration} for period parameter as it provides a more intuitive API.
120+
*
121+
* @param resourceFetcher fetches resource related to a primary resource
122+
* @param resourceCache cache of the primary resource
123+
* @param period default polling period
124+
* @param resourceClass class of the target resource
125+
* @param cacheKeyMapper use to distinguish resource in case more resources are handled for a
126+
* single primary resource
127+
* @param registerPredicate used to determine if the related resource for a custom resource should
128+
* be polled or not.
129+
*/
130+
@Deprecated(forRemoval = true)
59131
public PerResourcePollingEventSource(ResourceFetcher<R, P> resourceFetcher,
60132
Cache<P> resourceCache, long period,
61133
Predicate<P> registerPredicate, Class<R> resourceClass,
@@ -64,7 +136,35 @@ public PerResourcePollingEventSource(ResourceFetcher<R, P> resourceFetcher,
64136
new ScheduledThreadPoolExecutor(DEFAULT_EXECUTOR_THREAD_NUMBER));
65137
}
66138

67-
public PerResourcePollingEventSource(ResourceFetcher<R, P> resourceFetcher,
139+
140+
public PerResourcePollingEventSource(
141+
ResourceFetcher<R, P> resourceFetcher,
142+
EventSourceContext<P> context, Duration defaultPollingPeriod,
143+
Predicate<P> registerPredicate, Class<R> resourceClass,
144+
CacheKeyMapper<R> cacheKeyMapper, ScheduledExecutorService executorService) {
145+
this(resourceFetcher, context.getPrimaryCache(), defaultPollingPeriod.toMillis(),
146+
registerPredicate,
147+
resourceClass, cacheKeyMapper, executorService);
148+
}
149+
150+
/**
151+
* @deprecated use the variant which uses {@link EventSourceContext} instead of {@link Cache} and
152+
* {@link Duration} for period parameter as it provides a more intuitive API.
153+
*
154+
* @param resourceFetcher fetches resource related to a primary resource
155+
* @param resourceCache cache of the primary resource
156+
* @param period default polling period
157+
* @param resourceClass class of the target resource
158+
* @param cacheKeyMapper use to distinguish resource in case more resources are handled for a
159+
* single primary resource
160+
* @param registerPredicate used to determine if the related resource for a custom resource should
161+
* be polled or not.
162+
* @param executorService custom executor service
163+
*/
164+
165+
@Deprecated(forRemoval = true)
166+
public PerResourcePollingEventSource(
167+
ResourceFetcher<R, P> resourceFetcher,
68168
Cache<P> resourceCache, long period,
69169
Predicate<P> registerPredicate, Class<R> resourceClass,
70170
CacheKeyMapper<R> cacheKeyMapper, ScheduledExecutorService executorService) {

operator-framework-core/src/test/java/io/javaoperatorsdk/operator/processing/event/source/polling/PerResourcePollingEventSourceTest.java

Lines changed: 8 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -9,11 +9,9 @@
99
import org.junit.jupiter.api.Test;
1010

1111
import io.javaoperatorsdk.operator.TestUtils;
12+
import io.javaoperatorsdk.operator.api.reconciler.EventSourceContext;
1213
import io.javaoperatorsdk.operator.processing.event.EventHandler;
13-
import io.javaoperatorsdk.operator.processing.event.source.AbstractEventSourceTestBase;
14-
import io.javaoperatorsdk.operator.processing.event.source.Cache;
15-
import io.javaoperatorsdk.operator.processing.event.source.CacheKeyMapper;
16-
import io.javaoperatorsdk.operator.processing.event.source.SampleExternalResource;
14+
import io.javaoperatorsdk.operator.processing.event.source.*;
1715
import io.javaoperatorsdk.operator.sample.simple.TestCustomResource;
1816

1917
import static org.assertj.core.api.Assertions.assertThat;
@@ -35,16 +33,19 @@ class PerResourcePollingEventSourceTest extends
3533
private final PerResourcePollingEventSource.ResourceFetcher<SampleExternalResource, TestCustomResource> supplier =
3634
mock(PerResourcePollingEventSource.ResourceFetcher.class);
3735
@SuppressWarnings("unchecked")
38-
private final Cache<TestCustomResource> resourceCache = mock(Cache.class);
36+
private final IndexerResourceCache<TestCustomResource> resourceCache =
37+
mock(IndexerResourceCache.class);
3938
private final TestCustomResource testCustomResource = TestUtils.testCustomResource();
39+
private final EventSourceContext<TestCustomResource> context = mock(EventSourceContext.class);
4040

4141
@BeforeEach
4242
public void setup() {
4343
when(resourceCache.get(any())).thenReturn(Optional.of(testCustomResource));
4444
when(supplier.fetchResources(any()))
4545
.thenReturn(Set.of(SampleExternalResource.testResource1()));
46+
when(context.getPrimaryCache()).thenReturn(resourceCache);
4647

47-
setUpSource(new PerResourcePollingEventSource<>(supplier, resourceCache, PERIOD,
48+
setUpSource(new PerResourcePollingEventSource<>(supplier, context, Duration.ofMillis(PERIOD),
4849
SampleExternalResource.class, r -> r.getName() + "#" + r.getValue()));
4950
}
5051

@@ -61,7 +62,7 @@ void pollsTheResourceAfterAwareOfIt() {
6162

6263
@Test
6364
void registeringTaskOnAPredicate() {
64-
setUpSource(new PerResourcePollingEventSource<>(supplier, resourceCache, PERIOD,
65+
setUpSource(new PerResourcePollingEventSource<>(supplier, context, Duration.ofMillis(PERIOD),
6566
testCustomResource -> testCustomResource.getMetadata().getGeneration() > 1,
6667
SampleExternalResource.class, CacheKeyMapper.singleResourceCacheKeyMapper()));
6768
source.onResourceCreated(testCustomResource);

operator-framework/src/test/java/io/javaoperatorsdk/operator/sample/externalstate/ExternalStateReconciler.java

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,6 @@
11
package io.javaoperatorsdk.operator.sample.externalstate;
22

3+
import java.time.Duration;
34
import java.util.Collections;
45
import java.util.Map;
56
import java.util.Set;
@@ -122,7 +123,7 @@ public Map<String, EventSource> prepareEventSources(
122123
var id = configMap.getData().get(ID_KEY);
123124
var externalResource = externalService.read(id);
124125
return externalResource.map(Set::of).orElseGet(Collections::emptySet);
125-
}, context.getPrimaryCache(), 300L, ExternalResource.class);
126+
}, context, Duration.ofMillis(300L), ExternalResource.class);
126127

127128
return EventSourceInitializer.nameEventSources(configMapEventSource,
128129
externalResourceEventSource);

operator-framework/src/test/java/io/javaoperatorsdk/operator/sample/perresourceeventsource/PerResourcePollingEventSourceTestReconciler.java

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,6 @@
11
package io.javaoperatorsdk.operator.sample.perresourceeventsource;
22

3+
import java.time.Duration;
34
import java.util.Map;
45
import java.util.Set;
56
import java.util.UUID;
@@ -41,7 +42,7 @@ public Map<String, EventSource> prepareEventSources(
4142
numberOfFetchExecutions.compute(resource.getMetadata().getName(), (s, v) -> v + 1);
4243
return Set.of(UUID.randomUUID().toString());
4344
},
44-
context.getPrimaryCache(), POLL_PERIOD, String.class);
45+
context, Duration.ofMillis(POLL_PERIOD), String.class);
4546
return EventSourceInitializer.nameEventSources(eventSource);
4647
}
4748

0 commit comments

Comments
 (0)