diff --git a/operator-framework-core/src/main/java/io/javaoperatorsdk/operator/ControllerManager.java b/operator-framework-core/src/main/java/io/javaoperatorsdk/operator/ControllerManager.java index 44aa430ec2..c9732892b9 100644 --- a/operator-framework-core/src/main/java/io/javaoperatorsdk/operator/ControllerManager.java +++ b/operator-framework-core/src/main/java/io/javaoperatorsdk/operator/ControllerManager.java @@ -8,6 +8,7 @@ import org.slf4j.Logger; import org.slf4j.LoggerFactory; +import io.javaoperatorsdk.operator.api.config.ExecutorServiceManager; import io.javaoperatorsdk.operator.processing.Controller; /** @@ -32,20 +33,27 @@ public synchronized void shouldStart() { } public synchronized void start(boolean startEventProcessor) { - controllers().parallelStream().forEach(c -> c.start(startEventProcessor)); + ExecutorServiceManager.boundedExecuteAndWaitForAllToComplete(controllers().stream(), c -> { + c.start(startEventProcessor); + return null; + }, c -> "Controller Starter for: " + c.getConfiguration().getName()); started = true; } public synchronized void stop() { - controllers().parallelStream().forEach(closeable -> { - log.debug("closing {}", closeable); - closeable.stop(); - }); + ExecutorServiceManager.boundedExecuteAndWaitForAllToComplete(controllers().stream(), c -> { + log.debug("closing {}", c); + c.stop(); + return null; + }, c -> "Controller Stopper for: " + c.getConfiguration().getName()); started = false; } public synchronized void startEventProcessing() { - controllers().parallelStream().forEach(Controller::startEventProcessing); + ExecutorServiceManager.boundedExecuteAndWaitForAllToComplete(controllers().stream(), c -> { + c.startEventProcessing(); + return null; + }, c -> "Event processor starter for: " + c.getConfiguration().getName()); } @SuppressWarnings({"unchecked", "rawtypes"}) diff --git a/operator-framework-core/src/main/java/io/javaoperatorsdk/operator/Operator.java b/operator-framework-core/src/main/java/io/javaoperatorsdk/operator/Operator.java index b7c2f5e108..613ea622b5 100644 --- a/operator-framework-core/src/main/java/io/javaoperatorsdk/operator/Operator.java +++ b/operator-framework-core/src/main/java/io/javaoperatorsdk/operator/Operator.java @@ -104,6 +104,7 @@ public synchronized void start() { if (started) { return; } + ExecutorServiceManager.init(); controllerManager.shouldStart(); final var version = ConfigurationServiceProvider.instance().getVersion(); log.info( @@ -114,7 +115,6 @@ public synchronized void start() { final var clientVersion = Version.clientVersion(); log.info("Client version: {}", clientVersion); - ExecutorServiceManager.init(); // first start the controller manager before leader election, // the leader election would start subsequently the processor if on controllerManager.start(!leaderElectionManager.isLeaderElectionEnabled()); diff --git a/operator-framework-core/src/main/java/io/javaoperatorsdk/operator/api/config/ExecutorServiceManager.java b/operator-framework-core/src/main/java/io/javaoperatorsdk/operator/api/config/ExecutorServiceManager.java index 5dcba5975d..3261651f3d 100644 --- a/operator-framework-core/src/main/java/io/javaoperatorsdk/operator/api/config/ExecutorServiceManager.java +++ b/operator-framework-core/src/main/java/io/javaoperatorsdk/operator/api/config/ExecutorServiceManager.java @@ -23,10 +23,12 @@ public class ExecutorServiceManager { private static ExecutorServiceManager instance; private final ExecutorService executor; private final ExecutorService workflowExecutor; + private final ExecutorService cachingExecutorService; private final int terminationTimeoutSeconds; private ExecutorServiceManager(ExecutorService executor, ExecutorService workflowExecutor, int terminationTimeoutSeconds) { + this.cachingExecutorService = Executors.newCachedThreadPool(); this.executor = new InstrumentedExecutorService(executor); this.workflowExecutor = new InstrumentedExecutorService(workflowExecutor); this.terminationTimeoutSeconds = terminationTimeoutSeconds; @@ -49,7 +51,7 @@ public static void init() { } } - public synchronized static void stop() { + public static synchronized void stop() { if (instance != null) { instance.doStop(); } @@ -66,13 +68,26 @@ public synchronized static ExecutorServiceManager instance() { return instance; } - public static void executeAndWaitForAllToComplete(Stream stream, + /** + * Uses cachingExecutorService from this manager. Use this only for tasks, that don't have dynamic + * nature, in sense that won't grow with the number of inputs (thus kubernetes resources) + * + * @param stream of elements + * @param task to call on stream elements + * @param threadNamer for naming thread + * @param type + */ + public static void boundedExecuteAndWaitForAllToComplete(Stream stream, Function task, Function threadNamer) { - final var instrumented = new InstrumentedExecutorService( - Executors.newFixedThreadPool(Runtime.getRuntime().availableProcessors())); + executeAndWaitForAllToComplete(stream, task, threadNamer, instance().cachingExecutorService()); + } + public static void executeAndWaitForAllToComplete(Stream stream, + Function task, Function threadNamer, + ExecutorService executorService) { + final var instrumented = new InstrumentedExecutorService(executorService); try { - instrumented.invokeAll(stream.parallel().map(item -> (Callable) () -> { + instrumented.invokeAll(stream.map(item -> (Callable) () -> { // change thread name for easier debugging final var thread = Thread.currentThread(); final var name = thread.getName(); @@ -91,11 +106,12 @@ public static void executeAndWaitForAllToComplete(Stream stream, } catch (ExecutionException e) { throw new OperatorException(e.getCause()); } catch (InterruptedException e) { + log.warn("Interrupted.", e); Thread.currentThread().interrupt(); } }); - shutdown(instrumented); } catch (InterruptedException e) { + log.warn("Interrupted.", e); Thread.currentThread().interrupt(); } } @@ -108,11 +124,16 @@ public ExecutorService workflowExecutorService() { return workflowExecutor; } + public ExecutorService cachingExecutorService() { + return cachingExecutorService; + } + private void doStop() { try { log.debug("Closing executor"); shutdown(executor); shutdown(workflowExecutor); + shutdown(cachingExecutorService); } catch (InterruptedException e) { log.debug("Exception closing executor: {}", e.getLocalizedMessage()); Thread.currentThread().interrupt(); diff --git a/operator-framework-core/src/main/java/io/javaoperatorsdk/operator/processing/event/EventSourceManager.java b/operator-framework-core/src/main/java/io/javaoperatorsdk/operator/processing/event/EventSourceManager.java index 6d161aca9e..91bbd99b66 100644 --- a/operator-framework-core/src/main/java/io/javaoperatorsdk/operator/processing/event/EventSourceManager.java +++ b/operator-framework-core/src/main/java/io/javaoperatorsdk/operator/processing/event/EventSourceManager.java @@ -65,13 +65,13 @@ public void postProcessDefaultEventSourcesAfterProcessorInitializer() { public synchronized void start() { startEventSource(eventSources.namedControllerResourceEventSource()); - ExecutorServiceManager.executeAndWaitForAllToComplete( + ExecutorServiceManager.boundedExecuteAndWaitForAllToComplete( eventSources.additionalNamedEventSources() .filter(es -> es.priority().equals(EventSourceStartPriority.RESOURCE_STATE_LOADER)), this::startEventSource, getThreadNamer("start")); - ExecutorServiceManager.executeAndWaitForAllToComplete( + ExecutorServiceManager.boundedExecuteAndWaitForAllToComplete( eventSources.additionalNamedEventSources() .filter(es -> es.priority().equals(EventSourceStartPriority.DEFAULT)), this::startEventSource, @@ -86,10 +86,14 @@ private static Function getThreadNamer(String stage) { }; } + private static Function getEventSourceThreadNamer(String stage) { + return es -> stage + " -> " + es; + } + @Override public synchronized void stop() { stopEventSource(eventSources.namedControllerResourceEventSource()); - ExecutorServiceManager.executeAndWaitForAllToComplete( + ExecutorServiceManager.boundedExecuteAndWaitForAllToComplete( eventSources.additionalNamedEventSources(), this::stopEventSource, getThreadNamer("stop")); @@ -179,13 +183,15 @@ public void broadcastOnResourceEvent(ResourceAction action, P resource, P oldRes public void changeNamespaces(Set namespaces) { eventSources.controllerResourceEventSource() .changeNamespaces(namespaces); - eventSources + ExecutorServiceManager.boundedExecuteAndWaitForAllToComplete(eventSources .additionalEventSources() .filter(NamespaceChangeable.class::isInstance) .map(NamespaceChangeable.class::cast) - .filter(NamespaceChangeable::allowsNamespaceChanges) - .parallel() - .forEach(ies -> ies.changeNamespaces(namespaces)); + .filter(NamespaceChangeable::allowsNamespaceChanges), e -> { + e.changeNamespaces(namespaces); + return null; + }, + getEventSourceThreadNamer("changeNamespace")); } public Set getRegisteredEventSources() { diff --git a/operator-framework-core/src/main/java/io/javaoperatorsdk/operator/processing/event/source/informer/InformerManager.java b/operator-framework-core/src/main/java/io/javaoperatorsdk/operator/processing/event/source/informer/InformerManager.java index 9d048e61cf..0a6b8327c5 100644 --- a/operator-framework-core/src/main/java/io/javaoperatorsdk/operator/processing/event/source/informer/InformerManager.java +++ b/operator-framework-core/src/main/java/io/javaoperatorsdk/operator/processing/event/source/informer/InformerManager.java @@ -59,7 +59,7 @@ public InformerManager(MixedOperation, Resource> public void start() throws OperatorException { initSources(); // make sure informers are all started before proceeding further - ExecutorServiceManager.executeAndWaitForAllToComplete(sources.values().stream(), + ExecutorServiceManager.boundedExecuteAndWaitForAllToComplete(sources.values().stream(), iw -> { iw.start(); return null; diff --git a/operator-framework/src/test/java/io/javaoperatorsdk/operator/MultiVersionCRDIT.java b/operator-framework/src/test/java/io/javaoperatorsdk/operator/MultiVersionCRDIT.java index 2126f129c7..12230a7e2a 100644 --- a/operator-framework/src/test/java/io/javaoperatorsdk/operator/MultiVersionCRDIT.java +++ b/operator-framework/src/test/java/io/javaoperatorsdk/operator/MultiVersionCRDIT.java @@ -23,6 +23,7 @@ import com.fasterxml.jackson.core.JsonProcessingException; import static com.google.common.truth.Truth.assertThat; +import static io.javaoperatorsdk.operator.api.config.ConfigurationService.log; import static org.awaitility.Awaitility.await; class MultiVersionCRDIT { @@ -72,9 +73,15 @@ public void onStop(SharedIndexInformer informer, Throwable ex) { acceptOnlyIfUnsetOrEqualToAlreadySet(errorMessage, watcherEx.getCause().getMessage()); } final var apiTypeClass = informer.getApiTypeClass(); + + log.debug("Current resourceClassName: " + resourceClassName); + resourceClassName = acceptOnlyIfUnsetOrEqualToAlreadySet(resourceClassName, apiTypeClass.getName()); - System.out.println("Informer for " + HasMetadata.getFullResourceName(apiTypeClass) + + log.debug("API Type Class: " + apiTypeClass.getName() + + " - resource class name: " + resourceClassName); + log.info("Informer for " + HasMetadata.getFullResourceName(apiTypeClass) + " stopped due to: " + ex.getMessage()); } @@ -132,7 +139,7 @@ void invalidEventsShouldStopInformerAndCallInformerStoppedHandler() { operator.create(v1res); await() - .atMost(Duration.ofSeconds(1)) + .atMost(Duration.ofSeconds(10)) .pollInterval(Duration.ofMillis(50)) .untilAsserted(() -> { // v1 is the stored version so trying to create a v2 version should fail because we cannot