Skip to content

Commit a00f3cc

Browse files
committed
feat: add resource fetcher to CachingInboundEventSource
1 parent 74454ea commit a00f3cc

File tree

1 file changed

+52
-2
lines changed

1 file changed

+52
-2
lines changed

operator-framework-core/src/main/java/io/javaoperatorsdk/operator/processing/event/source/inbound/CachingInboundEventSource.java

Lines changed: 52 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -1,17 +1,28 @@
11
package io.javaoperatorsdk.operator.processing.event.source.inbound;
22

3+
import java.util.Collections;
4+
import java.util.HashSet;
35
import java.util.Set;
6+
import java.util.concurrent.ConcurrentHashMap;
47

58
import io.fabric8.kubernetes.api.model.HasMetadata;
69
import io.javaoperatorsdk.operator.processing.event.ResourceID;
710
import io.javaoperatorsdk.operator.processing.event.source.CacheKeyMapper;
811
import io.javaoperatorsdk.operator.processing.event.source.ExternalResourceCachingEventSource;
12+
import io.javaoperatorsdk.operator.processing.event.source.ResourceEventAware;
913

1014
public class CachingInboundEventSource<R, P extends HasMetadata>
11-
extends ExternalResourceCachingEventSource<R, P> {
15+
extends ExternalResourceCachingEventSource<R, P>
16+
implements ResourceEventAware<P> {
1217

13-
public CachingInboundEventSource(Class<R> resourceClass, CacheKeyMapper<R> cacheKeyMapper) {
18+
private final ResourceFetcher<R, P> resourceFetcher;
19+
private final Set<ResourceID> fetchedForPrimaries = ConcurrentHashMap.newKeySet();
20+
21+
public CachingInboundEventSource(
22+
ResourceFetcher<R, P> resourceFetcher, Class<R> resourceClass,
23+
CacheKeyMapper<R> cacheKeyMapper) {
1424
super(resourceClass, cacheKeyMapper);
25+
this.resourceFetcher = resourceFetcher;
1526
}
1627

1728
public void handleResourceEvent(ResourceID primaryID, Set<R> resources) {
@@ -26,4 +37,43 @@ public void handleResourceDeleteEvent(ResourceID primaryID, String resourceID) {
2637
super.handleDelete(primaryID, Set.of(resourceID));
2738
}
2839

40+
@Override
41+
public void onResourceDeleted(P resource) {
42+
var resourceID = ResourceID.fromResource(resource);
43+
fetchedForPrimaries.remove(resourceID);
44+
}
45+
46+
private Set<R> getAndCacheResource(P primary) {
47+
var primaryID = ResourceID.fromResource(primary);
48+
var values = resourceFetcher.fetchResources(primary);
49+
handleResources(primaryID, values, false);
50+
fetchedForPrimaries.add(primaryID);
51+
return values;
52+
}
53+
54+
/**
55+
* When this event source is queried for the resource, it might not be fully "synced". Thus, the
56+
* cache might not be propagated, therefore the supplier is checked for the resource too.
57+
*
58+
* @param primary resource of the controller
59+
* @return the related resource for this event source
60+
*/
61+
@Override
62+
public Set<R> getSecondaryResources(P primary) {
63+
var primaryID = ResourceID.fromResource(primary);
64+
var cachedValue = cache.get(primaryID);
65+
if (cachedValue != null && !cachedValue.isEmpty()) {
66+
return new HashSet<>(cachedValue.values());
67+
} else {
68+
if (fetchedForPrimaries.contains(primaryID)) {
69+
return Collections.emptySet();
70+
} else {
71+
return getAndCacheResource(primary);
72+
}
73+
}
74+
}
75+
76+
public interface ResourceFetcher<R, P> {
77+
Set<R> fetchResources(P primaryResource);
78+
}
2979
}

0 commit comments

Comments
 (0)