Skip to content

feat: separate executor service for workflow #1371

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 7 commits into from
Aug 26, 2022
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -80,6 +80,12 @@ default int concurrentReconciliationThreads() {
return DEFAULT_RECONCILIATION_THREADS_NUMBER;
}

int DEFAULT_WORKFLOW_EXECUTOR_THREAD_NUMBER = DEFAULT_RECONCILIATION_THREADS_NUMBER;

default int concurrentWorkflowExecutorThreads() {
return DEFAULT_WORKFLOW_EXECUTOR_THREAD_NUMBER;
}

/**
* Used to clone custom resources. It is strongly suggested that implementors override this method
* since the default implementation creates a new {@link Cloner} instance each time this method is
Expand Down Expand Up @@ -122,6 +128,10 @@ default ExecutorService getExecutorService() {
return Executors.newFixedThreadPool(concurrentReconciliationThreads());
}

default ExecutorService getWorkflowExecutorService() {
return Executors.newFixedThreadPool(concurrentWorkflowExecutorThreads());
}

default boolean closeClientOnStop() {
return true;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@ public class ConfigurationServiceOverrider {
private Boolean closeClientOnStop;
private ObjectMapper objectMapper;
private ExecutorService executorService;
private ExecutorService workflowExecutorService;
private LeaderElectionConfiguration leaderElectionConfiguration;

ConfigurationServiceOverrider(ConfigurationService original) {
Expand Down Expand Up @@ -68,6 +69,12 @@ public ConfigurationServiceOverrider withExecutorService(ExecutorService executo
return this;
}

public ConfigurationServiceOverrider withWorkflowExecutorService(
ExecutorService workflowExecutorService) {
this.workflowExecutorService = workflowExecutorService;
return this;
}

public ConfigurationServiceOverrider withObjectMapper(ObjectMapper objectMapper) {
this.objectMapper = objectMapper;
return this;
Expand Down Expand Up @@ -121,6 +128,12 @@ public ExecutorService getExecutorService() {
return executorService != null ? executorService : original.getExecutorService();
}

@Override
public ExecutorService getWorkflowExecutorService() {
return workflowExecutorService != null ? workflowExecutorService
: original.getWorkflowExecutorService();
}

@Override
public ObjectMapper getObjectMapper() {
return objectMapper != null ? objectMapper : original.getObjectMapper();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -15,31 +15,35 @@
public class ExecutorServiceManager {
private static final Logger log = LoggerFactory.getLogger(ExecutorServiceManager.class);
private static ExecutorServiceManager instance;

private final ExecutorService executor;
private final ExecutorService workflowExecutor;
private final int terminationTimeoutSeconds;

private ExecutorServiceManager(InstrumentedExecutorService executor,
private ExecutorServiceManager(ExecutorService executor, ExecutorService workflowExecutor,
int terminationTimeoutSeconds) {
this.executor = executor;
this.executor = new InstrumentedExecutorService(executor);
this.workflowExecutor = new InstrumentedExecutorService(workflowExecutor);
this.terminationTimeoutSeconds = terminationTimeoutSeconds;
}

public static void init() {
if (instance == null) {
final var configuration = ConfigurationServiceProvider.instance();
instance = new ExecutorServiceManager(
new InstrumentedExecutorService(configuration.getExecutorService()),
final var executorService = configuration.getExecutorService();
final var workflowExecutorService = configuration.getWorkflowExecutorService();
instance = new ExecutorServiceManager(executorService, workflowExecutorService,
configuration.getTerminationTimeoutSeconds());
log.debug("Initialized ExecutorServiceManager executor: {}, timeout: {}",
configuration.getExecutorService().getClass(),
log.debug(
"Initialized ExecutorServiceManager executor: {}, workflow executor: {}, timeout: {}",
executorService.getClass(),
workflowExecutorService.getClass(),
configuration.getTerminationTimeoutSeconds());
} else {
log.debug("Already started, reusing already setup instance!");
}
}

public static void stop() {
public synchronized static void stop() {
if (instance != null) {
instance.doStop();
}
Expand All @@ -48,7 +52,7 @@ public static void stop() {
instance = null;
}

public static ExecutorServiceManager instance() {
public synchronized static ExecutorServiceManager instance() {
if (instance == null) {
// provide a default configuration if none has been provided by init
init();
Expand All @@ -60,13 +64,22 @@ public ExecutorService executorService() {
return executor;
}

public ExecutorService workflowExecutorService() {
return workflowExecutor;
}

private void doStop() {
try {
log.debug("Closing executor");
executor.shutdown();
workflowExecutor.shutdown();
if (!workflowExecutor.awaitTermination(terminationTimeoutSeconds, TimeUnit.SECONDS)) {
workflowExecutor.shutdownNow(); // if we timed out, waiting, cancel everything
}
if (!executor.awaitTermination(terminationTimeoutSeconds, TimeUnit.SECONDS)) {
executor.shutdownNow(); // if we timed out, waiting, cancel everything
}

} catch (InterruptedException e) {
log.debug("Exception closing executor: {}", e.getLocalizedMessage());
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,7 @@
import java.util.stream.Collectors;

import io.fabric8.kubernetes.api.model.HasMetadata;
import io.javaoperatorsdk.operator.api.config.ConfigurationServiceProvider;
import io.javaoperatorsdk.operator.api.config.ExecutorServiceManager;
import io.javaoperatorsdk.operator.api.reconciler.Context;
import io.javaoperatorsdk.operator.api.reconciler.dependent.DependentResource;

Expand All @@ -30,7 +30,7 @@ public class Workflow<P extends HasMetadata> {
private ExecutorService executorService;

public Workflow(Set<DependentResourceNode> dependentResourceNodes) {
this.executorService = ConfigurationServiceProvider.instance().getExecutorService();
this.executorService = ExecutorServiceManager.instance().workflowExecutorService();
this.dependentResourceNodes = dependentResourceNodes;
this.throwExceptionAutomatically = THROW_EXCEPTION_AUTOMATICALLY_DEFAULT;
preprocessForReconcile();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@
import java.util.concurrent.ExecutorService;

import io.fabric8.kubernetes.api.model.HasMetadata;
import io.javaoperatorsdk.operator.api.config.ConfigurationServiceProvider;
import io.javaoperatorsdk.operator.api.config.ExecutorServiceManager;
import io.javaoperatorsdk.operator.api.reconciler.dependent.DependentResource;
import io.javaoperatorsdk.operator.processing.dependent.workflow.Condition;
import io.javaoperatorsdk.operator.processing.dependent.workflow.DependentResourceNode;
Expand Down Expand Up @@ -75,8 +75,9 @@ public WorkflowBuilder<P> withThrowExceptionFurther(boolean throwExceptionFurthe
}

public Workflow<P> build() {
return new Workflow(dependentResourceNodes,
ConfigurationServiceProvider.instance().getExecutorService(), throwExceptionAutomatically);
return new Workflow(
dependentResourceNodes, ExecutorServiceManager.instance().workflowExecutorService(),
throwExceptionAutomatically);
}

public Workflow<P> build(int parallelism) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -56,7 +56,7 @@ void testAddingWebPage() {
() -> {
var actual = operator().get(WebPage.class, TEST_PAGE);
var deployment = operator().get(Deployment.class, deploymentName(webPage));

assertThat(actual.getStatus()).isNotNull();
assertThat(actual.getStatus().getAreWeGood()).isTrue();
assertThat(deployment.getSpec().getReplicas())
.isEqualTo(deployment.getStatus().getReadyReplicas());
Expand Down