diff --git a/operator-framework-core/src/main/java/io/javaoperatorsdk/operator/api/config/AnnotationControllerConfiguration.java b/operator-framework-core/src/main/java/io/javaoperatorsdk/operator/api/config/AnnotationControllerConfiguration.java index c8d13b8688..3cee86802c 100644 --- a/operator-framework-core/src/main/java/io/javaoperatorsdk/operator/api/config/AnnotationControllerConfiguration.java +++ b/operator-framework-core/src/main/java/io/javaoperatorsdk/operator/api/config/AnnotationControllerConfiguration.java @@ -8,7 +8,9 @@ import java.util.List; import java.util.Optional; import java.util.Set; +import java.util.function.BiPredicate; import java.util.function.Function; +import java.util.function.Predicate; import java.util.stream.Collectors; import io.fabric8.kubernetes.api.model.HasMetadata; @@ -27,19 +29,23 @@ import io.javaoperatorsdk.operator.processing.dependent.workflow.Condition; import io.javaoperatorsdk.operator.processing.event.source.controller.ResourceEventFilter; import io.javaoperatorsdk.operator.processing.event.source.controller.ResourceEventFilters; +import io.javaoperatorsdk.operator.processing.event.source.filter.VoidGenericFilter; +import io.javaoperatorsdk.operator.processing.event.source.filter.VoidOnAddFilter; +import io.javaoperatorsdk.operator.processing.event.source.filter.VoidOnDeleteFilter; +import io.javaoperatorsdk.operator.processing.event.source.filter.VoidOnUpdateFilter; import static io.javaoperatorsdk.operator.api.reconciler.Constants.DEFAULT_NAMESPACES_SET; @SuppressWarnings("rawtypes") -public class AnnotationControllerConfiguration - implements io.javaoperatorsdk.operator.api.config.ControllerConfiguration { +public class AnnotationControllerConfiguration

+ implements io.javaoperatorsdk.operator.api.config.ControllerConfiguration

{ - protected final Reconciler reconciler; + protected final Reconciler

reconciler; private final ControllerConfiguration annotation; private List specs; - private Class resourceClass; + private Class

resourceClass; - public AnnotationControllerConfiguration(Reconciler reconciler) { + public AnnotationControllerConfiguration(Reconciler

reconciler) { this.reconciler = reconciler; this.annotation = reconciler.getClass().getAnnotation(ControllerConfiguration.class); if (annotation == null) { @@ -84,10 +90,10 @@ public Set getNamespaces() { @Override @SuppressWarnings("unchecked") - public Class getResourceClass() { + public Class

getResourceClass() { if (resourceClass == null) { resourceClass = - (Class) Utils.getFirstTypeArgumentFromSuperClassOrInterface(reconciler.getClass(), + (Class

) Utils.getFirstTypeArgumentFromSuperClassOrInterface(reconciler.getClass(), Reconciler.class); } return resourceClass; @@ -105,16 +111,16 @@ public String getAssociatedReconcilerClassName() { @SuppressWarnings("unchecked") @Override - public ResourceEventFilter getEventFilter() { - ResourceEventFilter answer = null; + public ResourceEventFilter

getEventFilter() { + ResourceEventFilter

answer = null; - Class>[] filterTypes = - (Class>[]) valueOrDefault(annotation, + Class>[] filterTypes = + (Class>[]) valueOrDefault(annotation, ControllerConfiguration::eventFilters, new Object[] {}); if (filterTypes.length > 0) { for (var filterType : filterTypes) { try { - ResourceEventFilter filter = filterType.getConstructor().newInstance(); + ResourceEventFilter

filter = filterType.getConstructor().newInstance(); if (answer == null) { answer = filter; @@ -144,17 +150,55 @@ public Optional reconciliationMaxInterval() { } } - public static T valueOrDefault( - ControllerConfiguration controllerConfiguration, - Function mapper, - T defaultValue) { - if (controllerConfiguration == null) { - return defaultValue; + @Override + @SuppressWarnings("unchecked") + public Optional> onAddFilter() { + return (Optional>) createFilter(annotation.onAddFilter(), FilterType.onAdd, + annotation.getClass().getSimpleName()); + } + + private enum FilterType { + onAdd(VoidOnAddFilter.class), onUpdate(VoidOnUpdateFilter.class), onDelete( + VoidOnDeleteFilter.class), generic(VoidGenericFilter.class); + + final Class defaultValue; + + FilterType(Class defaultValue) { + this.defaultValue = defaultValue; + } + } + + private Optional createFilter(Class filter, FilterType filterType, String origin) { + if (filterType.defaultValue.equals(filter)) { + return Optional.empty(); } else { - return mapper.apply(controllerConfiguration); + try { + var instance = (T) filter.getDeclaredConstructor().newInstance(); + return Optional.of(instance); + } catch (InstantiationException | IllegalAccessException | InvocationTargetException + | NoSuchMethodException e) { + throw new OperatorException( + "Couldn't create " + filterType + " filter from " + filter.getName() + " class in " + + origin + " for reconciler " + getName(), + e); + } } } + @SuppressWarnings("unchecked") + @Override + public Optional> onUpdateFilter() { + return (Optional>) createFilter(annotation.onUpdateFilter(), + FilterType.onUpdate, annotation.getClass().getSimpleName()); + } + + @SuppressWarnings("unchecked") + @Override + public Optional> genericFilter() { + return (Optional>) createFilter(annotation.genericFilter(), + FilterType.generic, annotation.getClass().getSimpleName()); + } + @SuppressWarnings({"rawtypes", "unchecked"}) @Override public List getDependentResources() { @@ -223,26 +267,59 @@ private String getName(Dependent dependent, Class d return name; } + @SuppressWarnings("rawtypes") private Object createKubernetesResourceConfig(Class dependentType) { + Object config; final var kubeDependent = dependentType.getAnnotation(KubernetesDependent.class); var namespaces = getNamespaces(); var configuredNS = false; - if (kubeDependent != null && !Arrays.equals(KubernetesDependent.DEFAULT_NAMESPACES, - kubeDependent.namespaces())) { - namespaces = Set.of(kubeDependent.namespaces()); - configuredNS = true; - } - String labelSelector = null; + Predicate onAddFilter = null; + BiPredicate onUpdateFilter = null; + BiPredicate onDeleteFilter = null; + Predicate genericFilter = null; if (kubeDependent != null) { + if (!Arrays.equals(KubernetesDependent.DEFAULT_NAMESPACES, + kubeDependent.namespaces())) { + namespaces = Set.of(kubeDependent.namespaces()); + configuredNS = true; + } + final var fromAnnotation = kubeDependent.labelSelector(); labelSelector = Constants.NO_VALUE_SET.equals(fromAnnotation) ? null : fromAnnotation; + + final var kubeDependentName = KubernetesDependent.class.getSimpleName(); + onAddFilter = createFilter(kubeDependent.onAddFilter(), FilterType.onAdd, kubeDependentName) + .orElse(null); + onUpdateFilter = + createFilter(kubeDependent.onUpdateFilter(), FilterType.onUpdate, kubeDependentName) + .orElse(null); + onDeleteFilter = + createFilter(kubeDependent.onDeleteFilter(), FilterType.onDelete, kubeDependentName) + .orElse(null); + genericFilter = + createFilter(kubeDependent.genericFilter(), FilterType.generic, kubeDependentName) + .orElse(null); } config = - new KubernetesDependentResourceConfig(namespaces, labelSelector, configuredNS); + new KubernetesDependentResourceConfig(namespaces, labelSelector, configuredNS, onAddFilter, + onUpdateFilter, onDeleteFilter, genericFilter); + return config; } + + public static T valueOrDefault( + ControllerConfiguration controllerConfiguration, + Function mapper, + T defaultValue) { + if (controllerConfiguration == null) { + return defaultValue; + } else { + return mapper.apply(controllerConfiguration); + } + } + } diff --git a/operator-framework-core/src/main/java/io/javaoperatorsdk/operator/api/config/ControllerConfigurationOverrider.java b/operator-framework-core/src/main/java/io/javaoperatorsdk/operator/api/config/ControllerConfigurationOverrider.java index 7e987d4700..dc579bb67f 100644 --- a/operator-framework-core/src/main/java/io/javaoperatorsdk/operator/api/config/ControllerConfigurationOverrider.java +++ b/operator-framework-core/src/main/java/io/javaoperatorsdk/operator/api/config/ControllerConfigurationOverrider.java @@ -1,7 +1,12 @@ package io.javaoperatorsdk.operator.api.config; import java.time.Duration; -import java.util.*; +import java.util.HashSet; +import java.util.LinkedHashMap; +import java.util.List; +import java.util.Optional; +import java.util.Set; +import java.util.function.BiPredicate; import java.util.function.Predicate; import java.util.stream.Collectors; @@ -27,6 +32,9 @@ public class ControllerConfigurationOverrider { private final ControllerConfiguration original; private Duration reconciliationMaxInterval; private final LinkedHashMap namedDependentResourceSpecs; + private Predicate onAddFilter; + private BiPredicate onUpdateFilter; + private Predicate genericFilter; private ControllerConfigurationOverrider(ControllerConfiguration original) { finalizer = original.getFinalizerName(); @@ -39,6 +47,9 @@ private ControllerConfigurationOverrider(ControllerConfiguration original) { // make the original specs modifiable final var dependentResources = original.getDependentResources(); namedDependentResourceSpecs = new LinkedHashMap<>(dependentResources.size()); + this.onAddFilter = original.onAddFilter().orElse(null); + this.onUpdateFilter = original.onUpdateFilter().orElse(null); + this.genericFilter = original.genericFilter().orElse(null); dependentResources.forEach(drs -> namedDependentResourceSpecs.put(drs.getName(), drs)); this.original = original; } @@ -120,6 +131,21 @@ public ControllerConfigurationOverrider withReconciliationMaxInterval( return this; } + public ControllerConfigurationOverrider withOnAddFilter(Predicate onAddFilter) { + this.onAddFilter = onAddFilter; + return this; + } + + public ControllerConfigurationOverrider withOnUpdateFilter(BiPredicate onUpdateFilter) { + this.onUpdateFilter = onUpdateFilter; + return this; + } + + public ControllerConfigurationOverrider withGenericFilter(Predicate genericFilter) { + this.genericFilter = genericFilter; + return this; + } + public ControllerConfigurationOverrider replacingNamedDependentResourceConfig(String name, Object dependentResourceConfig) { @@ -167,6 +193,9 @@ public ControllerConfiguration build() { customResourcePredicate, original.getResourceClass(), reconciliationMaxInterval, + onAddFilter, + onUpdateFilter, + genericFilter, newDependentSpecs); } diff --git a/operator-framework-core/src/main/java/io/javaoperatorsdk/operator/api/config/DefaultControllerConfiguration.java b/operator-framework-core/src/main/java/io/javaoperatorsdk/operator/api/config/DefaultControllerConfiguration.java index 7046af3d85..b68c5e8f4f 100644 --- a/operator-framework-core/src/main/java/io/javaoperatorsdk/operator/api/config/DefaultControllerConfiguration.java +++ b/operator-framework-core/src/main/java/io/javaoperatorsdk/operator/api/config/DefaultControllerConfiguration.java @@ -5,6 +5,8 @@ import java.util.List; import java.util.Optional; import java.util.Set; +import java.util.function.BiPredicate; +import java.util.function.Predicate; import io.fabric8.kubernetes.api.model.HasMetadata; import io.javaoperatorsdk.operator.api.config.dependent.DependentResourceSpec; @@ -39,8 +41,11 @@ public DefaultControllerConfiguration( ResourceEventFilter resourceEventFilter, Class resourceClass, Duration reconciliationMaxInterval, + Predicate onAddFilter, + BiPredicate onUpdateFilter, + Predicate genericFilter, List dependents) { - super(labelSelector, resourceClass, namespaces); + super(labelSelector, resourceClass, onAddFilter, onUpdateFilter, genericFilter, namespaces); this.associatedControllerClassName = associatedControllerClassName; this.name = name; this.crdName = crdName; diff --git a/operator-framework-core/src/main/java/io/javaoperatorsdk/operator/api/config/DefaultResourceConfiguration.java b/operator-framework-core/src/main/java/io/javaoperatorsdk/operator/api/config/DefaultResourceConfiguration.java index 407c352be0..4bb13fa06c 100644 --- a/operator-framework-core/src/main/java/io/javaoperatorsdk/operator/api/config/DefaultResourceConfiguration.java +++ b/operator-framework-core/src/main/java/io/javaoperatorsdk/operator/api/config/DefaultResourceConfiguration.java @@ -1,6 +1,9 @@ package io.javaoperatorsdk.operator.api.config; +import java.util.Optional; import java.util.Set; +import java.util.function.BiPredicate; +import java.util.function.Predicate; import io.fabric8.kubernetes.api.model.HasMetadata; @@ -12,18 +15,26 @@ public class DefaultResourceConfiguration private final String labelSelector; private final Set namespaces; private final Class resourceClass; + private final Predicate onAddFilter; + private final BiPredicate onUpdateFilter; + private final Predicate genericFilter; public DefaultResourceConfiguration(String labelSelector, Class resourceClass, - String... namespaces) { - this(labelSelector, resourceClass, + Predicate onAddFilter, + BiPredicate onUpdateFilter, Predicate genericFilter, String... namespaces) { + this(labelSelector, resourceClass, onAddFilter, onUpdateFilter, genericFilter, namespaces == null || namespaces.length == 0 ? DEFAULT_NAMESPACES_SET : Set.of(namespaces)); } public DefaultResourceConfiguration(String labelSelector, Class resourceClass, - Set namespaces) { + Predicate onAddFilter, + BiPredicate onUpdateFilter, Predicate genericFilter, Set namespaces) { this.labelSelector = labelSelector; this.resourceClass = resourceClass; + this.onAddFilter = onAddFilter; + this.onUpdateFilter = onUpdateFilter; + this.genericFilter = genericFilter; this.namespaces = namespaces == null || namespaces.isEmpty() ? DEFAULT_NAMESPACES_SET : namespaces; @@ -48,4 +59,18 @@ public Set getNamespaces() { public Class getResourceClass() { return resourceClass; } + + @Override + public Optional> onAddFilter() { + return Optional.ofNullable(onAddFilter); + } + + @Override + public Optional> onUpdateFilter() { + return Optional.ofNullable(onUpdateFilter); + } + + public Optional> genericFilter() { + return Optional.ofNullable(genericFilter); + } } diff --git a/operator-framework-core/src/main/java/io/javaoperatorsdk/operator/api/config/ResourceConfiguration.java b/operator-framework-core/src/main/java/io/javaoperatorsdk/operator/api/config/ResourceConfiguration.java index 70d2b765a5..636ae99b49 100644 --- a/operator-framework-core/src/main/java/io/javaoperatorsdk/operator/api/config/ResourceConfiguration.java +++ b/operator-framework-core/src/main/java/io/javaoperatorsdk/operator/api/config/ResourceConfiguration.java @@ -1,7 +1,10 @@ package io.javaoperatorsdk.operator.api.config; import java.util.Collections; +import java.util.Optional; import java.util.Set; +import java.util.function.BiPredicate; +import java.util.function.Predicate; import io.fabric8.kubernetes.api.model.HasMetadata; import io.javaoperatorsdk.operator.OperatorException; @@ -17,6 +20,18 @@ default String getResourceTypeName() { return ReconcilerUtils.getResourceTypeName(getResourceClass()); } + default Optional> onAddFilter() { + return Optional.empty(); + } + + default Optional> onUpdateFilter() { + return Optional.empty(); + } + + default Optional> genericFilter() { + return Optional.empty(); + } + /** * Retrieves the label selector that is used to filter which resources are actually watched by the * associated event source. See the official documentation on the diff --git a/operator-framework-core/src/main/java/io/javaoperatorsdk/operator/api/config/informer/InformerConfiguration.java b/operator-framework-core/src/main/java/io/javaoperatorsdk/operator/api/config/informer/InformerConfiguration.java index ddfae2919e..e9ee02c91b 100644 --- a/operator-framework-core/src/main/java/io/javaoperatorsdk/operator/api/config/informer/InformerConfiguration.java +++ b/operator-framework-core/src/main/java/io/javaoperatorsdk/operator/api/config/informer/InformerConfiguration.java @@ -1,7 +1,10 @@ package io.javaoperatorsdk.operator.api.config.informer; import java.util.Objects; +import java.util.Optional; import java.util.Set; +import java.util.function.BiPredicate; +import java.util.function.Predicate; import io.fabric8.kubernetes.api.model.HasMetadata; import io.javaoperatorsdk.operator.api.config.DefaultResourceConfiguration; @@ -23,28 +26,41 @@ class DefaultInformerConfiguration extends private final PrimaryToSecondaryMapper primaryToSecondaryMapper; private final SecondaryToPrimaryMapper secondaryToPrimaryMapper; private final boolean followControllerNamespaceChanges; + private final BiPredicate onDeleteFilter; protected DefaultInformerConfiguration(String labelSelector, Class resourceClass, PrimaryToSecondaryMapper primaryToSecondaryMapper, SecondaryToPrimaryMapper secondaryToPrimaryMapper, - Set namespaces, boolean followControllerNamespaceChanges) { - super(labelSelector, resourceClass, namespaces); + Set namespaces, boolean followControllerNamespaceChanges, + Predicate onAddFilter, + BiPredicate onUpdateFilter, + BiPredicate onDeleteFilter, + Predicate genericFilter) { + super(labelSelector, resourceClass, onAddFilter, onUpdateFilter, genericFilter, namespaces); this.followControllerNamespaceChanges = followControllerNamespaceChanges; + this.primaryToSecondaryMapper = primaryToSecondaryMapper; this.secondaryToPrimaryMapper = Objects.requireNonNullElse(secondaryToPrimaryMapper, Mappers.fromOwnerReference()); + this.onDeleteFilter = onDeleteFilter; } + @Override public boolean followControllerNamespaceChanges() { return followControllerNamespaceChanges; } + @Override public SecondaryToPrimaryMapper getSecondaryToPrimaryMapper() { return secondaryToPrimaryMapper; } + public Optional> onDeleteFilter() { + return Optional.ofNullable(onDeleteFilter); + } + @Override public

PrimaryToSecondaryMapper

getPrimaryToSecondaryMapper() { return (PrimaryToSecondaryMapper

) primaryToSecondaryMapper; @@ -61,6 +77,14 @@ public

PrimaryToSecondaryMapper

getPrimaryToSecondary SecondaryToPrimaryMapper getSecondaryToPrimaryMapper(); + Optional> onAddFilter(); + + Optional> onUpdateFilter(); + + Optional> onDeleteFilter(); + + Optional> genericFilter(); +

PrimaryToSecondaryMapper

getPrimaryToSecondaryMapper(); @SuppressWarnings("unused") @@ -71,6 +95,10 @@ class InformerConfigurationBuilder { private Set namespaces; private String labelSelector; private final Class resourceClass; + private Predicate onAddFilter; + private BiPredicate onUpdateFilter; + private BiPredicate onDeleteFilter; + private Predicate genericFilter; private boolean inheritControllerNamespacesOnChange = false; private InformerConfigurationBuilder(Class resourceClass) { @@ -151,11 +179,33 @@ public InformerConfigurationBuilder withLabelSelector(String labelSelector) { return this; } + public InformerConfigurationBuilder withOnAddFilter(Predicate onAddFilter) { + this.onAddFilter = onAddFilter; + return this; + } + + public InformerConfigurationBuilder withOnUpdateFilter(BiPredicate onUpdateFilter) { + this.onUpdateFilter = onUpdateFilter; + return this; + } + + public InformerConfigurationBuilder withOnDeleteFilter( + BiPredicate onDeleteFilter) { + this.onDeleteFilter = onDeleteFilter; + return this; + } + + public InformerConfigurationBuilder withGenericFilter(Predicate genericFilter) { + this.genericFilter = genericFilter; + return this; + } + public InformerConfiguration build() { return new DefaultInformerConfiguration<>(labelSelector, resourceClass, primaryToSecondaryMapper, secondaryToPrimaryMapper, - namespaces, inheritControllerNamespacesOnChange); + namespaces, inheritControllerNamespacesOnChange, onAddFilter, onUpdateFilter, + onDeleteFilter, genericFilter); } } diff --git a/operator-framework-core/src/main/java/io/javaoperatorsdk/operator/api/reconciler/ControllerConfiguration.java b/operator-framework-core/src/main/java/io/javaoperatorsdk/operator/api/reconciler/ControllerConfiguration.java index a5e1459cb7..4c6621bb57 100644 --- a/operator-framework-core/src/main/java/io/javaoperatorsdk/operator/api/reconciler/ControllerConfiguration.java +++ b/operator-framework-core/src/main/java/io/javaoperatorsdk/operator/api/reconciler/ControllerConfiguration.java @@ -4,9 +4,15 @@ import java.lang.annotation.Retention; import java.lang.annotation.RetentionPolicy; import java.lang.annotation.Target; +import java.util.function.BiPredicate; +import java.util.function.Predicate; +import io.fabric8.kubernetes.api.model.HasMetadata; import io.javaoperatorsdk.operator.api.reconciler.dependent.Dependent; import io.javaoperatorsdk.operator.processing.event.source.controller.ResourceEventFilter; +import io.javaoperatorsdk.operator.processing.event.source.filter.VoidGenericFilter; +import io.javaoperatorsdk.operator.processing.event.source.filter.VoidOnAddFilter; +import io.javaoperatorsdk.operator.processing.event.source.filter.VoidOnUpdateFilter; @Retention(RetentionPolicy.RUNTIME) @Target({ElementType.TYPE}) @@ -51,15 +57,31 @@ String labelSelector() default Constants.NO_VALUE_SET; /** - *

- * Resource event filters only applies on events of the main custom resource. Not on events from - * other event sources nor the periodic events. - *

+ * @deprecated Use onAddFilter, onUpdateFilter instead. + * + *

+ * Resource event filters only applies on events of the main custom resource. Not on + * events from other event sources nor the periodic events. + *

* * @return the list of event filters. */ + @Deprecated(forRemoval = true) Class[] eventFilters() default {}; + /** + * Filter of onAdd events of resources. + **/ + Class> onAddFilter() default VoidOnAddFilter.class; + + /** Filter of onUpdate events of resources. */ + Class> onUpdateFilter() default VoidOnUpdateFilter.class; + + /** + * Filter applied to all operations (add, update, delete). Used to ignore some resources. + **/ + Class> genericFilter() default VoidGenericFilter.class; + /** * Optional configuration of the maximal interval the SDK will wait for a reconciliation request * to happen before one will be automatically triggered. diff --git a/operator-framework-core/src/main/java/io/javaoperatorsdk/operator/processing/Controller.java b/operator-framework-core/src/main/java/io/javaoperatorsdk/operator/processing/Controller.java index 5c25787f1f..acbf2f2632 100644 --- a/operator-framework-core/src/main/java/io/javaoperatorsdk/operator/processing/Controller.java +++ b/operator-framework-core/src/main/java/io/javaoperatorsdk/operator/processing/Controller.java @@ -64,10 +64,10 @@ public Controller(Reconciler

reconciler, this.metrics = Optional.ofNullable(ConfigurationServiceProvider.instance().getMetrics()) .orElse(Metrics.NOOP); contextInitializer = reconciler instanceof ContextInitializer; - eventSourceManager = new EventSourceManager<>(this); isCleaner = reconciler instanceof Cleaner; managedWorkflow = ManagedWorkflow.workflowFor(kubernetesClient, configuration.getDependentResources()); + eventSourceManager = new EventSourceManager<>(this); } @Override diff --git a/operator-framework-core/src/main/java/io/javaoperatorsdk/operator/processing/dependent/AbstractEventSourceHolderDependentResource.java b/operator-framework-core/src/main/java/io/javaoperatorsdk/operator/processing/dependent/AbstractEventSourceHolderDependentResource.java index 91d11309f8..2f7747cec6 100644 --- a/operator-framework-core/src/main/java/io/javaoperatorsdk/operator/processing/dependent/AbstractEventSourceHolderDependentResource.java +++ b/operator-framework-core/src/main/java/io/javaoperatorsdk/operator/processing/dependent/AbstractEventSourceHolderDependentResource.java @@ -1,5 +1,8 @@ package io.javaoperatorsdk.operator.processing.dependent; +import java.util.function.BiPredicate; +import java.util.function.Predicate; + import io.fabric8.kubernetes.api.model.HasMetadata; import io.javaoperatorsdk.operator.api.reconciler.EventSourceContext; import io.javaoperatorsdk.operator.api.reconciler.Ignore; @@ -16,13 +19,22 @@ public abstract class AbstractEventSourceHolderDependentResource onAddFilter; + protected BiPredicate onUpdateFilter; + protected BiPredicate onDeleteFilter; + protected Predicate genericFilter; + public EventSource initEventSource(EventSourceContext

context) { // some sub-classes (e.g. KubernetesDependentResource) can have their event source created // before this method is called in the managed case, so only create the event source if it - // hasn't already been set + // hasn't already been set. + // The filters are applied automatically only if event source is created automatically. So if an + // event source + // is shared between dependent resources this does not override the existing filters. if (eventSource == null) { eventSource = createEventSource(context); + applyFilters(); } isCacheFillerEventSource = eventSource instanceof RecentOperationCacheFiller; @@ -35,6 +47,13 @@ protected void setEventSource(T eventSource) { this.eventSource = eventSource; } + protected void applyFilters() { + this.eventSource.setOnAddFilter(onAddFilter); + this.eventSource.setOnUpdateFilter(onUpdateFilter); + this.eventSource.setOnDeleteFilter(onDeleteFilter); + this.eventSource.setGenericFilter(genericFilter); + } + protected T eventSource() { return eventSource; } @@ -55,4 +74,16 @@ protected void onUpdated(ResourceID primaryResourceId, R updated, R actual) { private RecentOperationCacheFiller recentOperationCacheFiller() { return (RecentOperationCacheFiller) eventSource; } + + public void setOnAddFilter(Predicate onAddFilter) { + this.onAddFilter = onAddFilter; + } + + public void setOnUpdateFilter(BiPredicate onUpdateFilter) { + this.onUpdateFilter = onUpdateFilter; + } + + public void setOnDeleteFilter(BiPredicate onDeleteFilter) { + this.onDeleteFilter = onDeleteFilter; + } } diff --git a/operator-framework-core/src/main/java/io/javaoperatorsdk/operator/processing/dependent/kubernetes/KubernetesDependent.java b/operator-framework-core/src/main/java/io/javaoperatorsdk/operator/processing/dependent/kubernetes/KubernetesDependent.java index fc33975532..6ef9aba571 100644 --- a/operator-framework-core/src/main/java/io/javaoperatorsdk/operator/processing/dependent/kubernetes/KubernetesDependent.java +++ b/operator-framework-core/src/main/java/io/javaoperatorsdk/operator/processing/dependent/kubernetes/KubernetesDependent.java @@ -4,8 +4,15 @@ import java.lang.annotation.Retention; import java.lang.annotation.RetentionPolicy; import java.lang.annotation.Target; +import java.util.function.BiPredicate; +import java.util.function.Predicate; +import io.fabric8.kubernetes.api.model.HasMetadata; import io.javaoperatorsdk.operator.api.reconciler.Constants; +import io.javaoperatorsdk.operator.processing.event.source.filter.VoidGenericFilter; +import io.javaoperatorsdk.operator.processing.event.source.filter.VoidOnAddFilter; +import io.javaoperatorsdk.operator.processing.event.source.filter.VoidOnDeleteFilter; +import io.javaoperatorsdk.operator.processing.event.source.filter.VoidOnUpdateFilter; import static io.javaoperatorsdk.operator.api.reconciler.Constants.NO_VALUE_SET; @@ -32,4 +39,12 @@ * @return the label selector */ String labelSelector() default NO_VALUE_SET; + + Class> onAddFilter() default VoidOnAddFilter.class; + + Class> onUpdateFilter() default VoidOnUpdateFilter.class; + + Class> onDeleteFilter() default VoidOnDeleteFilter.class; + + Class> genericFilter() default VoidGenericFilter.class; } diff --git a/operator-framework-core/src/main/java/io/javaoperatorsdk/operator/processing/dependent/kubernetes/KubernetesDependentResource.java b/operator-framework-core/src/main/java/io/javaoperatorsdk/operator/processing/dependent/kubernetes/KubernetesDependentResource.java index 3f8e4f7fc6..ff5cc308bf 100644 --- a/operator-framework-core/src/main/java/io/javaoperatorsdk/operator/processing/dependent/kubernetes/KubernetesDependentResource.java +++ b/operator-framework-core/src/main/java/io/javaoperatorsdk/operator/processing/dependent/kubernetes/KubernetesDependentResource.java @@ -60,7 +60,6 @@ public void configureWith(KubernetesDependentResourceConfig config) { this.kubernetesDependentResourceConfig = config; } - @SuppressWarnings("unchecked") private void configureWith(String labelSelector, Set namespaces, boolean inheritNamespacesOnChange, EventSourceContext

context) { @@ -74,7 +73,7 @@ private void configureWith(String labelSelector, Set namespaces, .withNamespaces(namespaces, inheritNamespacesOnChange) .build(); - configureWith(new InformerEventSource<>(ic, client)); + configureWith(new InformerEventSource<>(ic, context)); } @SuppressWarnings("unchecked") @@ -159,8 +158,15 @@ protected NonNamespaceOperation, Resource> prepa } @Override + @SuppressWarnings("unchecked") protected InformerEventSource createEventSource(EventSourceContext

context) { if (kubernetesDependentResourceConfig != null) { + // sets the filters for the dependent resource, which are applied by parent class + onAddFilter = kubernetesDependentResourceConfig.onAddFilter(); + onUpdateFilter = kubernetesDependentResourceConfig.onUpdateFilter(); + onDeleteFilter = kubernetesDependentResourceConfig.onDeleteFilter(); + genericFilter = kubernetesDependentResourceConfig.genericFilter(); + configureWith(kubernetesDependentResourceConfig.labelSelector(), kubernetesDependentResourceConfig.namespaces(), !kubernetesDependentResourceConfig.wereNamespacesConfigured(), context); diff --git a/operator-framework-core/src/main/java/io/javaoperatorsdk/operator/processing/dependent/kubernetes/KubernetesDependentResourceConfig.java b/operator-framework-core/src/main/java/io/javaoperatorsdk/operator/processing/dependent/kubernetes/KubernetesDependentResourceConfig.java index a28668055a..8fd3bc2311 100644 --- a/operator-framework-core/src/main/java/io/javaoperatorsdk/operator/processing/dependent/kubernetes/KubernetesDependentResourceConfig.java +++ b/operator-framework-core/src/main/java/io/javaoperatorsdk/operator/processing/dependent/kubernetes/KubernetesDependentResourceConfig.java @@ -1,38 +1,55 @@ package io.javaoperatorsdk.operator.processing.dependent.kubernetes; import java.util.Set; +import java.util.function.BiPredicate; +import java.util.function.Predicate; import io.javaoperatorsdk.operator.api.reconciler.Constants; import static io.javaoperatorsdk.operator.api.reconciler.Constants.NO_VALUE_SET; -public class KubernetesDependentResourceConfig { +public class KubernetesDependentResourceConfig { private Set namespaces = Constants.SAME_AS_CONTROLLER_NAMESPACES_SET; private String labelSelector = NO_VALUE_SET; - private boolean namespacesWereConfigured = false; + + private Predicate onAddFilter; + + private BiPredicate onUpdateFilter; + + private BiPredicate onDeleteFilter; + + private Predicate genericFilter; + public KubernetesDependentResourceConfig() {} + @SuppressWarnings("rawtypes") public KubernetesDependentResourceConfig(Set namespaces, String labelSelector, - boolean configuredNS) { + boolean configuredNS, Predicate onAddFilter, + BiPredicate onUpdateFilter, + BiPredicate onDeleteFilter, Predicate genericFilter) { this.namespaces = namespaces; this.labelSelector = labelSelector; this.namespacesWereConfigured = configuredNS; + this.onAddFilter = onAddFilter; + this.onUpdateFilter = onUpdateFilter; + this.onDeleteFilter = onDeleteFilter; + this.genericFilter = genericFilter; } public KubernetesDependentResourceConfig(Set namespaces, String labelSelector) { - this(namespaces, labelSelector, true); + this(namespaces, labelSelector, true, null, null, null, null); } - public KubernetesDependentResourceConfig setNamespaces(Set namespaces) { + public KubernetesDependentResourceConfig setNamespaces(Set namespaces) { this.namespacesWereConfigured = true; this.namespaces = namespaces; return this; } - public KubernetesDependentResourceConfig setLabelSelector(String labelSelector) { + public KubernetesDependentResourceConfig setLabelSelector(String labelSelector) { this.labelSelector = labelSelector; return this; } @@ -49,4 +66,22 @@ public boolean wereNamespacesConfigured() { return namespacesWereConfigured; } + @SuppressWarnings("rawtypes") + public Predicate onAddFilter() { + return onAddFilter; + } + + @SuppressWarnings("rawtypes") + public BiPredicate onUpdateFilter() { + return onUpdateFilter; + } + + @SuppressWarnings("rawtypes") + public BiPredicate onDeleteFilter() { + return onDeleteFilter; + } + + public Predicate genericFilter() { + return genericFilter; + } } diff --git a/operator-framework-core/src/main/java/io/javaoperatorsdk/operator/processing/event/source/AbstractResourceEventSource.java b/operator-framework-core/src/main/java/io/javaoperatorsdk/operator/processing/event/source/AbstractResourceEventSource.java index 051a75ff20..d2e8ed8c1d 100644 --- a/operator-framework-core/src/main/java/io/javaoperatorsdk/operator/processing/event/source/AbstractResourceEventSource.java +++ b/operator-framework-core/src/main/java/io/javaoperatorsdk/operator/processing/event/source/AbstractResourceEventSource.java @@ -1,5 +1,8 @@ package io.javaoperatorsdk.operator.processing.event.source; +import java.util.function.BiPredicate; +import java.util.function.Predicate; + import io.fabric8.kubernetes.api.model.HasMetadata; public abstract class AbstractResourceEventSource @@ -7,6 +10,11 @@ public abstract class AbstractResourceEventSource implements ResourceEventSource { private final Class resourceClass; + protected Predicate onAddFilter; + protected BiPredicate onUpdateFilter; + protected BiPredicate onDeleteFilter; + protected Predicate genericFilter; + protected AbstractResourceEventSource(Class resourceClass) { this.resourceClass = resourceClass; } @@ -15,4 +23,22 @@ protected AbstractResourceEventSource(Class resourceClass) { public Class resourceType() { return resourceClass; } + + public void setOnAddFilter(Predicate onAddFilter) { + this.onAddFilter = onAddFilter; + } + + public void setOnUpdateFilter( + BiPredicate onUpdateFilter) { + this.onUpdateFilter = onUpdateFilter; + } + + public void setOnDeleteFilter( + BiPredicate onDeleteFilter) { + this.onDeleteFilter = onDeleteFilter; + } + + public void setGenericFilter(Predicate genericFilter) { + this.genericFilter = genericFilter; + } } diff --git a/operator-framework-core/src/main/java/io/javaoperatorsdk/operator/processing/event/source/CachingEventSource.java b/operator-framework-core/src/main/java/io/javaoperatorsdk/operator/processing/event/source/CachingEventSource.java deleted file mode 100644 index 55bd1ab920..0000000000 --- a/operator-framework-core/src/main/java/io/javaoperatorsdk/operator/processing/event/source/CachingEventSource.java +++ /dev/null @@ -1,51 +0,0 @@ -package io.javaoperatorsdk.operator.processing.event.source; - -import java.util.Optional; -import java.util.function.Predicate; -import java.util.stream.Stream; - -import io.fabric8.kubernetes.api.model.HasMetadata; -import io.javaoperatorsdk.operator.processing.event.ResourceID; - -/** - * Base class for event sources with caching capabilities. - *

- * - * @param represents the type of resources (usually external non-kubernetes ones) being handled. - */ -public abstract class CachingEventSource - extends AbstractResourceEventSource implements Cache { - - protected UpdatableCache cache; - - protected CachingEventSource(Class resourceClass) { - super(resourceClass); - cache = initCache(); - } - - @Override - public Optional get(ResourceID resourceID) { - return cache.get(resourceID); - } - - @Override - public boolean contains(ResourceID resourceID) { - return cache.contains(resourceID); - } - - @Override - public Stream keys() { - return cache.keys(); - } - - @Override - public Stream list(Predicate predicate) { - return cache.list(predicate); - } - - public Optional getCachedValue(ResourceID resourceID) { - return cache.get(resourceID); - } - - protected abstract UpdatableCache initCache(); -} diff --git a/operator-framework-core/src/main/java/io/javaoperatorsdk/operator/processing/event/source/ExternalResourceCachingEventSource.java b/operator-framework-core/src/main/java/io/javaoperatorsdk/operator/processing/event/source/ExternalResourceCachingEventSource.java index f8a0cafcd8..275a41775a 100644 --- a/operator-framework-core/src/main/java/io/javaoperatorsdk/operator/processing/event/source/ExternalResourceCachingEventSource.java +++ b/operator-framework-core/src/main/java/io/javaoperatorsdk/operator/processing/event/source/ExternalResourceCachingEventSource.java @@ -3,6 +3,7 @@ import java.util.*; import java.util.concurrent.ConcurrentHashMap; import java.util.stream.Collectors; +import java.util.stream.Stream; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -48,7 +49,7 @@ protected ExternalResourceCachingEventSource(Class resourceClass, protected synchronized void handleDelete(ResourceID primaryID) { var res = cache.remove(primaryID); - if (res != null) { + if (res != null && deleteAcceptedByFilter(res.values())) { getEventHandler().handleEvent(new Event(primaryID)); } } @@ -62,18 +63,19 @@ protected synchronized void handleDelete(ResourceID primaryID, R resource) { handleDelete(primaryID, Set.of(cacheKeyMapper.keyFor(resource))); } - protected synchronized void handleDelete(ResourceID primaryID, Set resourceID) { + protected synchronized void handleDelete(ResourceID primaryID, Set resourceIDs) { if (!isRunning()) { return; } var cachedValues = cache.get(primaryID); - var sizeBeforeRemove = cachedValues.size(); - resourceID.forEach(cachedValues::remove); + List removedResources = cachedValues == null ? Collections.emptyList() + : resourceIDs.stream() + .flatMap(id -> Stream.ofNullable(cachedValues.remove(id))).collect(Collectors.toList()); - if (cachedValues.isEmpty()) { + if (cachedValues != null && cachedValues.isEmpty()) { cache.remove(primaryID); } - if (sizeBeforeRemove > cachedValues.size()) { + if (!removedResources.isEmpty() && deleteAcceptedByFilter(removedResources)) { getEventHandler().handleEvent(new Event(primaryID)); } } @@ -90,7 +92,7 @@ protected synchronized void handleResources(Map> allNewResour var toDelete = cache.keySet().stream().filter(k -> !allNewResources.containsKey(k)) .collect(Collectors.toList()); toDelete.forEach(this::handleDelete); - allNewResources.forEach((primaryID, resources) -> handleResources(primaryID, resources)); + allNewResources.forEach(this::handleResources); } protected synchronized void handleResources(ResourceID primaryID, Set newResources, @@ -101,14 +103,76 @@ protected synchronized void handleResources(ResourceID primaryID, Set newReso return; } var cachedResources = cache.get(primaryID); + if (cachedResources == null) { + cachedResources = Collections.emptyMap(); + } var newResourcesMap = newResources.stream().collect(Collectors.toMap(cacheKeyMapper::keyFor, r -> r)); cache.put(primaryID, newResourcesMap); - if (propagateEvent && !newResourcesMap.equals(cachedResources)) { + if (propagateEvent && !newResourcesMap.equals(cachedResources) + && acceptedByFiler(cachedResources, newResourcesMap)) { getEventHandler().handleEvent(new Event(primaryID)); } } + private boolean acceptedByFiler(Map cachedResourceMap, + Map newResourcesMap) { + + var addedResources = new HashMap<>(newResourcesMap); + addedResources.keySet().removeAll(cachedResourceMap.keySet()); + if (onAddFilter != null || genericFilter != null) { + var anyAddAccepted = + addedResources.values().stream().anyMatch(r -> acceptedByGenericFiler(r) && + onAddFilter.test(r)); + if (anyAddAccepted) { + return true; + } + } else if (!addedResources.isEmpty()) { + return true; + } + + var deletedResource = new HashMap<>(cachedResourceMap); + deletedResource.keySet().removeAll(newResourcesMap.keySet()); + if (onDeleteFilter != null || genericFilter != null) { + var anyDeleteAccepted = + deletedResource.values().stream() + .anyMatch(r -> acceptedByGenericFiler(r) && onDeleteFilter.test(r, false)); + if (anyDeleteAccepted) { + return true; + } + } else if (!deletedResource.isEmpty()) { + return true; + } + + Map possibleUpdatedResources = new HashMap<>(cachedResourceMap); + possibleUpdatedResources.keySet().retainAll(newResourcesMap.keySet()); + possibleUpdatedResources = possibleUpdatedResources.entrySet().stream() + .filter(entry -> !newResourcesMap + .get(entry.getKey()).equals(entry.getValue())) + .collect(Collectors.toMap(Map.Entry::getKey, Map.Entry::getValue)); + + if (onUpdateFilter != null || genericFilter != null) { + var anyUpdated = possibleUpdatedResources.entrySet().stream() + .anyMatch( + entry -> { + var newResource = newResourcesMap.get(entry.getKey()); + return acceptedByGenericFiler(newResource) && + onUpdateFilter.test(newResource, entry.getValue()); + }); + if (anyUpdated) { + return true; + } + } else if (!possibleUpdatedResources.isEmpty()) { + return true; + } + + return false; + } + + private boolean acceptedByGenericFiler(R resource) { + return genericFilter == null || genericFilter.test(resource); + } + @Override public synchronized void handleRecentResourceCreate(ResourceID primaryID, R resource) { var actualValues = cache.get(primaryID); @@ -163,4 +227,15 @@ public Optional getSecondaryResource(ResourceID primaryID) { public Map> getCache() { return Collections.unmodifiableMap(cache); } + + protected boolean deleteAcceptedByFilter(Collection res) { + if (onDeleteFilter == null) { + return true; + } + // it is enough if at least one event is accepted + // Cannot be sure about the final state in general, mainly for polled resources. This might be + // fine-tuned for + // other event sources. (For now just by overriding this method.) + return res.stream().anyMatch(r -> onDeleteFilter.test(r, false)); + } } diff --git a/operator-framework-core/src/main/java/io/javaoperatorsdk/operator/processing/event/source/ResourceEventSource.java b/operator-framework-core/src/main/java/io/javaoperatorsdk/operator/processing/event/source/ResourceEventSource.java index d57a662d82..2e19603bd4 100644 --- a/operator-framework-core/src/main/java/io/javaoperatorsdk/operator/processing/event/source/ResourceEventSource.java +++ b/operator-framework-core/src/main/java/io/javaoperatorsdk/operator/processing/event/source/ResourceEventSource.java @@ -2,6 +2,8 @@ import java.util.Optional; import java.util.Set; +import java.util.function.BiPredicate; +import java.util.function.Predicate; import io.fabric8.kubernetes.api.model.HasMetadata; import io.javaoperatorsdk.operator.processing.ResourceOwner; @@ -22,4 +24,12 @@ default Optional getSecondaryResource(P primary) { } Set getSecondaryResources(P primary); + + void setOnAddFilter(Predicate onAddFilter); + + void setOnUpdateFilter(BiPredicate onUpdateFilter); + + void setOnDeleteFilter(BiPredicate onDeleteFilter); + + void setGenericFilter(Predicate genericFilter); } diff --git a/operator-framework-core/src/main/java/io/javaoperatorsdk/operator/processing/event/source/controller/ControllerResourceEventSource.java b/operator-framework-core/src/main/java/io/javaoperatorsdk/operator/processing/event/source/controller/ControllerResourceEventSource.java index 83afcc0be2..12d9d47fd5 100644 --- a/operator-framework-core/src/main/java/io/javaoperatorsdk/operator/processing/event/source/controller/ControllerResourceEventSource.java +++ b/operator-framework-core/src/main/java/io/javaoperatorsdk/operator/processing/event/source/controller/ControllerResourceEventSource.java @@ -2,6 +2,7 @@ import java.util.Optional; import java.util.Set; +import java.util.function.BiPredicate; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -19,6 +20,9 @@ import static io.javaoperatorsdk.operator.processing.KubernetesResourceUtils.getName; import static io.javaoperatorsdk.operator.processing.KubernetesResourceUtils.getUID; import static io.javaoperatorsdk.operator.processing.KubernetesResourceUtils.getVersion; +import static io.javaoperatorsdk.operator.processing.event.source.controller.InternalEventFilters.onUpdateFinalizerNeededAndApplied; +import static io.javaoperatorsdk.operator.processing.event.source.controller.InternalEventFilters.onUpdateGenerationAware; +import static io.javaoperatorsdk.operator.processing.event.source.controller.InternalEventFilters.onUpdateMarkedForDeletion; public class ControllerResourceEventSource extends ManagedInformerEventSource> @@ -27,22 +31,27 @@ public class ControllerResourceEventSource private static final Logger log = LoggerFactory.getLogger(ControllerResourceEventSource.class); private final Controller controller; - private final ResourceEventFilter filter; + private final ResourceEventFilter legacyFilters; @SuppressWarnings("unchecked") public ControllerResourceEventSource(Controller controller) { super(controller.getCRClient(), controller.getConfiguration()); this.controller = controller; - var filters = new ResourceEventFilter[] { - ResourceEventFilters.finalizerNeededAndApplied(), - ResourceEventFilters.markedForDeletion(), - ResourceEventFilters.generationAware(), - }; - if (controller.getConfiguration().getEventFilter() != null) { - filter = controller.getConfiguration().getEventFilter().and(ResourceEventFilters.or(filters)); - } else { - filter = ResourceEventFilters.or(filters); - } + + BiPredicate internalOnUpdateFilter = + (BiPredicate) onUpdateFinalizerNeededAndApplied(controller.useFinalizer(), + controller.getConfiguration().getFinalizerName()) + .or(onUpdateGenerationAware(controller.getConfiguration().isGenerationAware())) + .or(onUpdateMarkedForDeletion()); + + legacyFilters = controller.getConfiguration().getEventFilter(); + + // by default the on add should be processed in all cases regarding internal filters + controller.getConfiguration().onAddFilter().ifPresent(this::setOnAddFilter); + controller.getConfiguration().onUpdateFilter() + .ifPresentOrElse(filter -> setOnUpdateFilter(filter.and(internalOnUpdateFilter)), + () -> setOnUpdateFilter(internalOnUpdateFilter)); + controller.getConfiguration().genericFilter().ifPresent(this::setGenericFilter); } @Override @@ -60,7 +69,9 @@ public void eventReceived(ResourceAction action, T resource, T oldResource) { log.debug("Event received for resource: {}", getName(resource)); MDCUtils.addResourceInfo(resource); controller.getEventSourceManager().broadcastOnResourceEvent(action, resource, oldResource); - if (filter.acceptChange(controller, oldResource, resource)) { + if ((legacyFilters == null || + legacyFilters.acceptChange(controller, oldResource, resource)) + && isAcceptedByFilters(action, resource, oldResource)) { getEventHandler().handleEvent( new ResourceEvent(action, ResourceID.fromResource(resource), resource)); } else { @@ -72,6 +83,20 @@ public void eventReceived(ResourceAction action, T resource, T oldResource) { } } + private boolean isAcceptedByFilters(ResourceAction action, T resource, T oldResource) { + // delete event is filtered for generic filter only. + if (genericFilter != null && !genericFilter.test(resource)) { + return false; + } + switch (action) { + case ADDED: + return onAddFilter == null || onAddFilter.test(resource); + case UPDATED: + return onUpdateFilter.test(resource, oldResource); + } + return true; + } + @Override public void onAdd(T resource) { super.onAdd(resource); @@ -99,4 +124,10 @@ public Optional getSecondaryResource(T primary) { public Set getSecondaryResources(T primary) { throw new IllegalStateException("This method should not be called here. Primary: " + primary); } + + @Override + public void setOnDeleteFilter(BiPredicate onDeleteFilter) { + throw new IllegalStateException( + "onDeleteFilter is not supported for controller resource event source"); + } } diff --git a/operator-framework-core/src/main/java/io/javaoperatorsdk/operator/processing/event/source/controller/InternalEventFilters.java b/operator-framework-core/src/main/java/io/javaoperatorsdk/operator/processing/event/source/controller/InternalEventFilters.java new file mode 100644 index 0000000000..97da768979 --- /dev/null +++ b/operator-framework-core/src/main/java/io/javaoperatorsdk/operator/processing/event/source/controller/InternalEventFilters.java @@ -0,0 +1,43 @@ +package io.javaoperatorsdk.operator.processing.event.source.controller; + +import java.util.function.BiPredicate; + +import io.fabric8.kubernetes.api.model.HasMetadata; + +public class InternalEventFilters { + + private InternalEventFilters() {} + + static BiPredicate onUpdateMarkedForDeletion() { + return (newResource, oldResource) -> newResource.isMarkedForDeletion(); + } + + static BiPredicate onUpdateGenerationAware( + boolean generationAware) { + + return (newResource, oldResource) -> { + if (!generationAware) { + return true; + } + return oldResource.getMetadata().getGeneration() < newResource + .getMetadata().getGeneration(); + }; + } + + static BiPredicate onUpdateFinalizerNeededAndApplied( + boolean useFinalizer, + String finalizerName) { + return (newResource, oldResource) -> { + if (useFinalizer) { + boolean oldFinalizer = oldResource.hasFinalizer(finalizerName); + boolean newFinalizer = newResource.hasFinalizer(finalizerName); + // accepts event if old did not have finalizer, since it was just added, so the event needs + // to + // be published. + return !newFinalizer || !oldFinalizer; + } else { + return false; + } + }; + } +} diff --git a/operator-framework-core/src/main/java/io/javaoperatorsdk/operator/processing/event/source/controller/ResourceEventFilter.java b/operator-framework-core/src/main/java/io/javaoperatorsdk/operator/processing/event/source/controller/ResourceEventFilter.java index 17cb27fd71..08a86c92ae 100644 --- a/operator-framework-core/src/main/java/io/javaoperatorsdk/operator/processing/event/source/controller/ResourceEventFilter.java +++ b/operator-framework-core/src/main/java/io/javaoperatorsdk/operator/processing/event/source/controller/ResourceEventFilter.java @@ -10,6 +10,7 @@ * * @param

the type of custom resources handled by this filter */ +@Deprecated(forRemoval = true) @FunctionalInterface public interface ResourceEventFilter

{ diff --git a/operator-framework-core/src/main/java/io/javaoperatorsdk/operator/processing/event/source/controller/ResourceEventFilters.java b/operator-framework-core/src/main/java/io/javaoperatorsdk/operator/processing/event/source/controller/ResourceEventFilters.java index 5a22cafac8..7024388b8b 100644 --- a/operator-framework-core/src/main/java/io/javaoperatorsdk/operator/processing/event/source/controller/ResourceEventFilters.java +++ b/operator-framework-core/src/main/java/io/javaoperatorsdk/operator/processing/event/source/controller/ResourceEventFilters.java @@ -5,37 +5,12 @@ /** * Convenience implementations of, and utility methods for, {@link ResourceEventFilter}. */ +@Deprecated public final class ResourceEventFilters { - private static final ResourceEventFilter USE_FINALIZER = - (controller, oldResource, newResource) -> { - if (controller.useFinalizer()) { - final var finalizer = controller.getConfiguration().getFinalizerName(); - boolean oldFinalizer = oldResource == null || oldResource.hasFinalizer(finalizer); - boolean newFinalizer = newResource.hasFinalizer(finalizer); - - return !newFinalizer || !oldFinalizer; - } else { - return false; - } - }; - - private static final ResourceEventFilter GENERATION_AWARE = - (controller, oldResource, newResource) -> { - final var generationAware = controller.getConfiguration().isGenerationAware(); - return oldResource == null || !generationAware || - oldResource.getMetadata().getGeneration() < newResource.getMetadata().getGeneration(); - }; - private static final ResourceEventFilter PASSTHROUGH = (configuration, oldResource, newResource) -> true; - private static final ResourceEventFilter NONE = - (configuration, oldResource, newResource) -> false; - - private static final ResourceEventFilter MARKED_FOR_DELETION = - (configuration, oldResource, newResource) -> newResource.isMarkedForDeletion(); - private ResourceEventFilters() {} /** @@ -49,117 +24,4 @@ public static ResourceEventFilter passthrough() { return (ResourceEventFilter) PASSTHROUGH; } - /** - * Retrieves a filter that reject all events. - * - * @param the type of custom resource the filter should handle - * @return a filter that reject all events - */ - @SuppressWarnings("unchecked") - public static ResourceEventFilter none() { - return (ResourceEventFilter) NONE; - } - - /** - * Retrieves a filter that accepts all events if generation-aware processing is not activated but - * only changes that represent a generation increase otherwise. - * - * @param the type of custom resource the filter should handle - * @return a filter accepting changes based on generation information - */ - @SuppressWarnings("unchecked") - public static ResourceEventFilter generationAware() { - return (ResourceEventFilter) GENERATION_AWARE; - } - - /** - * Retrieves a filter that accepts changes if the target controller uses a finalizer and that - * finalizer hasn't already been applied, rejecting them otherwise. - * - * @param the type of custom resource the filter should handle - * @return a filter accepting changes based on whether the finalizer is needed and has been - * applied - */ - @SuppressWarnings("unchecked") - public static ResourceEventFilter finalizerNeededAndApplied() { - return (ResourceEventFilter) USE_FINALIZER; - } - - /** - * Retrieves a filter that accepts changes if the custom resource is marked for deletion. - * - * @param the type of custom resource the filter should handle - * @return a filter accepting changes based on whether the Custom Resource is marked for deletion. - */ - @SuppressWarnings("unchecked") - public static ResourceEventFilter markedForDeletion() { - return (ResourceEventFilter) MARKED_FOR_DELETION; - } - - /** - * Combines the provided, potentially {@code null} filters with an AND logic, i.e. the resulting - * filter will only accept the change if all filters accept it, reject it otherwise. - *

- * Note that the evaluation of filters is lazy: the result is returned as soon as possible without - * evaluating all filters if possible. - * - * @param items the filters to combine - * @param the type of custom resources the filters are supposed to handle - * @return a combined filter implementing the AND logic combination of the provided filters - */ - @SafeVarargs - public static ResourceEventFilter and( - ResourceEventFilter... items) { - if (items == null) { - return none(); - } - - return (configuration, oldResource, newResource) -> { - for (ResourceEventFilter item : items) { - if (item == null) { - continue; - } - - if (!item.acceptChange(configuration, oldResource, newResource)) { - return false; - } - } - - return true; - }; - } - - /** - * Combines the provided, potentially {@code null} filters with an OR logic, i.e. the resulting - * filter will accept the change if any of the filters accepts it, rejecting it only if all reject - * it. - *

- * Note that the evaluation of filters is lazy: the result is returned as soon as possible without - * evaluating all filters if possible. - * - * @param items the filters to combine - * @param the type of custom resources the filters are supposed to handle - * @return a combined filter implementing the OR logic combination of both provided filters - */ - @SafeVarargs - public static ResourceEventFilter or( - ResourceEventFilter... items) { - if (items == null) { - return none(); - } - - return (configuration, oldResource, newResource) -> { - for (ResourceEventFilter item : items) { - if (item == null) { - continue; - } - - if (item.acceptChange(configuration, oldResource, newResource)) { - return true; - } - } - - return false; - }; - } } diff --git a/operator-framework-core/src/main/java/io/javaoperatorsdk/operator/processing/event/source/filter/VoidGenericFilter.java b/operator-framework-core/src/main/java/io/javaoperatorsdk/operator/processing/event/source/filter/VoidGenericFilter.java new file mode 100644 index 0000000000..c02af4e421 --- /dev/null +++ b/operator-framework-core/src/main/java/io/javaoperatorsdk/operator/processing/event/source/filter/VoidGenericFilter.java @@ -0,0 +1,12 @@ +package io.javaoperatorsdk.operator.processing.event.source.filter; + +import java.util.function.Predicate; + +import io.fabric8.kubernetes.api.model.HasMetadata; + +public class VoidGenericFilter implements Predicate { + @Override + public boolean test(HasMetadata hasMetadata) { + return true; + } +} diff --git a/operator-framework-core/src/main/java/io/javaoperatorsdk/operator/processing/event/source/filter/VoidOnAddFilter.java b/operator-framework-core/src/main/java/io/javaoperatorsdk/operator/processing/event/source/filter/VoidOnAddFilter.java new file mode 100644 index 0000000000..11c233e14c --- /dev/null +++ b/operator-framework-core/src/main/java/io/javaoperatorsdk/operator/processing/event/source/filter/VoidOnAddFilter.java @@ -0,0 +1,12 @@ +package io.javaoperatorsdk.operator.processing.event.source.filter; + +import java.util.function.Predicate; + +import io.fabric8.kubernetes.api.model.HasMetadata; + +public class VoidOnAddFilter implements Predicate { + @Override + public boolean test(HasMetadata hasMetadata) { + return true; + } +} diff --git a/operator-framework-core/src/main/java/io/javaoperatorsdk/operator/processing/event/source/filter/VoidOnDeleteFilter.java b/operator-framework-core/src/main/java/io/javaoperatorsdk/operator/processing/event/source/filter/VoidOnDeleteFilter.java new file mode 100644 index 0000000000..7dba4566b4 --- /dev/null +++ b/operator-framework-core/src/main/java/io/javaoperatorsdk/operator/processing/event/source/filter/VoidOnDeleteFilter.java @@ -0,0 +1,12 @@ +package io.javaoperatorsdk.operator.processing.event.source.filter; + +import java.util.function.BiPredicate; + +import io.fabric8.kubernetes.api.model.HasMetadata; + +public class VoidOnDeleteFilter implements BiPredicate { + @Override + public boolean test(HasMetadata hasMetadata, Boolean aBoolean) { + return true; + } +} diff --git a/operator-framework-core/src/main/java/io/javaoperatorsdk/operator/processing/event/source/filter/VoidOnUpdateFilter.java b/operator-framework-core/src/main/java/io/javaoperatorsdk/operator/processing/event/source/filter/VoidOnUpdateFilter.java new file mode 100644 index 0000000000..333bff2455 --- /dev/null +++ b/operator-framework-core/src/main/java/io/javaoperatorsdk/operator/processing/event/source/filter/VoidOnUpdateFilter.java @@ -0,0 +1,12 @@ +package io.javaoperatorsdk.operator.processing.event.source.filter; + +import java.util.function.BiPredicate; + +import io.fabric8.kubernetes.api.model.HasMetadata; + +public class VoidOnUpdateFilter implements BiPredicate { + @Override + public boolean test(HasMetadata hasMetadata, HasMetadata hasMetadata2) { + return true; + } +} diff --git a/operator-framework-core/src/main/java/io/javaoperatorsdk/operator/processing/event/source/informer/InformerEventSource.java b/operator-framework-core/src/main/java/io/javaoperatorsdk/operator/processing/event/source/informer/InformerEventSource.java index c630b85a9c..55300f8f28 100644 --- a/operator-framework-core/src/main/java/io/javaoperatorsdk/operator/processing/event/source/informer/InformerEventSource.java +++ b/operator-framework-core/src/main/java/io/javaoperatorsdk/operator/processing/event/source/informer/InformerEventSource.java @@ -92,17 +92,22 @@ public InformerEventSource(InformerConfiguration configuration, KubernetesCli } else { primaryToSecondaryIndex = NOOPPrimaryToSecondaryIndex.getInstance(); } + onAddFilter = configuration.onAddFilter().orElse(null); + onUpdateFilter = configuration.onUpdateFilter().orElse(null); + onDeleteFilter = configuration.onDeleteFilter().orElse(null); + genericFilter = configuration.genericFilter().orElse(null); } @Override - public void onAdd(R resource) { + public void onAdd(R newResource) { if (log.isDebugEnabled()) { log.debug("On add event received for resource id: {} type: {}", - ResourceID.fromResource(resource), + ResourceID.fromResource(newResource), resourceType().getSimpleName()); } - primaryToSecondaryIndex.onAddOrUpdate(resource); - onAddOrUpdate("add", resource, () -> InformerEventSource.super.onAdd(resource)); + primaryToSecondaryIndex.onAddOrUpdate(newResource); + onAddOrUpdate(Operation.ADD, newResource, null, + () -> InformerEventSource.super.onAdd(newResource)); } @Override @@ -113,7 +118,7 @@ public void onUpdate(R oldObject, R newObject) { resourceType().getSimpleName()); } primaryToSecondaryIndex.onAddOrUpdate(newObject); - onAddOrUpdate("update", newObject, + onAddOrUpdate(Operation.UPDATE, newObject, oldObject, () -> InformerEventSource.super.onUpdate(oldObject, newObject)); } @@ -126,10 +131,13 @@ public void onDelete(R resource, boolean b) { } primaryToSecondaryIndex.onDelete(resource); super.onDelete(resource, b); - propagateEvent(resource); + if (acceptedByDeleteFilters(resource, b)) { + propagateEvent(resource); + } } - private synchronized void onAddOrUpdate(String operation, R newObject, Runnable superOnOp) { + private synchronized void onAddOrUpdate(Operation operation, R newObject, R oldObject, + Runnable superOnOp) { var resourceID = ResourceID.fromResource(newObject); if (eventRecorder.isRecordingFor(resourceID)) { log.debug("Recording event for: {}", resourceID); @@ -144,11 +152,15 @@ private synchronized void onAddOrUpdate(String operation, R newObject, Runnable superOnOp.run(); } else { superOnOp.run(); - log.debug( - "Propagating event for {}, resource with same version not result of a reconciliation. Resource ID: {}", - operation, - resourceID); - propagateEvent(newObject); + if (eventAcceptedByFilter(operation, newObject, oldObject)) { + log.debug( + "Propagating event for {}, resource with same version not result of a reconciliation. Resource ID: {}", + operation, + resourceID); + propagateEvent(newObject); + } else { + log.debug("Event filtered out for operation: {}, resourceID: {}", operation, resourceID); + } } } @@ -205,20 +217,21 @@ public InformerConfiguration getConfiguration() { @Override public synchronized void handleRecentResourceUpdate(ResourceID resourceID, R resource, R previousVersionOfResource) { - handleRecentCreateOrUpdate(resource, + handleRecentCreateOrUpdate(Operation.UPDATE, resource, previousVersionOfResource, () -> super.handleRecentResourceUpdate(resourceID, resource, previousVersionOfResource)); } @Override public synchronized void handleRecentResourceCreate(ResourceID resourceID, R resource) { - handleRecentCreateOrUpdate(resource, + handleRecentCreateOrUpdate(Operation.ADD, resource, null, () -> super.handleRecentResourceCreate(resourceID, resource)); } - private void handleRecentCreateOrUpdate(R resource, Runnable runnable) { + private void handleRecentCreateOrUpdate(Operation operation, R resource, R oldResource, + Runnable runnable) { primaryToSecondaryIndex.onAddOrUpdate(resource); if (eventRecorder.isRecordingFor(ResourceID.fromResource(resource))) { - handleRecentResourceOperationAndStopEventRecording(resource); + handleRecentResourceOperationAndStopEventRecording(operation, resource, oldResource); } else { runnable.run(); } @@ -239,23 +252,26 @@ private void handleRecentCreateOrUpdate(R resource, Runnable runnable) { * an event needs to be propagated to compensate. * * - * @param resource just created or updated resource + * @param newResource just created or updated resource */ - private void handleRecentResourceOperationAndStopEventRecording(R resource) { - ResourceID resourceID = ResourceID.fromResource(resource); + private void handleRecentResourceOperationAndStopEventRecording(Operation operation, + R newResource, R oldResource) { + ResourceID resourceID = ResourceID.fromResource(newResource); try { if (!eventRecorder.containsEventWithResourceVersion( - resourceID, resource.getMetadata().getResourceVersion())) { + resourceID, newResource.getMetadata().getResourceVersion())) { log.debug( "Did not found event in buffer with target version and resource id: {}", resourceID); - temporaryResourceCache.unconditionallyCacheResource(resource); + temporaryResourceCache.unconditionallyCacheResource(newResource); } else if (eventRecorder.containsEventWithVersionButItsNotLastOne( - resourceID, resource.getMetadata().getResourceVersion())) { + resourceID, newResource.getMetadata().getResourceVersion())) { R lastEvent = eventRecorder.getLastEvent(resourceID); log.debug( "Found events in event buffer but the target event is not last for id: {}. Propagating event.", resourceID); - propagateEvent(lastEvent); + if (eventAcceptedByFilter(operation, newResource, oldResource)) { + propagateEvent(lastEvent); + } } } finally { eventRecorder.stopEventRecording(resourceID); @@ -289,4 +305,25 @@ public synchronized void cleanupOnCreateOrUpdateEventFiltering(ResourceID resour public boolean allowsNamespaceChanges() { return getConfiguration().followControllerNamespaceChanges(); } + + + private boolean eventAcceptedByFilter(Operation operation, R newObject, R oldObject) { + if (genericFilter != null && !genericFilter.test(newObject)) { + return false; + } + if (operation == Operation.ADD) { + return onAddFilter == null || onAddFilter.test(newObject); + } else { + return onUpdateFilter == null || onUpdateFilter.test(newObject, oldObject); + } + } + + private enum Operation { + ADD, UPDATE + } + + private boolean acceptedByDeleteFilters(R resource, boolean b) { + return (onDeleteFilter == null || onDeleteFilter.test(resource, b)) && + (genericFilter == null || genericFilter.test(resource)); + } } diff --git a/operator-framework-core/src/main/java/io/javaoperatorsdk/operator/processing/event/source/informer/ManagedInformerEventSource.java b/operator-framework-core/src/main/java/io/javaoperatorsdk/operator/processing/event/source/informer/ManagedInformerEventSource.java index 98e445032c..6ebd63a7eb 100644 --- a/operator-framework-core/src/main/java/io/javaoperatorsdk/operator/processing/event/source/informer/ManagedInformerEventSource.java +++ b/operator-framework-core/src/main/java/io/javaoperatorsdk/operator/processing/event/source/informer/ManagedInformerEventSource.java @@ -20,18 +20,18 @@ import io.javaoperatorsdk.operator.api.config.ResourceConfiguration; import io.javaoperatorsdk.operator.api.reconciler.dependent.RecentOperationCacheFiller; import io.javaoperatorsdk.operator.processing.event.ResourceID; -import io.javaoperatorsdk.operator.processing.event.source.CachingEventSource; -import io.javaoperatorsdk.operator.processing.event.source.IndexerResourceCache; -import io.javaoperatorsdk.operator.processing.event.source.UpdatableCache; +import io.javaoperatorsdk.operator.processing.event.source.*; public abstract class ManagedInformerEventSource> - extends CachingEventSource - implements ResourceEventHandler, IndexerResourceCache, RecentOperationCacheFiller, + extends AbstractResourceEventSource + implements ResourceEventHandler, Cache, IndexerResourceCache, + RecentOperationCacheFiller, NamespaceChangeable { private static final Logger log = LoggerFactory.getLogger(ManagedInformerEventSource.class); protected TemporaryResourceCache temporaryResourceCache = new TemporaryResourceCache<>(this); + protected InformerManager cache = new InformerManager<>(); protected ManagedInformerEventSource( MixedOperation, Resource> client, C configuration) { @@ -54,13 +54,8 @@ public void onDelete(R obj, boolean deletedFinalStateUnknown) { temporaryResourceCache.removeResourceFromCache(obj); } - @Override - protected UpdatableCache initCache() { - return new InformerManager<>(); - } - protected InformerManager manager() { - return (InformerManager) cache; + return cache; } @Override @@ -103,11 +98,10 @@ public Optional get(ResourceID resourceID) { } else { log.debug("Resource not found in temporal cache reading it from informer cache," + " for Resource ID: {}", resourceID); - return super.get(resourceID); + return cache.get(resourceID); } } - @Override public Optional getCachedValue(ResourceID resourceID) { return get(resourceID); } @@ -128,4 +122,15 @@ public void addIndexers(Map>> indexers) { public List byIndex(String indexName, String indexKey) { return manager().byIndex(indexName, indexKey); } + + @Override + public Stream keys() { + return cache.keys(); + } + + @Override + public Stream list(Predicate predicate) { + return cache.list(predicate); + } + } diff --git a/operator-framework-core/src/main/java/io/javaoperatorsdk/operator/processing/event/source/polling/PerResourcePollingEventSource.java b/operator-framework-core/src/main/java/io/javaoperatorsdk/operator/processing/event/source/polling/PerResourcePollingEventSource.java index f55e7dd05e..44bab7a624 100644 --- a/operator-framework-core/src/main/java/io/javaoperatorsdk/operator/processing/event/source/polling/PerResourcePollingEventSource.java +++ b/operator-framework-core/src/main/java/io/javaoperatorsdk/operator/processing/event/source/polling/PerResourcePollingEventSource.java @@ -12,7 +12,6 @@ import io.javaoperatorsdk.operator.processing.event.ResourceID; import io.javaoperatorsdk.operator.processing.event.source.Cache; import io.javaoperatorsdk.operator.processing.event.source.CacheKeyMapper; -import io.javaoperatorsdk.operator.processing.event.source.CachingEventSource; import io.javaoperatorsdk.operator.processing.event.source.ExternalResourceCachingEventSource; import io.javaoperatorsdk.operator.processing.event.source.ResourceEventAware; @@ -22,7 +21,7 @@ * if there is no registerPredicate provided. If register predicate provided it is evaluated on * resource create and/or update to register polling for the event source. *

- * For other behavior see {@link CachingEventSource} + * For other behavior see {@link ExternalResourceCachingEventSource} * * @param the resource polled by the event source * @param

related custom resource diff --git a/operator-framework-core/src/main/java/io/javaoperatorsdk/operator/processing/event/source/polling/PollingEventSource.java b/operator-framework-core/src/main/java/io/javaoperatorsdk/operator/processing/event/source/polling/PollingEventSource.java index 09ff2e8b0e..94efbf25aa 100644 --- a/operator-framework-core/src/main/java/io/javaoperatorsdk/operator/processing/event/source/polling/PollingEventSource.java +++ b/operator-framework-core/src/main/java/io/javaoperatorsdk/operator/processing/event/source/polling/PollingEventSource.java @@ -1,9 +1,6 @@ package io.javaoperatorsdk.operator.processing.event.source.polling; -import java.util.Map; -import java.util.Set; -import java.util.Timer; -import java.util.TimerTask; +import java.util.*; import org.slf4j.Logger; import org.slf4j.LoggerFactory; diff --git a/operator-framework-core/src/test/java/io/javaoperatorsdk/operator/ControllerManagerTest.java b/operator-framework-core/src/test/java/io/javaoperatorsdk/operator/ControllerManagerTest.java index ca02094d46..6c7358d86e 100644 --- a/operator-framework-core/src/test/java/io/javaoperatorsdk/operator/ControllerManagerTest.java +++ b/operator-framework-core/src/test/java/io/javaoperatorsdk/operator/ControllerManagerTest.java @@ -63,7 +63,7 @@ private static class TestControllerConfiguration public TestControllerConfiguration(Reconciler controller, Class crClass) { super(null, getControllerName(controller), CustomResource.getCRDName(crClass), null, false, null, null, null, null, crClass, - null, null); + null, null, null, null, null); this.controller = controller; } diff --git a/operator-framework-core/src/test/java/io/javaoperatorsdk/operator/TestUtils.java b/operator-framework-core/src/test/java/io/javaoperatorsdk/operator/TestUtils.java index a7b6b46d17..3b2c5354f5 100644 --- a/operator-framework-core/src/test/java/io/javaoperatorsdk/operator/TestUtils.java +++ b/operator-framework-core/src/test/java/io/javaoperatorsdk/operator/TestUtils.java @@ -49,8 +49,9 @@ public static TestCustomResource testCustomResource(ResourceID id) { return resource; } - public static void markForDeletion(HasMetadata customResource) { + public static T markForDeletion(T customResource) { customResource.getMetadata().setDeletionTimestamp("2019-8-10"); + return customResource; } } diff --git a/operator-framework-core/src/test/java/io/javaoperatorsdk/operator/processing/event/EventSourceManagerTest.java b/operator-framework-core/src/test/java/io/javaoperatorsdk/operator/processing/event/EventSourceManagerTest.java index 821efbd16a..779fe032f9 100644 --- a/operator-framework-core/src/test/java/io/javaoperatorsdk/operator/processing/event/EventSourceManagerTest.java +++ b/operator-framework-core/src/test/java/io/javaoperatorsdk/operator/processing/event/EventSourceManagerTest.java @@ -11,10 +11,10 @@ import io.javaoperatorsdk.operator.api.config.informer.InformerConfiguration; import io.javaoperatorsdk.operator.api.reconciler.Reconciler; import io.javaoperatorsdk.operator.processing.Controller; -import io.javaoperatorsdk.operator.processing.event.source.CachingEventSource; import io.javaoperatorsdk.operator.processing.event.source.EventSource; import io.javaoperatorsdk.operator.processing.event.source.controller.ControllerResourceEventSource; import io.javaoperatorsdk.operator.processing.event.source.informer.InformerEventSource; +import io.javaoperatorsdk.operator.processing.event.source.informer.ManagedInformerEventSource; import io.javaoperatorsdk.operator.processing.event.source.timer.TimerEventSource; import io.javaoperatorsdk.operator.sample.simple.TestCustomResource; @@ -79,7 +79,7 @@ void retrievingEventSourceForClassShouldWork() { assertThatExceptionOfType(IllegalArgumentException.class) .isThrownBy(() -> manager.getResourceEventSourceFor(HasMetadata.class, "unknown_name")); - CachingEventSource eventSource = mock(CachingEventSource.class); + ManagedInformerEventSource eventSource = mock(ManagedInformerEventSource.class); when(eventSource.resourceType()).thenReturn(String.class); manager.registerEventSource(eventSource); @@ -93,11 +93,11 @@ void shouldNotBePossibleToAddEventSourcesForSameTypeAndName() { EventSourceManager manager = initManager(); final var name = "name1"; - CachingEventSource eventSource = mock(CachingEventSource.class); + ManagedInformerEventSource eventSource = mock(ManagedInformerEventSource.class); when(eventSource.resourceType()).thenReturn(TestCustomResource.class); manager.registerEventSource(name, eventSource); - eventSource = mock(CachingEventSource.class); + eventSource = mock(ManagedInformerEventSource.class); when(eventSource.resourceType()).thenReturn(TestCustomResource.class); final var source = eventSource; @@ -114,11 +114,11 @@ void shouldNotBePossibleToAddEventSourcesForSameTypeAndName() { void retrievingAnEventSourceWhenMultipleAreRegisteredForATypeShouldRequireAQualifier() { EventSourceManager manager = initManager(); - CachingEventSource eventSource = mock(CachingEventSource.class); + ManagedInformerEventSource eventSource = mock(ManagedInformerEventSource.class); when(eventSource.resourceType()).thenReturn(TestCustomResource.class); manager.registerEventSource("name1", eventSource); - CachingEventSource eventSource2 = mock(CachingEventSource.class); + ManagedInformerEventSource eventSource2 = mock(ManagedInformerEventSource.class); when(eventSource2.resourceType()).thenReturn(TestCustomResource.class); manager.registerEventSource("name2", eventSource2); diff --git a/operator-framework-core/src/test/java/io/javaoperatorsdk/operator/processing/event/source/CustomResourceSelectorTest.java b/operator-framework-core/src/test/java/io/javaoperatorsdk/operator/processing/event/source/CustomResourceSelectorTest.java index 3de6578515..bee750a324 100644 --- a/operator-framework-core/src/test/java/io/javaoperatorsdk/operator/processing/event/source/CustomResourceSelectorTest.java +++ b/operator-framework-core/src/test/java/io/javaoperatorsdk/operator/processing/event/source/CustomResourceSelectorTest.java @@ -136,7 +136,8 @@ public static class MyConfiguration extends DefaultControllerConfiguration false); + setUpSource(delFilteringEventSource); + // try without any resources added + source.handleDeletes(primaryID1(), Set.of(testResource1(), testResource2())); + source.handleResources(primaryID1(), Set.of(testResource1(), testResource2())); + // handling the add event + verify(eventHandler, times(1)).handleEvent(any()); + + source.handleDeletes(primaryID1(), Set.of(testResource1(), testResource2())); + + // no more invocation + verify(eventHandler, times(1)).handleEvent(any()); + } + + @Test + void filtersAddEvents() { + TestExternalCachingEventSource delFilteringEventSource = new TestExternalCachingEventSource(); + delFilteringEventSource.setOnAddFilter((res) -> false); + setUpSource(delFilteringEventSource); + + source.handleResources(primaryID1(), Set.of(testResource1())); + verify(eventHandler, times(0)).handleEvent(any()); + + source.handleResources(primaryID1(), Set.of(testResource1(), testResource2())); + verify(eventHandler, times(0)).handleEvent(any()); + } + + @Test + void filtersUpdateEvents() { + TestExternalCachingEventSource delFilteringEventSource = new TestExternalCachingEventSource(); + delFilteringEventSource.setOnUpdateFilter((res, res2) -> false); + setUpSource(delFilteringEventSource); + source.handleResources(primaryID1(), Set.of(testResource1())); + verify(eventHandler, times(1)).handleEvent(any()); + + var resource = testResource1(); + resource.setValue("changed value"); + source.handleResources(primaryID1(), Set.of(resource)); + + verify(eventHandler, times(1)).handleEvent(any()); + } + + @Test + void filtersImplicitDeleteEvents() { + TestExternalCachingEventSource delFilteringEventSource = new TestExternalCachingEventSource(); + delFilteringEventSource.setOnDeleteFilter((res, b) -> false); + setUpSource(delFilteringEventSource); + + source.handleResources(primaryID1(), Set.of(testResource1(), testResource2())); + verify(eventHandler, times(1)).handleEvent(any()); + + source.handleResources(primaryID1(), Set.of(testResource1())); + verify(eventHandler, times(1)).handleEvent(any()); + } + + @Test + void genericFilteringEvents() { + TestExternalCachingEventSource delFilteringEventSource = new TestExternalCachingEventSource(); + delFilteringEventSource.setGenericFilter(res -> false); + setUpSource(delFilteringEventSource); + + source.handleResources(primaryID1(), Set.of(testResource1())); + verify(eventHandler, times(0)).handleEvent(any()); + + source.handleResources(primaryID1(), Set.of(testResource1(), testResource2())); + verify(eventHandler, times(0)).handleEvent(any()); + + source.handleResources(primaryID1(), Set.of(testResource2())); + verify(eventHandler, times(0)).handleEvent(any()); + } + public static class TestExternalCachingEventSource extends ExternalResourceCachingEventSource { public TestExternalCachingEventSource() { - super(SampleExternalResource.class, (r) -> r.getName() + "#" + r.getValue()); + super(SampleExternalResource.class, (r) -> r.getName()); } } diff --git a/operator-framework-core/src/test/java/io/javaoperatorsdk/operator/processing/event/source/ResourceEventFilterTest.java b/operator-framework-core/src/test/java/io/javaoperatorsdk/operator/processing/event/source/ResourceEventFilterTest.java index 1c4c2bcce1..29af7b8428 100644 --- a/operator-framework-core/src/test/java/io/javaoperatorsdk/operator/processing/event/source/ResourceEventFilterTest.java +++ b/operator-framework-core/src/test/java/io/javaoperatorsdk/operator/processing/event/source/ResourceEventFilterTest.java @@ -56,7 +56,12 @@ public void eventFilteredByCustomPredicate() { cr.getMetadata().setGeneration(1L); cr.getStatus().setConfigMapStatus("1"); - eventSource.eventReceived(ResourceAction.UPDATED, cr, null); + TestCustomResource cr2 = TestUtils.testCustomResource(); + cr.getMetadata().setFinalizers(List.of(FINALIZER)); + cr.getMetadata().setGeneration(1L); + cr.getStatus().setConfigMapStatus("2"); + + eventSource.eventReceived(ResourceAction.UPDATED, cr, cr2); verify(eventHandler, times(1)).handleEvent(any()); cr.getMetadata().setGeneration(1L); @@ -140,7 +145,7 @@ public ControllerConfig(String finalizer, boolean generationAware, eventFilter, customResourceClass, null, - null); + null, null, null, null); } } diff --git a/operator-framework-core/src/test/java/io/javaoperatorsdk/operator/processing/event/source/controller/ControllerResourceEventSourceTest.java b/operator-framework-core/src/test/java/io/javaoperatorsdk/operator/processing/event/source/controller/ControllerResourceEventSourceTest.java index 1c467e0490..062298754d 100644 --- a/operator-framework-core/src/test/java/io/javaoperatorsdk/operator/processing/event/source/controller/ControllerResourceEventSourceTest.java +++ b/operator-framework-core/src/test/java/io/javaoperatorsdk/operator/processing/event/source/controller/ControllerResourceEventSourceTest.java @@ -2,6 +2,8 @@ import java.time.LocalDateTime; import java.util.List; +import java.util.function.BiPredicate; +import java.util.function.Predicate; import org.junit.jupiter.api.BeforeEach; import org.junit.jupiter.api.Test; @@ -16,10 +18,7 @@ import io.javaoperatorsdk.operator.sample.simple.TestCustomResource; import static org.mockito.ArgumentMatchers.eq; -import static org.mockito.Mockito.any; -import static org.mockito.Mockito.mock; -import static org.mockito.Mockito.times; -import static org.mockito.Mockito.verify; +import static org.mockito.Mockito.*; class ControllerResourceEventSourceTest extends AbstractEventSourceTestBase, EventHandler> { @@ -90,7 +89,7 @@ void handlesAllEventIfNotGenerationAware() { } @Test - public void eventWithNoGenerationProcessedIfNoFinalizer() { + void eventWithNoGenerationProcessedIfNoFinalizer() { TestCustomResource customResource1 = TestUtils.testCustomResource(); source.eventReceived(ResourceAction.UPDATED, customResource1, customResource1); @@ -99,7 +98,7 @@ public void eventWithNoGenerationProcessedIfNoFinalizer() { } @Test - public void callsBroadcastsOnResourceEvents() { + void callsBroadcastsOnResourceEvents() { TestCustomResource customResource1 = TestUtils.testCustomResource(); source.eventReceived(ResourceAction.UPDATED, customResource1, customResource1); @@ -109,14 +108,53 @@ public void callsBroadcastsOnResourceEvents() { eq(customResource1)); } + @Test + void filtersOutEventsOnAddAndUpdate() { + TestCustomResource cr = TestUtils.testCustomResource(); + + Predicate onAddPredicate = (res) -> false; + BiPredicate onUpdatePredicate = (res, res2) -> false; + source = + new ControllerResourceEventSource<>( + new TestController(onAddPredicate, onUpdatePredicate, null)); + setUpSource(source); + + source.eventReceived(ResourceAction.ADDED, cr, null); + source.eventReceived(ResourceAction.UPDATED, cr, cr); + + verify(eventHandler, never()).handleEvent(any()); + } + + @Test + void genericFilterFiltersOutAddUpdateAndDeleteEvents() { + TestCustomResource cr = TestUtils.testCustomResource(); + + source = + new ControllerResourceEventSource<>(new TestController(null, null, res -> false)); + setUpSource(source); + + source.eventReceived(ResourceAction.ADDED, cr, null); + source.eventReceived(ResourceAction.UPDATED, cr, cr); + source.eventReceived(ResourceAction.DELETED, cr, cr); + + verify(eventHandler, never()).handleEvent(any()); + } + @SuppressWarnings("unchecked") private static class TestController extends Controller { private final EventSourceManager eventSourceManager = mock(EventSourceManager.class); + public TestController(Predicate onAddFilter, + BiPredicate onUpdateFilter, + Predicate genericFilter) { + super(null, new TestConfiguration(true, onAddFilter, onUpdateFilter, genericFilter), + MockKubernetesClient.client(TestCustomResource.class)); + } + public TestController(boolean generationAware) { - super(null, new TestConfiguration(generationAware), + super(null, new TestConfiguration(generationAware, null, null, null), MockKubernetesClient.client(TestCustomResource.class)); } @@ -134,7 +172,9 @@ public boolean useFinalizer() { private static class TestConfiguration extends DefaultControllerConfiguration { - public TestConfiguration(boolean generationAware) { + public TestConfiguration(boolean generationAware, Predicate onAddFilter, + BiPredicate onUpdateFilter, + Predicate genericFilter) { super( null, null, @@ -147,7 +187,7 @@ public TestConfiguration(boolean generationAware) { null, TestCustomResource.class, null, - null); + onAddFilter, onUpdateFilter, genericFilter, null); } } } diff --git a/operator-framework-core/src/test/java/io/javaoperatorsdk/operator/processing/event/source/controller/InternalEventFiltersTest.java b/operator-framework-core/src/test/java/io/javaoperatorsdk/operator/processing/event/source/controller/InternalEventFiltersTest.java new file mode 100644 index 0000000000..2f1fec6917 --- /dev/null +++ b/operator-framework-core/src/test/java/io/javaoperatorsdk/operator/processing/event/source/controller/InternalEventFiltersTest.java @@ -0,0 +1,60 @@ +package io.javaoperatorsdk.operator.processing.event.source.controller; + +import java.util.List; + +import org.junit.jupiter.api.Test; + +import io.javaoperatorsdk.operator.TestUtils; + +import static io.javaoperatorsdk.operator.TestUtils.markForDeletion; +import static org.assertj.core.api.Assertions.assertThat; +import static org.junit.jupiter.api.Assertions.*; + +class InternalEventFiltersTest { + + public static final String FINALIZER = "finalizer"; + + @Test + void onUpdateMarkedForDeletion() { + var res = markForDeletion(TestUtils.testCustomResource()); + assertThat(InternalEventFilters.onUpdateMarkedForDeletion().test(res, res)).isTrue(); + } + + @Test + void generationAware() { + var res = TestUtils.testCustomResource1(); + var res2 = TestUtils.testCustomResource1(); + res2.getMetadata().setGeneration(2L); + + assertThat(InternalEventFilters.onUpdateGenerationAware(true).test(res2, res)).isTrue(); + assertThat(InternalEventFilters.onUpdateGenerationAware(true).test(res, res)).isFalse(); + assertThat(InternalEventFilters.onUpdateGenerationAware(false).test(res, res)).isTrue(); + } + + @Test + void finalizerCheckedIfConfigured() { + assertThat(InternalEventFilters.onUpdateFinalizerNeededAndApplied(true, FINALIZER) + .test(TestUtils.testCustomResource1(), TestUtils.testCustomResource1())).isTrue(); + + var res = TestUtils.testCustomResource1(); + res.getMetadata().setFinalizers(List.of(FINALIZER)); + + assertThat(InternalEventFilters.onUpdateFinalizerNeededAndApplied(true, FINALIZER) + .test(res, res)).isFalse(); + } + + @Test + void acceptsIfFinalizerWasJustAdded() { + var res = TestUtils.testCustomResource1(); + res.getMetadata().setFinalizers(List.of(FINALIZER)); + + assertThat(InternalEventFilters.onUpdateFinalizerNeededAndApplied(true, "finalizer") + .test(res, TestUtils.testCustomResource1())).isTrue(); + } + + @Test + void dontAcceptIfFinalizerNotUsed() { + assertThat(InternalEventFilters.onUpdateFinalizerNeededAndApplied(false, FINALIZER) + .test(TestUtils.testCustomResource1(), TestUtils.testCustomResource1())).isFalse(); + } +} diff --git a/operator-framework-core/src/test/java/io/javaoperatorsdk/operator/processing/event/source/informer/InformerEventSourceTest.java b/operator-framework-core/src/test/java/io/javaoperatorsdk/operator/processing/event/source/informer/InformerEventSourceTest.java index b30dc32f8f..623705b609 100644 --- a/operator-framework-core/src/test/java/io/javaoperatorsdk/operator/processing/event/source/informer/InformerEventSourceTest.java +++ b/operator-framework-core/src/test/java/io/javaoperatorsdk/operator/processing/event/source/informer/InformerEventSourceTest.java @@ -206,6 +206,52 @@ void putsResourceOnTempCacheIfNoEventRecordedWithSameResourceVersion() { verify(temporaryResourceCacheMock, times(1)).unconditionallyCacheResource(any()); } + @Test + void genericFilterForEvents() { + informerEventSource.setGenericFilter(r -> false); + when(temporaryResourceCacheMock.getResourceFromCache(any())) + .thenReturn(Optional.empty()); + + informerEventSource.onAdd(testDeployment()); + informerEventSource.onUpdate(testDeployment(), testDeployment()); + informerEventSource.onDelete(testDeployment(), true); + + verify(eventHandlerMock, never()).handleEvent(any()); + } + + @Test + void filtersOnAddEvents() { + informerEventSource.setOnAddFilter(r -> false); + when(temporaryResourceCacheMock.getResourceFromCache(any())) + .thenReturn(Optional.empty()); + + informerEventSource.onAdd(testDeployment()); + + verify(eventHandlerMock, never()).handleEvent(any()); + } + + @Test + void filtersOnUpdateEvents() { + informerEventSource.setOnUpdateFilter((r1, r2) -> false); + when(temporaryResourceCacheMock.getResourceFromCache(any())) + .thenReturn(Optional.empty()); + + informerEventSource.onUpdate(testDeployment(), testDeployment()); + + verify(eventHandlerMock, never()).handleEvent(any()); + } + + @Test + void filtersOnDeleteEvents() { + informerEventSource.setOnDeleteFilter((r, b) -> false); + when(temporaryResourceCacheMock.getResourceFromCache(any())) + .thenReturn(Optional.empty()); + + informerEventSource.onDelete(testDeployment(), true); + + verify(eventHandlerMock, never()).handleEvent(any()); + } + Deployment testDeployment() { Deployment deployment = new Deployment(); deployment.setMetadata(new ObjectMeta()); diff --git a/operator-framework-core/src/test/java/io/javaoperatorsdk/operator/processing/event/source/informer/DefaultPrimaryToSecondaryIndexTest.java b/operator-framework-core/src/test/java/io/javaoperatorsdk/operator/processing/event/source/informer/PrimaryToSecondaryIndexTest.java similarity index 96% rename from operator-framework-core/src/test/java/io/javaoperatorsdk/operator/processing/event/source/informer/DefaultPrimaryToSecondaryIndexTest.java rename to operator-framework-core/src/test/java/io/javaoperatorsdk/operator/processing/event/source/informer/PrimaryToSecondaryIndexTest.java index da2d1b7cf0..4baa531274 100644 --- a/operator-framework-core/src/test/java/io/javaoperatorsdk/operator/processing/event/source/informer/DefaultPrimaryToSecondaryIndexTest.java +++ b/operator-framework-core/src/test/java/io/javaoperatorsdk/operator/processing/event/source/informer/PrimaryToSecondaryIndexTest.java @@ -15,11 +15,11 @@ import static org.mockito.Mockito.mock; import static org.mockito.Mockito.when; -class DefaultPrimaryToSecondaryIndexTest { +class PrimaryToSecondaryIndexTest { private SecondaryToPrimaryMapper secondaryToPrimaryMapperMock = mock(SecondaryToPrimaryMapper.class); - private DefaultPrimaryToSecondaryIndex primaryToSecondaryIndex = + private PrimaryToSecondaryIndex primaryToSecondaryIndex = new DefaultPrimaryToSecondaryIndex<>(secondaryToPrimaryMapperMock); private ResourceID primaryID1 = new ResourceID("id1", "default"); diff --git a/operator-framework-junit5/src/main/java/io/javaoperatorsdk/operator/junit/LocallyRunOperatorExtension.java b/operator-framework-junit5/src/main/java/io/javaoperatorsdk/operator/junit/LocallyRunOperatorExtension.java index 74418d008d..e2f4234453 100644 --- a/operator-framework-junit5/src/main/java/io/javaoperatorsdk/operator/junit/LocallyRunOperatorExtension.java +++ b/operator-framework-junit5/src/main/java/io/javaoperatorsdk/operator/junit/LocallyRunOperatorExtension.java @@ -237,13 +237,13 @@ public Builder withPortForward(String namespace, String labelKey, String labelVa return this; } + public Builder withAdditionalCustomResourceDefinition( Class customResource) { additionalCustomResourceDefinitions.add(customResource); return this; } - public LocallyRunOperatorExtension build() { return new LocallyRunOperatorExtension( configurationService, diff --git a/operator-framework/src/test/java/io/javaoperatorsdk/operator/DependentFilterIT.java b/operator-framework/src/test/java/io/javaoperatorsdk/operator/DependentFilterIT.java new file mode 100644 index 0000000000..a900d0645d --- /dev/null +++ b/operator-framework/src/test/java/io/javaoperatorsdk/operator/DependentFilterIT.java @@ -0,0 +1,60 @@ +package io.javaoperatorsdk.operator; + +import java.time.Duration; +import java.util.Map; + +import org.junit.jupiter.api.Test; +import org.junit.jupiter.api.extension.RegisterExtension; + +import io.fabric8.kubernetes.api.model.ConfigMap; +import io.fabric8.kubernetes.api.model.ObjectMetaBuilder; +import io.javaoperatorsdk.operator.junit.LocallyRunOperatorExtension; +import io.javaoperatorsdk.operator.sample.dependentfilter.DependentFilterTestCustomResource; +import io.javaoperatorsdk.operator.sample.dependentfilter.DependentFilterTestReconciler; +import io.javaoperatorsdk.operator.sample.dependentfilter.DependentFilterTestResourceSpec; + +import static io.javaoperatorsdk.operator.sample.dependentfilter.DependentFilterTestReconciler.CM_VALUE_KEY; +import static io.javaoperatorsdk.operator.sample.dependentfilter.DependentFilterTestReconciler.CONFIG_MAP_FILTER_VALUE; +import static org.assertj.core.api.Assertions.assertThat; +import static org.awaitility.Awaitility.await; + +class DependentFilterIT { + + public static final String RESOURCE_NAME = "test1"; + + @RegisterExtension + LocallyRunOperatorExtension operator = + LocallyRunOperatorExtension.builder().withReconciler(DependentFilterTestReconciler.class) + .build(); + + @Test + void filtersUpdateOnConfigMap() { + var resource = createResource(); + operator.create(DependentFilterTestCustomResource.class, resource); + + await().pollDelay(Duration.ofMillis(150)).untilAsserted(() -> { + assertThat(operator.getReconcilerOfType(DependentFilterTestReconciler.class) + .getNumberOfExecutions()).isEqualTo(1); + }); + + var configMap = operator.get(ConfigMap.class, RESOURCE_NAME); + configMap.setData(Map.of(CM_VALUE_KEY, CONFIG_MAP_FILTER_VALUE)); + operator.replace(ConfigMap.class, configMap); + + await().pollDelay(Duration.ofMillis(150)).untilAsserted(() -> { + assertThat(operator.getReconcilerOfType(DependentFilterTestReconciler.class) + .getNumberOfExecutions()).isEqualTo(1); + }); + } + + DependentFilterTestCustomResource createResource() { + DependentFilterTestCustomResource resource = new DependentFilterTestCustomResource(); + resource.setMetadata(new ObjectMetaBuilder() + .withName(RESOURCE_NAME) + .build()); + resource.setSpec(new DependentFilterTestResourceSpec()); + resource.getSpec().setValue("value1"); + return resource; + } + +} diff --git a/operator-framework/src/test/java/io/javaoperatorsdk/operator/FilterIT.java b/operator-framework/src/test/java/io/javaoperatorsdk/operator/FilterIT.java new file mode 100644 index 0000000000..2dea399448 --- /dev/null +++ b/operator-framework/src/test/java/io/javaoperatorsdk/operator/FilterIT.java @@ -0,0 +1,73 @@ +package io.javaoperatorsdk.operator; + +import java.time.Duration; + +import org.junit.jupiter.api.Test; +import org.junit.jupiter.api.extension.RegisterExtension; + +import io.fabric8.kubernetes.api.model.ObjectMetaBuilder; +import io.javaoperatorsdk.operator.junit.LocallyRunOperatorExtension; +import io.javaoperatorsdk.operator.sample.filter.FilterTestCustomResource; +import io.javaoperatorsdk.operator.sample.filter.FilterTestReconciler; +import io.javaoperatorsdk.operator.sample.filter.FilterTestResourceSpec; + +import static io.javaoperatorsdk.operator.sample.filter.FilterTestReconciler.CONFIG_MAP_FILTER_VALUE; +import static org.assertj.core.api.Assertions.assertThat; +import static org.awaitility.Awaitility.await; + +class FilterIT { + + public static final String RESOURCE_NAME = "test1"; + public static final int POLL_DELAY = 150; + + @RegisterExtension + LocallyRunOperatorExtension operator = + LocallyRunOperatorExtension.builder().withReconciler(FilterTestReconciler.class) + .build(); + + @Test + void filtersControllerResourceUpdate() { + var res = operator.create(FilterTestCustomResource.class, createResource()); + // One for CR create event other for ConfigMap event + await().pollDelay(Duration.ofMillis(POLL_DELAY)) + .untilAsserted(() -> assertThat(operator.getReconcilerOfType(FilterTestReconciler.class) + .getNumberOfExecutions()).isEqualTo(2)); + + res.getSpec().setValue(FilterTestReconciler.CUSTOM_RESOURCE_FILTER_VALUE); + operator.replace(FilterTestCustomResource.class, res); + + // not more reconciliation with the filtered value + await().pollDelay(Duration.ofMillis(POLL_DELAY)) + .untilAsserted(() -> assertThat(operator.getReconcilerOfType(FilterTestReconciler.class) + .getNumberOfExecutions()).isEqualTo(2)); + } + + @Test + void filtersSecondaryResourceUpdate() { + var res = operator.create(FilterTestCustomResource.class, createResource()); + // One for CR create event other for ConfigMap event + await().pollDelay(Duration.ofMillis(POLL_DELAY)) + .untilAsserted(() -> assertThat(operator.getReconcilerOfType(FilterTestReconciler.class) + .getNumberOfExecutions()).isEqualTo(2)); + + res.getSpec().setValue(CONFIG_MAP_FILTER_VALUE); + operator.replace(FilterTestCustomResource.class, res); + + // the CM event filtered out + await().pollDelay(Duration.ofMillis(POLL_DELAY)) + .untilAsserted(() -> assertThat(operator.getReconcilerOfType(FilterTestReconciler.class) + .getNumberOfExecutions()).isEqualTo(3)); + } + + + FilterTestCustomResource createResource() { + FilterTestCustomResource resource = new FilterTestCustomResource(); + resource.setMetadata(new ObjectMetaBuilder() + .withName(RESOURCE_NAME) + .build()); + resource.setSpec(new FilterTestResourceSpec()); + resource.getSpec().setValue("value1"); + return resource; + } + +} diff --git a/operator-framework/src/test/java/io/javaoperatorsdk/operator/sample/createupdateeventfilter/CreateUpdateEventFilterTestReconciler.java b/operator-framework/src/test/java/io/javaoperatorsdk/operator/sample/createupdateeventfilter/CreateUpdateEventFilterTestReconciler.java index 256d861791..988b1466c1 100644 --- a/operator-framework/src/test/java/io/javaoperatorsdk/operator/sample/createupdateeventfilter/CreateUpdateEventFilterTestReconciler.java +++ b/operator-framework/src/test/java/io/javaoperatorsdk/operator/sample/createupdateeventfilter/CreateUpdateEventFilterTestReconciler.java @@ -105,7 +105,8 @@ public Map prepareEventSources( InformerConfiguration.from(ConfigMap.class) .withLabelSelector("integrationtest = " + this.getClass().getSimpleName()) .build(); - informerEventSource = new InformerEventSource<>(informerConfiguration, client); + informerEventSource = + new InformerEventSource<>(informerConfiguration, client); return EventSourceInitializer.nameEventSources(informerEventSource); } diff --git a/operator-framework/src/test/java/io/javaoperatorsdk/operator/sample/dependentfilter/DependentFilterTestCustomResource.java b/operator-framework/src/test/java/io/javaoperatorsdk/operator/sample/dependentfilter/DependentFilterTestCustomResource.java new file mode 100644 index 0000000000..0930c29774 --- /dev/null +++ b/operator-framework/src/test/java/io/javaoperatorsdk/operator/sample/dependentfilter/DependentFilterTestCustomResource.java @@ -0,0 +1,19 @@ +package io.javaoperatorsdk.operator.sample.dependentfilter; + +import io.fabric8.kubernetes.api.model.Namespaced; +import io.fabric8.kubernetes.client.CustomResource; +import io.fabric8.kubernetes.model.annotation.Group; +import io.fabric8.kubernetes.model.annotation.ShortNames; +import io.fabric8.kubernetes.model.annotation.Version; + +@Group("sample.javaoperatorsdk") +@Version("v1") +@ShortNames("dft") +public class DependentFilterTestCustomResource + extends CustomResource + implements Namespaced { + + public String getConfigMapName(int id) { + return "configmap" + id; + } +} diff --git a/operator-framework/src/test/java/io/javaoperatorsdk/operator/sample/dependentfilter/DependentFilterTestReconciler.java b/operator-framework/src/test/java/io/javaoperatorsdk/operator/sample/dependentfilter/DependentFilterTestReconciler.java new file mode 100644 index 0000000000..114491d9b9 --- /dev/null +++ b/operator-framework/src/test/java/io/javaoperatorsdk/operator/sample/dependentfilter/DependentFilterTestReconciler.java @@ -0,0 +1,29 @@ +package io.javaoperatorsdk.operator.sample.dependentfilter; + +import java.util.concurrent.atomic.AtomicInteger; + +import io.javaoperatorsdk.operator.api.reconciler.*; +import io.javaoperatorsdk.operator.api.reconciler.dependent.Dependent; + +@ControllerConfiguration(onUpdateFilter = UpdateFilter.class, + dependents = {@Dependent(type = FilteredDependentConfigMap.class)}) +public class DependentFilterTestReconciler + implements Reconciler { + + public static final String CONFIG_MAP_FILTER_VALUE = "config_map_skip_this"; + public static final String CM_VALUE_KEY = "value"; + + private final AtomicInteger numberOfExecutions = new AtomicInteger(0); + + @Override + public UpdateControl reconcile( + DependentFilterTestCustomResource resource, + Context context) { + numberOfExecutions.addAndGet(1); + return UpdateControl.noUpdate(); + } + + public int getNumberOfExecutions() { + return numberOfExecutions.get(); + } +} diff --git a/operator-framework/src/test/java/io/javaoperatorsdk/operator/sample/dependentfilter/DependentFilterTestResourceSpec.java b/operator-framework/src/test/java/io/javaoperatorsdk/operator/sample/dependentfilter/DependentFilterTestResourceSpec.java new file mode 100644 index 0000000000..cf6b02e936 --- /dev/null +++ b/operator-framework/src/test/java/io/javaoperatorsdk/operator/sample/dependentfilter/DependentFilterTestResourceSpec.java @@ -0,0 +1,15 @@ +package io.javaoperatorsdk.operator.sample.dependentfilter; + +public class DependentFilterTestResourceSpec { + + private String value; + + public String getValue() { + return value; + } + + public DependentFilterTestResourceSpec setValue(String value) { + this.value = value; + return this; + } +} diff --git a/operator-framework/src/test/java/io/javaoperatorsdk/operator/sample/dependentfilter/DependentFilterTestResourceStatus.java b/operator-framework/src/test/java/io/javaoperatorsdk/operator/sample/dependentfilter/DependentFilterTestResourceStatus.java new file mode 100644 index 0000000000..99e8d54514 --- /dev/null +++ b/operator-framework/src/test/java/io/javaoperatorsdk/operator/sample/dependentfilter/DependentFilterTestResourceStatus.java @@ -0,0 +1,5 @@ +package io.javaoperatorsdk.operator.sample.dependentfilter; + +public class DependentFilterTestResourceStatus { + +} diff --git a/operator-framework/src/test/java/io/javaoperatorsdk/operator/sample/dependentfilter/FilteredDependentConfigMap.java b/operator-framework/src/test/java/io/javaoperatorsdk/operator/sample/dependentfilter/FilteredDependentConfigMap.java new file mode 100644 index 0000000000..ee4bd4cde7 --- /dev/null +++ b/operator-framework/src/test/java/io/javaoperatorsdk/operator/sample/dependentfilter/FilteredDependentConfigMap.java @@ -0,0 +1,32 @@ +package io.javaoperatorsdk.operator.sample.dependentfilter; + +import java.util.Map; + +import io.fabric8.kubernetes.api.model.ConfigMap; +import io.fabric8.kubernetes.api.model.ObjectMetaBuilder; +import io.javaoperatorsdk.operator.api.reconciler.Context; +import io.javaoperatorsdk.operator.processing.dependent.kubernetes.CRUDKubernetesDependentResource; +import io.javaoperatorsdk.operator.processing.dependent.kubernetes.KubernetesDependent; + +import static io.javaoperatorsdk.operator.sample.dependentfilter.DependentFilterTestReconciler.CM_VALUE_KEY; + +@KubernetesDependent(onUpdateFilter = UpdateFilter.class) +public class FilteredDependentConfigMap + extends CRUDKubernetesDependentResource { + + public FilteredDependentConfigMap() { + super(ConfigMap.class); + } + + @Override + protected ConfigMap desired(DependentFilterTestCustomResource primary, + Context context) { + ConfigMap configMap = new ConfigMap(); + configMap.setMetadata(new ObjectMetaBuilder() + .withName(primary.getMetadata().getName()) + .withNamespace(primary.getMetadata().getNamespace()) + .build()); + configMap.setData(Map.of(CM_VALUE_KEY, primary.getSpec().getValue())); + return configMap; + } +} diff --git a/operator-framework/src/test/java/io/javaoperatorsdk/operator/sample/dependentfilter/UpdateFilter.java b/operator-framework/src/test/java/io/javaoperatorsdk/operator/sample/dependentfilter/UpdateFilter.java new file mode 100644 index 0000000000..60dc00ce8e --- /dev/null +++ b/operator-framework/src/test/java/io/javaoperatorsdk/operator/sample/dependentfilter/UpdateFilter.java @@ -0,0 +1,16 @@ +package io.javaoperatorsdk.operator.sample.dependentfilter; + +import java.util.function.BiPredicate; + +import io.fabric8.kubernetes.api.model.ConfigMap; + +import static io.javaoperatorsdk.operator.sample.dependentfilter.DependentFilterTestReconciler.CM_VALUE_KEY; +import static io.javaoperatorsdk.operator.sample.dependentfilter.DependentFilterTestReconciler.CONFIG_MAP_FILTER_VALUE; + +public class UpdateFilter + implements BiPredicate { + @Override + public boolean test(ConfigMap resource, ConfigMap oldResource) { + return !resource.getData().get(CM_VALUE_KEY).equals(CONFIG_MAP_FILTER_VALUE); + } +} diff --git a/operator-framework/src/test/java/io/javaoperatorsdk/operator/sample/filter/FilterTestCustomResource.java b/operator-framework/src/test/java/io/javaoperatorsdk/operator/sample/filter/FilterTestCustomResource.java new file mode 100644 index 0000000000..3314861ee5 --- /dev/null +++ b/operator-framework/src/test/java/io/javaoperatorsdk/operator/sample/filter/FilterTestCustomResource.java @@ -0,0 +1,19 @@ +package io.javaoperatorsdk.operator.sample.filter; + +import io.fabric8.kubernetes.api.model.Namespaced; +import io.fabric8.kubernetes.client.CustomResource; +import io.fabric8.kubernetes.model.annotation.Group; +import io.fabric8.kubernetes.model.annotation.ShortNames; +import io.fabric8.kubernetes.model.annotation.Version; + +@Group("sample.javaoperatorsdk") +@Version("v1") +@ShortNames("ftc") +public class FilterTestCustomResource + extends CustomResource + implements Namespaced { + + public String getConfigMapName(int id) { + return "configmap" + id; + } +} diff --git a/operator-framework/src/test/java/io/javaoperatorsdk/operator/sample/filter/FilterTestReconciler.java b/operator-framework/src/test/java/io/javaoperatorsdk/operator/sample/filter/FilterTestReconciler.java new file mode 100644 index 0000000000..7189b10c7f --- /dev/null +++ b/operator-framework/src/test/java/io/javaoperatorsdk/operator/sample/filter/FilterTestReconciler.java @@ -0,0 +1,77 @@ +package io.javaoperatorsdk.operator.sample.filter; + +import java.util.Map; +import java.util.concurrent.atomic.AtomicInteger; + +import io.fabric8.kubernetes.api.model.ConfigMap; +import io.fabric8.kubernetes.api.model.ObjectMetaBuilder; +import io.fabric8.kubernetes.client.KubernetesClient; +import io.javaoperatorsdk.operator.api.config.informer.InformerConfiguration; +import io.javaoperatorsdk.operator.api.reconciler.*; +import io.javaoperatorsdk.operator.junit.KubernetesClientAware; +import io.javaoperatorsdk.operator.processing.event.source.EventSource; +import io.javaoperatorsdk.operator.processing.event.source.informer.InformerEventSource; + +@ControllerConfiguration(onUpdateFilter = UpdateFilter.class) +public class FilterTestReconciler + implements Reconciler, + EventSourceInitializer, + KubernetesClientAware { + + public static final String CONFIG_MAP_FILTER_VALUE = "config_map_skip_this"; + public static final String CUSTOM_RESOURCE_FILTER_VALUE = "custom_resource_skip_this"; + + public static final String CM_VALUE_KEY = "value"; + private final AtomicInteger numberOfExecutions = new AtomicInteger(0); + private KubernetesClient client; + + @Override + public UpdateControl reconcile( + FilterTestCustomResource resource, + Context context) { + numberOfExecutions.addAndGet(1); + client.configMaps().inNamespace(resource.getMetadata().getNamespace()) + .createOrReplace(createConfigMap(resource)); + return UpdateControl.noUpdate(); + } + + private ConfigMap createConfigMap(FilterTestCustomResource resource) { + ConfigMap configMap = new ConfigMap(); + configMap.setMetadata(new ObjectMetaBuilder() + .withName(resource.getMetadata().getName()) + .withNamespace(resource.getMetadata().getNamespace()) + .build()); + configMap.addOwnerReference(resource); + configMap.setData(Map.of(CM_VALUE_KEY, resource.getSpec().getValue())); + return configMap; + } + + + public int getNumberOfExecutions() { + return numberOfExecutions.get(); + } + + @Override + public Map prepareEventSources( + EventSourceContext context) { + + InformerEventSource configMapES = + new InformerEventSource<>(InformerConfiguration + .from(ConfigMap.class, context) + .withOnUpdateFilter((newCM, oldCM) -> !newCM.getData().get(CM_VALUE_KEY) + .equals(CONFIG_MAP_FILTER_VALUE)) + .build(), context); + + return EventSourceInitializer.nameEventSources(configMapES); + } + + @Override + public void setKubernetesClient(KubernetesClient kubernetesClient) { + this.client = kubernetesClient; + } + + @Override + public KubernetesClient getKubernetesClient() { + return client; + } +} diff --git a/operator-framework/src/test/java/io/javaoperatorsdk/operator/sample/filter/FilterTestResourceSpec.java b/operator-framework/src/test/java/io/javaoperatorsdk/operator/sample/filter/FilterTestResourceSpec.java new file mode 100644 index 0000000000..044b0ea883 --- /dev/null +++ b/operator-framework/src/test/java/io/javaoperatorsdk/operator/sample/filter/FilterTestResourceSpec.java @@ -0,0 +1,15 @@ +package io.javaoperatorsdk.operator.sample.filter; + +public class FilterTestResourceSpec { + + private String value; + + public String getValue() { + return value; + } + + public FilterTestResourceSpec setValue(String value) { + this.value = value; + return this; + } +} diff --git a/operator-framework/src/test/java/io/javaoperatorsdk/operator/sample/filter/FilterTestResourceStatus.java b/operator-framework/src/test/java/io/javaoperatorsdk/operator/sample/filter/FilterTestResourceStatus.java new file mode 100644 index 0000000000..cf0d24aa2c --- /dev/null +++ b/operator-framework/src/test/java/io/javaoperatorsdk/operator/sample/filter/FilterTestResourceStatus.java @@ -0,0 +1,5 @@ +package io.javaoperatorsdk.operator.sample.filter; + +public class FilterTestResourceStatus { + +} diff --git a/operator-framework/src/test/java/io/javaoperatorsdk/operator/sample/filter/UpdateFilter.java b/operator-framework/src/test/java/io/javaoperatorsdk/operator/sample/filter/UpdateFilter.java new file mode 100644 index 0000000000..6b8a91f2c7 --- /dev/null +++ b/operator-framework/src/test/java/io/javaoperatorsdk/operator/sample/filter/UpdateFilter.java @@ -0,0 +1,13 @@ +package io.javaoperatorsdk.operator.sample.filter; + +import java.util.function.BiPredicate; + +import static io.javaoperatorsdk.operator.sample.filter.FilterTestReconciler.CUSTOM_RESOURCE_FILTER_VALUE; + +public class UpdateFilter + implements BiPredicate { + @Override + public boolean test(FilterTestCustomResource resource, FilterTestCustomResource oldResource) { + return !resource.getSpec().getValue().equals(CUSTOM_RESOURCE_FILTER_VALUE); + } +} diff --git a/sample-operators/mysql-schema/src/main/java/io/javaoperatorsdk/operator/sample/dependent/SchemaDependentResource.java b/sample-operators/mysql-schema/src/main/java/io/javaoperatorsdk/operator/sample/dependent/SchemaDependentResource.java index 5bef4adb04..48e3f37abe 100644 --- a/sample-operators/mysql-schema/src/main/java/io/javaoperatorsdk/operator/sample/dependent/SchemaDependentResource.java +++ b/sample-operators/mysql-schema/src/main/java/io/javaoperatorsdk/operator/sample/dependent/SchemaDependentResource.java @@ -29,6 +29,7 @@ public class SchemaDependentResource implements EventSourceProvider, DependentResourceConfigurator, Creator, Deleter { + public static final String NAME = "schema"; private static final Logger log = LoggerFactory.getLogger(SchemaDependentResource.class);