Skip to content

Commit 436fbf0

Browse files
committed
fix: some operations just need to spawn, not be waited on to finish
1 parent 5034220 commit 436fbf0

File tree

4 files changed

+43
-19
lines changed

4 files changed

+43
-19
lines changed

operator-framework-core/src/main/java/io/javaoperatorsdk/operator/ControllerManager.java

Lines changed: 12 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -8,6 +8,7 @@
88
import org.slf4j.Logger;
99
import org.slf4j.LoggerFactory;
1010

11+
import io.javaoperatorsdk.operator.api.config.ExecutorServiceManager;
1112
import io.javaoperatorsdk.operator.processing.Controller;
1213

1314
/**
@@ -32,20 +33,25 @@ public synchronized void shouldStart() {
3233
}
3334

3435
public synchronized void start(boolean startEventProcessor) {
35-
controllers().parallelStream().forEach(c -> c.start(startEventProcessor));
36+
ExecutorServiceManager.executeIOBoundTask(
37+
() -> controllers().parallelStream().forEach(c -> c.start(startEventProcessor)),
38+
"ControllerStart");
3639
started = true;
3740
}
3841

3942
public synchronized void stop() {
40-
controllers().parallelStream().forEach(closeable -> {
41-
log.debug("closing {}", closeable);
42-
closeable.stop();
43-
});
43+
ExecutorServiceManager.executeAndWaitForCompletion(
44+
() -> controllers().parallelStream().forEach(closeable -> {
45+
log.debug("closing {}", closeable);
46+
closeable.stop();
47+
}), "ControllerStop");
4448
started = false;
4549
}
4650

4751
public synchronized void startEventProcessing() {
48-
controllers().parallelStream().forEach(Controller::startEventProcessing);
52+
ExecutorServiceManager.executeIOBoundTask(
53+
() -> controllers().parallelStream().forEach(Controller::startEventProcessing),
54+
"ControllerEventProcessing");
4955
}
5056

5157
@SuppressWarnings({"unchecked", "rawtypes"})

operator-framework-core/src/main/java/io/javaoperatorsdk/operator/api/config/ExecutorServiceManager.java

Lines changed: 13 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -107,6 +107,19 @@ public static void executeAndWaitForCompletion(Runnable task, ExecutorService ex
107107
}
108108
}
109109

110+
public static void executeIOBoundTask(Runnable task, String threadNamePrefix) {
111+
// change thread name for easier debugging
112+
final var thread = Thread.currentThread();
113+
final var name = thread.getName();
114+
try {
115+
thread.setName(threadNamePrefix + "-" + thread.getId());
116+
instance().ioBoundExecutor.execute(task);
117+
} finally {
118+
// restore original name
119+
thread.setName(name);
120+
}
121+
}
122+
110123
private void doStop() {
111124
try {
112125
log.debug("Closing executor");

operator-framework-core/src/main/java/io/javaoperatorsdk/operator/processing/event/EventSourceManager.java

Lines changed: 12 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -67,24 +67,27 @@ public synchronized void start() {
6767
startEventSource(eventSources.namedControllerResourceEventSource());
6868

6969
// starting event sources on the workflow executor which shouldn't be used at this point
70-
ExecutorServiceManager
71-
.executeAndWaitForCompletion(() -> eventSources.additionalNamedEventSources()
70+
ExecutorServiceManager.executeAndWaitForCompletion(
71+
() -> eventSources.additionalNamedEventSources()
7272
.filter(es -> es.priority().equals(EventSourceStartPriority.RESOURCE_STATE_LOADER))
7373
.parallel()
74-
.forEach(this::startEventSource), "LowLevelEventSourceStart");
74+
.forEach(this::startEventSource),
75+
"LowLevelEventSourceStart");
7576

76-
ExecutorServiceManager
77-
.executeAndWaitForCompletion(() -> eventSources.additionalNamedEventSources()
77+
ExecutorServiceManager.executeAndWaitForCompletion(
78+
() -> eventSources.additionalNamedEventSources()
7879
.filter(es -> es.priority().equals(EventSourceStartPriority.DEFAULT))
79-
.parallel().forEach(this::startEventSource), "DefaultEventSourceStart");
80+
.parallel().forEach(this::startEventSource),
81+
"DefaultEventSourceStart");
8082
}
8183

8284
@Override
8385
public synchronized void stop() {
8486
stopEventSource(eventSources.namedControllerResourceEventSource());
85-
ExecutorServiceManager
86-
.executeAndWaitForCompletion(() -> eventSources.additionalNamedEventSources().parallel()
87-
.forEach(this::stopEventSource), "EventSourceStop");
87+
ExecutorServiceManager.executeIOBoundTask(
88+
() -> eventSources.additionalNamedEventSources().parallel()
89+
.forEach(this::stopEventSource),
90+
"EventSourceStop");
8891
eventSources.clear();
8992
}
9093

operator-framework-core/src/main/java/io/javaoperatorsdk/operator/processing/event/source/informer/InformerManager.java

Lines changed: 6 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -47,8 +47,9 @@ public class InformerManager<T extends HasMetadata, C extends ResourceConfigurat
4747
@Override
4848
public void start() throws OperatorException {
4949
// make sure informers are all started before proceeding further
50-
ExecutorServiceManager.executeAndWaitForCompletion(
51-
() -> sources.values().parallelStream().forEach(LifecycleAware::start), "InformerStart");
50+
ExecutorServiceManager.executeIOBoundTask(
51+
() -> sources.values().parallelStream().forEach(LifecycleAware::start),
52+
"InformerStart");
5253
}
5354

5455
void initSources(MixedOperation<T, KubernetesResourceList<T>, Resource<T>> client,
@@ -113,7 +114,7 @@ private InformerWrapper<T> createEventSource(
113114

114115
@Override
115116
public void stop() {
116-
ExecutorServiceManager.instance().workflowExecutorService().execute(
117+
ExecutorServiceManager.executeAndWaitForCompletion(
117118
() -> {
118119
log.info("Stopping {}", this);
119120
sources.forEach((ns, source) -> {
@@ -124,7 +125,8 @@ public void stop() {
124125
log.warn("Error stopping informer for namespace: {} -> {}", ns, source, e);
125126
}
126127
});
127-
});
128+
},
129+
"StopInformer");
128130
}
129131

130132
@Override

0 commit comments

Comments
 (0)