Skip to content

Commit 80716a8

Browse files
csvirimetacosm
andauthored
feat: separate executor service for workflow (#1371)
Co-authored-by: Chris Laprun <metacosm@gmail.com>
1 parent d07b063 commit 80716a8

File tree

6 files changed

+52
-15
lines changed

6 files changed

+52
-15
lines changed

operator-framework-core/src/main/java/io/javaoperatorsdk/operator/api/config/ConfigurationService.java

Lines changed: 10 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -80,6 +80,12 @@ default int concurrentReconciliationThreads() {
8080
return DEFAULT_RECONCILIATION_THREADS_NUMBER;
8181
}
8282

83+
int DEFAULT_WORKFLOW_EXECUTOR_THREAD_NUMBER = DEFAULT_RECONCILIATION_THREADS_NUMBER;
84+
85+
default int concurrentWorkflowExecutorThreads() {
86+
return DEFAULT_WORKFLOW_EXECUTOR_THREAD_NUMBER;
87+
}
88+
8389
/**
8490
* Used to clone custom resources. It is strongly suggested that implementors override this method
8591
* since the default implementation creates a new {@link Cloner} instance each time this method is
@@ -122,6 +128,10 @@ default ExecutorService getExecutorService() {
122128
return Executors.newFixedThreadPool(concurrentReconciliationThreads());
123129
}
124130

131+
default ExecutorService getWorkflowExecutorService() {
132+
return Executors.newFixedThreadPool(concurrentWorkflowExecutorThreads());
133+
}
134+
125135
default boolean closeClientOnStop() {
126136
return true;
127137
}

operator-framework-core/src/main/java/io/javaoperatorsdk/operator/api/config/ConfigurationServiceOverrider.java

Lines changed: 13 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -22,6 +22,7 @@ public class ConfigurationServiceOverrider {
2222
private Boolean closeClientOnStop;
2323
private ObjectMapper objectMapper;
2424
private ExecutorService executorService;
25+
private ExecutorService workflowExecutorService;
2526
private LeaderElectionConfiguration leaderElectionConfiguration;
2627

2728
ConfigurationServiceOverrider(ConfigurationService original) {
@@ -68,6 +69,12 @@ public ConfigurationServiceOverrider withExecutorService(ExecutorService executo
6869
return this;
6970
}
7071

72+
public ConfigurationServiceOverrider withWorkflowExecutorService(
73+
ExecutorService workflowExecutorService) {
74+
this.workflowExecutorService = workflowExecutorService;
75+
return this;
76+
}
77+
7178
public ConfigurationServiceOverrider withObjectMapper(ObjectMapper objectMapper) {
7279
this.objectMapper = objectMapper;
7380
return this;
@@ -121,6 +128,12 @@ public ExecutorService getExecutorService() {
121128
return executorService != null ? executorService : original.getExecutorService();
122129
}
123130

131+
@Override
132+
public ExecutorService getWorkflowExecutorService() {
133+
return workflowExecutorService != null ? workflowExecutorService
134+
: original.getWorkflowExecutorService();
135+
}
136+
124137
@Override
125138
public ObjectMapper getObjectMapper() {
126139
return objectMapper != null ? objectMapper : original.getObjectMapper();

operator-framework-core/src/main/java/io/javaoperatorsdk/operator/api/config/ExecutorServiceManager.java

Lines changed: 22 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -15,31 +15,35 @@
1515
public class ExecutorServiceManager {
1616
private static final Logger log = LoggerFactory.getLogger(ExecutorServiceManager.class);
1717
private static ExecutorServiceManager instance;
18-
1918
private final ExecutorService executor;
19+
private final ExecutorService workflowExecutor;
2020
private final int terminationTimeoutSeconds;
2121

22-
private ExecutorServiceManager(InstrumentedExecutorService executor,
22+
private ExecutorServiceManager(ExecutorService executor, ExecutorService workflowExecutor,
2323
int terminationTimeoutSeconds) {
24-
this.executor = executor;
24+
this.executor = new InstrumentedExecutorService(executor);
25+
this.workflowExecutor = new InstrumentedExecutorService(workflowExecutor);
2526
this.terminationTimeoutSeconds = terminationTimeoutSeconds;
2627
}
2728

2829
public static void init() {
2930
if (instance == null) {
3031
final var configuration = ConfigurationServiceProvider.instance();
31-
instance = new ExecutorServiceManager(
32-
new InstrumentedExecutorService(configuration.getExecutorService()),
32+
final var executorService = configuration.getExecutorService();
33+
final var workflowExecutorService = configuration.getWorkflowExecutorService();
34+
instance = new ExecutorServiceManager(executorService, workflowExecutorService,
3335
configuration.getTerminationTimeoutSeconds());
34-
log.debug("Initialized ExecutorServiceManager executor: {}, timeout: {}",
35-
configuration.getExecutorService().getClass(),
36+
log.debug(
37+
"Initialized ExecutorServiceManager executor: {}, workflow executor: {}, timeout: {}",
38+
executorService.getClass(),
39+
workflowExecutorService.getClass(),
3640
configuration.getTerminationTimeoutSeconds());
3741
} else {
3842
log.debug("Already started, reusing already setup instance!");
3943
}
4044
}
4145

42-
public static void stop() {
46+
public synchronized static void stop() {
4347
if (instance != null) {
4448
instance.doStop();
4549
}
@@ -48,7 +52,7 @@ public static void stop() {
4852
instance = null;
4953
}
5054

51-
public static ExecutorServiceManager instance() {
55+
public synchronized static ExecutorServiceManager instance() {
5256
if (instance == null) {
5357
// provide a default configuration if none has been provided by init
5458
init();
@@ -60,13 +64,22 @@ public ExecutorService executorService() {
6064
return executor;
6165
}
6266

67+
public ExecutorService workflowExecutorService() {
68+
return workflowExecutor;
69+
}
70+
6371
private void doStop() {
6472
try {
6573
log.debug("Closing executor");
6674
executor.shutdown();
75+
workflowExecutor.shutdown();
76+
if (!workflowExecutor.awaitTermination(terminationTimeoutSeconds, TimeUnit.SECONDS)) {
77+
workflowExecutor.shutdownNow(); // if we timed out, waiting, cancel everything
78+
}
6779
if (!executor.awaitTermination(terminationTimeoutSeconds, TimeUnit.SECONDS)) {
6880
executor.shutdownNow(); // if we timed out, waiting, cancel everything
6981
}
82+
7083
} catch (InterruptedException e) {
7184
log.debug("Exception closing executor: {}", e.getLocalizedMessage());
7285
}

operator-framework-core/src/main/java/io/javaoperatorsdk/operator/processing/dependent/workflow/Workflow.java

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -7,7 +7,7 @@
77
import java.util.stream.Collectors;
88

99
import io.fabric8.kubernetes.api.model.HasMetadata;
10-
import io.javaoperatorsdk.operator.api.config.ConfigurationServiceProvider;
10+
import io.javaoperatorsdk.operator.api.config.ExecutorServiceManager;
1111
import io.javaoperatorsdk.operator.api.reconciler.Context;
1212
import io.javaoperatorsdk.operator.api.reconciler.dependent.DependentResource;
1313

@@ -30,7 +30,7 @@ public class Workflow<P extends HasMetadata> {
3030
private ExecutorService executorService;
3131

3232
public Workflow(Set<DependentResourceNode> dependentResourceNodes) {
33-
this.executorService = ConfigurationServiceProvider.instance().getExecutorService();
33+
this.executorService = ExecutorServiceManager.instance().workflowExecutorService();
3434
this.dependentResourceNodes = dependentResourceNodes;
3535
this.throwExceptionAutomatically = THROW_EXCEPTION_AUTOMATICALLY_DEFAULT;
3636
preprocessForReconcile();

operator-framework-core/src/main/java/io/javaoperatorsdk/operator/processing/dependent/workflow/builder/WorkflowBuilder.java

Lines changed: 4 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -6,7 +6,7 @@
66
import java.util.concurrent.ExecutorService;
77

88
import io.fabric8.kubernetes.api.model.HasMetadata;
9-
import io.javaoperatorsdk.operator.api.config.ConfigurationServiceProvider;
9+
import io.javaoperatorsdk.operator.api.config.ExecutorServiceManager;
1010
import io.javaoperatorsdk.operator.api.reconciler.dependent.DependentResource;
1111
import io.javaoperatorsdk.operator.processing.dependent.workflow.Condition;
1212
import io.javaoperatorsdk.operator.processing.dependent.workflow.DependentResourceNode;
@@ -75,8 +75,9 @@ public WorkflowBuilder<P> withThrowExceptionFurther(boolean throwExceptionFurthe
7575
}
7676

7777
public Workflow<P> build() {
78-
return new Workflow(dependentResourceNodes,
79-
ConfigurationServiceProvider.instance().getExecutorService(), throwExceptionAutomatically);
78+
return new Workflow(
79+
dependentResourceNodes, ExecutorServiceManager.instance().workflowExecutorService(),
80+
throwExceptionAutomatically);
8081
}
8182

8283
public Workflow<P> build(int parallelism) {

sample-operators/webpage/src/test/java/io/javaoperatorsdk/operator/sample/WebPageOperatorAbstractTest.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -56,7 +56,7 @@ void testAddingWebPage() {
5656
() -> {
5757
var actual = operator().get(WebPage.class, TEST_PAGE);
5858
var deployment = operator().get(Deployment.class, deploymentName(webPage));
59-
59+
assertThat(actual.getStatus()).isNotNull();
6060
assertThat(actual.getStatus().getAreWeGood()).isTrue();
6161
assertThat(deployment.getSpec().getReplicas())
6262
.isEqualTo(deployment.getStatus().getReadyReplicas());

0 commit comments

Comments
 (0)