diff --git a/operator-framework-core/src/main/java/io/javaoperatorsdk/operator/processing/event/EventSourceManager.java b/operator-framework-core/src/main/java/io/javaoperatorsdk/operator/processing/event/EventSourceManager.java index ade840ff4b..3332364410 100644 --- a/operator-framework-core/src/main/java/io/javaoperatorsdk/operator/processing/event/EventSourceManager.java +++ b/operator-framework-core/src/main/java/io/javaoperatorsdk/operator/processing/event/EventSourceManager.java @@ -134,8 +134,8 @@ public final synchronized void registerEventSource(String name, EventSource even @SuppressWarnings("unchecked") public void broadcastOnResourceEvent(ResourceAction action, R resource, R oldResource) { eventSources.additionalNamedEventSources().forEach(eventSource -> { - if (eventSource instanceof ResourceEventAware) { - var lifecycleAwareES = ((ResourceEventAware) eventSource); + if (eventSource.original() instanceof ResourceEventAware) { + var lifecycleAwareES = ((ResourceEventAware) eventSource.original()); switch (action) { case ADDED: lifecycleAwareES.onResourceCreated(resource); diff --git a/operator-framework/src/test/java/io/javaoperatorsdk/operator/PerResourcePollingEventSourceIT.java b/operator-framework/src/test/java/io/javaoperatorsdk/operator/PerResourcePollingEventSourceIT.java new file mode 100644 index 0000000000..a4b850b9d4 --- /dev/null +++ b/operator-framework/src/test/java/io/javaoperatorsdk/operator/PerResourcePollingEventSourceIT.java @@ -0,0 +1,52 @@ +package io.javaoperatorsdk.operator; + +import org.junit.jupiter.api.Test; +import org.junit.jupiter.api.extension.RegisterExtension; + +import io.fabric8.kubernetes.api.model.ObjectMetaBuilder; +import io.javaoperatorsdk.operator.junit.LocallyRunOperatorExtension; +import io.javaoperatorsdk.operator.sample.perresourceeventsource.PerResourceEventSourceCustomResource; +import io.javaoperatorsdk.operator.sample.perresourceeventsource.PerResourcePollingEventSourceTestReconciler; + +import static org.assertj.core.api.Assertions.assertThat; +import static org.awaitility.Awaitility.await; + +class PerResourcePollingEventSourceIT { + + public static final String NAME_1 = "name1"; + public static final String NAME_2 = "name2"; + + @RegisterExtension + LocallyRunOperatorExtension operator = + LocallyRunOperatorExtension.builder() + .withReconciler(new PerResourcePollingEventSourceTestReconciler()) + .build(); + + /** + * This is kinda some test to verify that the implementation of PerResourcePollingEventSource + * works with the underling mechanisms in event source manager and other parts of the system. + **/ + @Test + void fetchedAndReconciledMultipleTimes() { + operator.create(PerResourceEventSourceCustomResource.class, resource(NAME_1)); + operator.create(PerResourceEventSourceCustomResource.class, resource(NAME_2)); + + var reconciler = + operator.getReconcilerOfType(PerResourcePollingEventSourceTestReconciler.class); + await().untilAsserted(() -> { + assertThat(reconciler.getNumberOfExecutions(NAME_1)).isGreaterThan(2); + assertThat(reconciler.getNumberOfFetchExecution(NAME_1)).isGreaterThan(2); + assertThat(reconciler.getNumberOfExecutions(NAME_2)).isGreaterThan(2); + assertThat(reconciler.getNumberOfFetchExecution(NAME_2)).isGreaterThan(2); + }); + } + + private PerResourceEventSourceCustomResource resource(String name) { + var res = new PerResourceEventSourceCustomResource(); + res.setMetadata(new ObjectMetaBuilder() + .withName(name) + .build()); + return res; + } + +} diff --git a/operator-framework/src/test/java/io/javaoperatorsdk/operator/sample/perresourceeventsource/PerResourceEventSourceCustomResource.java b/operator-framework/src/test/java/io/javaoperatorsdk/operator/sample/perresourceeventsource/PerResourceEventSourceCustomResource.java new file mode 100644 index 0000000000..381e324aae --- /dev/null +++ b/operator-framework/src/test/java/io/javaoperatorsdk/operator/sample/perresourceeventsource/PerResourceEventSourceCustomResource.java @@ -0,0 +1,15 @@ +package io.javaoperatorsdk.operator.sample.perresourceeventsource; + +import io.fabric8.kubernetes.api.model.Namespaced; +import io.fabric8.kubernetes.client.CustomResource; +import io.fabric8.kubernetes.model.annotation.Group; +import io.fabric8.kubernetes.model.annotation.ShortNames; +import io.fabric8.kubernetes.model.annotation.Version; + +@Group("sample.javaoperatorsdk") +@Version("v1") +@ShortNames("pres") +public class PerResourceEventSourceCustomResource + extends CustomResource + implements Namespaced { +} diff --git a/operator-framework/src/test/java/io/javaoperatorsdk/operator/sample/perresourceeventsource/PerResourcePollingEventSourceTestReconciler.java b/operator-framework/src/test/java/io/javaoperatorsdk/operator/sample/perresourceeventsource/PerResourcePollingEventSourceTestReconciler.java new file mode 100644 index 0000000000..dec730d536 --- /dev/null +++ b/operator-framework/src/test/java/io/javaoperatorsdk/operator/sample/perresourceeventsource/PerResourcePollingEventSourceTestReconciler.java @@ -0,0 +1,67 @@ +package io.javaoperatorsdk.operator.sample.perresourceeventsource; + +import java.util.Map; +import java.util.Set; +import java.util.UUID; +import java.util.concurrent.ConcurrentHashMap; + +import io.fabric8.kubernetes.client.KubernetesClient; +import io.javaoperatorsdk.operator.api.reconciler.*; +import io.javaoperatorsdk.operator.junit.KubernetesClientAware; +import io.javaoperatorsdk.operator.processing.event.source.EventSource; +import io.javaoperatorsdk.operator.processing.event.source.polling.PerResourcePollingEventSource; + +@ControllerConfiguration +public class PerResourcePollingEventSourceTestReconciler + implements Reconciler, + EventSourceInitializer, + KubernetesClientAware { + + public static final int POLL_PERIOD = 100; + private final Map numberOfExecutions = new ConcurrentHashMap<>(); + private final Map numberOfFetchExecutions = new ConcurrentHashMap<>(); + + private KubernetesClient client; + private PerResourcePollingEventSource eventSource; + + @Override + public UpdateControl reconcile( + PerResourceEventSourceCustomResource resource, + Context context) throws Exception { + numberOfExecutions.putIfAbsent(resource.getMetadata().getName(), 0); + numberOfExecutions.compute(resource.getMetadata().getName(), (s, v) -> v + 1); + return UpdateControl.noUpdate(); + } + + @Override + public Map prepareEventSources( + EventSourceContext context) { + this.eventSource = + new PerResourcePollingEventSource<>(resource -> { + numberOfFetchExecutions.putIfAbsent(resource.getMetadata().getName(), 0); + numberOfFetchExecutions.compute(resource.getMetadata().getName(), (s, v) -> v + 1); + return Set.of(UUID.randomUUID().toString()); + }, + context.getPrimaryCache(), POLL_PERIOD, String.class); + return EventSourceInitializer.nameEventSources(eventSource); + } + + @Override + public KubernetesClient getKubernetesClient() { + return client; + } + + @Override + public void setKubernetesClient(KubernetesClient kubernetesClient) { + this.client = kubernetesClient; + } + + public int getNumberOfExecutions(String name) { + return numberOfExecutions.get(name); + } + + public int getNumberOfFetchExecution(String name) { + return numberOfFetchExecutions.get(name); + } + +}