Skip to content

Commit f46db1b

Browse files
authored
feat: add resource fetcher to CachingInboundEventSource (#1428)
1 parent f1aed62 commit f46db1b

File tree

2 files changed

+141
-2
lines changed

2 files changed

+141
-2
lines changed

operator-framework-core/src/main/java/io/javaoperatorsdk/operator/processing/event/source/inbound/CachingInboundEventSource.java

Lines changed: 52 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -1,17 +1,28 @@
11
package io.javaoperatorsdk.operator.processing.event.source.inbound;
22

3+
import java.util.Collections;
4+
import java.util.HashSet;
35
import java.util.Set;
6+
import java.util.concurrent.ConcurrentHashMap;
47

58
import io.fabric8.kubernetes.api.model.HasMetadata;
69
import io.javaoperatorsdk.operator.processing.event.ResourceID;
710
import io.javaoperatorsdk.operator.processing.event.source.CacheKeyMapper;
811
import io.javaoperatorsdk.operator.processing.event.source.ExternalResourceCachingEventSource;
12+
import io.javaoperatorsdk.operator.processing.event.source.ResourceEventAware;
913

1014
public class CachingInboundEventSource<R, P extends HasMetadata>
11-
extends ExternalResourceCachingEventSource<R, P> {
15+
extends ExternalResourceCachingEventSource<R, P>
16+
implements ResourceEventAware<P> {
1217

13-
public CachingInboundEventSource(Class<R> resourceClass, CacheKeyMapper<R> cacheKeyMapper) {
18+
private final ResourceFetcher<R, P> resourceFetcher;
19+
private final Set<ResourceID> fetchedForPrimaries = ConcurrentHashMap.newKeySet();
20+
21+
public CachingInboundEventSource(
22+
ResourceFetcher<R, P> resourceFetcher, Class<R> resourceClass,
23+
CacheKeyMapper<R> cacheKeyMapper) {
1424
super(resourceClass, cacheKeyMapper);
25+
this.resourceFetcher = resourceFetcher;
1526
}
1627

1728
public void handleResourceEvent(ResourceID primaryID, Set<R> resources) {
@@ -26,4 +37,43 @@ public void handleResourceDeleteEvent(ResourceID primaryID, String resourceID) {
2637
super.handleDelete(primaryID, Set.of(resourceID));
2738
}
2839

40+
@Override
41+
public void onResourceDeleted(P resource) {
42+
var resourceID = ResourceID.fromResource(resource);
43+
fetchedForPrimaries.remove(resourceID);
44+
}
45+
46+
private Set<R> getAndCacheResource(P primary) {
47+
var primaryID = ResourceID.fromResource(primary);
48+
var values = resourceFetcher.fetchResources(primary);
49+
handleResources(primaryID, values, false);
50+
fetchedForPrimaries.add(primaryID);
51+
return values;
52+
}
53+
54+
/**
55+
* When this event source is queried for the resource, it might not be fully "synced". Thus, the
56+
* cache might not be propagated, therefore the supplier is checked for the resource too.
57+
*
58+
* @param primary resource of the controller
59+
* @return the related resource for this event source
60+
*/
61+
@Override
62+
public Set<R> getSecondaryResources(P primary) {
63+
var primaryID = ResourceID.fromResource(primary);
64+
var cachedValue = cache.get(primaryID);
65+
if (cachedValue != null && !cachedValue.isEmpty()) {
66+
return new HashSet<>(cachedValue.values());
67+
} else {
68+
if (fetchedForPrimaries.contains(primaryID)) {
69+
return Collections.emptySet();
70+
} else {
71+
return getAndCacheResource(primary);
72+
}
73+
}
74+
}
75+
76+
public interface ResourceFetcher<R, P> {
77+
Set<R> fetchResources(P primaryResource);
78+
}
2979
}
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,89 @@
1+
package io.javaoperatorsdk.operator.processing.event.source.inbound;
2+
3+
import java.util.Set;
4+
5+
import org.junit.jupiter.api.BeforeEach;
6+
import org.junit.jupiter.api.Test;
7+
8+
import io.javaoperatorsdk.operator.TestUtils;
9+
import io.javaoperatorsdk.operator.processing.event.EventHandler;
10+
import io.javaoperatorsdk.operator.processing.event.ResourceID;
11+
import io.javaoperatorsdk.operator.processing.event.source.AbstractEventSourceTestBase;
12+
import io.javaoperatorsdk.operator.processing.event.source.CacheKeyMapper;
13+
import io.javaoperatorsdk.operator.processing.event.source.SampleExternalResource;
14+
import io.javaoperatorsdk.operator.sample.simple.TestCustomResource;
15+
16+
import static org.assertj.core.api.Assertions.assertThat;
17+
import static org.mockito.ArgumentMatchers.any;
18+
import static org.mockito.ArgumentMatchers.eq;
19+
import static org.mockito.Mockito.mock;
20+
import static org.mockito.Mockito.never;
21+
import static org.mockito.Mockito.times;
22+
import static org.mockito.Mockito.verify;
23+
import static org.mockito.Mockito.when;
24+
25+
public class CachingInboundEventSourceTest extends
26+
AbstractEventSourceTestBase<CachingInboundEventSource<SampleExternalResource, TestCustomResource>, EventHandler> {
27+
28+
public static final int PERIOD = 150;
29+
private CachingInboundEventSource.ResourceFetcher<SampleExternalResource, TestCustomResource> supplier =
30+
mock(
31+
CachingInboundEventSource.ResourceFetcher.class);
32+
private TestCustomResource testCustomResource = TestUtils.testCustomResource();
33+
private CacheKeyMapper<SampleExternalResource> cacheKeyMapper =
34+
r -> r.getName() + "#" + r.getValue();
35+
36+
@BeforeEach
37+
public void setup() {
38+
when(supplier.fetchResources(any()))
39+
.thenReturn(Set.of(SampleExternalResource.testResource1()));
40+
41+
setUpSource(new CachingInboundEventSource<>(supplier,
42+
SampleExternalResource.class, cacheKeyMapper));
43+
}
44+
45+
@Test
46+
void getSecondaryResourceFromCacheOrSupplier() throws InterruptedException {
47+
when(supplier.fetchResources(any()))
48+
.thenReturn(Set.of(SampleExternalResource.testResource1()));
49+
50+
var value = source.getSecondaryResources(testCustomResource);
51+
52+
verify(supplier, times(1)).fetchResources(eq(testCustomResource));
53+
verify(eventHandler, never()).handleEvent(any());
54+
assertThat(value).hasSize(1);
55+
56+
value = source.getSecondaryResources(testCustomResource);
57+
58+
assertThat(value).hasSize(1);
59+
verify(supplier, times(1)).fetchResources(eq(testCustomResource));
60+
verify(eventHandler, never()).handleEvent(any());
61+
62+
source.handleResourceEvent(ResourceID.fromResource(testCustomResource),
63+
Set.of(SampleExternalResource.testResource1(), SampleExternalResource.testResource2()));
64+
65+
verify(supplier, times(1)).fetchResources(eq(testCustomResource));
66+
value = source.getSecondaryResources(testCustomResource);
67+
assertThat(value).hasSize(2);
68+
}
69+
70+
@Test
71+
void propagateEventOnDeletedResource() throws InterruptedException {
72+
source.handleResourceEvent(ResourceID.fromResource(testCustomResource),
73+
SampleExternalResource.testResource1());
74+
source.handleResourceDeleteEvent(ResourceID.fromResource(testCustomResource),
75+
cacheKeyMapper.keyFor(SampleExternalResource.testResource1()));
76+
source.handleResourceDeleteEvent(ResourceID.fromResource(testCustomResource),
77+
cacheKeyMapper.keyFor(SampleExternalResource.testResource2()));
78+
79+
verify(eventHandler, times(2)).handleEvent(any());
80+
}
81+
82+
@Test
83+
void propagateEventOnUpdateResources() throws InterruptedException {
84+
source.handleResourceEvent(ResourceID.fromResource(testCustomResource),
85+
Set.of(SampleExternalResource.testResource1(), SampleExternalResource.testResource2()));
86+
87+
verify(eventHandler, times(1)).handleEvent(any());
88+
}
89+
}

0 commit comments

Comments
 (0)