Skip to content

Commit 5e9377a

Browse files
committed
fix: use a new executor service for each task to run
1 parent 7d007db commit 5e9377a

File tree

7 files changed

+43
-45
lines changed

7 files changed

+43
-45
lines changed

operator-framework-core/src/main/java/io/javaoperatorsdk/operator/ControllerManager.java

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -33,7 +33,7 @@ public synchronized void shouldStart() {
3333
}
3434

3535
public synchronized void start(boolean startEventProcessor) {
36-
ExecutorServiceManager.executeIOBoundTask(
36+
ExecutorServiceManager.executeAndWaitForCompletion(
3737
() -> controllers().parallelStream().forEach(c -> c.start(startEventProcessor)),
3838
"ControllerStart");
3939
started = true;
@@ -49,7 +49,7 @@ public synchronized void stop() {
4949
}
5050

5151
public synchronized void startEventProcessing() {
52-
ExecutorServiceManager.executeIOBoundTask(
52+
ExecutorServiceManager.executeAndWaitForCompletion(
5353
() -> controllers().parallelStream().forEach(Controller::startEventProcessing),
5454
"ControllerEventProcessing");
5555
}

operator-framework-core/src/main/java/io/javaoperatorsdk/operator/api/config/ExecutorServiceManager.java

Lines changed: 9 additions & 34 deletions
Original file line numberDiff line numberDiff line change
@@ -20,15 +20,12 @@ public class ExecutorServiceManager {
2020
private static ExecutorServiceManager instance;
2121
private final ExecutorService executor;
2222
private final ExecutorService workflowExecutor;
23-
private final ExecutorService ioBoundExecutor;
2423
private final int terminationTimeoutSeconds;
2524

2625
private ExecutorServiceManager(ExecutorService executor, ExecutorService workflowExecutor,
2726
int terminationTimeoutSeconds) {
2827
this.executor = new InstrumentedExecutorService(executor);
2928
this.workflowExecutor = new InstrumentedExecutorService(workflowExecutor);
30-
this.ioBoundExecutor = new InstrumentedExecutorService(
31-
Executors.newFixedThreadPool(Runtime.getRuntime().availableProcessors()));
3229
this.terminationTimeoutSeconds = terminationTimeoutSeconds;
3330
}
3431

@@ -75,30 +72,22 @@ public ExecutorService workflowExecutorService() {
7572
}
7673

7774
/**
78-
* Runs the specified I/O-bound task and waits for its completion using the ExecutorService
79-
* provided by {@link #workflowExecutorService()}
75+
* Runs the specified I/O-bound task and waits for its completion using the new ExecutorService
8076
*
8177
* @param task task to run concurrently
78+
* @param threadNamePrefix the prefix with which to prefix thread names when tasks are run this
79+
* way
8280
*/
8381
public static void executeAndWaitForCompletion(Runnable task, String threadNamePrefix) {
84-
executeAndWaitForCompletion(task, instance().ioBoundExecutor, threadNamePrefix);
85-
}
86-
87-
/**
88-
* Executes the specified I/O-bound task using the specified ExecutorService and waits for its
89-
* completion for at most {@link #terminationTimeoutSeconds} seconds.
90-
*
91-
* @param task task to run concurrently
92-
* @param executor ExecutorService used to run the task
93-
*/
94-
public static void executeAndWaitForCompletion(Runnable task, ExecutorService executor,
95-
String threadNamePrefix) {
82+
final var executor = Executors.newFixedThreadPool(Runtime.getRuntime().availableProcessors());
83+
ExecutorService instrumented = new InstrumentedExecutorService(executor);
9684
// change thread name for easier debugging
9785
final var thread = Thread.currentThread();
9886
final var name = thread.getName();
9987
try {
10088
thread.setName(threadNamePrefix + "-" + thread.getId());
101-
executor.submit(task).get(instance().terminationTimeoutSeconds, TimeUnit.SECONDS);
89+
instrumented.submit(task).get(instance().terminationTimeoutSeconds, TimeUnit.SECONDS);
90+
shutdown(instrumented);
10291
} catch (InterruptedException | ExecutionException | TimeoutException e) {
10392
throw new OperatorException("Couldn't execute task", e);
10493
} finally {
@@ -107,33 +96,19 @@ public static void executeAndWaitForCompletion(Runnable task, ExecutorService ex
10796
}
10897
}
10998

110-
public static void executeIOBoundTask(Runnable task, String threadNamePrefix) {
111-
// change thread name for easier debugging
112-
final var thread = Thread.currentThread();
113-
final var name = thread.getName();
114-
try {
115-
thread.setName(threadNamePrefix + "-" + thread.getId());
116-
instance().ioBoundExecutor.execute(task);
117-
} finally {
118-
// restore original name
119-
thread.setName(name);
120-
}
121-
}
122-
12399
private void doStop() {
124100
try {
125101
log.debug("Closing executor");
126102
shutdown(executor);
127103
shutdown(workflowExecutor);
128-
shutdown(ioBoundExecutor);
129104
} catch (InterruptedException e) {
130105
log.debug("Exception closing executor: {}", e.getLocalizedMessage());
131106
}
132107
}
133108

134-
private void shutdown(ExecutorService executorService) throws InterruptedException {
109+
private static void shutdown(ExecutorService executorService) throws InterruptedException {
135110
executorService.shutdown();
136-
if (!executorService.awaitTermination(terminationTimeoutSeconds, TimeUnit.SECONDS)) {
111+
if (!executorService.awaitTermination(instance().terminationTimeoutSeconds, TimeUnit.SECONDS)) {
137112
executorService.shutdownNow(); // if we timed out, waiting, cancel everything
138113
}
139114
}

operator-framework-core/src/main/java/io/javaoperatorsdk/operator/processing/event/EventSourceManager.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -84,7 +84,7 @@ public synchronized void start() {
8484
@Override
8585
public synchronized void stop() {
8686
stopEventSource(eventSources.namedControllerResourceEventSource());
87-
ExecutorServiceManager.executeIOBoundTask(
87+
ExecutorServiceManager.executeAndWaitForCompletion(
8888
() -> eventSources.additionalNamedEventSources().parallel()
8989
.forEach(this::stopEventSource),
9090
"EventSourceStop");

operator-framework-core/src/main/java/io/javaoperatorsdk/operator/processing/event/source/informer/InformerManager.java

Lines changed: 15 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -47,8 +47,21 @@ public class InformerManager<T extends HasMetadata, C extends ResourceConfigurat
4747
@Override
4848
public void start() throws OperatorException {
4949
// make sure informers are all started before proceeding further
50-
ExecutorServiceManager.executeIOBoundTask(
51-
() -> sources.values().parallelStream().forEach(LifecycleAware::start),
50+
ExecutorServiceManager.executeAndWaitForCompletion(
51+
() -> sources.values().parallelStream().forEach(source -> {
52+
// change thread name for easier debugging
53+
final var thread = Thread.currentThread();
54+
final var name = thread.getName();
55+
try {
56+
thread.setName(source.informerInfo() + " " + thread.getId());
57+
source.start();
58+
} catch (Exception e) {
59+
throw new OperatorException("Couldn't start informer: " + source, e);
60+
} finally {
61+
// restore original name
62+
thread.setName(name);
63+
}
64+
}),
5265
"InformerStart");
5366
}
5467

operator-framework-core/src/main/java/io/javaoperatorsdk/operator/processing/event/source/informer/InformerWrapper.java

Lines changed: 5 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -143,6 +143,10 @@ public List<T> byIndex(String indexName, String indexKey) {
143143

144144
@Override
145145
public String toString() {
146-
return "InformerWrapper [" + versionedFullResourceName() + "] (" + informer + ')';
146+
return informerInfo() + " (" + informer + ')';
147+
}
148+
149+
String informerInfo() {
150+
return "InformerWrapper [" + versionedFullResourceName() + "]";
147151
}
148152
}

operator-framework-core/src/test/java/io/javaoperatorsdk/operator/processing/event/EventSourceManagerTest.java

Lines changed: 10 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -4,6 +4,7 @@
44

55
import org.junit.jupiter.api.Test;
66

7+
import io.fabric8.kubernetes.api.model.ConfigMap;
78
import io.fabric8.kubernetes.api.model.HasMetadata;
89
import io.javaoperatorsdk.operator.MockKubernetesClient;
910
import io.javaoperatorsdk.operator.OperatorException;
@@ -24,7 +25,13 @@
2425
import static org.junit.jupiter.api.Assertions.assertEquals;
2526
import static org.junit.jupiter.api.Assertions.assertThrows;
2627
import static org.junit.jupiter.api.Assertions.assertTrue;
27-
import static org.mockito.Mockito.*;
28+
import static org.mockito.Mockito.any;
29+
import static org.mockito.Mockito.doReturn;
30+
import static org.mockito.Mockito.mock;
31+
import static org.mockito.Mockito.spy;
32+
import static org.mockito.Mockito.times;
33+
import static org.mockito.Mockito.verify;
34+
import static org.mockito.Mockito.when;
2835

2936
@SuppressWarnings({"rawtypes", "unchecked"})
3037
class EventSourceManagerTest {
@@ -165,9 +172,9 @@ void changesNamespacesOnControllerAndInformerEventSources() {
165172
}
166173

167174
private EventSourceManager initManager() {
168-
final var configuration = MockControllerConfiguration.forResource(HasMetadata.class);
175+
final var configuration = MockControllerConfiguration.forResource(ConfigMap.class);
169176
final Controller controller = new Controller(mock(Reconciler.class), configuration,
170-
MockKubernetesClient.client(HasMetadata.class));
177+
MockKubernetesClient.client(ConfigMap.class));
171178
return new EventSourceManager(controller);
172179
}
173180
}

operator-framework-core/src/test/java/io/javaoperatorsdk/operator/processing/event/source/informer/InformerEventSourceTest.java

Lines changed: 1 addition & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -19,7 +19,6 @@
1919
import io.javaoperatorsdk.operator.sample.simple.TestCustomResource;
2020

2121
import static io.javaoperatorsdk.operator.api.reconciler.Constants.DEFAULT_NAMESPACES_SET;
22-
import static org.junit.jupiter.api.Assertions.assertThrows;
2322
import static org.mockito.ArgumentMatchers.any;
2423
import static org.mockito.ArgumentMatchers.eq;
2524
import static org.mockito.Mockito.atLeastOnce;
@@ -259,7 +258,7 @@ void informerStoppedHandlerShouldBeCalledWhenInformerStops() {
259258

260259
// by default informer fails to start if there is an exception in the client on start.
261260
// Throws the exception further.
262-
assertThrows(RuntimeException.class, () -> informerEventSource.start());
261+
informerEventSource.start();
263262
verify(informerStoppedHandler, atLeastOnce()).onStop(any(), eq(exception));
264263
} finally {
265264
ConfigurationServiceProvider.reset();

0 commit comments

Comments
 (0)