Skip to content

feat: add resource fetcher to CachingInboundEventSource #1428

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 2 commits into from
Sep 5, 2022
Merged
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
@@ -1,17 +1,28 @@
package io.javaoperatorsdk.operator.processing.event.source.inbound;

import java.util.Collections;
import java.util.HashSet;
import java.util.Set;
import java.util.concurrent.ConcurrentHashMap;

import io.fabric8.kubernetes.api.model.HasMetadata;
import io.javaoperatorsdk.operator.processing.event.ResourceID;
import io.javaoperatorsdk.operator.processing.event.source.CacheKeyMapper;
import io.javaoperatorsdk.operator.processing.event.source.ExternalResourceCachingEventSource;
import io.javaoperatorsdk.operator.processing.event.source.ResourceEventAware;

public class CachingInboundEventSource<R, P extends HasMetadata>
extends ExternalResourceCachingEventSource<R, P> {
extends ExternalResourceCachingEventSource<R, P>
implements ResourceEventAware<P> {

public CachingInboundEventSource(Class<R> resourceClass, CacheKeyMapper<R> cacheKeyMapper) {
private final ResourceFetcher<R, P> resourceFetcher;
private final Set<ResourceID> fetchedForPrimaries = ConcurrentHashMap.newKeySet();

public CachingInboundEventSource(
ResourceFetcher<R, P> resourceFetcher, Class<R> resourceClass,
CacheKeyMapper<R> cacheKeyMapper) {
super(resourceClass, cacheKeyMapper);
this.resourceFetcher = resourceFetcher;
}

public void handleResourceEvent(ResourceID primaryID, Set<R> resources) {
Expand All @@ -26,4 +37,43 @@ public void handleResourceDeleteEvent(ResourceID primaryID, String resourceID) {
super.handleDelete(primaryID, Set.of(resourceID));
}

@Override
public void onResourceDeleted(P resource) {
var resourceID = ResourceID.fromResource(resource);
fetchedForPrimaries.remove(resourceID);
}

private Set<R> getAndCacheResource(P primary) {
var primaryID = ResourceID.fromResource(primary);
var values = resourceFetcher.fetchResources(primary);
handleResources(primaryID, values, false);
fetchedForPrimaries.add(primaryID);
return values;
}

/**
* When this event source is queried for the resource, it might not be fully "synced". Thus, the
* cache might not be propagated, therefore the supplier is checked for the resource too.
*
* @param primary resource of the controller
* @return the related resource for this event source
*/
@Override
public Set<R> getSecondaryResources(P primary) {
var primaryID = ResourceID.fromResource(primary);
var cachedValue = cache.get(primaryID);
if (cachedValue != null && !cachedValue.isEmpty()) {
return new HashSet<>(cachedValue.values());
} else {
if (fetchedForPrimaries.contains(primaryID)) {
return Collections.emptySet();
} else {
return getAndCacheResource(primary);
}
}
}

public interface ResourceFetcher<R, P> {
Set<R> fetchResources(P primaryResource);
}
}