diff --git a/operator-framework-core/src/main/java/io/javaoperatorsdk/operator/processing/event/EventSources.java b/operator-framework-core/src/main/java/io/javaoperatorsdk/operator/processing/event/EventSources.java index 1a49add3cb..c21509f41c 100644 --- a/operator-framework-core/src/main/java/io/javaoperatorsdk/operator/processing/event/EventSources.java +++ b/operator-framework-core/src/main/java/io/javaoperatorsdk/operator/processing/event/EventSources.java @@ -1,10 +1,12 @@ package io.javaoperatorsdk.operator.processing.event; + import java.util.Collections; import java.util.HashMap; import java.util.List; import java.util.Map; import java.util.Objects; +import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.ConcurrentNavigableMap; import java.util.concurrent.ConcurrentSkipListMap; import java.util.stream.Stream; @@ -33,7 +35,8 @@ public void add(EventSource eventSource) { + " is already registered with name: " + name); } sourceByName.put(name, eventSource); - sources.computeIfAbsent(keyFor(eventSource), k -> new HashMap<>()).put(name, eventSource); + sources.computeIfAbsent(keyFor(eventSource), k -> new ConcurrentHashMap<>()).put(name, + eventSource); } public EventSource remove(String name) { @@ -144,7 +147,6 @@ public List> getEventSources(Class dependentType) { if (sourcesForType == null) { return Collections.emptyList(); } - return sourcesForType.values().stream() .map(es -> (EventSource) es).toList(); } diff --git a/operator-framework-core/src/test/java/io/javaoperatorsdk/operator/processing/event/EventSourceManagerTest.java b/operator-framework-core/src/test/java/io/javaoperatorsdk/operator/processing/event/EventSourceManagerTest.java index 6c68916e98..9ddb877f07 100644 --- a/operator-framework-core/src/test/java/io/javaoperatorsdk/operator/processing/event/EventSourceManagerTest.java +++ b/operator-framework-core/src/test/java/io/javaoperatorsdk/operator/processing/event/EventSourceManagerTest.java @@ -36,6 +36,7 @@ class EventSourceManagerTest { public void registersEventSource() { EventSource eventSource = mock(EventSource.class); when(eventSource.resourceType()).thenReturn(EventSource.class); + when(eventSource.name()).thenReturn("name1"); eventSourceManager.registerEventSource(eventSource); @@ -95,6 +96,7 @@ void retrievingEventSourceForClassShouldWork() { ManagedInformerEventSource eventSource = mock(ManagedInformerEventSource.class); when(eventSource.resourceType()).thenReturn(String.class); + when(eventSource.name()).thenReturn("name1"); manager.registerEventSource(eventSource); var source = manager.getEventSourceFor(String.class); diff --git a/operator-framework-core/src/test/java/io/javaoperatorsdk/operator/processing/event/EventSourcesTest.java b/operator-framework-core/src/test/java/io/javaoperatorsdk/operator/processing/event/EventSourcesTest.java index 3d0d92da4f..12f0cc7331 100644 --- a/operator-framework-core/src/test/java/io/javaoperatorsdk/operator/processing/event/EventSourcesTest.java +++ b/operator-framework-core/src/test/java/io/javaoperatorsdk/operator/processing/event/EventSourcesTest.java @@ -1,5 +1,10 @@ package io.javaoperatorsdk.operator.processing.event; +import java.util.ConcurrentModificationException; +import java.util.concurrent.Phaser; +import java.util.concurrent.atomic.AtomicBoolean; +import java.util.stream.IntStream; + import org.junit.jupiter.api.Test; import io.fabric8.kubernetes.api.model.ConfigMap; @@ -177,7 +182,43 @@ void getEventSourcesShouldWork() { assertThat(eventSources.getEventSources(Service.class)).isEmpty(); } - + @Test + void testConcurrentAddRemoveAndGet() throws InterruptedException { + + final var concurrentExceptionFound = new AtomicBoolean(false); + + for (int i = 0; i < 1000 && !concurrentExceptionFound.get(); i++) { + final var eventSources = new EventSources(); + var eventSourceList = + IntStream.range(1, 20).mapToObj(n -> eventSourceMockWithName(EventSource.class, + "name" + n, HasMetadata.class)).toList(); + + IntStream.range(1, 10).forEach(n -> eventSources.add(eventSourceList.get(n - 1))); + + var phaser = new Phaser(2); + + var t1 = new Thread(() -> { + phaser.arriveAndAwaitAdvance(); + IntStream.range(11, 20).forEach(n -> eventSources.add(eventSourceList.get(n - 1))); + }); + var t2 = new Thread(() -> { + phaser.arriveAndAwaitAdvance(); + try { + eventSources.getEventSources(eventSourceList.get(0).resourceType()); + } catch (ConcurrentModificationException e) { + concurrentExceptionFound.set(true); + } + }); + t1.start(); + t2.start(); + t1.join(); + t2.join(); + } + + assertThat(concurrentExceptionFound) + .withFailMessage("ConcurrentModificationException thrown") + .isFalse(); + } EventSource eventSourceMockWithName(Class clazz, String name, Class resourceType) {