From b046690a109854f385381d6964fd40d6b04b36e6 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Attila=20M=C3=A9sz=C3=A1ros?= Date: Tue, 12 Nov 2024 20:54:24 +0100 Subject: [PATCH 1/2] fix: concurrent modification exception MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Signed-off-by: Attila Mészáros --- .../processing/event/EventSourcesTest.java | 41 ++++++++++++++++++- 1 file changed, 40 insertions(+), 1 deletion(-) diff --git a/operator-framework-core/src/test/java/io/javaoperatorsdk/operator/processing/event/EventSourcesTest.java b/operator-framework-core/src/test/java/io/javaoperatorsdk/operator/processing/event/EventSourcesTest.java index 3d0d92da4f..f60c717536 100644 --- a/operator-framework-core/src/test/java/io/javaoperatorsdk/operator/processing/event/EventSourcesTest.java +++ b/operator-framework-core/src/test/java/io/javaoperatorsdk/operator/processing/event/EventSourcesTest.java @@ -1,5 +1,10 @@ package io.javaoperatorsdk.operator.processing.event; +import java.util.ConcurrentModificationException; +import java.util.concurrent.Phaser; +import java.util.concurrent.atomic.AtomicBoolean; +import java.util.stream.IntStream; + import org.junit.jupiter.api.Test; import io.fabric8.kubernetes.api.model.ConfigMap; @@ -177,7 +182,41 @@ void getEventSourcesShouldWork() { assertThat(eventSources.getEventSources(Service.class)).isEmpty(); } - + @Test + void testConcurrentAddRemoveAndGet() throws InterruptedException { + + final var concurrentExceptionFound = new AtomicBoolean(false); + + for (int i = 0; i < 1000 && !concurrentExceptionFound.get(); i++) { + final var eventSources = new EventSources(); + var eventSourceList = + IntStream.range(1, 20).mapToObj(n -> eventSourceMockWithName(EventSource.class, + "name" + n, HasMetadata.class)).toList(); + + IntStream.range(1, 10).forEach(n -> eventSources.add(eventSourceList.get(n - 1))); + + var phaser = new Phaser(2); + + var t1 = new Thread(() -> { + phaser.arriveAndAwaitAdvance(); + IntStream.range(11, 20).forEach(n -> eventSources.add(eventSourceList.get(n - 1))); + }); + var t2 = new Thread(() -> { + phaser.arriveAndAwaitAdvance(); + try { + eventSources.getEventSources(eventSourceList.get(0).resourceType()); + } catch (ConcurrentModificationException e) { + concurrentExceptionFound.set(true); + } + }); + t1.start(); + t2.start(); + t1.join(); + t2.join(); + } + + assertThat(concurrentExceptionFound).isFalse(); + } EventSource eventSourceMockWithName(Class clazz, String name, Class resourceType) { From f8cb01ce6abcbd0e2ee5aeb8eb95d25c71175d4b Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Attila=20M=C3=A9sz=C3=A1ros?= Date: Tue, 12 Nov 2024 21:11:43 +0100 Subject: [PATCH 2/2] fix MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Signed-off-by: Attila Mészáros --- .../operator/processing/event/EventSources.java | 6 ++++-- .../operator/processing/event/EventSourceManagerTest.java | 2 ++ .../operator/processing/event/EventSourcesTest.java | 4 +++- 3 files changed, 9 insertions(+), 3 deletions(-) diff --git a/operator-framework-core/src/main/java/io/javaoperatorsdk/operator/processing/event/EventSources.java b/operator-framework-core/src/main/java/io/javaoperatorsdk/operator/processing/event/EventSources.java index 1a49add3cb..c21509f41c 100644 --- a/operator-framework-core/src/main/java/io/javaoperatorsdk/operator/processing/event/EventSources.java +++ b/operator-framework-core/src/main/java/io/javaoperatorsdk/operator/processing/event/EventSources.java @@ -1,10 +1,12 @@ package io.javaoperatorsdk.operator.processing.event; + import java.util.Collections; import java.util.HashMap; import java.util.List; import java.util.Map; import java.util.Objects; +import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.ConcurrentNavigableMap; import java.util.concurrent.ConcurrentSkipListMap; import java.util.stream.Stream; @@ -33,7 +35,8 @@ public void add(EventSource eventSource) { + " is already registered with name: " + name); } sourceByName.put(name, eventSource); - sources.computeIfAbsent(keyFor(eventSource), k -> new HashMap<>()).put(name, eventSource); + sources.computeIfAbsent(keyFor(eventSource), k -> new ConcurrentHashMap<>()).put(name, + eventSource); } public EventSource remove(String name) { @@ -144,7 +147,6 @@ public List> getEventSources(Class dependentType) { if (sourcesForType == null) { return Collections.emptyList(); } - return sourcesForType.values().stream() .map(es -> (EventSource) es).toList(); } diff --git a/operator-framework-core/src/test/java/io/javaoperatorsdk/operator/processing/event/EventSourceManagerTest.java b/operator-framework-core/src/test/java/io/javaoperatorsdk/operator/processing/event/EventSourceManagerTest.java index 6c68916e98..9ddb877f07 100644 --- a/operator-framework-core/src/test/java/io/javaoperatorsdk/operator/processing/event/EventSourceManagerTest.java +++ b/operator-framework-core/src/test/java/io/javaoperatorsdk/operator/processing/event/EventSourceManagerTest.java @@ -36,6 +36,7 @@ class EventSourceManagerTest { public void registersEventSource() { EventSource eventSource = mock(EventSource.class); when(eventSource.resourceType()).thenReturn(EventSource.class); + when(eventSource.name()).thenReturn("name1"); eventSourceManager.registerEventSource(eventSource); @@ -95,6 +96,7 @@ void retrievingEventSourceForClassShouldWork() { ManagedInformerEventSource eventSource = mock(ManagedInformerEventSource.class); when(eventSource.resourceType()).thenReturn(String.class); + when(eventSource.name()).thenReturn("name1"); manager.registerEventSource(eventSource); var source = manager.getEventSourceFor(String.class); diff --git a/operator-framework-core/src/test/java/io/javaoperatorsdk/operator/processing/event/EventSourcesTest.java b/operator-framework-core/src/test/java/io/javaoperatorsdk/operator/processing/event/EventSourcesTest.java index f60c717536..12f0cc7331 100644 --- a/operator-framework-core/src/test/java/io/javaoperatorsdk/operator/processing/event/EventSourcesTest.java +++ b/operator-framework-core/src/test/java/io/javaoperatorsdk/operator/processing/event/EventSourcesTest.java @@ -215,7 +215,9 @@ void testConcurrentAddRemoveAndGet() throws InterruptedException { t2.join(); } - assertThat(concurrentExceptionFound).isFalse(); + assertThat(concurrentExceptionFound) + .withFailMessage("ConcurrentModificationException thrown") + .isFalse(); } EventSource eventSourceMockWithName(Class clazz, String name,