Skip to content

Commit 5034220

Browse files
committed
fix: use separate executor to start/stop EventSources
1 parent 89e3a72 commit 5034220

File tree

3 files changed

+30
-16
lines changed

3 files changed

+30
-16
lines changed

operator-framework-core/src/main/java/io/javaoperatorsdk/operator/api/config/ExecutorServiceManager.java

Lines changed: 25 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -5,6 +5,7 @@
55
import java.util.concurrent.Callable;
66
import java.util.concurrent.ExecutionException;
77
import java.util.concurrent.ExecutorService;
8+
import java.util.concurrent.Executors;
89
import java.util.concurrent.Future;
910
import java.util.concurrent.TimeUnit;
1011
import java.util.concurrent.TimeoutException;
@@ -19,12 +20,15 @@ public class ExecutorServiceManager {
1920
private static ExecutorServiceManager instance;
2021
private final ExecutorService executor;
2122
private final ExecutorService workflowExecutor;
23+
private final ExecutorService ioBoundExecutor;
2224
private final int terminationTimeoutSeconds;
2325

2426
private ExecutorServiceManager(ExecutorService executor, ExecutorService workflowExecutor,
2527
int terminationTimeoutSeconds) {
2628
this.executor = new InstrumentedExecutorService(executor);
2729
this.workflowExecutor = new InstrumentedExecutorService(workflowExecutor);
30+
this.ioBoundExecutor = new InstrumentedExecutorService(
31+
Executors.newFixedThreadPool(Runtime.getRuntime().availableProcessors()));
2832
this.terminationTimeoutSeconds = terminationTimeoutSeconds;
2933
}
3034

@@ -76,8 +80,8 @@ public ExecutorService workflowExecutorService() {
7680
*
7781
* @param task task to run concurrently
7882
*/
79-
public static void executeAndWaitForCompletion(Runnable task) {
80-
executeAndWaitForCompletion(task, instance().workflowExecutorService());
83+
public static void executeAndWaitForCompletion(Runnable task, String threadNamePrefix) {
84+
executeAndWaitForCompletion(task, instance().ioBoundExecutor, threadNamePrefix);
8185
}
8286

8387
/**
@@ -87,30 +91,40 @@ public static void executeAndWaitForCompletion(Runnable task) {
8791
* @param task task to run concurrently
8892
* @param executor ExecutorService used to run the task
8993
*/
90-
public static void executeAndWaitForCompletion(Runnable task, ExecutorService executor) {
94+
public static void executeAndWaitForCompletion(Runnable task, ExecutorService executor,
95+
String threadNamePrefix) {
96+
// change thread name for easier debugging
97+
final var thread = Thread.currentThread();
98+
final var name = thread.getName();
9199
try {
100+
thread.setName(threadNamePrefix + "-" + thread.getId());
92101
executor.submit(task).get(instance().terminationTimeoutSeconds, TimeUnit.SECONDS);
93102
} catch (InterruptedException | ExecutionException | TimeoutException e) {
94103
throw new OperatorException("Couldn't execute task", e);
104+
} finally {
105+
// restore original name
106+
thread.setName(name);
95107
}
96108
}
97109

98110
private void doStop() {
99111
try {
100112
log.debug("Closing executor");
101-
executor.shutdown();
102-
workflowExecutor.shutdown();
103-
if (!workflowExecutor.awaitTermination(terminationTimeoutSeconds, TimeUnit.SECONDS)) {
104-
workflowExecutor.shutdownNow(); // if we timed out, waiting, cancel everything
105-
}
106-
if (!executor.awaitTermination(terminationTimeoutSeconds, TimeUnit.SECONDS)) {
107-
executor.shutdownNow(); // if we timed out, waiting, cancel everything
108-
}
113+
shutdown(executor);
114+
shutdown(workflowExecutor);
115+
shutdown(ioBoundExecutor);
109116
} catch (InterruptedException e) {
110117
log.debug("Exception closing executor: {}", e.getLocalizedMessage());
111118
}
112119
}
113120

121+
private void shutdown(ExecutorService executorService) throws InterruptedException {
122+
executorService.shutdown();
123+
if (!executorService.awaitTermination(terminationTimeoutSeconds, TimeUnit.SECONDS)) {
124+
executorService.shutdownNow(); // if we timed out, waiting, cancel everything
125+
}
126+
}
127+
114128
private static class InstrumentedExecutorService implements ExecutorService {
115129
private final boolean debug;
116130
private final ExecutorService executor;

operator-framework-core/src/main/java/io/javaoperatorsdk/operator/processing/event/EventSourceManager.java

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -71,20 +71,20 @@ public synchronized void start() {
7171
.executeAndWaitForCompletion(() -> eventSources.additionalNamedEventSources()
7272
.filter(es -> es.priority().equals(EventSourceStartPriority.RESOURCE_STATE_LOADER))
7373
.parallel()
74-
.forEach(this::startEventSource));
74+
.forEach(this::startEventSource), "LowLevelEventSourceStart");
7575

7676
ExecutorServiceManager
7777
.executeAndWaitForCompletion(() -> eventSources.additionalNamedEventSources()
7878
.filter(es -> es.priority().equals(EventSourceStartPriority.DEFAULT))
79-
.parallel().forEach(this::startEventSource));
79+
.parallel().forEach(this::startEventSource), "DefaultEventSourceStart");
8080
}
8181

8282
@Override
8383
public synchronized void stop() {
8484
stopEventSource(eventSources.namedControllerResourceEventSource());
8585
ExecutorServiceManager
8686
.executeAndWaitForCompletion(() -> eventSources.additionalNamedEventSources().parallel()
87-
.forEach(this::stopEventSource));
87+
.forEach(this::stopEventSource), "EventSourceStop");
8888
eventSources.clear();
8989
}
9090

operator-framework-core/src/main/java/io/javaoperatorsdk/operator/processing/event/source/informer/InformerManager.java

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -48,7 +48,7 @@ public class InformerManager<T extends HasMetadata, C extends ResourceConfigurat
4848
public void start() throws OperatorException {
4949
// make sure informers are all started before proceeding further
5050
ExecutorServiceManager.executeAndWaitForCompletion(
51-
() -> sources.values().parallelStream().forEach(LifecycleAware::start));
51+
() -> sources.values().parallelStream().forEach(LifecycleAware::start), "InformerStart");
5252
}
5353

5454
void initSources(MixedOperation<T, KubernetesResourceList<T>, Resource<T>> client,
@@ -97,7 +97,7 @@ public void changeNamespaces(Set<String> namespaces) {
9797
log.debug("Registered new {} -> {} for namespace: {}", this, source,
9898
ns);
9999
}
100-
}));
100+
}), "InformerStart");
101101
}
102102

103103

0 commit comments

Comments
 (0)