diff --git a/operator-framework-core/src/main/java/io/javaoperatorsdk/operator/api/reconciler/DefaultContext.java b/operator-framework-core/src/main/java/io/javaoperatorsdk/operator/api/reconciler/DefaultContext.java index 00a94390c6..0b6d90065b 100644 --- a/operator-framework-core/src/main/java/io/javaoperatorsdk/operator/api/reconciler/DefaultContext.java +++ b/operator-framework-core/src/main/java/io/javaoperatorsdk/operator/api/reconciler/DefaultContext.java @@ -1,7 +1,5 @@ package io.javaoperatorsdk.operator.api.reconciler; -import java.util.Collections; -import java.util.List; import java.util.Optional; import java.util.Set; import java.util.stream.Collectors; @@ -10,7 +8,6 @@ import io.javaoperatorsdk.operator.api.config.ControllerConfiguration; import io.javaoperatorsdk.operator.api.reconciler.dependent.managed.ManagedDependentResourceContext; import io.javaoperatorsdk.operator.processing.Controller; -import io.javaoperatorsdk.operator.processing.MultiResourceOwner; public class DefaultContext

implements Context

{ @@ -37,17 +34,8 @@ public Optional getRetryInfo() { @SuppressWarnings("unchecked") public Set getSecondaryResources(Class expectedType) { return controller.getEventSourceManager().getEventSourcesFor(expectedType).stream() - .map( - es -> { - if (es instanceof MultiResourceOwner) { - return ((MultiResourceOwner) es).getSecondaryResources(primaryResource); - } else { - return es.getSecondaryResource(primaryResource) - .map(List::of) - .orElse(Collections.emptyList()); - } - }) - .flatMap(List::stream) + .map(es -> es.getSecondaryResources(primaryResource)) + .flatMap(Set::stream) .collect(Collectors.toSet()); } diff --git a/operator-framework-core/src/main/java/io/javaoperatorsdk/operator/api/reconciler/dependent/RecentOperationCacheFiller.java b/operator-framework-core/src/main/java/io/javaoperatorsdk/operator/api/reconciler/dependent/RecentOperationCacheFiller.java index 7e27537d49..cb84783a4f 100644 --- a/operator-framework-core/src/main/java/io/javaoperatorsdk/operator/api/reconciler/dependent/RecentOperationCacheFiller.java +++ b/operator-framework-core/src/main/java/io/javaoperatorsdk/operator/api/reconciler/dependent/RecentOperationCacheFiller.java @@ -6,5 +6,5 @@ public interface RecentOperationCacheFiller { void handleRecentResourceCreate(ResourceID resourceID, R resource); - void handleRecentResourceUpdate(ResourceID resourceID, R resource, R previousResourceVersion); + void handleRecentResourceUpdate(ResourceID resourceID, R resource, R previousVersionOfResource); } diff --git a/operator-framework-core/src/main/java/io/javaoperatorsdk/operator/processing/MultiResourceOwner.java b/operator-framework-core/src/main/java/io/javaoperatorsdk/operator/processing/MultiResourceOwner.java deleted file mode 100644 index d3cc6b3770..0000000000 --- a/operator-framework-core/src/main/java/io/javaoperatorsdk/operator/processing/MultiResourceOwner.java +++ /dev/null @@ -1,23 +0,0 @@ -package io.javaoperatorsdk.operator.processing; - -import java.util.List; -import java.util.Optional; - -import io.fabric8.kubernetes.api.model.HasMetadata; - -public interface MultiResourceOwner extends ResourceOwner { - - default Optional getSecondaryResource(P primary) { - var list = getSecondaryResources(primary); - if (list.isEmpty()) { - return Optional.empty(); - } else if (list.size() == 1) { - return Optional.of(list.get(0)); - } else { - throw new IllegalStateException("More than 1 secondary resource related to primary"); - } - - } - - List getSecondaryResources(P primary); -} diff --git a/operator-framework-core/src/main/java/io/javaoperatorsdk/operator/processing/dependent/external/AbstractCachingDependentResource.java b/operator-framework-core/src/main/java/io/javaoperatorsdk/operator/processing/dependent/external/AbstractCachingDependentResource.java index 036a507de9..4f85cb9d1d 100644 --- a/operator-framework-core/src/main/java/io/javaoperatorsdk/operator/processing/dependent/external/AbstractCachingDependentResource.java +++ b/operator-framework-core/src/main/java/io/javaoperatorsdk/operator/processing/dependent/external/AbstractCachingDependentResource.java @@ -4,7 +4,7 @@ import io.fabric8.kubernetes.api.model.HasMetadata; import io.javaoperatorsdk.operator.processing.dependent.AbstractEventSourceHolderDependentResource; -import io.javaoperatorsdk.operator.processing.event.ExternalResourceCachingEventSource; +import io.javaoperatorsdk.operator.processing.event.source.ExternalResourceCachingEventSource; public abstract class AbstractCachingDependentResource extends @@ -15,8 +15,6 @@ protected AbstractCachingDependentResource(Class resourceType) { this.resourceType = resourceType; } - public abstract Optional fetchResource(P primaryResource); - @Override public Class resourceType() { return resourceType; diff --git a/operator-framework-core/src/main/java/io/javaoperatorsdk/operator/processing/dependent/external/AbstractPollingDependentResource.java b/operator-framework-core/src/main/java/io/javaoperatorsdk/operator/processing/dependent/external/AbstractPollingDependentResource.java index 8c91dea15d..f3812a095f 100644 --- a/operator-framework-core/src/main/java/io/javaoperatorsdk/operator/processing/dependent/external/AbstractPollingDependentResource.java +++ b/operator-framework-core/src/main/java/io/javaoperatorsdk/operator/processing/dependent/external/AbstractPollingDependentResource.java @@ -1,9 +1,10 @@ package io.javaoperatorsdk.operator.processing.dependent.external; import io.fabric8.kubernetes.api.model.HasMetadata; +import io.javaoperatorsdk.operator.processing.event.source.CacheKeyMapper; public abstract class AbstractPollingDependentResource - extends AbstractCachingDependentResource { + extends AbstractCachingDependentResource implements CacheKeyMapper { public static final int DEFAULT_POLLING_PERIOD = 5000; private long pollingPeriod; @@ -24,4 +25,10 @@ public void setPollingPeriod(long pollingPeriod) { public long getPollingPeriod() { return pollingPeriod; } + + // for now dependent resources support event sources only with one owned resource. + @Override + public String keyFor(R resource) { + return CacheKeyMapper.singleResourceCacheKeyMapper().keyFor(resource); + } } diff --git a/operator-framework-core/src/main/java/io/javaoperatorsdk/operator/processing/dependent/external/PerResourcePollingDependentResource.java b/operator-framework-core/src/main/java/io/javaoperatorsdk/operator/processing/dependent/external/PerResourcePollingDependentResource.java index b598073a21..1afa566a3a 100644 --- a/operator-framework-core/src/main/java/io/javaoperatorsdk/operator/processing/dependent/external/PerResourcePollingDependentResource.java +++ b/operator-framework-core/src/main/java/io/javaoperatorsdk/operator/processing/dependent/external/PerResourcePollingDependentResource.java @@ -2,12 +2,14 @@ import io.fabric8.kubernetes.api.model.HasMetadata; import io.javaoperatorsdk.operator.api.reconciler.EventSourceContext; -import io.javaoperatorsdk.operator.processing.event.ExternalResourceCachingEventSource; +import io.javaoperatorsdk.operator.processing.event.source.ExternalResourceCachingEventSource; import io.javaoperatorsdk.operator.processing.event.source.polling.PerResourcePollingEventSource; public abstract class PerResourcePollingDependentResource extends AbstractPollingDependentResource implements PerResourcePollingEventSource.ResourceFetcher { + + public PerResourcePollingDependentResource(Class resourceType) { super(resourceType); } @@ -20,6 +22,7 @@ public PerResourcePollingDependentResource(Class resourceType, long pollingPe protected ExternalResourceCachingEventSource createEventSource( EventSourceContext

context) { return new PerResourcePollingEventSource<>(this, context.getPrimaryCache(), - getPollingPeriod(), resourceType()); + getPollingPeriod(), resourceType(), this); } + } diff --git a/operator-framework-core/src/main/java/io/javaoperatorsdk/operator/processing/dependent/external/PollingDependentResource.java b/operator-framework-core/src/main/java/io/javaoperatorsdk/operator/processing/dependent/external/PollingDependentResource.java index 1cc6e22afe..9fb293b1f5 100644 --- a/operator-framework-core/src/main/java/io/javaoperatorsdk/operator/processing/dependent/external/PollingDependentResource.java +++ b/operator-framework-core/src/main/java/io/javaoperatorsdk/operator/processing/dependent/external/PollingDependentResource.java @@ -1,28 +1,32 @@ package io.javaoperatorsdk.operator.processing.dependent.external; -import java.util.Map; -import java.util.function.Supplier; - import io.fabric8.kubernetes.api.model.HasMetadata; import io.javaoperatorsdk.operator.api.reconciler.EventSourceContext; -import io.javaoperatorsdk.operator.processing.event.ExternalResourceCachingEventSource; -import io.javaoperatorsdk.operator.processing.event.ResourceID; +import io.javaoperatorsdk.operator.processing.event.source.CacheKeyMapper; +import io.javaoperatorsdk.operator.processing.event.source.ExternalResourceCachingEventSource; import io.javaoperatorsdk.operator.processing.event.source.polling.PollingEventSource; public abstract class PollingDependentResource - extends AbstractPollingDependentResource implements Supplier> { + extends AbstractPollingDependentResource + implements PollingEventSource.GenericResourceFetcher { + + private final CacheKeyMapper cacheKeyMapper; - public PollingDependentResource(Class resourceType) { + public PollingDependentResource(Class resourceType, CacheKeyMapper cacheKeyMapper) { super(resourceType); + this.cacheKeyMapper = cacheKeyMapper; } - public PollingDependentResource(Class resourceType, long pollingPeriod) { + public PollingDependentResource(Class resourceType, long pollingPeriod, + CacheKeyMapper cacheKeyMapper) { super(resourceType, pollingPeriod); + this.cacheKeyMapper = cacheKeyMapper; } @Override protected ExternalResourceCachingEventSource createEventSource( EventSourceContext

context) { - return new PollingEventSource<>(this, getPollingPeriod(), resourceType()); + return new PollingEventSource<>(this, getPollingPeriod(), resourceType(), cacheKeyMapper); } + } diff --git a/operator-framework-core/src/main/java/io/javaoperatorsdk/operator/processing/event/ExternalResourceCachingEventSource.java b/operator-framework-core/src/main/java/io/javaoperatorsdk/operator/processing/event/ExternalResourceCachingEventSource.java deleted file mode 100644 index b4bb0f7ef7..0000000000 --- a/operator-framework-core/src/main/java/io/javaoperatorsdk/operator/processing/event/ExternalResourceCachingEventSource.java +++ /dev/null @@ -1,60 +0,0 @@ -package io.javaoperatorsdk.operator.processing.event; - -import java.util.Optional; - -import io.fabric8.kubernetes.api.model.HasMetadata; -import io.javaoperatorsdk.operator.api.reconciler.dependent.RecentOperationCacheFiller; -import io.javaoperatorsdk.operator.processing.event.source.CachingEventSource; - -public class ExternalResourceCachingEventSource - extends CachingEventSource implements RecentOperationCacheFiller { - - public ExternalResourceCachingEventSource(Class resourceClass) { - super(resourceClass); - } - - public synchronized void handleDelete(ResourceID relatedResourceID) { - if (!isRunning()) { - return; - } - var cachedValue = cache.get(relatedResourceID); - cache.remove(relatedResourceID); - // we only propagate event if the resource was previously in cache - if (cachedValue.isPresent()) { - getEventHandler().handleEvent(new Event(relatedResourceID)); - } - } - - public synchronized void handleEvent(R value, ResourceID relatedResourceID) { - if (!isRunning()) { - return; - } - var cachedValue = cache.get(relatedResourceID); - if (cachedValue.map(v -> !v.equals(value)).orElse(true)) { - cache.put(relatedResourceID, value); - getEventHandler().handleEvent(new Event(relatedResourceID)); - } - } - - @Override - public synchronized void handleRecentResourceCreate(ResourceID resourceID, R resource) { - if (cache.get(resourceID).isEmpty()) { - cache.put(resourceID, resource); - } - } - - @Override - public synchronized void handleRecentResourceUpdate(ResourceID resourceID, R resource, - R previousResourceVersion) { - cache.get(resourceID).ifPresent(r -> { - if (r.equals(previousResourceVersion)) { - cache.put(resourceID, resource); - } - }); - } - - @Override - public Optional getSecondaryResource(P primary) { - return cache.get(ResourceID.fromResource(primary)); - } -} diff --git a/operator-framework-core/src/main/java/io/javaoperatorsdk/operator/processing/event/source/AbstractEventSource.java b/operator-framework-core/src/main/java/io/javaoperatorsdk/operator/processing/event/source/AbstractEventSource.java index 6767e974e2..a6666ba81d 100644 --- a/operator-framework-core/src/main/java/io/javaoperatorsdk/operator/processing/event/source/AbstractEventSource.java +++ b/operator-framework-core/src/main/java/io/javaoperatorsdk/operator/processing/event/source/AbstractEventSource.java @@ -4,7 +4,6 @@ import io.javaoperatorsdk.operator.processing.event.EventHandler; public abstract class AbstractEventSource implements EventSource { - private EventHandler handler; private volatile boolean running = false; diff --git a/operator-framework-core/src/main/java/io/javaoperatorsdk/operator/processing/event/source/AbstractResourceEventSource.java b/operator-framework-core/src/main/java/io/javaoperatorsdk/operator/processing/event/source/AbstractResourceEventSource.java index b22899f246..051a75ff20 100644 --- a/operator-framework-core/src/main/java/io/javaoperatorsdk/operator/processing/event/source/AbstractResourceEventSource.java +++ b/operator-framework-core/src/main/java/io/javaoperatorsdk/operator/processing/event/source/AbstractResourceEventSource.java @@ -2,7 +2,7 @@ import io.fabric8.kubernetes.api.model.HasMetadata; -public abstract class AbstractResourceEventSource

+public abstract class AbstractResourceEventSource extends AbstractEventSource implements ResourceEventSource { private final Class resourceClass; diff --git a/operator-framework-core/src/main/java/io/javaoperatorsdk/operator/processing/event/source/CacheKeyMapper.java b/operator-framework-core/src/main/java/io/javaoperatorsdk/operator/processing/event/source/CacheKeyMapper.java new file mode 100644 index 0000000000..d290e15496 --- /dev/null +++ b/operator-framework-core/src/main/java/io/javaoperatorsdk/operator/processing/event/source/CacheKeyMapper.java @@ -0,0 +1,18 @@ +package io.javaoperatorsdk.operator.processing.event.source; + +public interface CacheKeyMapper { + + String keyFor(R resource); + + /** + * Used if a polling event source handles only single secondary resource. See also docs for: + * {@link ExternalResourceCachingEventSource} + * + * @return static id mapper, all resources are mapped for same id. + * @param secondary resource type + */ + static CacheKeyMapper singleResourceCacheKeyMapper() { + return r -> "id"; + } + +} diff --git a/operator-framework-core/src/main/java/io/javaoperatorsdk/operator/processing/event/source/CachingEventSource.java b/operator-framework-core/src/main/java/io/javaoperatorsdk/operator/processing/event/source/CachingEventSource.java index 8453651b6b..55bd1ab920 100644 --- a/operator-framework-core/src/main/java/io/javaoperatorsdk/operator/processing/event/source/CachingEventSource.java +++ b/operator-framework-core/src/main/java/io/javaoperatorsdk/operator/processing/event/source/CachingEventSource.java @@ -14,7 +14,7 @@ * @param represents the type of resources (usually external non-kubernetes ones) being handled. */ public abstract class CachingEventSource - extends AbstractResourceEventSource implements Cache { + extends AbstractResourceEventSource implements Cache { protected UpdatableCache cache; @@ -43,12 +43,9 @@ public Stream list(Predicate predicate) { return cache.list(predicate); } - protected UpdatableCache initCache() { - return new ConcurrentHashMapCache<>(); - } - public Optional getCachedValue(ResourceID resourceID) { return cache.get(resourceID); } + protected abstract UpdatableCache initCache(); } diff --git a/operator-framework-core/src/main/java/io/javaoperatorsdk/operator/processing/event/source/ExternalResourceCachingEventSource.java b/operator-framework-core/src/main/java/io/javaoperatorsdk/operator/processing/event/source/ExternalResourceCachingEventSource.java new file mode 100644 index 0000000000..f8a0cafcd8 --- /dev/null +++ b/operator-framework-core/src/main/java/io/javaoperatorsdk/operator/processing/event/source/ExternalResourceCachingEventSource.java @@ -0,0 +1,166 @@ +package io.javaoperatorsdk.operator.processing.event.source; + +import java.util.*; +import java.util.concurrent.ConcurrentHashMap; +import java.util.stream.Collectors; + +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import io.fabric8.kubernetes.api.model.HasMetadata; +import io.javaoperatorsdk.operator.api.reconciler.dependent.RecentOperationCacheFiller; +import io.javaoperatorsdk.operator.processing.event.Event; +import io.javaoperatorsdk.operator.processing.event.ResourceID; + +/** + * Handles caching and related operation of external event sources. It can handle multiple secondary + * resources for a single primary resources. + *

+ * There are two related concepts to understand: + *

+ * + * When a resource is added for a primary resource its key is used to put in a map. Equals is used + * to compare if it's still the same resource, or an updated version of it. Event is emitted only if + * a new resource(s) is received or actually updated or deleted. Delete is detected by a missing + * key. + * + * @param type of polled external secondary resource + * @param

primary resource + */ +public abstract class ExternalResourceCachingEventSource + extends AbstractResourceEventSource implements RecentOperationCacheFiller { + + private static Logger log = LoggerFactory.getLogger(ExternalResourceCachingEventSource.class); + + protected final CacheKeyMapper cacheKeyMapper; + + protected Map> cache = new ConcurrentHashMap<>(); + + protected ExternalResourceCachingEventSource(Class resourceClass, + CacheKeyMapper cacheKeyMapper) { + super(resourceClass); + this.cacheKeyMapper = cacheKeyMapper; + } + + protected synchronized void handleDelete(ResourceID primaryID) { + var res = cache.remove(primaryID); + if (res != null) { + getEventHandler().handleEvent(new Event(primaryID)); + } + } + + protected synchronized void handleDeletes(ResourceID primaryID, Set resource) { + handleDelete(primaryID, + resource.stream().map(cacheKeyMapper::keyFor).collect(Collectors.toSet())); + } + + protected synchronized void handleDelete(ResourceID primaryID, R resource) { + handleDelete(primaryID, Set.of(cacheKeyMapper.keyFor(resource))); + } + + protected synchronized void handleDelete(ResourceID primaryID, Set resourceID) { + if (!isRunning()) { + return; + } + var cachedValues = cache.get(primaryID); + var sizeBeforeRemove = cachedValues.size(); + resourceID.forEach(cachedValues::remove); + + if (cachedValues.isEmpty()) { + cache.remove(primaryID); + } + if (sizeBeforeRemove > cachedValues.size()) { + getEventHandler().handleEvent(new Event(primaryID)); + } + } + + protected synchronized void handleResources(ResourceID primaryID, R actualResource) { + handleResources(primaryID, Set.of(actualResource), true); + } + + protected synchronized void handleResources(ResourceID primaryID, Set newResources) { + handleResources(primaryID, newResources, true); + } + + protected synchronized void handleResources(Map> allNewResources) { + var toDelete = cache.keySet().stream().filter(k -> !allNewResources.containsKey(k)) + .collect(Collectors.toList()); + toDelete.forEach(this::handleDelete); + allNewResources.forEach((primaryID, resources) -> handleResources(primaryID, resources)); + } + + protected synchronized void handleResources(ResourceID primaryID, Set newResources, + boolean propagateEvent) { + log.debug("Handling resources update for: {} numberOfResources: {} ", primaryID, + newResources.size()); + if (!isRunning()) { + return; + } + var cachedResources = cache.get(primaryID); + var newResourcesMap = + newResources.stream().collect(Collectors.toMap(cacheKeyMapper::keyFor, r -> r)); + cache.put(primaryID, newResourcesMap); + if (propagateEvent && !newResourcesMap.equals(cachedResources)) { + getEventHandler().handleEvent(new Event(primaryID)); + } + } + + @Override + public synchronized void handleRecentResourceCreate(ResourceID primaryID, R resource) { + var actualValues = cache.get(primaryID); + var resourceId = cacheKeyMapper.keyFor(resource); + if (actualValues == null) { + actualValues = new HashMap<>(); + cache.put(primaryID, actualValues); + actualValues.put(resourceId, resource); + } else { + actualValues.computeIfAbsent(resourceId, r -> resource); + } + } + + @Override + public synchronized void handleRecentResourceUpdate( + ResourceID primaryID, R resource, R previousVersionOfResource) { + var actualValues = cache.get(primaryID); + if (actualValues != null) { + var resourceId = cacheKeyMapper.keyFor(resource); + R actualResource = actualValues.get(resourceId); + if (actualResource.equals(previousVersionOfResource)) { + actualValues.put(resourceId, resource); + } + } + } + + @Override + public Set getSecondaryResources(P primary) { + return getSecondaryResources(ResourceID.fromResource(primary)); + } + + public Set getSecondaryResources(ResourceID primaryID) { + var cachedValues = cache.get(primaryID); + if (cachedValues == null) { + return Collections.emptySet(); + } else { + return new HashSet<>(cache.get(primaryID).values()); + } + } + + public Optional getSecondaryResource(ResourceID primaryID) { + var resources = getSecondaryResources(primaryID); + if (resources.isEmpty()) { + return Optional.empty(); + } else if (resources.size() == 1) { + return Optional.of(resources.iterator().next()); + } else { + throw new IllegalStateException("More than 1 secondary resource related to primary"); + } + } + + public Map> getCache() { + return Collections.unmodifiableMap(cache); + } +} diff --git a/operator-framework-core/src/main/java/io/javaoperatorsdk/operator/processing/event/source/ResourceEventSource.java b/operator-framework-core/src/main/java/io/javaoperatorsdk/operator/processing/event/source/ResourceEventSource.java index d65208f746..d57a662d82 100644 --- a/operator-framework-core/src/main/java/io/javaoperatorsdk/operator/processing/event/source/ResourceEventSource.java +++ b/operator-framework-core/src/main/java/io/javaoperatorsdk/operator/processing/event/source/ResourceEventSource.java @@ -1,9 +1,25 @@ package io.javaoperatorsdk.operator.processing.event.source; +import java.util.Optional; +import java.util.Set; + import io.fabric8.kubernetes.api.model.HasMetadata; import io.javaoperatorsdk.operator.processing.ResourceOwner; public interface ResourceEventSource extends EventSource, ResourceOwner { + default Optional getSecondaryResource(P primary) { + var resources = getSecondaryResources(primary); + if (resources.isEmpty()) { + return Optional.empty(); + } else if (resources.size() == 1) { + return Optional.of(resources.iterator().next()); + } else { + throw new IllegalStateException("More than 1 secondary resource related to primary"); + } + + } + + Set getSecondaryResources(P primary); } diff --git a/operator-framework-core/src/main/java/io/javaoperatorsdk/operator/processing/event/source/controller/ControllerResourceEventSource.java b/operator-framework-core/src/main/java/io/javaoperatorsdk/operator/processing/event/source/controller/ControllerResourceEventSource.java index c3ffc326bf..0c0f0c6b8e 100644 --- a/operator-framework-core/src/main/java/io/javaoperatorsdk/operator/processing/event/source/controller/ControllerResourceEventSource.java +++ b/operator-framework-core/src/main/java/io/javaoperatorsdk/operator/processing/event/source/controller/ControllerResourceEventSource.java @@ -1,6 +1,7 @@ package io.javaoperatorsdk.operator.processing.event.source.controller; import java.util.Optional; +import java.util.Set; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -94,4 +95,9 @@ public void onDelete(T resource, boolean b) { public Optional getSecondaryResource(T primary) { throw new IllegalStateException("This method should not be called here. Primary: " + primary); } + + @Override + public Set getSecondaryResources(T primary) { + throw new IllegalStateException("This method should not be called here. Primary: " + primary); + } } diff --git a/operator-framework-core/src/main/java/io/javaoperatorsdk/operator/processing/event/source/inbound/CachingInboundEventSource.java b/operator-framework-core/src/main/java/io/javaoperatorsdk/operator/processing/event/source/inbound/CachingInboundEventSource.java index 34f37a048b..6d421a6460 100644 --- a/operator-framework-core/src/main/java/io/javaoperatorsdk/operator/processing/event/source/inbound/CachingInboundEventSource.java +++ b/operator-framework-core/src/main/java/io/javaoperatorsdk/operator/processing/event/source/inbound/CachingInboundEventSource.java @@ -1,21 +1,29 @@ package io.javaoperatorsdk.operator.processing.event.source.inbound; +import java.util.Set; + import io.fabric8.kubernetes.api.model.HasMetadata; -import io.javaoperatorsdk.operator.processing.event.ExternalResourceCachingEventSource; import io.javaoperatorsdk.operator.processing.event.ResourceID; +import io.javaoperatorsdk.operator.processing.event.source.CacheKeyMapper; +import io.javaoperatorsdk.operator.processing.event.source.ExternalResourceCachingEventSource; + +public class CachingInboundEventSource + extends ExternalResourceCachingEventSource { -public class CachingInboundEventSource - extends ExternalResourceCachingEventSource { + public CachingInboundEventSource(Class resourceClass, CacheKeyMapper cacheKeyMapper) { + super(resourceClass, cacheKeyMapper); + } - public CachingInboundEventSource(Class resourceClass) { - super(resourceClass); + public void handleResourceEvent(ResourceID primaryID, Set resources) { + super.handleResources(primaryID, resources); } - public void handleResourceEvent(T resource, ResourceID relatedResourceID) { - super.handleEvent(resource, relatedResourceID); + public void handleResourceEvent(ResourceID primaryID, R resource) { + super.handleResources(primaryID, resource); } - public void handleResourceDeleteEvent(ResourceID resourceID) { - super.handleDelete(resourceID); + public void handleResourceDeleteEvent(ResourceID primaryID, String resourceID) { + super.handleDelete(primaryID, Set.of(resourceID)); } + } diff --git a/operator-framework-core/src/main/java/io/javaoperatorsdk/operator/processing/event/source/informer/InformerEventSource.java b/operator-framework-core/src/main/java/io/javaoperatorsdk/operator/processing/event/source/informer/InformerEventSource.java index 08facecbb7..5d1d12875f 100644 --- a/operator-framework-core/src/main/java/io/javaoperatorsdk/operator/processing/event/source/informer/InformerEventSource.java +++ b/operator-framework-core/src/main/java/io/javaoperatorsdk/operator/processing/event/source/informer/InformerEventSource.java @@ -1,7 +1,7 @@ package io.javaoperatorsdk.operator.processing.event.source.informer; -import java.util.List; import java.util.Optional; +import java.util.Set; import java.util.stream.Collectors; import org.slf4j.Logger; @@ -13,7 +13,6 @@ import io.javaoperatorsdk.operator.api.config.informer.InformerConfiguration; import io.javaoperatorsdk.operator.api.reconciler.EventSourceContext; import io.javaoperatorsdk.operator.api.reconciler.dependent.RecentOperationEventFilter; -import io.javaoperatorsdk.operator.processing.MultiResourceOwner; import io.javaoperatorsdk.operator.processing.event.Event; import io.javaoperatorsdk.operator.processing.event.EventHandler; import io.javaoperatorsdk.operator.processing.event.ResourceID; @@ -66,7 +65,7 @@ */ public class InformerEventSource extends ManagedInformerEventSource> - implements MultiResourceOwner, ResourceEventHandler, RecentOperationEventFilter { + implements ResourceEventHandler, RecentOperationEventFilter { private static final Logger log = LoggerFactory.getLogger(InformerEventSource.class); @@ -177,11 +176,11 @@ private void propagateEvent(R object) { } @Override - public List getSecondaryResources(P primary) { + public Set getSecondaryResources(P primary) { var secondaryIDs = primaryToSecondaryIndex.getSecondaryResources(ResourceID.fromResource(primary)); return secondaryIDs.stream().map(this::get).flatMap(Optional::stream) - .collect(Collectors.toList()); + .collect(Collectors.toSet()); } public InformerConfiguration getConfiguration() { @@ -190,9 +189,9 @@ public InformerConfiguration getConfiguration() { @Override public synchronized void handleRecentResourceUpdate(ResourceID resourceID, R resource, - R previousResourceVersion) { + R previousVersionOfResource) { handleRecentCreateOrUpdate(resource, - () -> super.handleRecentResourceUpdate(resourceID, resource, previousResourceVersion)); + () -> super.handleRecentResourceUpdate(resourceID, resource, previousVersionOfResource)); } @Override diff --git a/operator-framework-core/src/main/java/io/javaoperatorsdk/operator/processing/event/source/informer/ManagedInformerEventSource.java b/operator-framework-core/src/main/java/io/javaoperatorsdk/operator/processing/event/source/informer/ManagedInformerEventSource.java index 606b0bc962..95aba48ad7 100644 --- a/operator-framework-core/src/main/java/io/javaoperatorsdk/operator/processing/event/source/informer/ManagedInformerEventSource.java +++ b/operator-framework-core/src/main/java/io/javaoperatorsdk/operator/processing/event/source/informer/ManagedInformerEventSource.java @@ -74,9 +74,9 @@ public void stop() { @Override public void handleRecentResourceUpdate(ResourceID resourceID, R resource, - R previousResourceVersion) { + R previousVersionOfResource) { temporaryResourceCache.putUpdatedResource(resource, - previousResourceVersion.getMetadata().getResourceVersion()); + previousVersionOfResource.getMetadata().getResourceVersion()); } @Override diff --git a/operator-framework-core/src/main/java/io/javaoperatorsdk/operator/processing/event/source/polling/PerResourcePollingEventSource.java b/operator-framework-core/src/main/java/io/javaoperatorsdk/operator/processing/event/source/polling/PerResourcePollingEventSource.java index 8b04dc9e57..f55e7dd05e 100644 --- a/operator-framework-core/src/main/java/io/javaoperatorsdk/operator/processing/event/source/polling/PerResourcePollingEventSource.java +++ b/operator-framework-core/src/main/java/io/javaoperatorsdk/operator/processing/event/source/polling/PerResourcePollingEventSource.java @@ -1,9 +1,6 @@ package io.javaoperatorsdk.operator.processing.event.source.polling; -import java.util.Map; -import java.util.Optional; -import java.util.Timer; -import java.util.TimerTask; +import java.util.*; import java.util.concurrent.ConcurrentHashMap; import java.util.function.Predicate; @@ -12,10 +9,11 @@ import io.fabric8.kubernetes.api.model.HasMetadata; import io.javaoperatorsdk.operator.OperatorException; -import io.javaoperatorsdk.operator.processing.event.ExternalResourceCachingEventSource; import io.javaoperatorsdk.operator.processing.event.ResourceID; import io.javaoperatorsdk.operator.processing.event.source.Cache; +import io.javaoperatorsdk.operator.processing.event.source.CacheKeyMapper; import io.javaoperatorsdk.operator.processing.event.source.CachingEventSource; +import io.javaoperatorsdk.operator.processing.event.source.ExternalResourceCachingEventSource; import io.javaoperatorsdk.operator.processing.event.source.ResourceEventAware; /** @@ -41,40 +39,36 @@ public class PerResourcePollingEventSource private final Cache

resourceCache; private final Predicate

registerPredicate; private final long period; + private final Set fetchedForPrimaries = ConcurrentHashMap.newKeySet(); public PerResourcePollingEventSource(ResourceFetcher resourceFetcher, Cache

resourceCache, long period, Class resourceClass) { - this(resourceFetcher, resourceCache, period, null, resourceClass); + this(resourceFetcher, resourceCache, period, null, resourceClass, + CacheKeyMapper.singleResourceCacheKeyMapper()); + } + + public PerResourcePollingEventSource(ResourceFetcher resourceFetcher, + Cache

resourceCache, long period, Class resourceClass, + CacheKeyMapper cacheKeyMapper) { + this(resourceFetcher, resourceCache, period, null, resourceClass, cacheKeyMapper); } public PerResourcePollingEventSource(ResourceFetcher resourceFetcher, Cache

resourceCache, long period, - Predicate

registerPredicate, Class resourceClass) { - super(resourceClass); + Predicate

registerPredicate, Class resourceClass, + CacheKeyMapper cacheKeyMapper) { + super(resourceClass, cacheKeyMapper); this.resourceFetcher = resourceFetcher; this.resourceCache = resourceCache; this.period = period; this.registerPredicate = registerPredicate; } - private void pollForResource(P resource) { - var value = resourceFetcher.fetchResource(resource); - var resourceID = ResourceID.fromResource(resource); - if (value.isEmpty()) { - super.handleDelete(resourceID); - } else { - super.handleEvent(value.get(), resourceID); - } - } - - private Optional getAndCacheResource(ResourceID resourceID) { - var resource = resourceCache.get(resourceID); - if (resource.isPresent()) { - var value = resourceFetcher.fetchResource(resource.get()); - value.ifPresent(v -> cache.put(resourceID, v)); - return value; - } - return Optional.empty(); + private Set getAndCacheResource(P primary, boolean fromGetter) { + var values = resourceFetcher.fetchResources(primary); + handleResources(ResourceID.fromResource(primary), values, !fromGetter); + fetchedForPrimaries.add(ResourceID.fromResource(primary)); + return values; } @Override @@ -95,7 +89,8 @@ public void onResourceDeleted(P resource) { log.debug("Canceling task for resource: {}", resource); task.cancel(); } - cache.remove(resourceID); + handleDelete(resourceID); + fetchedForPrimaries.remove(resourceID); } // This method is always called from the same Thread for the same resource, @@ -103,24 +98,27 @@ public void onResourceDeleted(P resource) { // important // because otherwise there will be a race condition related to the timerTasks. private void checkAndRegisterTask(P resource) { - var resourceID = ResourceID.fromResource(resource); - if (timerTasks.get(resourceID) == null && (registerPredicate == null + var primaryID = ResourceID.fromResource(resource); + if (timerTasks.get(primaryID) == null && (registerPredicate == null || registerPredicate.test(resource))) { - var task = new TimerTask() { - @Override - public void run() { - if (!isRunning()) { - log.debug("Event source not yet started. Will not run for: {}", resourceID); - return; - } - // always use up-to-date resource from cache - var res = resourceCache.get(resourceID); - res.ifPresentOrElse(r -> pollForResource(r), - () -> log.warn("No resource in cache for resource ID: {}", resourceID)); - } - }; - timerTasks.put(resourceID, task); - timer.schedule(task, 0, period); + var task = + new TimerTask() { + @Override + public void run() { + if (!isRunning()) { + log.debug("Event source not yet started. Will not run for: {}", primaryID); + return; + } + // always use up-to-date resource from cache + var res = resourceCache.get(primaryID); + res.ifPresentOrElse(p -> getAndCacheResource(p, false), + () -> log.warn("No resource in cache for resource ID: {}", primaryID)); + } + }; + timerTasks.put(primaryID, task); + // there is a delay, to not do two fetches when the resources first appeared + // and getSecondaryResource is called on reconciliation. + timer.schedule(task, period, period); } } @@ -132,28 +130,22 @@ public void run() { * @return the related resource for this event source */ @Override - public Optional getSecondaryResource(P primary) { - return getValueFromCacheOrSupplier(ResourceID.fromResource(primary)); - } - - /** - * - * @param resourceID of the target related resource - * @return the cached value of the resource, if not present it gets the resource from the - * supplier. The value provided from the supplier is cached, but no new event is - * propagated. - */ - public Optional getValueFromCacheOrSupplier(ResourceID resourceID) { - var cachedValue = getCachedValue(resourceID); - if (cachedValue.isPresent()) { - return cachedValue; + public Set getSecondaryResources(P primary) { + var primaryID = ResourceID.fromResource(primary); + var cachedValue = cache.get(primaryID); + if (cachedValue != null && !cachedValue.isEmpty()) { + return new HashSet<>(cachedValue.values()); } else { - return getAndCacheResource(resourceID); + if (fetchedForPrimaries.contains(primaryID)) { + return Collections.emptySet(); + } else { + return getAndCacheResource(primary, true); + } } } public interface ResourceFetcher { - Optional fetchResource(P primaryResource); + Set fetchResources(P primaryResource); } @Override diff --git a/operator-framework-core/src/main/java/io/javaoperatorsdk/operator/processing/event/source/polling/PollingEventSource.java b/operator-framework-core/src/main/java/io/javaoperatorsdk/operator/processing/event/source/polling/PollingEventSource.java index dcdb2cdda9..09ff2e8b0e 100644 --- a/operator-framework-core/src/main/java/io/javaoperatorsdk/operator/processing/event/source/polling/PollingEventSource.java +++ b/operator-framework-core/src/main/java/io/javaoperatorsdk/operator/processing/event/source/polling/PollingEventSource.java @@ -1,33 +1,33 @@ package io.javaoperatorsdk.operator.processing.event.source.polling; import java.util.Map; -import java.util.Optional; +import java.util.Set; import java.util.Timer; import java.util.TimerTask; -import java.util.function.Supplier; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import io.fabric8.kubernetes.api.model.HasMetadata; import io.javaoperatorsdk.operator.OperatorException; -import io.javaoperatorsdk.operator.processing.event.ExternalResourceCachingEventSource; import io.javaoperatorsdk.operator.processing.event.ResourceID; +import io.javaoperatorsdk.operator.processing.event.source.CacheKeyMapper; +import io.javaoperatorsdk.operator.processing.event.source.ExternalResourceCachingEventSource; /** * Polls resource (on contrary to {@link PerResourcePollingEventSource}) not per resource bases but - * instead to calls supplier periodically and independently of the number of state of custom - * resources managed by the operator. It is called on start (synced). This means that when the - * reconciler first time executed on startup a poll already happened before. So if the cache does - * not contain the target resource it means it is not created yet or was deleted while an operator - * was not running. + * instead to calls supplier periodically and independently of the number or state of custom + * resources managed by the controller. It is called on start (synced). This means that when the + * reconciler first time executed on startup the first poll already happened before. So if the cache + * does not contain the target resource it means it is not created yet or was deleted while an + * operator was not running. * *

* Another caveat with this is if the cached object is checked in the reconciler and created since * not in the cache it should be manually added to the cache, since it can happen that the * reconciler is triggered before the cache is propagated with the new resource from a scheduled - * execution. See {@link #put(ResourceID, Object)} method. So the generic workflow in reconciler - * should be: + * execution. See {@link #handleRecentResourceCreate(ResourceID, Object)} and update method. So the + * generic workflow in reconciler should be: * *