diff --git a/operator-framework-core/src/main/java/io/javaoperatorsdk/operator/api/reconciler/DefaultContext.java b/operator-framework-core/src/main/java/io/javaoperatorsdk/operator/api/reconciler/DefaultContext.java index cb7f4ae63b..c95d5e3d03 100644 --- a/operator-framework-core/src/main/java/io/javaoperatorsdk/operator/api/reconciler/DefaultContext.java +++ b/operator-framework-core/src/main/java/io/javaoperatorsdk/operator/api/reconciler/DefaultContext.java @@ -34,7 +34,7 @@ public Optional getRetryInfo() { @Override public Set getSecondaryResources(Class expectedType) { - return controller.getEventSourceManager().getEventSourcesFor(expectedType).stream() + return controller.getEventSourceManager().getResourceEventSourcesFor(expectedType).stream() .map(es -> es.getSecondaryResources(primaryResource)) .flatMap(Set::stream) .collect(Collectors.toSet()); diff --git a/operator-framework-core/src/main/java/io/javaoperatorsdk/operator/processing/Controller.java b/operator-framework-core/src/main/java/io/javaoperatorsdk/operator/processing/Controller.java index 1b6542b8eb..a34c4b38f1 100644 --- a/operator-framework-core/src/main/java/io/javaoperatorsdk/operator/processing/Controller.java +++ b/operator-framework-core/src/main/java/io/javaoperatorsdk/operator/processing/Controller.java @@ -83,9 +83,14 @@ public Controller(Reconciler

reconciler, isCleaner = reconciler instanceof Cleaner; managedWorkflow = ManagedWorkflow.workflowFor(kubernetesClient, configuration.getDependentResources()); + eventSourceManager = new EventSourceManager<>(this); eventProcessor = new EventProcessor<>(eventSourceManager); eventSourceManager.postProcessDefaultEventSourcesAfterProcessorInitializer(); + + final var context = new EventSourceContext<>( + eventSourceManager.getControllerResourceEventSource(), configuration, kubernetesClient); + initAndRegisterEventSources(context); } @Override @@ -241,8 +246,7 @@ public void initAndRegisterEventSources(EventSourceContext

context) { .map(EventSourceReferencer.class::cast) .forEach(dr -> { try { - ((EventSourceReferencer

) dr) - .resolveEventSource(eventSourceManager); + ((EventSourceReferencer

) dr).resolveEventSource(eventSourceManager); } catch (EventSourceNotFoundException e) { unresolvable.computeIfAbsent(e.getEventSourceName(), s -> new ArrayList<>()).add(dr); } @@ -318,10 +322,6 @@ public synchronized void start(boolean startEventProcessor) throws OperatorExcep try { // check that the custom resource is known by the cluster if configured that way validateCRDWithLocalModelIfRequired(resClass, controllerName, crdName, specVersion); - final var context = new EventSourceContext<>( - eventSourceManager.getControllerResourceEventSource(), configuration, kubernetesClient); - - initAndRegisterEventSources(context); eventSourceManager.start(); if (startEventProcessor) { eventProcessor.start(); diff --git a/operator-framework-core/src/main/java/io/javaoperatorsdk/operator/processing/event/EventSourceManager.java b/operator-framework-core/src/main/java/io/javaoperatorsdk/operator/processing/event/EventSourceManager.java index d091d442f6..a85633bb00 100644 --- a/operator-framework-core/src/main/java/io/javaoperatorsdk/operator/processing/event/EventSourceManager.java +++ b/operator-framework-core/src/main/java/io/javaoperatorsdk/operator/processing/event/EventSourceManager.java @@ -5,6 +5,7 @@ import java.util.Objects; import java.util.Set; import java.util.stream.Collectors; +import java.util.stream.Stream; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -126,8 +127,9 @@ public final synchronized void registerEventSource(String name, EventSource even if (name == null || name.isBlank()) { name = EventSourceInitializer.generateNameFor(eventSource); } - eventSources.add(name, eventSource); - eventSource.setEventHandler(controller.getEventProcessor()); + final var named = new NamedEventSource(eventSource, name); + eventSources.add(named); + named.setEventHandler(controller.getEventProcessor()); } catch (IllegalStateException | MissingCRDException e) { throw e; // leave untouched } catch (Exception e) { @@ -138,22 +140,24 @@ public final synchronized void registerEventSource(String name, EventSource even @SuppressWarnings("unchecked") public void broadcastOnResourceEvent(ResourceAction action, P resource, P oldResource) { - eventSources.additionalNamedEventSources().forEach(eventSource -> { - if (eventSource.original() instanceof ResourceEventAware) { - var lifecycleAwareES = ((ResourceEventAware

) eventSource.original()); - switch (action) { - case ADDED: - lifecycleAwareES.onResourceCreated(resource); - break; - case UPDATED: - lifecycleAwareES.onResourceUpdated(resource, oldResource); - break; - case DELETED: - lifecycleAwareES.onResourceDeleted(resource); - break; - } - } - }); + eventSources.additionalNamedEventSources() + .map(NamedEventSource::original) + .forEach(source -> { + if (source instanceof ResourceEventAware) { + var lifecycleAwareES = ((ResourceEventAware

) source); + switch (action) { + case ADDED: + lifecycleAwareES.onResourceCreated(resource); + break; + case UPDATED: + lifecycleAwareES.onResourceUpdated(resource, oldResource); + break; + case DELETED: + lifecycleAwareES.onResourceDeleted(resource); + break; + } + } + }); } public void changeNamespaces(Set namespaces) { @@ -174,6 +178,11 @@ public Set getRegisteredEventSources() { .collect(Collectors.toCollection(LinkedHashSet::new)); } + @SuppressWarnings("unused") + public Stream getNamedEventSourcesStream() { + return eventSources.flatMappedSources(); + } + public ControllerResourceEventSource

getControllerResourceEventSource() { return eventSources.controllerResourceEventSource(); } @@ -187,9 +196,12 @@ public List> getResourceEventSourcesFor(Class d return eventSources.getEventSources(dependentType); } + /** + * @deprecated Use {@link #getResourceEventSourceFor(Class)} instead + */ @Deprecated public List> getEventSourcesFor(Class dependentType) { - return eventSources.getEventSources(dependentType); + return getResourceEventSourcesFor(dependentType); } @Override diff --git a/operator-framework-core/src/main/java/io/javaoperatorsdk/operator/processing/event/EventSourceMetadata.java b/operator-framework-core/src/main/java/io/javaoperatorsdk/operator/processing/event/EventSourceMetadata.java new file mode 100644 index 0000000000..2fd913d481 --- /dev/null +++ b/operator-framework-core/src/main/java/io/javaoperatorsdk/operator/processing/event/EventSourceMetadata.java @@ -0,0 +1,13 @@ +package io.javaoperatorsdk.operator.processing.event; + +import java.util.Optional; + +public interface EventSourceMetadata { + String name(); + + Class type(); + + Optional> resourceType(); + + Optional configuration(); +} diff --git a/operator-framework-core/src/main/java/io/javaoperatorsdk/operator/processing/event/EventSources.java b/operator-framework-core/src/main/java/io/javaoperatorsdk/operator/processing/event/EventSources.java index e4fabe7ff8..c17843de2f 100644 --- a/operator-framework-core/src/main/java/io/javaoperatorsdk/operator/processing/event/EventSources.java +++ b/operator-framework-core/src/main/java/io/javaoperatorsdk/operator/processing/event/EventSources.java @@ -1,6 +1,10 @@ package io.javaoperatorsdk.operator.processing.event; -import java.util.*; +import java.util.Collections; +import java.util.HashMap; +import java.util.List; +import java.util.Map; +import java.util.Objects; import java.util.concurrent.ConcurrentNavigableMap; import java.util.concurrent.ConcurrentSkipListMap; import java.util.stream.Collectors; @@ -20,15 +24,14 @@ class EventSources { public static final String RETRY_RESCHEDULE_TIMER_EVENT_SOURCE_NAME = "RetryAndRescheduleTimerEventSource"; - private final ConcurrentNavigableMap> sources = + private final ConcurrentNavigableMap> sources = new ConcurrentSkipListMap<>(); private final TimerEventSource retryAndRescheduleTimerEventSource = new TimerEventSource<>(); private ControllerResourceEventSource controllerResourceEventSource; - ControllerResourceEventSource initControllerEventSource(Controller controller) { + void initControllerEventSource(Controller controller) { controllerResourceEventSource = new ControllerResourceEventSource<>(controller); - return controllerResourceEventSource; } ControllerResourceEventSource controllerResourceEventSource() { @@ -49,7 +52,7 @@ public Stream additionalNamedEventSources() { Stream additionalEventSources() { return Stream.concat( Stream.of(retryEventSource()).filter(Objects::nonNull), - sources.values().stream().flatMap(c -> c.values().stream())); + flatMappedSources().map(NamedEventSource::original)); } NamedEventSource namedControllerResourceEventSource() { @@ -58,29 +61,32 @@ NamedEventSource namedControllerResourceEventSource() { } Stream flatMappedSources() { - return sources.values().stream().flatMap(c -> c.entrySet().stream() - .map(esEntry -> new NamedEventSource(esEntry.getValue(), esEntry.getKey()))); + return sources.values().stream().flatMap(c -> c.values().stream()); } public void clear() { sources.clear(); } - public boolean contains(String name, EventSource source) { + private NamedEventSource existing(String name, EventSource source) { final var eventSources = sources.get(keyFor(source)); if (eventSources == null || eventSources.isEmpty()) { - return false; + return null; } - return eventSources.containsKey(name); + return eventSources.get(name); } - public void add(String name, EventSource eventSource) { - if (contains(name, eventSource)) { - throw new IllegalArgumentException("An event source is already registered for the " - + keyAsString(getResourceType(eventSource), name) + public void add(NamedEventSource eventSource) { + final var name = eventSource.name(); + final var original = eventSource.original(); + final var existing = existing(name, original); + if (existing != null && !eventSource.equals(existing)) { + throw new IllegalArgumentException("Event source " + existing.original() + + " is already registered for the " + + keyAsString(getResourceType(original), name) + " class/name combination"); } - sources.computeIfAbsent(keyFor(eventSource), k -> new HashMap<>()).put(name, eventSource); + sources.computeIfAbsent(keyFor(original), k -> new HashMap<>()).put(name, eventSource); } @SuppressWarnings("rawtypes") @@ -91,6 +97,10 @@ private Class getResourceType(EventSource source) { } private String keyFor(EventSource source) { + if (source instanceof NamedEventSource) { + source = ((NamedEventSource) source).original(); + } + return keyFor(getResourceType(source)); } @@ -100,6 +110,10 @@ private String keyFor(Class dependentType) { @SuppressWarnings("unchecked") public ResourceEventSource get(Class dependentType, String name) { + if (dependentType == null) { + throw new IllegalArgumentException("Must pass a dependent type to retrieve event sources"); + } + final var sourcesForType = sources.get(keyFor(dependentType)); if (sourcesForType == null || sourcesForType.isEmpty()) { throw new IllegalArgumentException( @@ -107,9 +121,9 @@ public ResourceEventSource get(Class dependentType, String name) { } final var size = sourcesForType.size(); - final EventSource source; + NamedEventSource source; if (size == 1 && name == null) { - source = sourcesForType.values().stream().findFirst().orElse(null); + source = sourcesForType.values().stream().findFirst().orElseThrow(); } else { if (name == null || name.isBlank()) { throw new IllegalArgumentException("There are multiple EventSources registered for type " @@ -125,15 +139,16 @@ public ResourceEventSource get(Class dependentType, String name) { } } - if (!(source instanceof ResourceEventSource)) { + EventSource original = source.original(); + if (!(original instanceof ResourceEventSource)) { throw new IllegalArgumentException(source + " associated with " + keyAsString(dependentType, name) + " is not a " + ResourceEventSource.class.getSimpleName()); } - final var res = (ResourceEventSource) source; + final var res = (ResourceEventSource) original; final var resourceClass = res.resourceType(); if (!resourceClass.isAssignableFrom(dependentType)) { - throw new IllegalArgumentException(source + " associated with " + throw new IllegalArgumentException(original + " associated with " + keyAsString(dependentType, name) + " is handling " + resourceClass.getName() + " resources but asked for " + dependentType.getName()); @@ -151,7 +166,12 @@ private String keyAsString(Class dependentType, String name) { @SuppressWarnings("unchecked") public List> getEventSources(Class dependentType) { final var sourcesForType = sources.get(keyFor(dependentType)); + if (sourcesForType == null) { + return Collections.emptyList(); + } + return sourcesForType.values().stream() + .map(NamedEventSource::original) .filter(ResourceEventSource.class::isInstance) .map(es -> (ResourceEventSource) es) .collect(Collectors.toList()); diff --git a/operator-framework-core/src/main/java/io/javaoperatorsdk/operator/processing/event/NamedEventSource.java b/operator-framework-core/src/main/java/io/javaoperatorsdk/operator/processing/event/NamedEventSource.java index 5822711cc5..13d5a10323 100644 --- a/operator-framework-core/src/main/java/io/javaoperatorsdk/operator/processing/event/NamedEventSource.java +++ b/operator-framework-core/src/main/java/io/javaoperatorsdk/operator/processing/event/NamedEventSource.java @@ -1,12 +1,15 @@ package io.javaoperatorsdk.operator.processing.event; import java.util.Objects; +import java.util.Optional; import io.javaoperatorsdk.operator.OperatorException; +import io.javaoperatorsdk.operator.processing.event.source.Configurable; import io.javaoperatorsdk.operator.processing.event.source.EventSource; import io.javaoperatorsdk.operator.processing.event.source.EventSourceStartPriority; +import io.javaoperatorsdk.operator.processing.event.source.ResourceEventSource; -class NamedEventSource implements EventSource { +class NamedEventSource implements EventSource, EventSourceMetadata { private final EventSource original; private final String name; @@ -35,6 +38,35 @@ public String name() { return name; } + @Override + public Class type() { + return original.getClass(); + } + + @Override + @SuppressWarnings({"rawtypes", "unchecked"}) + public Optional> resourceType() { + if (original instanceof ResourceEventSource) { + ResourceEventSource resourceEventSource = (ResourceEventSource) original; + return Optional.of(resourceEventSource.resourceType()); + } + return Optional.empty(); + } + + @Override + @SuppressWarnings("rawtypes") + public Optional configuration() { + if (original instanceof Configurable) { + Configurable configurable = (Configurable) original; + return Optional.ofNullable(configurable.configuration()); + } + return Optional.empty(); + } + + public EventSource eventSource() { + return original; + } + @Override public String toString() { return original + " named: '" + name + "'}"; diff --git a/operator-framework-core/src/main/java/io/javaoperatorsdk/operator/processing/event/source/Configurable.java b/operator-framework-core/src/main/java/io/javaoperatorsdk/operator/processing/event/source/Configurable.java new file mode 100644 index 0000000000..ecff7d11c9 --- /dev/null +++ b/operator-framework-core/src/main/java/io/javaoperatorsdk/operator/processing/event/source/Configurable.java @@ -0,0 +1,5 @@ +package io.javaoperatorsdk.operator.processing.event.source; + +public interface Configurable { + C configuration(); +} diff --git a/operator-framework-core/src/main/java/io/javaoperatorsdk/operator/processing/event/source/informer/InformerManager.java b/operator-framework-core/src/main/java/io/javaoperatorsdk/operator/processing/event/source/informer/InformerManager.java index f5f52d1c0e..0d22ccdef0 100644 --- a/operator-framework-core/src/main/java/io/javaoperatorsdk/operator/processing/event/source/informer/InformerManager.java +++ b/operator-framework-core/src/main/java/io/javaoperatorsdk/operator/processing/event/source/informer/InformerManager.java @@ -1,6 +1,10 @@ package io.javaoperatorsdk.operator.processing.event.source.informer; -import java.util.*; +import java.util.HashMap; +import java.util.List; +import java.util.Map; +import java.util.Optional; +import java.util.Set; import java.util.concurrent.ConcurrentHashMap; import java.util.function.Function; import java.util.function.Predicate; @@ -72,6 +76,10 @@ void initSources(MixedOperation, Resource> clien } } + C configuration() { + return configuration; + } + public void changeNamespaces(Set namespaces) { var sourcesToRemove = sources.keySet().stream() .filter(k -> !namespaces.contains(k)).collect(Collectors.toSet()); diff --git a/operator-framework-core/src/main/java/io/javaoperatorsdk/operator/processing/event/source/informer/ManagedInformerEventSource.java b/operator-framework-core/src/main/java/io/javaoperatorsdk/operator/processing/event/source/informer/ManagedInformerEventSource.java index cc9af59094..7fdf3c5e56 100644 --- a/operator-framework-core/src/main/java/io/javaoperatorsdk/operator/processing/event/source/informer/ManagedInformerEventSource.java +++ b/operator-framework-core/src/main/java/io/javaoperatorsdk/operator/processing/event/source/informer/ManagedInformerEventSource.java @@ -20,13 +20,16 @@ import io.javaoperatorsdk.operator.api.config.ResourceConfiguration; import io.javaoperatorsdk.operator.api.reconciler.dependent.RecentOperationCacheFiller; import io.javaoperatorsdk.operator.processing.event.ResourceID; -import io.javaoperatorsdk.operator.processing.event.source.*; +import io.javaoperatorsdk.operator.processing.event.source.AbstractResourceEventSource; +import io.javaoperatorsdk.operator.processing.event.source.Cache; +import io.javaoperatorsdk.operator.processing.event.source.Configurable; +import io.javaoperatorsdk.operator.processing.event.source.IndexerResourceCache; public abstract class ManagedInformerEventSource> extends AbstractResourceEventSource implements ResourceEventHandler, Cache, IndexerResourceCache, RecentOperationCacheFiller, - NamespaceChangeable { + NamespaceChangeable, Configurable { private static final Logger log = LoggerFactory.getLogger(ManagedInformerEventSource.class); @@ -133,4 +136,8 @@ public Stream list(Predicate predicate) { return cache.list(predicate); } + @Override + public C configuration() { + return manager().configuration(); + } } diff --git a/operator-framework-core/src/test/java/io/javaoperatorsdk/operator/processing/event/EventSourceManagerTest.java b/operator-framework-core/src/test/java/io/javaoperatorsdk/operator/processing/event/EventSourceManagerTest.java index aa9bccde09..ab345145da 100644 --- a/operator-framework-core/src/test/java/io/javaoperatorsdk/operator/processing/event/EventSourceManagerTest.java +++ b/operator-framework-core/src/test/java/io/javaoperatorsdk/operator/processing/event/EventSourceManagerTest.java @@ -24,7 +24,13 @@ import static org.junit.jupiter.api.Assertions.assertEquals; import static org.junit.jupiter.api.Assertions.assertThrows; import static org.junit.jupiter.api.Assertions.assertTrue; -import static org.mockito.Mockito.*; +import static org.mockito.Mockito.any; +import static org.mockito.Mockito.doReturn; +import static org.mockito.Mockito.mock; +import static org.mockito.Mockito.spy; +import static org.mockito.Mockito.times; +import static org.mockito.Mockito.verify; +import static org.mockito.Mockito.when; @SuppressWarnings({"rawtypes", "unchecked"}) class EventSourceManagerTest { @@ -109,7 +115,7 @@ void shouldNotBePossibleToAddEventSourcesForSameTypeAndName() { final var cause = exception.getCause(); assertTrue(cause instanceof IllegalArgumentException); assertThat(cause.getMessage()).contains( - "An event source is already registered for the (io.javaoperatorsdk.operator.sample.simple.TestCustomResource, " + "is already registered for the (io.javaoperatorsdk.operator.sample.simple.TestCustomResource, " + name + ") class/name combination"); } diff --git a/operator-framework-core/src/test/java/io/javaoperatorsdk/operator/processing/event/EventSourcesTest.java b/operator-framework-core/src/test/java/io/javaoperatorsdk/operator/processing/event/EventSourcesTest.java index 7e1fc36967..b5438cdd04 100644 --- a/operator-framework-core/src/test/java/io/javaoperatorsdk/operator/processing/event/EventSourcesTest.java +++ b/operator-framework-core/src/test/java/io/javaoperatorsdk/operator/processing/event/EventSourcesTest.java @@ -2,50 +2,184 @@ import org.junit.jupiter.api.Test; +import io.fabric8.kubernetes.api.model.ConfigMap; import io.fabric8.kubernetes.api.model.HasMetadata; +import io.fabric8.kubernetes.api.model.Service; import io.javaoperatorsdk.operator.MockKubernetesClient; import io.javaoperatorsdk.operator.api.config.MockControllerConfiguration; import io.javaoperatorsdk.operator.api.reconciler.Reconciler; import io.javaoperatorsdk.operator.processing.Controller; import io.javaoperatorsdk.operator.processing.event.source.EventSource; +import io.javaoperatorsdk.operator.processing.event.source.ResourceEventSource; import static io.javaoperatorsdk.operator.processing.event.EventSources.RETRY_RESCHEDULE_TIMER_EVENT_SOURCE_NAME; import static org.assertj.core.api.Assertions.assertThat; -import static org.junit.jupiter.api.Assertions.*; +import static org.junit.jupiter.api.Assertions.assertEquals; +import static org.junit.jupiter.api.Assertions.assertNotNull; +import static org.junit.jupiter.api.Assertions.assertThrows; import static org.mockito.Mockito.mock; +import static org.mockito.Mockito.when; @SuppressWarnings({"unchecked", "rawtypes"}) class EventSourcesTest { public static final String EVENT_SOURCE_NAME = "foo"; - EventSources eventSources = new EventSources(); @Test - void cannotAddTwoEventSourcesWithSameName() { + void cannotAddTwoDifferentEventSourcesWithSameName() { + final var eventSources = new EventSources(); assertThrows(IllegalArgumentException.class, () -> { - eventSources.add("name", mock(EventSource.class)); - eventSources.add("name", mock(EventSource.class)); + eventSources.add(new NamedEventSource(mock(EventSource.class), "name")); + eventSources.add(new NamedEventSource(mock(EventSource.class), "name")); }); } + @Test + void cannotAddTwoEventSourcesWithSameNameUnlessTheyAreEqual() { + final var eventSources = new EventSources(); + final var source = mock(EventSource.class); + eventSources.add(new NamedEventSource(source, "name")); + eventSources.add(new NamedEventSource(source, "name")); + assertThat(eventSources.flatMappedSources()) + .containsExactly(new NamedEventSource(source, "name")); + } + @Test void eventSourcesStreamShouldNotReturnControllerEventSource() { - initControllerEventSource(); + final var eventSources = new EventSources(); final var source = mock(EventSource.class); - eventSources.add(EVENT_SOURCE_NAME, source); + final var namedEventSource = new NamedEventSource(source, EVENT_SOURCE_NAME); + eventSources.add(namedEventSource); assertThat(eventSources.additionalNamedEventSources()).containsExactly( new NamedEventSource(eventSources.retryEventSource(), RETRY_RESCHEDULE_TIMER_EVENT_SOURCE_NAME), - new NamedEventSource(source, EVENT_SOURCE_NAME)); + namedEventSource); + } + + @Test + void additionalEventSourcesShouldNotContainNamedEventSources() { + final var eventSources = new EventSources(); + final var source = mock(EventSource.class); + final var namedEventSource = new NamedEventSource(source, EVENT_SOURCE_NAME); + eventSources.add(namedEventSource); + + assertThat(eventSources.additionalEventSources()).containsExactly( + eventSources.retryEventSource(), source); } - private void initControllerEventSource() { + @Test + void checkControllerResourceEventSource() { + final var eventSources = new EventSources(); final var configuration = MockControllerConfiguration.forResource(HasMetadata.class); final var controller = new Controller(mock(Reconciler.class), configuration, MockKubernetesClient.client(HasMetadata.class)); eventSources.initControllerEventSource(controller); + final var controllerResourceEventSource = eventSources.controllerResourceEventSource(); + assertNotNull(controllerResourceEventSource); + assertEquals(HasMetadata.class, controllerResourceEventSource.resourceType()); + + assertEquals(controllerResourceEventSource, + eventSources.namedControllerResourceEventSource().eventSource()); + } + + @Test + void flatMappedSourcesShouldReturnOnlyUserRegisteredEventSources() { + final var eventSources = new EventSources(); + final var mock1 = mock(ResourceEventSource.class); + when(mock1.resourceType()).thenReturn(HasMetadata.class); + final var mock2 = mock(ResourceEventSource.class); + when(mock2.resourceType()).thenReturn(HasMetadata.class); + final var mock3 = mock(ResourceEventSource.class); + when(mock3.resourceType()).thenReturn(ConfigMap.class); + + final var named1 = new NamedEventSource(mock1, "name1"); + final var named2 = new NamedEventSource(mock2, "name2"); + final var named3 = new NamedEventSource(mock3, "name2"); + eventSources.add(named1); + eventSources.add(named2); + eventSources.add(named3); + + assertThat(eventSources.flatMappedSources()).contains(named1, named2, named3); + } + + @Test + void clearShouldWork() { + final var eventSources = new EventSources(); + final var mock1 = mock(ResourceEventSource.class); + when(mock1.resourceType()).thenReturn(HasMetadata.class); + final var mock2 = mock(ResourceEventSource.class); + when(mock2.resourceType()).thenReturn(HasMetadata.class); + final var mock3 = mock(ResourceEventSource.class); + when(mock3.resourceType()).thenReturn(ConfigMap.class); + + final var named1 = new NamedEventSource(mock1, "name1"); + final var named2 = new NamedEventSource(mock2, "name2"); + final var named3 = new NamedEventSource(mock3, "name2"); + eventSources.add(named1); + eventSources.add(named2); + eventSources.add(named3); + + eventSources.clear(); + assertThat(eventSources.flatMappedSources()).isEmpty(); } + @Test + void getShouldWork() { + final var eventSources = new EventSources(); + final var mock1 = mock(ResourceEventSource.class); + when(mock1.resourceType()).thenReturn(HasMetadata.class); + final var mock2 = mock(ResourceEventSource.class); + when(mock2.resourceType()).thenReturn(HasMetadata.class); + final var mock3 = mock(ResourceEventSource.class); + when(mock3.resourceType()).thenReturn(ConfigMap.class); + + final var named1 = new NamedEventSource(mock1, "name1"); + final var named2 = new NamedEventSource(mock2, "name2"); + final var named3 = new NamedEventSource(mock3, "name2"); + eventSources.add(named1); + eventSources.add(named2); + eventSources.add(named3); + + assertEquals(mock1, eventSources.get(HasMetadata.class, "name1")); + assertEquals(mock2, eventSources.get(HasMetadata.class, "name2")); + assertEquals(mock3, eventSources.get(ConfigMap.class, "name2")); + assertEquals(mock3, eventSources.get(ConfigMap.class, null)); + + + assertThrows(IllegalArgumentException.class, () -> eventSources.get(HasMetadata.class, null)); + assertThrows(IllegalArgumentException.class, + () -> eventSources.get(ConfigMap.class, "unknown")); + assertThrows(IllegalArgumentException.class, () -> eventSources.get(null, null)); + assertThrows(IllegalArgumentException.class, () -> eventSources.get(HasMetadata.class, null)); + } + + @Test + void getEventSourcesShouldWork() { + final var eventSources = new EventSources(); + final var mock1 = mock(ResourceEventSource.class); + when(mock1.resourceType()).thenReturn(HasMetadata.class); + final var mock2 = mock(ResourceEventSource.class); + when(mock2.resourceType()).thenReturn(HasMetadata.class); + final var mock3 = mock(ResourceEventSource.class); + when(mock3.resourceType()).thenReturn(ConfigMap.class); + + final var named1 = new NamedEventSource(mock1, "name1"); + final var named2 = new NamedEventSource(mock2, "name2"); + final var named3 = new NamedEventSource(mock3, "name2"); + eventSources.add(named1); + eventSources.add(named2); + eventSources.add(named3); + + var sources = eventSources.getEventSources(HasMetadata.class); + assertThat(sources.size()).isEqualTo(2); + assertThat(sources).contains(mock1, mock2); + + sources = eventSources.getEventSources(ConfigMap.class); + assertThat(sources.size()).isEqualTo(1); + assertThat(sources).contains(mock3); + + assertThat(eventSources.getEventSources(Service.class)).isEmpty(); + } }