Skip to content

Commit b7e5817

Browse files
committed
fix: run event source start on specific thread pool
Fixes #1603
1 parent da20264 commit b7e5817

File tree

3 files changed

+59
-28
lines changed

3 files changed

+59
-28
lines changed

operator-framework-core/src/main/java/io/javaoperatorsdk/operator/api/config/ExecutorServiceManager.java

Lines changed: 20 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -5,6 +5,7 @@
55
import java.util.concurrent.Callable;
66
import java.util.concurrent.ExecutionException;
77
import java.util.concurrent.ExecutorService;
8+
import java.util.concurrent.ForkJoinPool;
89
import java.util.concurrent.Future;
910
import java.util.concurrent.TimeUnit;
1011
import java.util.concurrent.TimeoutException;
@@ -17,6 +18,9 @@ public class ExecutorServiceManager {
1718
private static ExecutorServiceManager instance;
1819
private final ExecutorService executor;
1920
private final ExecutorService workflowExecutor;
21+
22+
private static final ForkJoinPool threadPool = new ForkJoinPool(
23+
Runtime.getRuntime().availableProcessors());
2024
private final int terminationTimeoutSeconds;
2125

2226
private ExecutorServiceManager(ExecutorService executor, ExecutorService workflowExecutor,
@@ -68,6 +72,21 @@ public ExecutorService workflowExecutorService() {
6872
return workflowExecutor;
6973
}
7074

75+
public static void executeInParallel(Runnable callable) {
76+
executeInParallel(() -> {
77+
callable.run();
78+
return null;
79+
});
80+
}
81+
82+
public static <T> T executeInParallel(Callable<T> callable) {
83+
try {
84+
return threadPool.submit(callable).get();
85+
} catch (InterruptedException | ExecutionException e) {
86+
throw new RuntimeException(e);
87+
}
88+
}
89+
7190
private void doStop() {
7291
try {
7392
log.debug("Closing executor");
@@ -80,6 +99,7 @@ private void doStop() {
8099
executor.shutdownNow(); // if we timed out, waiting, cancel everything
81100
}
82101

102+
threadPool.shutdown();
83103
} catch (InterruptedException e) {
84104
log.debug("Exception closing executor: {}", e.getLocalizedMessage());
85105
}

operator-framework-core/src/main/java/io/javaoperatorsdk/operator/processing/event/EventSourceManager.java

Lines changed: 11 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -12,6 +12,7 @@
1212
import io.fabric8.kubernetes.api.model.HasMetadata;
1313
import io.javaoperatorsdk.operator.MissingCRDException;
1414
import io.javaoperatorsdk.operator.OperatorException;
15+
import io.javaoperatorsdk.operator.api.config.ExecutorServiceManager;
1516
import io.javaoperatorsdk.operator.api.config.NamespaceChangeable;
1617
import io.javaoperatorsdk.operator.api.reconciler.EventSourceInitializer;
1718
import io.javaoperatorsdk.operator.processing.Controller;
@@ -64,20 +65,24 @@ public void postProcessDefaultEventSourcesAfterProcessorInitializer() {
6465
@Override
6566
public synchronized void start() {
6667
startEventSource(eventSources.namedControllerResourceEventSource());
67-
eventSources.additionalNamedEventSources()
68+
69+
// starting event sources on the workflow executor which shouldn't be used at this point
70+
ExecutorServiceManager.executeInParallel(() -> eventSources.additionalNamedEventSources()
6871
.filter(es -> es.priority().equals(EventSourceStartPriority.RESOURCE_STATE_LOADER))
69-
.parallel().forEach(this::startEventSource);
70-
eventSources.additionalNamedEventSources()
72+
.parallel()
73+
.forEach(this::startEventSource));
74+
75+
ExecutorServiceManager.executeInParallel(() -> eventSources.additionalNamedEventSources()
7176
.filter(es -> es.priority().equals(EventSourceStartPriority.DEFAULT))
72-
.parallel().forEach(this::startEventSource);
77+
.parallel().forEach(this::startEventSource));
7378
}
7479

7580
@Override
7681
public synchronized void stop() {
7782
stopEventSource(eventSources.namedControllerResourceEventSource());
78-
eventSources.additionalNamedEventSources().parallel().forEach(this::stopEventSource);
83+
ExecutorServiceManager.executeInParallel(
84+
() -> eventSources.additionalNamedEventSources().parallel().forEach(this::stopEventSource));
7985
eventSources.clear();
80-
8186
}
8287

8388
@SuppressWarnings("rawtypes")

operator-framework-core/src/main/java/io/javaoperatorsdk/operator/processing/event/source/informer/InformerManager.java

Lines changed: 28 additions & 22 deletions
Original file line numberDiff line numberDiff line change
@@ -20,6 +20,7 @@
2020
import io.javaoperatorsdk.operator.ReconcilerUtils;
2121
import io.javaoperatorsdk.operator.api.config.Cloner;
2222
import io.javaoperatorsdk.operator.api.config.ConfigurationServiceProvider;
23+
import io.javaoperatorsdk.operator.api.config.ExecutorServiceManager;
2324
import io.javaoperatorsdk.operator.api.config.ResourceConfiguration;
2425
import io.javaoperatorsdk.operator.processing.LifecycleAware;
2526
import io.javaoperatorsdk.operator.processing.event.ResourceID;
@@ -41,7 +42,8 @@ public class InformerManager<T extends HasMetadata, C extends ResourceConfigurat
4142

4243
@Override
4344
public void start() throws OperatorException {
44-
sources.values().parallelStream().forEach(LifecycleAware::start);
45+
ExecutorServiceManager
46+
.executeInParallel(() -> sources.values().parallelStream().forEach(LifecycleAware::start));
4547
}
4648

4749
void initSources(MixedOperation<T, KubernetesResourceList<T>, Resource<T>> client,
@@ -78,18 +80,19 @@ public void changeNamespaces(Set<String> namespaces) {
7880
log.debug("Stopped informer {} for namespaces: {}", this, sourcesToRemove);
7981
sourcesToRemove.forEach(k -> sources.remove(k).stop());
8082

81-
namespaces.forEach(ns -> {
82-
if (!sources.containsKey(ns)) {
83-
final var source =
84-
createEventSource(
85-
client.inNamespace(ns).withLabelSelector(configuration.getLabelSelector()),
86-
eventHandler, ns);
87-
source.addIndexers(this.indexers);
88-
source.start();
89-
log.debug("Registered new {} -> {} for namespace: {}", this, source,
90-
ns);
91-
}
92-
});
83+
ExecutorServiceManager.executeInParallel(
84+
() -> namespaces.forEach(ns -> {
85+
if (!sources.containsKey(ns)) {
86+
final var source =
87+
createEventSource(
88+
client.inNamespace(ns).withLabelSelector(configuration.getLabelSelector()),
89+
eventHandler, ns);
90+
source.addIndexers(this.indexers);
91+
source.start();
92+
log.debug("Registered new {} -> {} for namespace: {}", this, source,
93+
ns);
94+
}
95+
}));
9396
}
9497

9598

@@ -105,15 +108,18 @@ private InformerWrapper<T> createEventSource(
105108

106109
@Override
107110
public void stop() {
108-
log.info("Stopping {}", this);
109-
sources.forEach((ns, source) -> {
110-
try {
111-
log.debug("Stopping informer for namespace: {} -> {}", ns, source);
112-
source.stop();
113-
} catch (Exception e) {
114-
log.warn("Error stopping informer for namespace: {} -> {}", ns, source, e);
115-
}
116-
});
111+
ExecutorServiceManager.executeInParallel(
112+
() -> {
113+
log.info("Stopping {}", this);
114+
sources.forEach((ns, source) -> {
115+
try {
116+
log.debug("Stopping informer for namespace: {} -> {}", ns, source);
117+
source.stop();
118+
} catch (Exception e) {
119+
log.warn("Error stopping informer for namespace: {} -> {}", ns, source, e);
120+
}
121+
});
122+
});
117123
}
118124

119125
@Override

0 commit comments

Comments
 (0)