diff --git a/docs/assets/js/uikit.js b/docs/assets/js/uikit.js
index 5b1acf9275..665c8c0522 100644
--- a/docs/assets/js/uikit.js
+++ b/docs/assets/js/uikit.js
@@ -8214,7 +8214,7 @@
updateAria: function(toggled) {
attr(this.$el, 'aria-expanded', isBoolean(toggled)
- ? toggled
+ ? toggled// todo delete bulk support
: isToggled(this.target, this.cls)
);
}
diff --git a/operator-framework-core/src/main/java/io/javaoperatorsdk/operator/api/config/AnnotationControllerConfiguration.java b/operator-framework-core/src/main/java/io/javaoperatorsdk/operator/api/config/AnnotationControllerConfiguration.java
index b5e3fffcf0..75d622e30b 100644
--- a/operator-framework-core/src/main/java/io/javaoperatorsdk/operator/api/config/AnnotationControllerConfiguration.java
+++ b/operator-framework-core/src/main/java/io/javaoperatorsdk/operator/api/config/AnnotationControllerConfiguration.java
@@ -17,9 +17,8 @@
import io.javaoperatorsdk.operator.OperatorException;
import io.javaoperatorsdk.operator.ReconcilerUtils;
import io.javaoperatorsdk.operator.api.config.dependent.DependentResourceSpec;
-import io.javaoperatorsdk.operator.api.reconciler.Constants;
+import io.javaoperatorsdk.operator.api.reconciler.*;
import io.javaoperatorsdk.operator.api.reconciler.ControllerConfiguration;
-import io.javaoperatorsdk.operator.api.reconciler.Reconciler;
import io.javaoperatorsdk.operator.api.reconciler.dependent.Dependent;
import io.javaoperatorsdk.operator.api.reconciler.dependent.DependentResource;
import io.javaoperatorsdk.operator.processing.dependent.kubernetes.KubernetesDependent;
@@ -46,17 +45,17 @@ public class AnnotationControllerConfiguration
private static final String KUBE_DEPENDENT_NAME = KubernetesDependent.class.getSimpleName();
protected final Reconciler
reconciler;
- private final ControllerConfiguration annotation;
+ private final io.javaoperatorsdk.operator.api.reconciler.ControllerConfiguration annotation;
private List specs;
private Class resourceClass;
public AnnotationControllerConfiguration(Reconciler
reconciler) {
this.reconciler = reconciler;
- this.annotation = reconciler.getClass().getAnnotation(ControllerConfiguration.class);
+ this.annotation = reconciler.getClass()
+ .getAnnotation(io.javaoperatorsdk.operator.api.reconciler.ControllerConfiguration.class);
if (annotation == null) {
- throw new OperatorException(
- "Missing mandatory @" + ControllerConfiguration.class.getSimpleName() +
- " annotation for reconciler: " + reconciler);
+ throw new OperatorException("Missing mandatory @" + CONTROLLER_CONFIG_ANNOTATION +
+ " annotation for reconciler: " + reconciler);
}
}
@@ -247,9 +246,10 @@ public List getDependentResources() {
final var context = "DependentResource of type '" + dependentType.getName() + "'";
spec = new DependentResourceSpec(dependentType, config, name,
Set.of(dependent.dependsOn()),
- instantiateConditionIfNotDefault(dependent.readyPostcondition(), context),
- instantiateConditionIfNotDefault(dependent.reconcilePrecondition(), context),
- instantiateConditionIfNotDefault(dependent.deletePostcondition(), context));
+ instantiateIfNotDefault(dependent.readyPostcondition(), Condition.class, context),
+ instantiateIfNotDefault(dependent.reconcilePrecondition(), Condition.class, context),
+ instantiateIfNotDefault(dependent.deletePostcondition(), Condition.class, context),
+ dependent.provideEventSource());
specsMap.put(name, spec);
}
@@ -258,10 +258,10 @@ public List getDependentResources() {
return specs;
}
- protected Condition, ?> instantiateConditionIfNotDefault(Class extends Condition> condition,
+ protected T instantiateIfNotDefault(Class extends T> toInstantiate, Class defaultClass,
String context) {
- if (condition != Condition.class) {
- return instantiateAndConfigureIfNeeded(condition, Condition.class, context);
+ if (!defaultClass.equals(toInstantiate)) {
+ return instantiateAndConfigureIfNeeded(toInstantiate, defaultClass, context);
}
return null;
}
@@ -287,17 +287,17 @@ private Object createKubernetesResourceConfig(Class extends DependentResource>
OnUpdateFilter extends HasMetadata> onUpdateFilter = null;
OnDeleteFilter extends HasMetadata> onDeleteFilter = null;
GenericFilter extends HasMetadata> genericFilter = null;
+ ResourceDiscriminator, ? extends HasMetadata> resourceDiscriminator = null;
+ String eventSourceNameToUse = null;
if (kubeDependent != null) {
if (!Arrays.equals(KubernetesDependent.DEFAULT_NAMESPACES,
kubeDependent.namespaces())) {
namespaces = Set.of(kubeDependent.namespaces());
configuredNS = true;
}
-
final var fromAnnotation = kubeDependent.labelSelector();
labelSelector = Constants.NO_VALUE_SET.equals(fromAnnotation) ? null : fromAnnotation;
-
final var context =
KUBE_DEPENDENT_NAME + " annotation on " + dependentType.getName() + " DependentResource";
onAddFilter = createFilter(kubeDependent.onAddFilter(), OnAddFilter.class, context)
@@ -311,15 +311,32 @@ private Object createKubernetesResourceConfig(Class extends DependentResource>
genericFilter =
createFilter(kubeDependent.genericFilter(), GenericFilter.class, context)
.orElse(null);
+
+ resourceDiscriminator =
+ instantiateIfNotDefault(kubeDependent.resourceDiscriminator(),
+ ResourceDiscriminator.class, context);
+ eventSourceNameToUse = Constants.NO_VALUE_SET.equals(kubeDependent.eventSourceToUse()) ? null
+ : kubeDependent.eventSourceToUse();
}
config =
- new KubernetesDependentResourceConfig(namespaces, labelSelector, configuredNS, onAddFilter,
- onUpdateFilter, onDeleteFilter, genericFilter);
+ new KubernetesDependentResourceConfig(namespaces, labelSelector, configuredNS,
+ resourceDiscriminator, onAddFilter,
+ onUpdateFilter, onDeleteFilter, genericFilter, eventSourceNameToUse);
return config;
}
+ @SuppressWarnings({"unchecked"})
+ private ResourceDiscriminator, ? extends HasMetadata> instantiateDiscriminatorIfNotVoid(
+ Class extends ResourceDiscriminator> discriminator) {
+ if (discriminator != ResourceDiscriminator.class) {
+ return instantiateAndConfigureIfNeeded(discriminator, ResourceDiscriminator.class,
+ CONTROLLER_CONFIG_ANNOTATION);
+ }
+ return null;
+ }
+
public static T valueOrDefault(
ControllerConfiguration controllerConfiguration,
Function mapper,
diff --git a/operator-framework-core/src/main/java/io/javaoperatorsdk/operator/api/config/ControllerConfigurationOverrider.java b/operator-framework-core/src/main/java/io/javaoperatorsdk/operator/api/config/ControllerConfigurationOverrider.java
index e9cae2dccf..ee153c879a 100644
--- a/operator-framework-core/src/main/java/io/javaoperatorsdk/operator/api/config/ControllerConfigurationOverrider.java
+++ b/operator-framework-core/src/main/java/io/javaoperatorsdk/operator/api/config/ControllerConfigurationOverrider.java
@@ -174,7 +174,7 @@ private void replaceConfig(String name, Object newConfig, DependentResourceSpec<
namedDependentResourceSpecs.put(name,
new DependentResourceSpec<>(current.getDependentResourceClass(), newConfig, name,
current.getDependsOn(), current.getReadyCondition(), current.getReconcileCondition(),
- current.getDeletePostCondition()));
+ current.getDeletePostCondition(), current.provideEventSource()));
}
@SuppressWarnings("unchecked")
@@ -220,7 +220,7 @@ public ControllerConfiguration build() {
KubernetesDependentResourceConfig c) {
return new DependentResourceSpec(spec.getDependentResourceClass(),
c.setNamespaces(namespaces), name, spec.getDependsOn(), spec.getReadyCondition(),
- spec.getReconcileCondition(), spec.getDeletePostCondition());
+ spec.getReconcileCondition(), spec.getDeletePostCondition(), spec.provideEventSource());
}
public static ControllerConfigurationOverrider override(
diff --git a/operator-framework-core/src/main/java/io/javaoperatorsdk/operator/api/config/dependent/DependentResourceSpec.java b/operator-framework-core/src/main/java/io/javaoperatorsdk/operator/api/config/dependent/DependentResourceSpec.java
index f146d127d0..71476cc2d3 100644
--- a/operator-framework-core/src/main/java/io/javaoperatorsdk/operator/api/config/dependent/DependentResourceSpec.java
+++ b/operator-framework-core/src/main/java/io/javaoperatorsdk/operator/api/config/dependent/DependentResourceSpec.java
@@ -23,9 +23,12 @@ public class DependentResourceSpec, C> {
private final Condition, ?> deletePostCondition;
+ private final boolean provideEventSource;
+
public DependentResourceSpec(Class dependentResourceClass, C dependentResourceConfig,
String name, Set dependsOn, Condition, ?> readyCondition,
- Condition, ?> reconcileCondition, Condition, ?> deletePostCondition) {
+ Condition, ?> reconcileCondition, Condition, ?> deletePostCondition,
+ boolean provideEventSource) {
this.dependentResourceClass = dependentResourceClass;
this.dependentResourceConfig = dependentResourceConfig;
this.name = name;
@@ -33,6 +36,7 @@ public DependentResourceSpec(Class dependentResourceClass, C dependentResourc
this.readyCondition = readyCondition;
this.reconcileCondition = reconcileCondition;
this.deletePostCondition = deletePostCondition;
+ this.provideEventSource = provideEventSource;
}
public Class getDependentResourceClass() {
@@ -89,4 +93,8 @@ public Condition getReconcileCondition() {
public Condition getDeletePostCondition() {
return deletePostCondition;
}
+
+ public boolean provideEventSource() {
+ return provideEventSource;
+ }
}
diff --git a/operator-framework-core/src/main/java/io/javaoperatorsdk/operator/api/reconciler/Context.java b/operator-framework-core/src/main/java/io/javaoperatorsdk/operator/api/reconciler/Context.java
index 845810c8a1..2e4fb98e6f 100644
--- a/operator-framework-core/src/main/java/io/javaoperatorsdk/operator/api/reconciler/Context.java
+++ b/operator-framework-core/src/main/java/io/javaoperatorsdk/operator/api/reconciler/Context.java
@@ -6,20 +6,27 @@
import io.fabric8.kubernetes.api.model.HasMetadata;
import io.javaoperatorsdk.operator.api.config.ControllerConfiguration;
import io.javaoperatorsdk.operator.api.reconciler.dependent.managed.ManagedDependentResourceContext;
+import io.javaoperatorsdk.operator.processing.event.EventSourceRetriever;
public interface Context {
Optional getRetryInfo();
- default Optional getSecondaryResource(Class expectedType) {
- return getSecondaryResource(expectedType, null);
+ default Optional getSecondaryResource(Class expectedType) {
+ return getSecondaryResource(expectedType, (String) null);
}
- Set getSecondaryResources(Class expectedType);
+ Set getSecondaryResources(Class expectedType);
- Optional getSecondaryResource(Class expectedType, String eventSourceName);
+ @Deprecated(forRemoval = true)
+ Optional getSecondaryResource(Class expectedType, String eventSourceName);
+
+ Optional getSecondaryResource(Class expectedType,
+ ResourceDiscriminator discriminator);
ControllerConfiguration getControllerConfiguration();
ManagedDependentResourceContext managedDependentResourceContext();
+
+ EventSourceRetriever
eventSourceRetriever();
}
diff --git a/operator-framework-core/src/main/java/io/javaoperatorsdk/operator/api/reconciler/DefaultContext.java b/operator-framework-core/src/main/java/io/javaoperatorsdk/operator/api/reconciler/DefaultContext.java
index afb37a8c53..cb7f4ae63b 100644
--- a/operator-framework-core/src/main/java/io/javaoperatorsdk/operator/api/reconciler/DefaultContext.java
+++ b/operator-framework-core/src/main/java/io/javaoperatorsdk/operator/api/reconciler/DefaultContext.java
@@ -9,6 +9,7 @@
import io.javaoperatorsdk.operator.api.reconciler.dependent.managed.DefaultManagedDependentResourceContext;
import io.javaoperatorsdk.operator.api.reconciler.dependent.managed.ManagedDependentResourceContext;
import io.javaoperatorsdk.operator.processing.Controller;
+import io.javaoperatorsdk.operator.processing.event.EventSourceRetriever;
public class DefaultContext
implements Context
{
@@ -47,6 +48,12 @@ public Optional getSecondaryResource(Class expectedType, String eventS
.getSecondaryResource(primaryResource);
}
+ @Override
+ public Optional getSecondaryResource(Class expectedType,
+ ResourceDiscriminator discriminator) {
+ return discriminator.distinguish(expectedType, primaryResource, this);
+ }
+
@Override
public ControllerConfiguration getControllerConfiguration() {
return controllerConfiguration;
@@ -57,6 +64,11 @@ public ManagedDependentResourceContext managedDependentResourceContext() {
return defaultManagedDependentResourceContext;
}
+ @Override
+ public EventSourceRetriever
eventSourceRetriever() {
+ return controller.getEventSourceManager();
+ }
+
public DefaultContext
setRetryInfo(RetryInfo retryInfo) {
this.retryInfo = retryInfo;
return this;
diff --git a/operator-framework-core/src/main/java/io/javaoperatorsdk/operator/api/reconciler/EventSourceInitializer.java b/operator-framework-core/src/main/java/io/javaoperatorsdk/operator/api/reconciler/EventSourceInitializer.java
index 9b3c7a67bd..017418ea35 100644
--- a/operator-framework-core/src/main/java/io/javaoperatorsdk/operator/api/reconciler/EventSourceInitializer.java
+++ b/operator-framework-core/src/main/java/io/javaoperatorsdk/operator/api/reconciler/EventSourceInitializer.java
@@ -1,10 +1,14 @@
package io.javaoperatorsdk.operator.api.reconciler;
+import java.util.Collections;
import java.util.HashMap;
import java.util.Map;
+import java.util.Optional;
import io.fabric8.kubernetes.api.model.HasMetadata;
+import io.javaoperatorsdk.operator.api.reconciler.dependent.DependentResource;
import io.javaoperatorsdk.operator.processing.event.source.EventSource;
+import io.javaoperatorsdk.operator.processing.event.source.ResourceEventSource;
/**
* An interface that a {@link Reconciler} can implement to have the SDK register the provided
@@ -39,6 +43,22 @@ static Map nameEventSources(EventSource... eventSources) {
return eventSourceMap;
}
+ @SuppressWarnings("unchecked,rawtypes")
+ static Map nameEventSourcesFromDependentResource(
+ EventSourceContext context, DependentResource... dependentResources) {
+
+ if (dependentResources != null) {
+ Map eventSourceMap = new HashMap<>(dependentResources.length);
+ for (DependentResource dependentResource : dependentResources) {
+ Optional es = dependentResource.eventSource(context);
+ es.ifPresent(e -> eventSourceMap.put(generateNameFor(e), e));
+ }
+ return eventSourceMap;
+ } else {
+ return Collections.emptyMap();
+ }
+ }
+
/**
* This is for the use case when the event sources are not access explicitly by name in the
* reconciler.
diff --git a/operator-framework-core/src/main/java/io/javaoperatorsdk/operator/api/reconciler/ResourceDiscriminator.java b/operator-framework-core/src/main/java/io/javaoperatorsdk/operator/api/reconciler/ResourceDiscriminator.java
new file mode 100644
index 0000000000..072e7d8078
--- /dev/null
+++ b/operator-framework-core/src/main/java/io/javaoperatorsdk/operator/api/reconciler/ResourceDiscriminator.java
@@ -0,0 +1,11 @@
+package io.javaoperatorsdk.operator.api.reconciler;
+
+import java.util.Optional;
+
+import io.fabric8.kubernetes.api.model.HasMetadata;
+
+public interface ResourceDiscriminator {
+
+ Optional distinguish(Class resource, P primary, Context context);
+
+}
diff --git a/operator-framework-core/src/main/java/io/javaoperatorsdk/operator/api/reconciler/ResourceIDMatcherDiscriminator.java b/operator-framework-core/src/main/java/io/javaoperatorsdk/operator/api/reconciler/ResourceIDMatcherDiscriminator.java
new file mode 100644
index 0000000000..f28633252a
--- /dev/null
+++ b/operator-framework-core/src/main/java/io/javaoperatorsdk/operator/api/reconciler/ResourceIDMatcherDiscriminator.java
@@ -0,0 +1,25 @@
+package io.javaoperatorsdk.operator.api.reconciler;
+
+import java.util.Optional;
+import java.util.function.Function;
+
+import io.fabric8.kubernetes.api.model.HasMetadata;
+import io.javaoperatorsdk.operator.processing.event.ResourceID;
+
+public class ResourceIDMatcherDiscriminator
+ implements ResourceDiscriminator {
+
+ private final Function mapper;
+
+ public ResourceIDMatcherDiscriminator(Function
mapper) {
+ this.mapper = mapper;
+ }
+
+ @Override
+ public Optional distinguish(Class resource, P primary, Context context) {
+ var resourceID = mapper.apply(primary);
+ return context.getSecondaryResources(resource).stream()
+ .filter(resourceID::isSameResource)
+ .findFirst();
+ }
+}
diff --git a/operator-framework-core/src/main/java/io/javaoperatorsdk/operator/api/reconciler/dependent/Dependent.java b/operator-framework-core/src/main/java/io/javaoperatorsdk/operator/api/reconciler/dependent/Dependent.java
index 90ba701a6a..00d2ad1882 100644
--- a/operator-framework-core/src/main/java/io/javaoperatorsdk/operator/api/reconciler/dependent/Dependent.java
+++ b/operator-framework-core/src/main/java/io/javaoperatorsdk/operator/api/reconciler/dependent/Dependent.java
@@ -57,4 +57,13 @@
* one can be
*/
String[] dependsOn() default {};
+
+ /**
+ * Setting this to false means that the event source provided by the dependent resource won't be
+ * used. This is helpful if more dependent resources created for the same type, and want to share
+ * a common event source. In that case an event source needs to be explicitly registered.
+ *
+ * @return if the event source (if any) provided by the dependent resource should be used or not.
+ */
+ boolean provideEventSource() default true;
}
diff --git a/operator-framework-core/src/main/java/io/javaoperatorsdk/operator/api/reconciler/dependent/DependentResource.java b/operator-framework-core/src/main/java/io/javaoperatorsdk/operator/api/reconciler/dependent/DependentResource.java
index 0923d19473..d098137b46 100644
--- a/operator-framework-core/src/main/java/io/javaoperatorsdk/operator/api/reconciler/dependent/DependentResource.java
+++ b/operator-framework-core/src/main/java/io/javaoperatorsdk/operator/api/reconciler/dependent/DependentResource.java
@@ -1,8 +1,11 @@
package io.javaoperatorsdk.operator.api.reconciler.dependent;
+import java.util.Optional;
+
import io.fabric8.kubernetes.api.model.HasMetadata;
import io.javaoperatorsdk.operator.api.reconciler.Context;
-import io.javaoperatorsdk.operator.processing.ResourceOwner;
+import io.javaoperatorsdk.operator.api.reconciler.EventSourceContext;
+import io.javaoperatorsdk.operator.processing.event.source.ResourceEventSource;
/**
* An interface to implement and provide dependent resource support.
@@ -10,7 +13,7 @@
* @param the dependent resource type
* @param the associated primary resource type
*/
-public interface DependentResource extends ResourceOwner {
+public interface DependentResource {
/**
* Reconciles the dependent resource given the desired primary state
@@ -21,6 +24,42 @@ public interface DependentResource extends ResourceOwn
*/
ReconcileResult reconcile(P primary, Context context);
+ /**
+ * Retrieves the resource type associated with this DependentResource
+ *
+ * @return the resource type associated with this DependentResource
+ */
+ Class resourceType();
+
+ /**
+ * Dependent resources are designed to by default provide event sources. There are cases where it
+ * might not:
+ *
+ * - If an event source is shared between multiple dependent resources. In this case only one or
+ * none of the dependent resources sharing the event source should provide one.
+ * - Some special implementation of an event source. That just execute some action might not
+ * provide one.
+ *
+ *
+ * @param eventSourceContext context of event source initialization
+ * @return an optional event source
+ */
+ default Optional> eventSource(
+ EventSourceContext eventSourceContext) {
+ return Optional.empty();
+ }
+
+ /**
+ * Calling this method, instructs the implementation to not provide an event source, even if it
+ * normally does.
+ */
+ void doNotProvideEventSource();
+
+
+ default Optional getSecondaryResource(P primary, Context context) {
+ return Optional.empty();
+ }
+
/**
* Computes a default name for the specified DependentResource class
*
diff --git a/operator-framework-core/src/main/java/io/javaoperatorsdk/operator/api/reconciler/dependent/EventSourceAware.java b/operator-framework-core/src/main/java/io/javaoperatorsdk/operator/api/reconciler/dependent/EventSourceAware.java
new file mode 100644
index 0000000000..09b13d90c5
--- /dev/null
+++ b/operator-framework-core/src/main/java/io/javaoperatorsdk/operator/api/reconciler/dependent/EventSourceAware.java
@@ -0,0 +1,10 @@
+package io.javaoperatorsdk.operator.api.reconciler.dependent;
+
+import io.fabric8.kubernetes.api.model.HasMetadata;
+import io.javaoperatorsdk.operator.processing.event.EventSourceRetriever;
+
+public interface EventSourceAware
{
+
+ void selectEventSources(EventSourceRetriever
eventSourceRetriever);
+
+}
diff --git a/operator-framework-core/src/main/java/io/javaoperatorsdk/operator/api/reconciler/dependent/EventSourceProvider.java b/operator-framework-core/src/main/java/io/javaoperatorsdk/operator/api/reconciler/dependent/EventSourceProvider.java
index 98190cb7ef..c83af1270a 100644
--- a/operator-framework-core/src/main/java/io/javaoperatorsdk/operator/api/reconciler/dependent/EventSourceProvider.java
+++ b/operator-framework-core/src/main/java/io/javaoperatorsdk/operator/api/reconciler/dependent/EventSourceProvider.java
@@ -4,6 +4,11 @@
import io.javaoperatorsdk.operator.api.reconciler.EventSourceContext;
import io.javaoperatorsdk.operator.processing.event.source.EventSource;
+/**
+ * @deprecated now event source related methods are directly on {@link DependentResource}
+ * @param
primary resource
+ */
+@Deprecated(forRemoval = true)
public interface EventSourceProvider
{
/**
* @param context - event source context where the event source is initialized
diff --git a/operator-framework-core/src/main/java/io/javaoperatorsdk/operator/api/reconciler/dependent/ReconcileResult.java b/operator-framework-core/src/main/java/io/javaoperatorsdk/operator/api/reconciler/dependent/ReconcileResult.java
index c83da1c8ea..468e14e8ea 100644
--- a/operator-framework-core/src/main/java/io/javaoperatorsdk/operator/api/reconciler/dependent/ReconcileResult.java
+++ b/operator-framework-core/src/main/java/io/javaoperatorsdk/operator/api/reconciler/dependent/ReconcileResult.java
@@ -1,14 +1,14 @@
package io.javaoperatorsdk.operator.api.reconciler.dependent;
-import java.util.Optional;
+import java.util.*;
+import java.util.stream.Collectors;
import io.fabric8.kubernetes.api.model.HasMetadata;
import io.javaoperatorsdk.operator.processing.event.ResourceID;
public class ReconcileResult {
- private final R resource;
- private final Operation operation;
+ private final Map resourceOperations;
public static ReconcileResult resourceCreated(T resource) {
return new ReconcileResult<>(resource, Operation.CREATED);
@@ -22,25 +22,49 @@ public static ReconcileResult noOperation(T resource) {
return new ReconcileResult<>(resource, Operation.NONE);
}
+ @SafeVarargs
+ public static ReconcileResult aggregatedResult(ReconcileResult... results) {
+ if (results == null) {
+ throw new IllegalArgumentException("Should provide results to aggregate");
+ }
+ if (results.length == 1) {
+ return results[0];
+ }
+ final Map operations = new HashMap<>(results.length);
+ for (ReconcileResult res : results) {
+ res.getSingleResource().ifPresent(r -> operations.put(r, res.getSingleOperation()));
+ }
+ return new ReconcileResult<>(operations);
+ }
+
@Override
public String toString() {
- return getResource()
- .map(r -> r instanceof HasMetadata ? ResourceID.fromResource((HasMetadata) r) : r)
- .orElse("no resource")
- + " -> " + operation;
+ return resourceOperations.entrySet().stream().collect(Collectors.toMap(
+ e -> e instanceof HasMetadata ? ResourceID.fromResource((HasMetadata) e) : e,
+ Map.Entry::getValue))
+ .toString();
}
private ReconcileResult(R resource, Operation operation) {
- this.resource = resource;
- this.operation = operation;
+ resourceOperations = resource != null ? Map.of(resource, operation) : Collections.emptyMap();
+ }
+
+ private ReconcileResult(Map operations) {
+ resourceOperations = Collections.unmodifiableMap(operations);
+ }
+
+ public Optional getSingleResource() {
+ return resourceOperations.entrySet().stream().findFirst().map(Map.Entry::getKey);
}
- public Optional getResource() {
- return Optional.ofNullable(resource);
+ public Operation getSingleOperation() {
+ return resourceOperations.entrySet().stream().findFirst().map(Map.Entry::getValue)
+ .orElseThrow();
}
- public Operation getOperation() {
- return operation;
+ @SuppressWarnings("unused")
+ public Map getResourceOperations() {
+ return resourceOperations;
}
public enum Operation {
diff --git a/operator-framework-core/src/main/java/io/javaoperatorsdk/operator/processing/Controller.java b/operator-framework-core/src/main/java/io/javaoperatorsdk/operator/processing/Controller.java
index 92b70e722d..57ce0a3a5e 100644
--- a/operator-framework-core/src/main/java/io/javaoperatorsdk/operator/processing/Controller.java
+++ b/operator-framework-core/src/main/java/io/javaoperatorsdk/operator/processing/Controller.java
@@ -32,6 +32,7 @@
import io.javaoperatorsdk.operator.api.reconciler.Ignore;
import io.javaoperatorsdk.operator.api.reconciler.Reconciler;
import io.javaoperatorsdk.operator.api.reconciler.UpdateControl;
+import io.javaoperatorsdk.operator.api.reconciler.dependent.EventSourceAware;
import io.javaoperatorsdk.operator.api.reconciler.dependent.EventSourceProvider;
import io.javaoperatorsdk.operator.api.reconciler.dependent.managed.DefaultManagedDependentResourceContext;
import io.javaoperatorsdk.operator.processing.dependent.workflow.ManagedWorkflow;
@@ -39,6 +40,7 @@
import io.javaoperatorsdk.operator.processing.event.EventProcessor;
import io.javaoperatorsdk.operator.processing.event.EventSourceManager;
import io.javaoperatorsdk.operator.processing.event.ResourceID;
+import io.javaoperatorsdk.operator.processing.event.source.ResourceEventSource;
import static io.javaoperatorsdk.operator.api.reconciler.Constants.WATCH_CURRENT_NAMESPACE;
@@ -207,21 +209,30 @@ private void initContextIfNeeded(P resource, Context context) {
}
public void initAndRegisterEventSources(EventSourceContext
context) {
- managedWorkflow
- .getDependentResourcesByName().entrySet().stream()
- .filter(drEntry -> drEntry.getValue() instanceof EventSourceProvider)
- .forEach(drEntry -> {
- final var provider = (EventSourceProvider) drEntry.getValue();
- final var source = provider.initEventSource(context);
- eventSourceManager.registerEventSource(drEntry.getKey(), source);
- });
-
- // add manually defined event sources
if (reconciler instanceof EventSourceInitializer) {
final var provider = (EventSourceInitializer
) this.reconciler;
final var ownSources = provider.prepareEventSources(context);
ownSources.forEach(eventSourceManager::registerEventSource);
}
+ managedWorkflow
+ .getDependentResourcesByName().entrySet().stream()
+ .forEach(drEntry -> {
+ if (drEntry.getValue() instanceof EventSourceProvider) {
+ final var provider = (EventSourceProvider) drEntry.getValue();
+ final var source = provider.initEventSource(context);
+ eventSourceManager.registerEventSource(drEntry.getKey(), source);
+ } else {
+ Optional eventSource =
+ drEntry.getValue().eventSource(context);
+ eventSource.ifPresent(es -> {
+ eventSourceManager.registerEventSource(drEntry.getKey(), es);
+ });
+ }
+ });
+ managedWorkflow.getDependentResourcesByName().entrySet().stream().map(Map.Entry::getValue)
+ .filter(EventSourceAware.class::isInstance)
+ .forEach(dr -> ((EventSourceAware) dr)
+ .selectEventSources(eventSourceManager));
}
@Override
diff --git a/operator-framework-core/src/main/java/io/javaoperatorsdk/operator/processing/ResourceOwner.java b/operator-framework-core/src/main/java/io/javaoperatorsdk/operator/processing/ResourceOwner.java
deleted file mode 100644
index f9c02a8a33..0000000000
--- a/operator-framework-core/src/main/java/io/javaoperatorsdk/operator/processing/ResourceOwner.java
+++ /dev/null
@@ -1,26 +0,0 @@
-package io.javaoperatorsdk.operator.processing;
-
-import java.util.Optional;
-
-import io.fabric8.kubernetes.api.model.HasMetadata;
-
-public interface ResourceOwner {
-
- /**
- * Retrieves the resource type associated with this ResourceOwner
- *
- * @return the resource type associated with this ResourceOwner
- */
- Class resourceType();
-
- /**
- * Retrieves the resource associated with the specified primary one, returning the actual state of
- * the resource. Typically, this state might come from a local cache, updated after
- * reconciliation.
- *
- * @param primary the primary resource for which we want to retrieve the secondary resource
- * @return an {@link Optional} containing the secondary resource or {@link Optional#empty()} if it
- * doesn't exist
- */
- Optional getSecondaryResource(P primary);
-}
diff --git a/operator-framework-core/src/main/java/io/javaoperatorsdk/operator/processing/dependent/AbstractDependentResource.java b/operator-framework-core/src/main/java/io/javaoperatorsdk/operator/processing/dependent/AbstractDependentResource.java
index 5dbdba9358..fe7717e846 100644
--- a/operator-framework-core/src/main/java/io/javaoperatorsdk/operator/processing/dependent/AbstractDependentResource.java
+++ b/operator-framework-core/src/main/java/io/javaoperatorsdk/operator/processing/dependent/AbstractDependentResource.java
@@ -1,14 +1,21 @@
package io.javaoperatorsdk.operator.processing.dependent;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Optional;
+
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import io.fabric8.kubernetes.api.model.HasMetadata;
import io.javaoperatorsdk.operator.api.reconciler.Context;
+import io.javaoperatorsdk.operator.api.reconciler.EventSourceContext;
import io.javaoperatorsdk.operator.api.reconciler.Ignore;
+import io.javaoperatorsdk.operator.api.reconciler.ResourceDiscriminator;
import io.javaoperatorsdk.operator.api.reconciler.dependent.DependentResource;
import io.javaoperatorsdk.operator.api.reconciler.dependent.ReconcileResult;
import io.javaoperatorsdk.operator.processing.event.ResourceID;
+import io.javaoperatorsdk.operator.processing.event.source.ResourceEventSource;
@Ignore
public abstract class AbstractDependentResource
@@ -17,23 +24,91 @@ public abstract class AbstractDependentResource
protected final boolean creatable = this instanceof Creator;
protected final boolean updatable = this instanceof Updater;
+ protected final boolean bulk = this instanceof BulkDependentResource;
protected Creator creator;
protected Updater updater;
+ protected BulkDependentResource bulkDependentResource;
+ private boolean returnEventSource = true;
+
+ protected List> resourceDiscriminator = new ArrayList<>(1);
@SuppressWarnings("unchecked")
public AbstractDependentResource() {
creator = creatable ? (Creator) this : null;
updater = updatable ? (Updater) this : null;
+
+ bulkDependentResource = bulk ? (BulkDependentResource) this : null;
+ }
+
+ @Override
+ public void doNotProvideEventSource() {
+ this.returnEventSource = false;
}
+ @Override
+ public Optional> eventSource(EventSourceContext eventSourceContext) {
+ if (!returnEventSource) {
+ return Optional.empty();
+ } else {
+ return Optional.of(provideEventSource(eventSourceContext));
+ }
+ }
+
+ protected abstract ResourceEventSource provideEventSource(
+ EventSourceContext eventSourceContext);
+
@Override
public ReconcileResult reconcile(P primary, Context context) {
- var maybeActual = getSecondaryResource(primary);
+ if (bulk) {
+ final var count = bulkDependentResource.count(primary, context);
+ deleteBulkResourcesIfRequired(count, lastKnownBulkSize(), primary, context);
+ adjustDiscriminators(count);
+ @SuppressWarnings("unchecked")
+ final ReconcileResult[] results = new ReconcileResult[count];
+ for (int i = 0; i < count; i++) {
+ results[i] = reconcileIndexAware(primary, i, context);
+ }
+ return ReconcileResult.aggregatedResult(results);
+ } else {
+ return reconcileIndexAware(primary, 0, context);
+ }
+ }
+
+ protected void deleteBulkResourcesIfRequired(int targetCount, int actualCount, P primary,
+ Context context) {
+ if (targetCount >= actualCount) {
+ return;
+ }
+ for (int i = targetCount; i < actualCount; i++) {
+ var resource = getSecondaryResourceIndexAware(primary, i, context);
+ var index = i;
+ resource.ifPresent(
+ r -> bulkDependentResource.deleteBulkResourceWithIndex(primary, r, index, context));
+ }
+ }
+
+ private void adjustDiscriminators(int count) {
+ if (resourceDiscriminator.size() == count) {
+ return;
+ }
+ if (resourceDiscriminator.size() < count) {
+ for (int i = resourceDiscriminator.size(); i < count; i++) {
+ resourceDiscriminator.add(bulkDependentResource.getResourceDiscriminator(i));
+ }
+ }
+ if (resourceDiscriminator.size() > count) {
+ resourceDiscriminator.subList(count, resourceDiscriminator.size()).clear();
+ }
+ }
+
+ protected ReconcileResult reconcileIndexAware(P primary, int i, Context context) {
+ Optional maybeActual = bulk ? getSecondaryResourceIndexAware(primary, i, context)
+ : getSecondaryResource(primary, context);
if (creatable || updatable) {
if (maybeActual.isEmpty()) {
if (creatable) {
- var desired = desired(primary, context);
+ var desired = desiredIndexAware(primary, i, context);
throwIfNull(desired, primary, "Desired");
logForOperation("Creating", primary, desired);
var createdResource = handleCreate(desired, primary, context);
@@ -42,9 +117,15 @@ public ReconcileResult reconcile(P primary, Context context) {
} else {
final var actual = maybeActual.get();
if (updatable) {
- final var match = updater.match(actual, primary, context);
+ final Matcher.Result match;
+ if (bulk) {
+ match = updater.match(actual, primary, i, context);
+ } else {
+ match = updater.match(actual, primary, context);
+ }
if (!match.matched()) {
- final var desired = match.computedDesired().orElse(desired(primary, context));
+ final var desired =
+ match.computedDesired().orElse(desiredIndexAware(primary, i, context));
throwIfNull(desired, primary, "Desired");
logForOperation("Updating", primary, desired);
var updatedResource = handleUpdate(actual, desired, primary, context);
@@ -62,6 +143,20 @@ public ReconcileResult reconcile(P primary, Context context) {
return ReconcileResult.noOperation(maybeActual.orElse(null));
}
+ private R desiredIndexAware(P primary, int i, Context
context) {
+ return bulk ? desired(primary, i, context)
+ : desired(primary, context);
+ }
+
+ public Optional getSecondaryResource(P primary, Context context) {
+ return resourceDiscriminator.isEmpty() ? context.getSecondaryResource(resourceType())
+ : resourceDiscriminator.get(0).distinguish(resourceType(), primary, context);
+ }
+
+ protected Optional getSecondaryResourceIndexAware(P primary, int index, Context context) {
+ return context.getSecondaryResource(resourceType(), resourceDiscriminator.get(index));
+ }
+
private void throwIfNull(R desired, P primary, String descriptor) {
if (desired == null) {
throw new DependentResourceException(
@@ -87,7 +182,7 @@ protected R handleCreate(R desired, P primary, Context
context) {
}
/**
- * Allows sub-classes to perform additional processing (e.g. caching) on the created resource if
+ * Allows subclasses to perform additional processing (e.g. caching) on the created resource if
* needed.
*
* @param primaryResourceId the {@link ResourceID} of the primary resource associated with the
@@ -97,7 +192,7 @@ protected R handleCreate(R desired, P primary, Context
context) {
protected abstract void onCreated(ResourceID primaryResourceId, R created);
/**
- * Allows sub-classes to perform additional processing on the updated resource if needed.
+ * Allows subclasses to perform additional processing on the updated resource if needed.
*
* @param primaryResourceId the {@link ResourceID} of the primary resource associated with the
* newly updated resource
@@ -118,4 +213,33 @@ protected R desired(P primary, Context
context) {
throw new IllegalStateException(
"desired method must be implemented if this DependentResource can be created and/or updated");
}
+
+ protected R desired(P primary, int index, Context
context) {
+ throw new IllegalStateException(
+ "Must be implemented for bulk DependentResource creation");
+ }
+
+ public AbstractDependentResource setResourceDiscriminator(
+ ResourceDiscriminator resourceDiscriminator) {
+ if (resourceDiscriminator != null) {
+ this.resourceDiscriminator.add(resourceDiscriminator);
+ }
+ return this;
+ }
+
+ public ResourceDiscriminator getResourceDiscriminator() {
+ if (this.resourceDiscriminator.isEmpty()) {
+ return null;
+ } else {
+ return this.resourceDiscriminator.get(0);
+ }
+ }
+
+ protected int lastKnownBulkSize() {
+ return resourceDiscriminator.size();
+ }
+
+ protected boolean getReturnEventSource() {
+ return returnEventSource;
+ }
}
diff --git a/operator-framework-core/src/main/java/io/javaoperatorsdk/operator/processing/dependent/AbstractEventSourceHolderDependentResource.java b/operator-framework-core/src/main/java/io/javaoperatorsdk/operator/processing/dependent/AbstractEventSourceHolderDependentResource.java
index 0ceba16826..a90017897a 100644
--- a/operator-framework-core/src/main/java/io/javaoperatorsdk/operator/processing/dependent/AbstractEventSourceHolderDependentResource.java
+++ b/operator-framework-core/src/main/java/io/javaoperatorsdk/operator/processing/dependent/AbstractEventSourceHolderDependentResource.java
@@ -1,12 +1,14 @@
package io.javaoperatorsdk.operator.processing.dependent;
+import java.util.Optional;
+
import io.fabric8.kubernetes.api.model.HasMetadata;
import io.javaoperatorsdk.operator.api.reconciler.EventSourceContext;
import io.javaoperatorsdk.operator.api.reconciler.Ignore;
-import io.javaoperatorsdk.operator.api.reconciler.dependent.EventSourceProvider;
+import io.javaoperatorsdk.operator.api.reconciler.dependent.EventSourceAware;
import io.javaoperatorsdk.operator.api.reconciler.dependent.RecentOperationCacheFiller;
+import io.javaoperatorsdk.operator.processing.event.EventSourceRetriever;
import io.javaoperatorsdk.operator.processing.event.ResourceID;
-import io.javaoperatorsdk.operator.processing.event.source.EventSource;
import io.javaoperatorsdk.operator.processing.event.source.ResourceEventSource;
import io.javaoperatorsdk.operator.processing.event.source.filter.GenericFilter;
import io.javaoperatorsdk.operator.processing.event.source.filter.OnAddFilter;
@@ -15,18 +17,23 @@
@Ignore
public abstract class AbstractEventSourceHolderDependentResource>
- extends AbstractDependentResource
- implements EventSourceProvider {
+ extends AbstractDependentResource implements EventSourceAware {
private T eventSource;
+ private final Class resourceType;
private boolean isCacheFillerEventSource;
protected OnAddFilter onAddFilter;
protected OnUpdateFilter onUpdateFilter;
protected OnDeleteFilter onDeleteFilter;
protected GenericFilter genericFilter;
+ protected String eventSourceToUse;
+
+ protected AbstractEventSourceHolderDependentResource(Class resourceType) {
+ this.resourceType = resourceType;
+ }
- public EventSource initEventSource(EventSourceContext context) {
+ public ResourceEventSource provideEventSource(EventSourceContext context) {
// some sub-classes (e.g. KubernetesDependentResource) can have their event source created
// before this method is called in the managed case, so only create the event source if it
// hasn't already been set.
@@ -34,17 +41,39 @@ public EventSource initEventSource(EventSourceContext
context) {
// event source
// is shared between dependent resources this does not override the existing filters.
if (eventSource == null) {
- eventSource = createEventSource(context);
+ setEventSource(createEventSource(context));
applyFilters();
}
-
- isCacheFillerEventSource = eventSource instanceof RecentOperationCacheFiller;
return eventSource;
}
+ @SuppressWarnings("unchecked")
+ @Override
+ public void selectEventSources(EventSourceRetriever
eventSourceRetriever) {
+ if (!getReturnEventSource()) {
+ if (eventSourceToUse != null) {
+ setEventSource(
+ (T) eventSourceRetriever.getResourceEventSourceFor(resourceType(), eventSourceToUse));
+ } else {
+ setEventSource((T) eventSourceRetriever.getResourceEventSourceFor(resourceType()));
+ }
+ }
+ }
+
+ /** To make this backwards compatible even for respect of overriding */
+ public T initEventSource(EventSourceContext
context) {
+ return (T) eventSource(context).orElseThrow();
+ }
+
+ @Override
+ public Class resourceType() {
+ return resourceType;
+ }
+
protected abstract T createEventSource(EventSourceContext context);
protected void setEventSource(T eventSource) {
+ isCacheFillerEventSource = eventSource instanceof RecentOperationCacheFiller;
this.eventSource = eventSource;
}
@@ -55,8 +84,8 @@ protected void applyFilters() {
this.eventSource.setGenericFilter(genericFilter);
}
- protected T eventSource() {
- return eventSource;
+ public Optional> eventSource() {
+ return Optional.ofNullable(eventSource);
}
protected void onCreated(ResourceID primaryResourceId, R created) {
@@ -87,4 +116,10 @@ public void setOnUpdateFilter(OnUpdateFilter onUpdateFilter) {
public void setOnDeleteFilter(OnDeleteFilter onDeleteFilter) {
this.onDeleteFilter = onDeleteFilter;
}
+
+ public AbstractEventSourceHolderDependentResource setEventSourceToUse(
+ String eventSourceToUse) {
+ this.eventSourceToUse = eventSourceToUse;
+ return this;
+ }
}
diff --git a/operator-framework-core/src/main/java/io/javaoperatorsdk/operator/processing/dependent/BulkDependentResource.java b/operator-framework-core/src/main/java/io/javaoperatorsdk/operator/processing/dependent/BulkDependentResource.java
new file mode 100644
index 0000000000..1f2688f5cb
--- /dev/null
+++ b/operator-framework-core/src/main/java/io/javaoperatorsdk/operator/processing/dependent/BulkDependentResource.java
@@ -0,0 +1,35 @@
+package io.javaoperatorsdk.operator.processing.dependent;
+
+import io.fabric8.kubernetes.api.model.HasMetadata;
+import io.javaoperatorsdk.operator.api.reconciler.Context;
+import io.javaoperatorsdk.operator.api.reconciler.ResourceDiscriminator;
+import io.javaoperatorsdk.operator.api.reconciler.dependent.Deleter;
+
+/**
+ * Manages dynamic number of resources created for a primary resource. Since the point of a bulk
+ * dependent resource is to manage the number of secondary resources dynamically it implement
+ * {@link Creator} and {@link Deleter} interfaces out of the box. A concrete dependent resource can
+ * implement additionally also {@link Updater}.
+ */
+public interface BulkDependentResource extends Creator, Deleter {
+
+ /**
+ * @return number of resources to create
+ */
+ int count(P primary, Context
context);
+
+ R desired(P primary, int index, Context
context);
+
+ /**
+ * Used to delete resource if the desired count is lower than the actual count of a resource.
+ *
+ * @param primary resource
+ * @param resource actual resource from the cache for the index
+ * @param i index of the resource
+ * @param context actual context
+ */
+ void deleteBulkResourceWithIndex(P primary, R resource, int i, Context
context);
+
+ ResourceDiscriminator getResourceDiscriminator(int index);
+
+}
diff --git a/operator-framework-core/src/main/java/io/javaoperatorsdk/operator/processing/dependent/BulkResourceDiscriminatorFactory.java b/operator-framework-core/src/main/java/io/javaoperatorsdk/operator/processing/dependent/BulkResourceDiscriminatorFactory.java
new file mode 100644
index 0000000000..8b9b6e968d
--- /dev/null
+++ b/operator-framework-core/src/main/java/io/javaoperatorsdk/operator/processing/dependent/BulkResourceDiscriminatorFactory.java
@@ -0,0 +1,10 @@
+package io.javaoperatorsdk.operator.processing.dependent;
+
+import io.fabric8.kubernetes.api.model.HasMetadata;
+import io.javaoperatorsdk.operator.api.reconciler.ResourceDiscriminator;
+
+public interface BulkResourceDiscriminatorFactory {
+
+ ResourceDiscriminator createResourceDiscriminator(int index);
+
+}
diff --git a/operator-framework-core/src/main/java/io/javaoperatorsdk/operator/processing/dependent/BulkUpdater.java b/operator-framework-core/src/main/java/io/javaoperatorsdk/operator/processing/dependent/BulkUpdater.java
new file mode 100644
index 0000000000..9c00b47d0c
--- /dev/null
+++ b/operator-framework-core/src/main/java/io/javaoperatorsdk/operator/processing/dependent/BulkUpdater.java
@@ -0,0 +1,20 @@
+package io.javaoperatorsdk.operator.processing.dependent;
+
+import io.fabric8.kubernetes.api.model.HasMetadata;
+import io.javaoperatorsdk.operator.api.reconciler.Context;
+
+/**
+ * Helper for the Bulk Dependent Resources to make it more explicit that bulk needs to only
+ * implement the index aware match method.
+ *
+ * @param secondary resource type
+ * @param primary resource type
+ */
+public interface BulkUpdater extends Updater {
+
+ default Matcher.Result match(R actualResource, P primary, Context context) {
+ throw new IllegalStateException();
+ }
+
+ Matcher.Result match(R actualResource, P primary, int index, Context context);
+}
diff --git a/operator-framework-core/src/main/java/io/javaoperatorsdk/operator/processing/dependent/DesiredEqualsMatcher.java b/operator-framework-core/src/main/java/io/javaoperatorsdk/operator/processing/dependent/DesiredEqualsMatcher.java
index 459d7951d6..1d3b34a47b 100644
--- a/operator-framework-core/src/main/java/io/javaoperatorsdk/operator/processing/dependent/DesiredEqualsMatcher.java
+++ b/operator-framework-core/src/main/java/io/javaoperatorsdk/operator/processing/dependent/DesiredEqualsMatcher.java
@@ -16,4 +16,10 @@ public Result match(R actualResource, P primary, Context context) {
var desired = abstractDependentResource.desired(primary, context);
return Result.computed(actualResource.equals(desired), desired);
}
+
+ @Override
+ public Result match(R actualResource, P primary, int index, Context context) {
+ var desired = abstractDependentResource.desired(primary, index, context);
+ return Result.computed(actualResource.equals(desired), desired);
+ }
}
diff --git a/operator-framework-core/src/main/java/io/javaoperatorsdk/operator/processing/dependent/Matcher.java b/operator-framework-core/src/main/java/io/javaoperatorsdk/operator/processing/dependent/Matcher.java
index 750fe89cbf..835f76ab3a 100644
--- a/operator-framework-core/src/main/java/io/javaoperatorsdk/operator/processing/dependent/Matcher.java
+++ b/operator-framework-core/src/main/java/io/javaoperatorsdk/operator/processing/dependent/Matcher.java
@@ -95,4 +95,19 @@ public Optional computedDesired() {
* {@link Result#computed(boolean, Object)})
*/
Result match(R actualResource, P primary, Context context);
+
+ /**
+ * Determines whether the specified secondary resource matches the desired state with target index
+ * of a bulk resource as defined from the specified primary resource, given the specified
+ * {@link Context}.
+ *
+ * @param actualResource the resource we want to determine whether it's matching the desired state
+ * @param primary the primary resource from which the desired state is inferred
+ * @param context the context in which the resource is being matched
+ * @return a {@link Result} encapsulating whether the resource matched its desired state and this
+ * associated state if it was computed as part of the matching process. Use the static
+ * convenience methods ({@link Result#nonComputed(boolean)} and
+ * {@link Result#computed(boolean, Object)})
+ */
+ Result match(R actualResource, P primary, int index, Context context);
}
diff --git a/operator-framework-core/src/main/java/io/javaoperatorsdk/operator/processing/dependent/Updater.java b/operator-framework-core/src/main/java/io/javaoperatorsdk/operator/processing/dependent/Updater.java
index 828f9ad785..06b3cb52f6 100644
--- a/operator-framework-core/src/main/java/io/javaoperatorsdk/operator/processing/dependent/Updater.java
+++ b/operator-framework-core/src/main/java/io/javaoperatorsdk/operator/processing/dependent/Updater.java
@@ -8,4 +8,8 @@ public interface Updater {
R update(R actual, R desired, P primary, Context context);
Result match(R actualResource, P primary, Context context);
+
+ default Result match(R actualResource, P primary, int index, Context context) {
+ throw new IllegalStateException("Implement this for bulk matching");
+ }
}
diff --git a/operator-framework-core/src/main/java/io/javaoperatorsdk/operator/processing/dependent/external/AbstractCachingDependentResource.java b/operator-framework-core/src/main/java/io/javaoperatorsdk/operator/processing/dependent/external/AbstractCachingDependentResource.java
deleted file mode 100644
index 242625bc5d..0000000000
--- a/operator-framework-core/src/main/java/io/javaoperatorsdk/operator/processing/dependent/external/AbstractCachingDependentResource.java
+++ /dev/null
@@ -1,29 +0,0 @@
-package io.javaoperatorsdk.operator.processing.dependent.external;
-
-import java.util.Optional;
-
-import io.fabric8.kubernetes.api.model.HasMetadata;
-import io.javaoperatorsdk.operator.api.reconciler.Ignore;
-import io.javaoperatorsdk.operator.processing.dependent.AbstractEventSourceHolderDependentResource;
-import io.javaoperatorsdk.operator.processing.event.source.ExternalResourceCachingEventSource;
-
-@Ignore
-public abstract class AbstractCachingDependentResource
- extends
- AbstractEventSourceHolderDependentResource> {
- private final Class resourceType;
-
- protected AbstractCachingDependentResource(Class resourceType) {
- this.resourceType = resourceType;
- }
-
- @Override
- public Class resourceType() {
- return resourceType;
- }
-
- @Override
- public Optional getSecondaryResource(P primaryResource) {
- return eventSource().getSecondaryResource(primaryResource);
- }
-}
diff --git a/operator-framework-core/src/main/java/io/javaoperatorsdk/operator/processing/dependent/external/AbstractExternalDependentResource.java b/operator-framework-core/src/main/java/io/javaoperatorsdk/operator/processing/dependent/external/AbstractExternalDependentResource.java
new file mode 100644
index 0000000000..310bf95ad6
--- /dev/null
+++ b/operator-framework-core/src/main/java/io/javaoperatorsdk/operator/processing/dependent/external/AbstractExternalDependentResource.java
@@ -0,0 +1,19 @@
+package io.javaoperatorsdk.operator.processing.dependent.external;
+
+import io.fabric8.kubernetes.api.model.HasMetadata;
+import io.javaoperatorsdk.operator.processing.dependent.AbstractEventSourceHolderDependentResource;
+import io.javaoperatorsdk.operator.processing.event.ResourceID;
+import io.javaoperatorsdk.operator.processing.event.source.ResourceEventSource;
+
+public abstract class AbstractExternalDependentResource>
+ extends AbstractEventSourceHolderDependentResource {
+
+ protected AbstractExternalDependentResource(Class resourceType) {
+ super(resourceType);
+ }
+
+ @Override
+ protected void onCreated(ResourceID primaryResourceId, R created) {
+ super.onCreated(primaryResourceId, created);
+ }
+}
diff --git a/operator-framework-core/src/main/java/io/javaoperatorsdk/operator/processing/dependent/external/AbstractPollingDependentResource.java b/operator-framework-core/src/main/java/io/javaoperatorsdk/operator/processing/dependent/external/AbstractPollingDependentResource.java
index 6bab6f48cf..2ccba025a7 100644
--- a/operator-framework-core/src/main/java/io/javaoperatorsdk/operator/processing/dependent/external/AbstractPollingDependentResource.java
+++ b/operator-framework-core/src/main/java/io/javaoperatorsdk/operator/processing/dependent/external/AbstractPollingDependentResource.java
@@ -2,11 +2,15 @@
import io.fabric8.kubernetes.api.model.HasMetadata;
import io.javaoperatorsdk.operator.api.reconciler.Ignore;
+import io.javaoperatorsdk.operator.processing.dependent.AbstractEventSourceHolderDependentResource;
import io.javaoperatorsdk.operator.processing.event.source.CacheKeyMapper;
+import io.javaoperatorsdk.operator.processing.event.source.ExternalResourceCachingEventSource;
@Ignore
public abstract class AbstractPollingDependentResource
- extends AbstractCachingDependentResource implements CacheKeyMapper {
+ extends
+ AbstractEventSourceHolderDependentResource>
+ implements CacheKeyMapper {
public static final int DEFAULT_POLLING_PERIOD = 5000;
private long pollingPeriod;
diff --git a/operator-framework-core/src/main/java/io/javaoperatorsdk/operator/processing/dependent/external/AbstractSimpleDependentResource.java b/operator-framework-core/src/main/java/io/javaoperatorsdk/operator/processing/dependent/external/AbstractSimpleDependentResource.java
deleted file mode 100644
index 0675db5f1a..0000000000
--- a/operator-framework-core/src/main/java/io/javaoperatorsdk/operator/processing/dependent/external/AbstractSimpleDependentResource.java
+++ /dev/null
@@ -1,81 +0,0 @@
-package io.javaoperatorsdk.operator.processing.dependent.external;
-
-import java.util.Optional;
-
-import io.fabric8.kubernetes.api.model.HasMetadata;
-import io.javaoperatorsdk.operator.api.reconciler.Context;
-import io.javaoperatorsdk.operator.api.reconciler.Ignore;
-import io.javaoperatorsdk.operator.api.reconciler.dependent.ReconcileResult;
-import io.javaoperatorsdk.operator.processing.dependent.AbstractDependentResource;
-import io.javaoperatorsdk.operator.processing.dependent.DesiredEqualsMatcher;
-import io.javaoperatorsdk.operator.processing.dependent.Matcher;
-import io.javaoperatorsdk.operator.processing.event.ResourceID;
-import io.javaoperatorsdk.operator.processing.event.source.ConcurrentHashMapCache;
-import io.javaoperatorsdk.operator.processing.event.source.UpdatableCache;
-
-/** A base class for external dependent resources that don't have an event source. */
-@Ignore
-public abstract class AbstractSimpleDependentResource
- extends AbstractDependentResource {
-
- // cache serves only to keep the resource readable again until next reconciliation when the
- // new resource is read again.
- protected final UpdatableCache cache;
- protected Matcher matcher;
-
- public AbstractSimpleDependentResource() {
- this(new ConcurrentHashMapCache<>());
- }
-
- public AbstractSimpleDependentResource(UpdatableCache cache) {
- this.cache = cache;
- initMatcher();
- }
-
- @Override
- public Optional getSecondaryResource(HasMetadata primaryResource) {
- return cache.get(ResourceID.fromResource(primaryResource));
- }
-
- /**
- * Actually read the resource from the target API
- *
- * @param primaryResource the primary associated resource
- * @return fetched resource if present
- **/
- public abstract Optional fetchResource(HasMetadata primaryResource);
-
- @Override
- public ReconcileResult reconcile(P primary, Context context) {
- var resourceId = ResourceID.fromResource(primary);
- Optional resource = fetchResource(primary);
- resource.ifPresentOrElse(r -> cache.put(resourceId, r), () -> cache.remove(resourceId));
- return super.reconcile(primary, context);
- }
-
- public final void delete(P primary, Context context) {
- deleteResource(primary, context);
- cache.remove(ResourceID.fromResource(primary));
- }
-
- protected abstract void deleteResource(P primary, Context
context);
-
- @Override
- protected void onCreated(ResourceID primaryResourceId, R created) {
- cache.put(primaryResourceId, created);
- }
-
- @Override
- protected void onUpdated(ResourceID primaryResourceId, R updated, R actual) {
- cache.put(primaryResourceId, updated);
- }
-
- public Matcher.Result match(R actualResource, P primary, Context context) {
- return matcher.match(actualResource, primary, context);
- }
-
- protected void initMatcher() {
- matcher = new DesiredEqualsMatcher<>(this);
- }
-
-}
diff --git a/operator-framework-core/src/main/java/io/javaoperatorsdk/operator/processing/dependent/external/ProvidedResourceIDHandler.java b/operator-framework-core/src/main/java/io/javaoperatorsdk/operator/processing/dependent/external/ProvidedResourceIDHandler.java
new file mode 100644
index 0000000000..08b30537c1
--- /dev/null
+++ b/operator-framework-core/src/main/java/io/javaoperatorsdk/operator/processing/dependent/external/ProvidedResourceIDHandler.java
@@ -0,0 +1,14 @@
+package io.javaoperatorsdk.operator.processing.dependent.external;
+
+import io.fabric8.kubernetes.api.model.HasMetadata;
+import io.javaoperatorsdk.operator.processing.event.source.ExternalIDMapper;
+
+public interface ProvidedResourceIDHandler
{
+
+ void storeExternalID(P primary, R resource, T id);
+
+ void cleanupExternalIDState(P primary, R resource, T id);
+
+ ExternalIDMapper
externalIDMapper();
+
+}
diff --git a/operator-framework-core/src/main/java/io/javaoperatorsdk/operator/processing/dependent/kubernetes/GenericKubernetesResourceMatcher.java b/operator-framework-core/src/main/java/io/javaoperatorsdk/operator/processing/dependent/kubernetes/GenericKubernetesResourceMatcher.java
index e294b1c938..bb066b5b24 100644
--- a/operator-framework-core/src/main/java/io/javaoperatorsdk/operator/processing/dependent/kubernetes/GenericKubernetesResourceMatcher.java
+++ b/operator-framework-core/src/main/java/io/javaoperatorsdk/operator/processing/dependent/kubernetes/GenericKubernetesResourceMatcher.java
@@ -24,17 +24,42 @@ private GenericKubernetesResourceMatcher(KubernetesDependentResource depen
static Matcher matcherFor(
Class resourceType, KubernetesDependentResource dependentResource) {
if (Secret.class.isAssignableFrom(resourceType)) {
- return (actual, primary, context) -> {
- final var desired = dependentResource.desired(primary, context);
- return Result.computed(
- ResourceComparators.compareSecretData((Secret) desired, (Secret) actual), desired);
+ return new Matcher<>() {
+ @Override
+ public Result match(R actualResource, P primary, Context context) {
+ final var desired = dependentResource.desired(primary, context);
+ return Result.computed(
+ ResourceComparators.compareSecretData((Secret) desired, (Secret) actualResource),
+ desired);
+ }
+
+ @Override
+ public Result match(R actualResource, P primary, int index, Context context) {
+ final var desired = dependentResource.desired(primary, index, context);
+ return Result.computed(
+ ResourceComparators.compareSecretData((Secret) desired, (Secret) actualResource),
+ desired);
+ }
};
} else if (ConfigMap.class.isAssignableFrom(resourceType)) {
- return (actual, primary, context) -> {
- final var desired = dependentResource.desired(primary, context);
- return Result.computed(
- ResourceComparators.compareConfigMapData((ConfigMap) desired, (ConfigMap) actual),
- desired);
+ return new Matcher<>() {
+ @Override
+ public Result match(R actualResource, P primary, Context context) {
+ final var desired = dependentResource.desired(primary, context);
+ return Result.computed(
+ ResourceComparators.compareConfigMapData((ConfigMap) desired,
+ (ConfigMap) actualResource),
+ desired);
+ }
+
+ @Override
+ public Result match(R actualResource, P primary, int index, Context context) {
+ final var desired = dependentResource.desired(primary, index, context);
+ return Result.computed(
+ ResourceComparators.compareConfigMapData((ConfigMap) desired,
+ (ConfigMap) actualResource),
+ desired);
+ }
};
} else {
return new GenericKubernetesResourceMatcher(dependentResource);
@@ -43,32 +68,18 @@ static Matcher matcherFor(
@Override
public Result match(R actualResource, P primary, Context context) {
- return match(dependentResource, actualResource, primary, context, false);
+ var desired = dependentResource.desired(primary, context);
+ return match(desired, actualResource, false);
}
- /**
- * Determines whether the specified actual resource matches the desired state defined by the
- * specified {@link KubernetesDependentResource} based on the observed state of the associated
- * specified primary resource.
- *
- * @param dependentResource the {@link KubernetesDependentResource} implementation used to
- * computed the desired state associated with the specified primary resource
- * @param actualResource the observed dependent resource for which we want to determine whether it
- * matches the desired state or not
- * @param primary the primary resource from which we want to compute the desired state
- * @param context the {@link Context} instance within which this method is called
- * @param considerMetadata {@code true} to consider the metadata of the actual resource when
- * determining if it matches the desired state, {@code false} if matching should occur only
- * considering the spec of the resources
- * @return a {@link io.javaoperatorsdk.operator.processing.dependent.Matcher.Result} object
- * @param the type of resource we want to determine whether they match or not
- * @param the type of primary resources associated with the secondary resources we want to
- * match
- */
- public static Result match(
- KubernetesDependentResource dependentResource, R actualResource, P primary,
- Context context, boolean considerMetadata) {
- final var desired = dependentResource.desired(primary, context);
+ @Override
+ public Result match(R actualResource, P primary, int index, Context context) {
+ var desired = dependentResource.desired(primary, index, context);
+ return match(desired, actualResource, false);
+ }
+
+ public static Result match(
+ R desired, R actualResource, boolean considerMetadata) {
if (considerMetadata) {
final var desiredMetadata = desired.getMetadata();
final var actualMetadata = actualResource.getMetadata();
@@ -95,4 +106,30 @@ public static Result match(
}
return Result.computed(true, desired);
}
+
+ /**
+ * Determines whether the specified actual resource matches the desired state defined by the
+ * specified {@link KubernetesDependentResource} based on the observed state of the associated
+ * specified primary resource.
+ *
+ * @param dependentResource the {@link KubernetesDependentResource} implementation used to
+ * computed the desired state associated with the specified primary resource
+ * @param actualResource the observed dependent resource for which we want to determine whether it
+ * matches the desired state or not
+ * @param primary the primary resource from which we want to compute the desired state
+ * @param context the {@link Context} instance within which this method is called
+ * @param considerMetadata {@code true} to consider the metadata of the actual resource when
+ * determining if it matches the desired state, {@code false} if matching should occur only
+ * considering the spec of the resources
+ * @return a {@link io.javaoperatorsdk.operator.processing.dependent.Matcher.Result} object
+ * @param the type of resource we want to determine whether they match or not
+ * @param the type of primary resources associated with the secondary resources we want to
+ * match
+ */
+ public static Result match(
+ KubernetesDependentResource dependentResource, R actualResource, P primary,
+ Context context, boolean considerMetadata) {
+ final var desired = dependentResource.desired(primary, context);
+ return match(desired, actualResource, considerMetadata);
+ }
}
diff --git a/operator-framework-core/src/main/java/io/javaoperatorsdk/operator/processing/dependent/kubernetes/KubernetesDependent.java b/operator-framework-core/src/main/java/io/javaoperatorsdk/operator/processing/dependent/kubernetes/KubernetesDependent.java
index f66ff95373..2ccd4da82a 100644
--- a/operator-framework-core/src/main/java/io/javaoperatorsdk/operator/processing/dependent/kubernetes/KubernetesDependent.java
+++ b/operator-framework-core/src/main/java/io/javaoperatorsdk/operator/processing/dependent/kubernetes/KubernetesDependent.java
@@ -6,6 +6,8 @@
import java.lang.annotation.Target;
import io.javaoperatorsdk.operator.api.reconciler.Constants;
+import io.javaoperatorsdk.operator.api.reconciler.ResourceDiscriminator;
+import io.javaoperatorsdk.operator.processing.event.source.filter.*;
import io.javaoperatorsdk.operator.processing.event.source.filter.GenericFilter;
import io.javaoperatorsdk.operator.processing.event.source.filter.OnAddFilter;
import io.javaoperatorsdk.operator.processing.event.source.filter.OnDeleteFilter;
@@ -68,4 +70,8 @@
* itself if no value is set
*/
Class extends GenericFilter> genericFilter() default GenericFilter.class;
+
+ Class extends ResourceDiscriminator> resourceDiscriminator() default ResourceDiscriminator.class;
+
+ String eventSourceToUse() default NO_VALUE_SET;
}
diff --git a/operator-framework-core/src/main/java/io/javaoperatorsdk/operator/processing/dependent/kubernetes/KubernetesDependentResource.java b/operator-framework-core/src/main/java/io/javaoperatorsdk/operator/processing/dependent/kubernetes/KubernetesDependentResource.java
index 930e5fd5b4..37d2246a58 100644
--- a/operator-framework-core/src/main/java/io/javaoperatorsdk/operator/processing/dependent/kubernetes/KubernetesDependentResource.java
+++ b/operator-framework-core/src/main/java/io/javaoperatorsdk/operator/processing/dependent/kubernetes/KubernetesDependentResource.java
@@ -1,16 +1,13 @@
package io.javaoperatorsdk.operator.processing.dependent.kubernetes;
import java.util.HashMap;
-import java.util.Optional;
import java.util.Set;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import io.fabric8.kubernetes.api.model.HasMetadata;
-import io.fabric8.kubernetes.api.model.KubernetesResourceList;
import io.fabric8.kubernetes.client.KubernetesClient;
-import io.fabric8.kubernetes.client.dsl.NonNamespaceOperation;
import io.fabric8.kubernetes.client.dsl.Resource;
import io.javaoperatorsdk.operator.OperatorException;
import io.javaoperatorsdk.operator.api.config.informer.InformerConfiguration;
@@ -34,20 +31,19 @@
public abstract class KubernetesDependentResource
extends AbstractEventSourceHolderDependentResource>
implements KubernetesClientAware,
- DependentResourceConfigurator {
+ DependentResourceConfigurator> {
private static final Logger log = LoggerFactory.getLogger(KubernetesDependentResource.class);
protected KubernetesClient client;
private final Matcher matcher;
private final ResourceUpdatePreProcessor processor;
- private final Class resourceType;
private final boolean garbageCollected = this instanceof GarbageCollected;
private KubernetesDependentResourceConfig kubernetesDependentResourceConfig;
@SuppressWarnings("unchecked")
public KubernetesDependentResource(Class resourceType) {
- this.resourceType = resourceType;
+ super(resourceType);
matcher = this instanceof Matcher ? (Matcher) this
: GenericKubernetesResourceMatcher.matcherFor(resourceType, this);
@@ -56,9 +52,18 @@ public KubernetesDependentResource(Class resourceType) {
: GenericResourceUpdatePreProcessor.processorFor(resourceType);
}
+ @SuppressWarnings("unchecked")
@Override
- public void configureWith(KubernetesDependentResourceConfig config) {
+ public void configureWith(KubernetesDependentResourceConfig config) {
this.kubernetesDependentResourceConfig = config;
+ var discriminator = kubernetesDependentResourceConfig.getResourceDiscriminator();
+ if (discriminator != null) {
+ setResourceDiscriminator(discriminator);
+ }
+ config.getEventSourceToUse().ifPresent(n -> {
+ doNotProvideEventSource();
+ setEventSourceToUse(n);
+ });
}
private void configureWith(String labelSelector, Set