Skip to content

Commit 4e605cd

Browse files
csvirimetacosm
authored andcommitted
simple impl
1 parent 32d91dc commit 4e605cd

File tree

5 files changed

+41
-3
lines changed

5 files changed

+41
-3
lines changed

operator-framework-core/src/main/java/io/javaoperatorsdk/operator/api/config/ConfigurationService.java

Lines changed: 10 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -80,6 +80,12 @@ default int concurrentReconciliationThreads() {
8080
return DEFAULT_RECONCILIATION_THREADS_NUMBER;
8181
}
8282

83+
int DEFAULT_WORKFLOW_EXECUTOR_THREAD_NUMBER = DEFAULT_RECONCILIATION_THREADS_NUMBER;
84+
85+
default int concurrentWorkflowExecutorThreads() {
86+
return DEFAULT_WORKFLOW_EXECUTOR_THREAD_NUMBER;
87+
}
88+
8389
/**
8490
* Used to clone custom resources. It is strongly suggested that implementors override this method
8591
* since the default implementation creates a new {@link Cloner} instance each time this method is
@@ -122,6 +128,10 @@ default ExecutorService getExecutorService() {
122128
return Executors.newFixedThreadPool(concurrentReconciliationThreads());
123129
}
124130

131+
default ExecutorService getWorkflowExecutorService() {
132+
return Executors.newFixedThreadPool(concurrentWorkflowExecutorThreads());
133+
}
134+
125135
default boolean closeClientOnStop() {
126136
return true;
127137
}

operator-framework-core/src/main/java/io/javaoperatorsdk/operator/api/config/ConfigurationServiceOverrider.java

Lines changed: 12 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -22,6 +22,7 @@ public class ConfigurationServiceOverrider {
2222
private Boolean closeClientOnStop;
2323
private ObjectMapper objectMapper;
2424
private ExecutorService executorService;
25+
private ExecutorService workflowExecutorService;
2526
private LeaderElectionConfiguration leaderElectionConfiguration;
2627

2728
ConfigurationServiceOverrider(ConfigurationService original) {
@@ -68,6 +69,12 @@ public ConfigurationServiceOverrider withExecutorService(ExecutorService executo
6869
return this;
6970
}
7071

72+
public ConfigurationServiceOverrider withWorkflowExecutorService(
73+
ExecutorService workflowExecutorService) {
74+
this.workflowExecutorService = workflowExecutorService;
75+
return this;
76+
}
77+
7178
public ConfigurationServiceOverrider withObjectMapper(ObjectMapper objectMapper) {
7279
this.objectMapper = objectMapper;
7380
return this;
@@ -121,6 +128,11 @@ public ExecutorService getExecutorService() {
121128
return executorService != null ? executorService : original.getExecutorService();
122129
}
123130

131+
@Override
132+
public ExecutorService getWorkflowExecutorService() {
133+
return workflowExecutorService != null ? workflowExecutorService : original.getWorkflowExecutorService();
134+
}
135+
124136
@Override
125137
public ObjectMapper getObjectMapper() {
126138
return objectMapper != null ? objectMapper : original.getObjectMapper();

operator-framework-core/src/main/java/io/javaoperatorsdk/operator/api/config/ExecutorServiceManager.java

Lines changed: 16 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -17,6 +17,7 @@ public class ExecutorServiceManager {
1717
private static ExecutorServiceManager instance;
1818

1919
private final ExecutorService executor;
20+
private ExecutorService workflowExecutor;
2021
private final int terminationTimeoutSeconds;
2122

2223
private ExecutorServiceManager(InstrumentedExecutorService executor,
@@ -60,13 +61,28 @@ public ExecutorService executorService() {
6061
return executor;
6162
}
6263

64+
// needs to be synchronized since only called when a workflow is initialized, but that happens in
65+
// controllers which are started in parallel
66+
public synchronized ExecutorService workflowExecutorService() {
67+
if (workflowExecutor == null) {
68+
workflowExecutor =
69+
new InstrumentedExecutorService(
70+
ConfigurationServiceProvider.instance().getWorkflowExecutorService());
71+
}
72+
return workflowExecutor;
73+
}
74+
6375
private void doStop() {
6476
try {
6577
log.debug("Closing executor");
6678
executor.shutdown();
79+
workflowExecutor.shutdown();
6780
if (!executor.awaitTermination(terminationTimeoutSeconds, TimeUnit.SECONDS)) {
6881
executor.shutdownNow(); // if we timed out, waiting, cancel everything
6982
}
83+
if (!workflowExecutor.awaitTermination(terminationTimeoutSeconds, TimeUnit.SECONDS)) {
84+
executor.shutdownNow(); // if we timed out, waiting, cancel everything
85+
}
7086
} catch (InterruptedException e) {
7187
log.debug("Exception closing executor: {}", e.getLocalizedMessage());
7288
}

operator-framework-core/src/main/java/io/javaoperatorsdk/operator/processing/dependent/workflow/Workflow.java

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -7,7 +7,7 @@
77
import java.util.stream.Collectors;
88

99
import io.fabric8.kubernetes.api.model.HasMetadata;
10-
import io.javaoperatorsdk.operator.api.config.ConfigurationServiceProvider;
10+
import io.javaoperatorsdk.operator.api.config.ExecutorServiceManager;
1111
import io.javaoperatorsdk.operator.api.reconciler.Context;
1212
import io.javaoperatorsdk.operator.api.reconciler.dependent.DependentResource;
1313

@@ -30,7 +30,7 @@ public class Workflow<P extends HasMetadata> {
3030
private ExecutorService executorService;
3131

3232
public Workflow(Set<DependentResourceNode> dependentResourceNodes) {
33-
this.executorService = ConfigurationServiceProvider.instance().getExecutorService();
33+
this.executorService = ExecutorServiceManager.instance().workflowExecutorService();
3434
this.dependentResourceNodes = dependentResourceNodes;
3535
this.throwExceptionAutomatically = THROW_EXCEPTION_AUTOMATICALLY_DEFAULT;
3636
preprocessForReconcile();

operator-framework-core/src/main/java/io/javaoperatorsdk/operator/processing/dependent/workflow/builder/WorkflowBuilder.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -76,7 +76,7 @@ public WorkflowBuilder<P> withThrowExceptionFurther(boolean throwExceptionFurthe
7676

7777
public Workflow<P> build() {
7878
return new Workflow(
79-
dependentResourceNodes, ExecutorServiceManager.instance().executorService(),
79+
dependentResourceNodes, ExecutorServiceManager.instance().workflowExecutorService(),
8080
throwExceptionAutomatically);
8181
}
8282

0 commit comments

Comments
 (0)