diff --git a/operator-framework-core/src/main/java/io/javaoperatorsdk/operator/processing/event/source/inbound/CachingInboundEventSource.java b/operator-framework-core/src/main/java/io/javaoperatorsdk/operator/processing/event/source/inbound/CachingInboundEventSource.java index 6d421a6460..450e308904 100644 --- a/operator-framework-core/src/main/java/io/javaoperatorsdk/operator/processing/event/source/inbound/CachingInboundEventSource.java +++ b/operator-framework-core/src/main/java/io/javaoperatorsdk/operator/processing/event/source/inbound/CachingInboundEventSource.java @@ -1,17 +1,28 @@ package io.javaoperatorsdk.operator.processing.event.source.inbound; +import java.util.Collections; +import java.util.HashSet; import java.util.Set; +import java.util.concurrent.ConcurrentHashMap; import io.fabric8.kubernetes.api.model.HasMetadata; import io.javaoperatorsdk.operator.processing.event.ResourceID; import io.javaoperatorsdk.operator.processing.event.source.CacheKeyMapper; import io.javaoperatorsdk.operator.processing.event.source.ExternalResourceCachingEventSource; +import io.javaoperatorsdk.operator.processing.event.source.ResourceEventAware; public class CachingInboundEventSource - extends ExternalResourceCachingEventSource { + extends ExternalResourceCachingEventSource + implements ResourceEventAware

{ - public CachingInboundEventSource(Class resourceClass, CacheKeyMapper cacheKeyMapper) { + private final ResourceFetcher resourceFetcher; + private final Set fetchedForPrimaries = ConcurrentHashMap.newKeySet(); + + public CachingInboundEventSource( + ResourceFetcher resourceFetcher, Class resourceClass, + CacheKeyMapper cacheKeyMapper) { super(resourceClass, cacheKeyMapper); + this.resourceFetcher = resourceFetcher; } public void handleResourceEvent(ResourceID primaryID, Set resources) { @@ -26,4 +37,43 @@ public void handleResourceDeleteEvent(ResourceID primaryID, String resourceID) { super.handleDelete(primaryID, Set.of(resourceID)); } + @Override + public void onResourceDeleted(P resource) { + var resourceID = ResourceID.fromResource(resource); + fetchedForPrimaries.remove(resourceID); + } + + private Set getAndCacheResource(P primary) { + var primaryID = ResourceID.fromResource(primary); + var values = resourceFetcher.fetchResources(primary); + handleResources(primaryID, values, false); + fetchedForPrimaries.add(primaryID); + return values; + } + + /** + * When this event source is queried for the resource, it might not be fully "synced". Thus, the + * cache might not be propagated, therefore the supplier is checked for the resource too. + * + * @param primary resource of the controller + * @return the related resource for this event source + */ + @Override + public Set getSecondaryResources(P primary) { + var primaryID = ResourceID.fromResource(primary); + var cachedValue = cache.get(primaryID); + if (cachedValue != null && !cachedValue.isEmpty()) { + return new HashSet<>(cachedValue.values()); + } else { + if (fetchedForPrimaries.contains(primaryID)) { + return Collections.emptySet(); + } else { + return getAndCacheResource(primary); + } + } + } + + public interface ResourceFetcher { + Set fetchResources(P primaryResource); + } } diff --git a/operator-framework-core/src/test/java/io/javaoperatorsdk/operator/processing/event/source/inbound/CachingInboundEventSourceTest.java b/operator-framework-core/src/test/java/io/javaoperatorsdk/operator/processing/event/source/inbound/CachingInboundEventSourceTest.java new file mode 100644 index 0000000000..d3b1bcff08 --- /dev/null +++ b/operator-framework-core/src/test/java/io/javaoperatorsdk/operator/processing/event/source/inbound/CachingInboundEventSourceTest.java @@ -0,0 +1,89 @@ +package io.javaoperatorsdk.operator.processing.event.source.inbound; + +import java.util.Set; + +import org.junit.jupiter.api.BeforeEach; +import org.junit.jupiter.api.Test; + +import io.javaoperatorsdk.operator.TestUtils; +import io.javaoperatorsdk.operator.processing.event.EventHandler; +import io.javaoperatorsdk.operator.processing.event.ResourceID; +import io.javaoperatorsdk.operator.processing.event.source.AbstractEventSourceTestBase; +import io.javaoperatorsdk.operator.processing.event.source.CacheKeyMapper; +import io.javaoperatorsdk.operator.processing.event.source.SampleExternalResource; +import io.javaoperatorsdk.operator.sample.simple.TestCustomResource; + +import static org.assertj.core.api.Assertions.assertThat; +import static org.mockito.ArgumentMatchers.any; +import static org.mockito.ArgumentMatchers.eq; +import static org.mockito.Mockito.mock; +import static org.mockito.Mockito.never; +import static org.mockito.Mockito.times; +import static org.mockito.Mockito.verify; +import static org.mockito.Mockito.when; + +public class CachingInboundEventSourceTest extends + AbstractEventSourceTestBase, EventHandler> { + + public static final int PERIOD = 150; + private CachingInboundEventSource.ResourceFetcher supplier = + mock( + CachingInboundEventSource.ResourceFetcher.class); + private TestCustomResource testCustomResource = TestUtils.testCustomResource(); + private CacheKeyMapper cacheKeyMapper = + r -> r.getName() + "#" + r.getValue(); + + @BeforeEach + public void setup() { + when(supplier.fetchResources(any())) + .thenReturn(Set.of(SampleExternalResource.testResource1())); + + setUpSource(new CachingInboundEventSource<>(supplier, + SampleExternalResource.class, cacheKeyMapper)); + } + + @Test + void getSecondaryResourceFromCacheOrSupplier() throws InterruptedException { + when(supplier.fetchResources(any())) + .thenReturn(Set.of(SampleExternalResource.testResource1())); + + var value = source.getSecondaryResources(testCustomResource); + + verify(supplier, times(1)).fetchResources(eq(testCustomResource)); + verify(eventHandler, never()).handleEvent(any()); + assertThat(value).hasSize(1); + + value = source.getSecondaryResources(testCustomResource); + + assertThat(value).hasSize(1); + verify(supplier, times(1)).fetchResources(eq(testCustomResource)); + verify(eventHandler, never()).handleEvent(any()); + + source.handleResourceEvent(ResourceID.fromResource(testCustomResource), + Set.of(SampleExternalResource.testResource1(), SampleExternalResource.testResource2())); + + verify(supplier, times(1)).fetchResources(eq(testCustomResource)); + value = source.getSecondaryResources(testCustomResource); + assertThat(value).hasSize(2); + } + + @Test + void propagateEventOnDeletedResource() throws InterruptedException { + source.handleResourceEvent(ResourceID.fromResource(testCustomResource), + SampleExternalResource.testResource1()); + source.handleResourceDeleteEvent(ResourceID.fromResource(testCustomResource), + cacheKeyMapper.keyFor(SampleExternalResource.testResource1())); + source.handleResourceDeleteEvent(ResourceID.fromResource(testCustomResource), + cacheKeyMapper.keyFor(SampleExternalResource.testResource2())); + + verify(eventHandler, times(2)).handleEvent(any()); + } + + @Test + void propagateEventOnUpdateResources() throws InterruptedException { + source.handleResourceEvent(ResourceID.fromResource(testCustomResource), + Set.of(SampleExternalResource.testResource1(), SampleExternalResource.testResource2())); + + verify(eventHandler, times(1)).handleEvent(any()); + } +}