Skip to content

feat: add resource fetcher to CachingInboundEventSource #1428

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 2 commits into from
Sep 5, 2022
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
@@ -1,17 +1,28 @@
package io.javaoperatorsdk.operator.processing.event.source.inbound;

import java.util.Collections;
import java.util.HashSet;
import java.util.Set;
import java.util.concurrent.ConcurrentHashMap;

import io.fabric8.kubernetes.api.model.HasMetadata;
import io.javaoperatorsdk.operator.processing.event.ResourceID;
import io.javaoperatorsdk.operator.processing.event.source.CacheKeyMapper;
import io.javaoperatorsdk.operator.processing.event.source.ExternalResourceCachingEventSource;
import io.javaoperatorsdk.operator.processing.event.source.ResourceEventAware;

public class CachingInboundEventSource<R, P extends HasMetadata>
extends ExternalResourceCachingEventSource<R, P> {
extends ExternalResourceCachingEventSource<R, P>
implements ResourceEventAware<P> {

public CachingInboundEventSource(Class<R> resourceClass, CacheKeyMapper<R> cacheKeyMapper) {
private final ResourceFetcher<R, P> resourceFetcher;
private final Set<ResourceID> fetchedForPrimaries = ConcurrentHashMap.newKeySet();

public CachingInboundEventSource(
ResourceFetcher<R, P> resourceFetcher, Class<R> resourceClass,
CacheKeyMapper<R> cacheKeyMapper) {
super(resourceClass, cacheKeyMapper);
this.resourceFetcher = resourceFetcher;
}

public void handleResourceEvent(ResourceID primaryID, Set<R> resources) {
Expand All @@ -26,4 +37,43 @@ public void handleResourceDeleteEvent(ResourceID primaryID, String resourceID) {
super.handleDelete(primaryID, Set.of(resourceID));
}

@Override
public void onResourceDeleted(P resource) {
var resourceID = ResourceID.fromResource(resource);
fetchedForPrimaries.remove(resourceID);
}

private Set<R> getAndCacheResource(P primary) {
var primaryID = ResourceID.fromResource(primary);
var values = resourceFetcher.fetchResources(primary);
handleResources(primaryID, values, false);
fetchedForPrimaries.add(primaryID);
return values;
}

/**
* When this event source is queried for the resource, it might not be fully "synced". Thus, the
* cache might not be propagated, therefore the supplier is checked for the resource too.
*
* @param primary resource of the controller
* @return the related resource for this event source
*/
@Override
public Set<R> getSecondaryResources(P primary) {
var primaryID = ResourceID.fromResource(primary);
var cachedValue = cache.get(primaryID);
if (cachedValue != null && !cachedValue.isEmpty()) {
return new HashSet<>(cachedValue.values());
} else {
if (fetchedForPrimaries.contains(primaryID)) {
return Collections.emptySet();
} else {
return getAndCacheResource(primary);
}
}
}

public interface ResourceFetcher<R, P> {
Set<R> fetchResources(P primaryResource);
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,89 @@
package io.javaoperatorsdk.operator.processing.event.source.inbound;

import java.util.Set;

import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test;

import io.javaoperatorsdk.operator.TestUtils;
import io.javaoperatorsdk.operator.processing.event.EventHandler;
import io.javaoperatorsdk.operator.processing.event.ResourceID;
import io.javaoperatorsdk.operator.processing.event.source.AbstractEventSourceTestBase;
import io.javaoperatorsdk.operator.processing.event.source.CacheKeyMapper;
import io.javaoperatorsdk.operator.processing.event.source.SampleExternalResource;
import io.javaoperatorsdk.operator.sample.simple.TestCustomResource;

import static org.assertj.core.api.Assertions.assertThat;
import static org.mockito.ArgumentMatchers.any;
import static org.mockito.ArgumentMatchers.eq;
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.never;
import static org.mockito.Mockito.times;
import static org.mockito.Mockito.verify;
import static org.mockito.Mockito.when;

public class CachingInboundEventSourceTest extends
AbstractEventSourceTestBase<CachingInboundEventSource<SampleExternalResource, TestCustomResource>, EventHandler> {

public static final int PERIOD = 150;
private CachingInboundEventSource.ResourceFetcher<SampleExternalResource, TestCustomResource> supplier =
mock(
CachingInboundEventSource.ResourceFetcher.class);
private TestCustomResource testCustomResource = TestUtils.testCustomResource();
private CacheKeyMapper<SampleExternalResource> cacheKeyMapper =
r -> r.getName() + "#" + r.getValue();

@BeforeEach
public void setup() {
when(supplier.fetchResources(any()))
.thenReturn(Set.of(SampleExternalResource.testResource1()));

setUpSource(new CachingInboundEventSource<>(supplier,
SampleExternalResource.class, cacheKeyMapper));
}

@Test
void getSecondaryResourceFromCacheOrSupplier() throws InterruptedException {
when(supplier.fetchResources(any()))
.thenReturn(Set.of(SampleExternalResource.testResource1()));

var value = source.getSecondaryResources(testCustomResource);

verify(supplier, times(1)).fetchResources(eq(testCustomResource));
verify(eventHandler, never()).handleEvent(any());
assertThat(value).hasSize(1);

value = source.getSecondaryResources(testCustomResource);

assertThat(value).hasSize(1);
verify(supplier, times(1)).fetchResources(eq(testCustomResource));
verify(eventHandler, never()).handleEvent(any());

source.handleResourceEvent(ResourceID.fromResource(testCustomResource),
Copy link
Contributor Author

@scrocquesel scrocquesel Aug 30, 2022

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@csviri Had to use the Set.of overload to avoid replacing the existing secondary. I guess the correct behavior should be additive.

source.handleResourceEvent(primaryID, a)
source.handleResourceEvent(primaryID, b)
source.handleResourceEvent(primaryID, c)
source.handleResourceDeleteEvent(primaryID, c)
source.getSecondaryResources(testCustomResource) // should return (a, b)
source.handleResourceEvent(primaryID, Set.of(x,y))
source.getSecondaryResources(testCustomResource) // should return (x,y)

Wait for your confirmation and will add javadoc so it's more clear to the user

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

No it should not be. At least that is not what I had on my mind.

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Probably that would be good to document.
We could add overloaded or rather differerently named versions for additiveness, that would be useful for sure. But this is on purpose, now.

Set.of(SampleExternalResource.testResource1(), SampleExternalResource.testResource2()));

verify(supplier, times(1)).fetchResources(eq(testCustomResource));
value = source.getSecondaryResources(testCustomResource);
assertThat(value).hasSize(2);
}

@Test
void propagateEventOnDeletedResource() throws InterruptedException {
source.handleResourceEvent(ResourceID.fromResource(testCustomResource),
SampleExternalResource.testResource1());
source.handleResourceDeleteEvent(ResourceID.fromResource(testCustomResource),
cacheKeyMapper.keyFor(SampleExternalResource.testResource1()));
source.handleResourceDeleteEvent(ResourceID.fromResource(testCustomResource),
cacheKeyMapper.keyFor(SampleExternalResource.testResource2()));

verify(eventHandler, times(2)).handleEvent(any());
}

@Test
void propagateEventOnUpdateResources() throws InterruptedException {
source.handleResourceEvent(ResourceID.fromResource(testCustomResource),
Set.of(SampleExternalResource.testResource1(), SampleExternalResource.testResource2()));

verify(eventHandler, times(1)).handleEvent(any());
}
}