diff --git a/operator-framework-core/src/main/java/io/javaoperatorsdk/operator/ControllerManager.java b/operator-framework-core/src/main/java/io/javaoperatorsdk/operator/ControllerManager.java index 44aa430ec2..d92c58b366 100644 --- a/operator-framework-core/src/main/java/io/javaoperatorsdk/operator/ControllerManager.java +++ b/operator-framework-core/src/main/java/io/javaoperatorsdk/operator/ControllerManager.java @@ -8,6 +8,7 @@ import org.slf4j.Logger; import org.slf4j.LoggerFactory; +import io.javaoperatorsdk.operator.api.config.ExecutorServiceManager; import io.javaoperatorsdk.operator.processing.Controller; /** @@ -32,20 +33,27 @@ public synchronized void shouldStart() { } public synchronized void start(boolean startEventProcessor) { - controllers().parallelStream().forEach(c -> c.start(startEventProcessor)); + ExecutorServiceManager.executeAndWaitForAllToComplete(controllers().stream(), c -> { + c.start(startEventProcessor); + return null; + }, c -> "Controller Starter for: " + c.getConfiguration().getName()); started = true; } public synchronized void stop() { - controllers().parallelStream().forEach(closeable -> { - log.debug("closing {}", closeable); - closeable.stop(); - }); + ExecutorServiceManager.executeAndWaitForAllToComplete(controllers().stream(), c -> { + log.debug("closing {}", c); + c.stop(); + return null; + }, c -> "Controller Stopper for: " + c.getConfiguration().getName()); started = false; } public synchronized void startEventProcessing() { - controllers().parallelStream().forEach(Controller::startEventProcessing); + ExecutorServiceManager.executeAndWaitForAllToComplete(controllers().stream(), c -> { + c.startEventProcessing(); + return null; + }, c -> "Event processor starter for: " + c.getConfiguration().getName()); } @SuppressWarnings({"unchecked", "rawtypes"}) diff --git a/operator-framework-core/src/main/java/io/javaoperatorsdk/operator/api/config/ExecutorServiceManager.java b/operator-framework-core/src/main/java/io/javaoperatorsdk/operator/api/config/ExecutorServiceManager.java index 5dcba5975d..06eda871f2 100644 --- a/operator-framework-core/src/main/java/io/javaoperatorsdk/operator/api/config/ExecutorServiceManager.java +++ b/operator-framework-core/src/main/java/io/javaoperatorsdk/operator/api/config/ExecutorServiceManager.java @@ -72,7 +72,7 @@ public static void executeAndWaitForAllToComplete(Stream stream, Executors.newFixedThreadPool(Runtime.getRuntime().availableProcessors())); try { - instrumented.invokeAll(stream.parallel().map(item -> (Callable) () -> { + instrumented.invokeAll(stream.map(item -> (Callable) () -> { // change thread name for easier debugging final var thread = Thread.currentThread(); final var name = thread.getName(); @@ -91,11 +91,13 @@ public static void executeAndWaitForAllToComplete(Stream stream, } catch (ExecutionException e) { throw new OperatorException(e.getCause()); } catch (InterruptedException e) { + log.warn("Interrupted.", e); Thread.currentThread().interrupt(); } }); shutdown(instrumented); } catch (InterruptedException e) { + log.warn("Interrupted.", e); Thread.currentThread().interrupt(); } } diff --git a/operator-framework-core/src/main/java/io/javaoperatorsdk/operator/processing/event/EventSourceManager.java b/operator-framework-core/src/main/java/io/javaoperatorsdk/operator/processing/event/EventSourceManager.java index 6d161aca9e..d8b2312ebb 100644 --- a/operator-framework-core/src/main/java/io/javaoperatorsdk/operator/processing/event/EventSourceManager.java +++ b/operator-framework-core/src/main/java/io/javaoperatorsdk/operator/processing/event/EventSourceManager.java @@ -86,6 +86,10 @@ private static Function getThreadNamer(String stage) { }; } + private static Function getEventSourceThreadNamer(String stage) { + return es -> stage + " -> " + es; + } + @Override public synchronized void stop() { stopEventSource(eventSources.namedControllerResourceEventSource()); @@ -179,13 +183,15 @@ public void broadcastOnResourceEvent(ResourceAction action, P resource, P oldRes public void changeNamespaces(Set namespaces) { eventSources.controllerResourceEventSource() .changeNamespaces(namespaces); - eventSources + ExecutorServiceManager.executeAndWaitForAllToComplete(eventSources .additionalEventSources() .filter(NamespaceChangeable.class::isInstance) .map(NamespaceChangeable.class::cast) - .filter(NamespaceChangeable::allowsNamespaceChanges) - .parallel() - .forEach(ies -> ies.changeNamespaces(namespaces)); + .filter(NamespaceChangeable::allowsNamespaceChanges), e -> { + e.changeNamespaces(namespaces); + return null; + }, + getEventSourceThreadNamer("changeNamespace")); } public Set getRegisteredEventSources() { diff --git a/operator-framework/src/test/java/io/javaoperatorsdk/operator/MultiVersionCRDIT.java b/operator-framework/src/test/java/io/javaoperatorsdk/operator/MultiVersionCRDIT.java index 2126f129c7..d00fc48489 100644 --- a/operator-framework/src/test/java/io/javaoperatorsdk/operator/MultiVersionCRDIT.java +++ b/operator-framework/src/test/java/io/javaoperatorsdk/operator/MultiVersionCRDIT.java @@ -132,7 +132,7 @@ void invalidEventsShouldStopInformerAndCallInformerStoppedHandler() { operator.create(v1res); await() - .atMost(Duration.ofSeconds(1)) + .atMost(Duration.ofSeconds(2)) .pollInterval(Duration.ofMillis(50)) .untilAsserted(() -> { // v1 is the stored version so trying to create a v2 version should fail because we cannot