Skip to content

Commit 67076c8

Browse files
committed
fix: use workflow service to start/stop event sources
IO-bound operations shouldn't use ForkJoinPools and workflow service should be in use during these operations so it should be available to run these tasks.
1 parent eedd830 commit 67076c8

File tree

3 files changed

+29
-29
lines changed

3 files changed

+29
-29
lines changed

operator-framework-core/src/main/java/io/javaoperatorsdk/operator/api/config/ExecutorServiceManager.java

Lines changed: 7 additions & 15 deletions
Original file line numberDiff line numberDiff line change
@@ -5,21 +5,20 @@
55
import java.util.concurrent.Callable;
66
import java.util.concurrent.ExecutionException;
77
import java.util.concurrent.ExecutorService;
8-
import java.util.concurrent.ForkJoinPool;
98
import java.util.concurrent.Future;
109
import java.util.concurrent.TimeUnit;
1110
import java.util.concurrent.TimeoutException;
1211

1312
import org.slf4j.Logger;
1413
import org.slf4j.LoggerFactory;
1514

15+
import io.javaoperatorsdk.operator.OperatorException;
16+
1617
public class ExecutorServiceManager {
1718
private static final Logger log = LoggerFactory.getLogger(ExecutorServiceManager.class);
1819
private static ExecutorServiceManager instance;
1920
private final ExecutorService executor;
2021
private final ExecutorService workflowExecutor;
21-
private final ForkJoinPool threadPool =
22-
new ForkJoinPool(Runtime.getRuntime().availableProcessors());
2322
private final int terminationTimeoutSeconds;
2423

2524
private ExecutorServiceManager(ExecutorService executor, ExecutorService workflowExecutor,
@@ -71,25 +70,18 @@ public ExecutorService workflowExecutorService() {
7170
return workflowExecutor;
7271
}
7372

74-
public static void executeInParallel(Runnable callable) {
75-
instance().executeInParallel(() -> {
76-
callable.run();
77-
return null;
78-
});
79-
}
80-
81-
public <T> T executeInParallel(Callable<T> callable) {
73+
public static void executeAndWaitForCompletion(Runnable task) {
8274
try {
83-
return threadPool.submit(callable).get();
84-
} catch (InterruptedException | ExecutionException e) {
85-
throw new RuntimeException(e);
75+
instance().workflowExecutorService().submit(task)
76+
.get(instance().terminationTimeoutSeconds, TimeUnit.SECONDS);
77+
} catch (InterruptedException | ExecutionException | TimeoutException e) {
78+
throw new OperatorException("Couldn't execute task", e);
8679
}
8780
}
8881

8982
private void doStop() {
9083
try {
9184
log.debug("Closing executor");
92-
threadPool.shutdown();
9385
executor.shutdown();
9486
workflowExecutor.shutdown();
9587
if (!workflowExecutor.awaitTermination(terminationTimeoutSeconds, TimeUnit.SECONDS)) {

operator-framework-core/src/main/java/io/javaoperatorsdk/operator/processing/event/EventSourceManager.java

Lines changed: 12 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -67,21 +67,24 @@ public synchronized void start() {
6767
startEventSource(eventSources.namedControllerResourceEventSource());
6868

6969
// starting event sources on the workflow executor which shouldn't be used at this point
70-
ExecutorServiceManager.executeInParallel(() -> eventSources.additionalNamedEventSources()
71-
.filter(es -> es.priority().equals(EventSourceStartPriority.RESOURCE_STATE_LOADER))
72-
.parallel()
73-
.forEach(this::startEventSource));
70+
ExecutorServiceManager
71+
.executeAndWaitForCompletion(() -> eventSources.additionalNamedEventSources()
72+
.filter(es -> es.priority().equals(EventSourceStartPriority.RESOURCE_STATE_LOADER))
73+
.parallel()
74+
.forEach(this::startEventSource));
7475

75-
ExecutorServiceManager.executeInParallel(() -> eventSources.additionalNamedEventSources()
76-
.filter(es -> es.priority().equals(EventSourceStartPriority.DEFAULT))
77-
.parallel().forEach(this::startEventSource));
76+
ExecutorServiceManager
77+
.executeAndWaitForCompletion(() -> eventSources.additionalNamedEventSources()
78+
.filter(es -> es.priority().equals(EventSourceStartPriority.DEFAULT))
79+
.parallel().forEach(this::startEventSource));
7880
}
7981

8082
@Override
8183
public synchronized void stop() {
8284
stopEventSource(eventSources.namedControllerResourceEventSource());
83-
ExecutorServiceManager.executeInParallel(
84-
() -> eventSources.additionalNamedEventSources().parallel().forEach(this::stopEventSource));
85+
ExecutorServiceManager
86+
.executeAndWaitForCompletion(() -> eventSources.additionalNamedEventSources().parallel()
87+
.forEach(this::stopEventSource));
8588
eventSources.clear();
8689
}
8790

operator-framework-core/src/main/java/io/javaoperatorsdk/operator/processing/event/source/informer/InformerManager.java

Lines changed: 10 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,10 @@
11
package io.javaoperatorsdk.operator.processing.event.source.informer;
22

3-
import java.util.*;
3+
import java.util.HashMap;
4+
import java.util.List;
5+
import java.util.Map;
6+
import java.util.Optional;
7+
import java.util.Set;
48
import java.util.concurrent.ConcurrentHashMap;
59
import java.util.function.Function;
610
import java.util.function.Predicate;
@@ -42,8 +46,9 @@ public class InformerManager<T extends HasMetadata, C extends ResourceConfigurat
4246

4347
@Override
4448
public void start() throws OperatorException {
45-
ExecutorServiceManager
46-
.executeInParallel(() -> sources.values().parallelStream().forEach(LifecycleAware::start));
49+
// make sure informers are all started before proceeding further
50+
ExecutorServiceManager.executeAndWaitForCompletion(
51+
() -> sources.values().parallelStream().forEach(LifecycleAware::start));
4752
}
4853

4954
void initSources(MixedOperation<T, KubernetesResourceList<T>, Resource<T>> client,
@@ -80,7 +85,7 @@ public void changeNamespaces(Set<String> namespaces) {
8085
log.debug("Stopped informer {} for namespaces: {}", this, sourcesToRemove);
8186
sourcesToRemove.forEach(k -> sources.remove(k).stop());
8287

83-
ExecutorServiceManager.executeInParallel(
88+
ExecutorServiceManager.executeAndWaitForCompletion(
8489
() -> namespaces.forEach(ns -> {
8590
if (!sources.containsKey(ns)) {
8691
final var source =
@@ -108,7 +113,7 @@ private InformerWrapper<T> createEventSource(
108113

109114
@Override
110115
public void stop() {
111-
ExecutorServiceManager.executeInParallel(
116+
ExecutorServiceManager.instance().workflowExecutorService().execute(
112117
() -> {
113118
log.info("Stopping {}", this);
114119
sources.forEach((ns, source) -> {

0 commit comments

Comments
 (0)