diff --git a/operator-framework-core/src/main/java/io/javaoperatorsdk/operator/processing/event/EventSourceManager.java b/operator-framework-core/src/main/java/io/javaoperatorsdk/operator/processing/event/EventSourceManager.java index 3990c8c300..29df2c6b8a 100644 --- a/operator-framework-core/src/main/java/io/javaoperatorsdk/operator/processing/event/EventSourceManager.java +++ b/operator-framework-core/src/main/java/io/javaoperatorsdk/operator/processing/event/EventSourceManager.java @@ -39,13 +39,15 @@ public EventSourceManager(Controller controller) { this.eventSources = eventSources; this.controller = controller; // controller event source needs to be available before we create the event processor - final var controllerEventSource = eventSources.initControllerEventSource(controller); + eventSources.initControllerEventSource(controller); this.eventProcessor = new EventProcessor<>(this); - // sources need to be registered after the event processor is created since it's set on the - // event source - registerEventSource(eventSources.retryEventSource()); - registerEventSource(controllerEventSource); + postProcessDefaultEventSources(); + } + + private void postProcessDefaultEventSources() { + eventSources.controllerResourceEventSource().setEventHandler(eventProcessor); + eventSources.retryEventSource().setEventHandler(eventProcessor); } /** @@ -146,7 +148,7 @@ public void broadcastOnResourceEvent(ResourceAction action, R resource, R oldRes public void changeNamespaces(Set namespaces) { eventProcessor.stop(); eventSources - .allEventSources() + .eventSources() .filter(NamespaceChangeable.class::isInstance) .map(NamespaceChangeable.class::cast) .filter(NamespaceChangeable::allowsNamespaceChanges) diff --git a/operator-framework-core/src/main/java/io/javaoperatorsdk/operator/processing/event/EventSources.java b/operator-framework-core/src/main/java/io/javaoperatorsdk/operator/processing/event/EventSources.java index 42f0638968..aebe6caecb 100644 --- a/operator-framework-core/src/main/java/io/javaoperatorsdk/operator/processing/event/EventSources.java +++ b/operator-framework-core/src/main/java/io/javaoperatorsdk/operator/processing/event/EventSources.java @@ -16,7 +16,11 @@ class EventSources implements Iterable { - private static final String CONTROLLER_EVENT_SOURCE_KEY = "0"; + public static final String CONTROLLER_RESOURCE_EVENT_SOURCE_NAME = + "ControllerResourceEventSource"; + public static final String RETRY_RESCHEDULE_TIMER_EVENT_SOURCE_NAME = + "RetryAndRescheduleTimerEventSource"; + private final ConcurrentNavigableMap> sources = new ConcurrentSkipListMap<>(); private final TimerEventSource retryAndRescheduleTimerEventSource = new TimerEventSource<>(); @@ -38,7 +42,11 @@ TimerEventSource retryEventSource() { @Override public Iterator iterator() { - return flatMappedSources().iterator(); + return Stream.concat(Stream.of( + new NamedEventSource(controllerResourceEventSource, CONTROLLER_RESOURCE_EVENT_SOURCE_NAME), + new NamedEventSource(retryAndRescheduleTimerEventSource, + RETRY_RESCHEDULE_TIMER_EVENT_SOURCE_NAME)), + flatMappedSources()).iterator(); } Stream flatMappedSources() { @@ -46,9 +54,9 @@ Stream flatMappedSources() { .map(esEntry -> new NamedEventSource(esEntry.getValue(), esEntry.getKey()))); } - Stream allEventSources() { + Stream eventSources() { return Stream.concat( - Stream.of(retryEventSource(), controllerResourceEventSource()).filter(Objects::nonNull), + Stream.of(controllerResourceEventSource(), retryEventSource()).filter(Objects::nonNull), sources.values().stream().flatMap(c -> c.values().stream())); } @@ -81,9 +89,6 @@ private Class getResourceType(EventSource source) { } private String keyFor(EventSource source) { - if (source instanceof ControllerResourceEventSource) { - return CONTROLLER_EVENT_SOURCE_KEY; - } return keyFor(getResourceType(source)); } diff --git a/operator-framework-core/src/main/java/io/javaoperatorsdk/operator/processing/event/NamedEventSource.java b/operator-framework-core/src/main/java/io/javaoperatorsdk/operator/processing/event/NamedEventSource.java index 38d262f254..92d1f40096 100644 --- a/operator-framework-core/src/main/java/io/javaoperatorsdk/operator/processing/event/NamedEventSource.java +++ b/operator-framework-core/src/main/java/io/javaoperatorsdk/operator/processing/event/NamedEventSource.java @@ -1,5 +1,7 @@ package io.javaoperatorsdk.operator.processing.event; +import java.util.Objects; + import io.javaoperatorsdk.operator.OperatorException; import io.javaoperatorsdk.operator.processing.event.source.EventSource; @@ -40,4 +42,19 @@ public String toString() { public EventSource original() { return original; } + + @Override + public boolean equals(Object o) { + if (this == o) + return true; + if (o == null || getClass() != o.getClass()) + return false; + NamedEventSource that = (NamedEventSource) o; + return Objects.equals(original, that.original) && Objects.equals(name, that.name); + } + + @Override + public int hashCode() { + return Objects.hash(original, name); + } } diff --git a/operator-framework-core/src/test/java/io/javaoperatorsdk/operator/processing/event/EventSourcesTest.java b/operator-framework-core/src/test/java/io/javaoperatorsdk/operator/processing/event/EventSourcesTest.java index 3bc223653f..ecc5c12079 100644 --- a/operator-framework-core/src/test/java/io/javaoperatorsdk/operator/processing/event/EventSourcesTest.java +++ b/operator-framework-core/src/test/java/io/javaoperatorsdk/operator/processing/event/EventSourcesTest.java @@ -1,8 +1,5 @@ package io.javaoperatorsdk.operator.processing.event; -import java.util.Set; -import java.util.stream.Collectors; - import org.junit.jupiter.api.Test; import io.fabric8.kubernetes.api.model.HasMetadata; @@ -12,12 +9,16 @@ import io.javaoperatorsdk.operator.processing.Controller; import io.javaoperatorsdk.operator.processing.event.source.EventSource; +import static io.javaoperatorsdk.operator.processing.event.EventSources.CONTROLLER_RESOURCE_EVENT_SOURCE_NAME; +import static io.javaoperatorsdk.operator.processing.event.EventSources.RETRY_RESCHEDULE_TIMER_EVENT_SOURCE_NAME; +import static org.assertj.core.api.Assertions.assertThat; import static org.junit.jupiter.api.Assertions.*; import static org.mockito.Mockito.mock; @SuppressWarnings({"unchecked", "rawtypes"}) class EventSourcesTest { + public static final String EVENT_SOURCE_NAME = "foo"; EventSources eventSources = new EventSources(); @Test @@ -31,20 +32,41 @@ void cannotAddTwoEventSourcesWithSameName() { @Test void allEventSourcesShouldReturnAll() { // initial state doesn't have ControllerResourceEventSource - assertEquals(Set.of(eventSources.retryEventSource()), eventSources.allEventSources().collect( - Collectors.toSet())); + assertThat(eventSources.eventSources()).containsExactly(eventSources.retryEventSource()); + + initControllerEventSource(); + + assertThat(eventSources.eventSources()).containsExactly( + eventSources.controllerResourceEventSource(), + eventSources.retryEventSource()); + + final var source = mock(EventSource.class); + eventSources.add(EVENT_SOURCE_NAME, source); + // order matters + assertThat(eventSources.eventSources()) + .containsExactly(eventSources.controllerResourceEventSource(), + eventSources.retryEventSource(), source); + } + + @Test + void eventSourcesIteratorShouldReturnControllerEventSourceAsFirst() { + initControllerEventSource(); + final var source = mock(EventSource.class); + eventSources.add(EVENT_SOURCE_NAME, source); + + assertThat(eventSources.iterator()).toIterable().containsExactly( + new NamedEventSource(eventSources.controllerResourceEventSource(), + CONTROLLER_RESOURCE_EVENT_SOURCE_NAME), + new NamedEventSource(eventSources.retryEventSource(), + RETRY_RESCHEDULE_TIMER_EVENT_SOURCE_NAME), + new NamedEventSource(source, EVENT_SOURCE_NAME)); + } + + private void initControllerEventSource() { final var configuration = MockControllerConfiguration.forResource(HasMetadata.class); final var controller = new Controller(mock(Reconciler.class), configuration, MockKubernetesClient.client(HasMetadata.class)); eventSources.initControllerEventSource(controller); - assertEquals( - Set.of(eventSources.retryEventSource(), eventSources.controllerResourceEventSource()), - eventSources.allEventSources().collect(Collectors.toSet())); - final var source = mock(EventSource.class); - eventSources.add("foo", source); - assertEquals(Set.of(eventSources.retryEventSource(), - eventSources.controllerResourceEventSource(), source), - eventSources.allEventSources().collect(Collectors.toSet())); } }