Skip to content

feat: using dynamic ThreadPoolExecutor for reconcile and workflow executor services #1804

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 5 commits into from
Mar 10, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -3,8 +3,7 @@
import java.time.Duration;
import java.util.Optional;
import java.util.Set;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.*;

import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
Expand Down Expand Up @@ -75,24 +74,49 @@ default boolean checkCRDAndValidateLocalModel() {
return false;
}

int DEFAULT_RECONCILIATION_THREADS_NUMBER = 10;
int DEFAULT_RECONCILIATION_THREADS_NUMBER = 200;
int MIN_DEFAULT_RECONCILIATION_THREADS_NUMBER = 10;

/**
* Retrieves the maximum number of threads the operator can spin out to dispatch reconciliation
* requests to reconcilers
* The maximum number of threads the operator can spin out to dispatch reconciliation requests to
* reconcilers
*
* @return the maximum number of concurrent reconciliation threads
*/
default int concurrentReconciliationThreads() {
return DEFAULT_RECONCILIATION_THREADS_NUMBER;
}

/**
* The minimum number of threads the operator starts in the thread pool for reconciliations.
*
* @return the minimum number of concurrent reconciliation threads
*/
default int minConcurrentReconciliationThreads() {
return MIN_DEFAULT_RECONCILIATION_THREADS_NUMBER;
}

int DEFAULT_WORKFLOW_EXECUTOR_THREAD_NUMBER = DEFAULT_RECONCILIATION_THREADS_NUMBER;
int MIN_DEFAULT_WORKFLOW_EXECUTOR_THREAD_NUMBER = MIN_DEFAULT_RECONCILIATION_THREADS_NUMBER;

/**
* Retrieves the maximum number of threads the operator can spin out to be used in the workflows.
*
* @return the maximum number of concurrent workflow threads
*/
default int concurrentWorkflowExecutorThreads() {
return DEFAULT_WORKFLOW_EXECUTOR_THREAD_NUMBER;
}

/**
* The minimum number of threads the operator starts in the thread pool for workflows.
*
* @return the minimum number of concurrent workflow threads
*/
default int minConcurrentWorkflowExecutorThreads() {
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Would be nice to add doc for this as well.

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

added docs

return MIN_DEFAULT_WORKFLOW_EXECUTOR_THREAD_NUMBER;
}

/**
* Used to clone custom resources. It is strongly suggested that implementors override this method
* since the default implementation creates a new {@link Cloner} instance each time this method is
Expand Down Expand Up @@ -136,11 +160,15 @@ default Metrics getMetrics() {
}

default ExecutorService getExecutorService() {
return Executors.newFixedThreadPool(concurrentReconciliationThreads());
return new ThreadPoolExecutor(minConcurrentReconciliationThreads(),
concurrentReconciliationThreads(),
1, TimeUnit.MINUTES, new LinkedBlockingDeque<>());
}

default ExecutorService getWorkflowExecutorService() {
return Executors.newFixedThreadPool(concurrentWorkflowExecutorThreads());
return new ThreadPoolExecutor(minConcurrentWorkflowExecutorThreads(),
concurrentWorkflowExecutorThreads(),
1, TimeUnit.MINUTES, new LinkedBlockingDeque<>());
}

default boolean closeClientOnStop() {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,9 @@ public class ConfigurationServiceOverrider {
private Config clientConfig;
private Boolean checkCR;
private Integer concurrentReconciliationThreads;
private Integer minConcurrentReconciliationThreads;
private Integer concurrentWorkflowExecutorThreads;
private Integer minConcurrentWorkflowExecutorThreads;
private Cloner cloner;
private Integer timeoutSeconds;
private Boolean closeClientOnStop;
Expand Down Expand Up @@ -56,6 +58,16 @@ public ConfigurationServiceOverrider withConcurrentWorkflowExecutorThreads(int t
return this;
}

public ConfigurationServiceOverrider withMinConcurrentReconciliationThreads(int threadNumber) {
this.minConcurrentReconciliationThreads = threadNumber;
return this;
}

public ConfigurationServiceOverrider withMinConcurrentWorkflowExecutorThreads(int threadNumber) {
this.minConcurrentWorkflowExecutorThreads = threadNumber;
return this;
}

public ConfigurationServiceOverrider withResourceCloner(Cloner cloner) {
this.cloner = cloner;
return this;
Expand Down Expand Up @@ -149,6 +161,18 @@ public int concurrentWorkflowExecutorThreads() {
: original.concurrentWorkflowExecutorThreads();
}

@Override
public int minConcurrentReconciliationThreads() {
return minConcurrentReconciliationThreads != null ? minConcurrentReconciliationThreads
: original.minConcurrentReconciliationThreads();
}

@Override
public int minConcurrentWorkflowExecutorThreads() {
return minConcurrentWorkflowExecutorThreads != null ? minConcurrentWorkflowExecutorThreads
: original.minConcurrentWorkflowExecutorThreads();
}

@Override
public int getTerminationTimeoutSeconds() {
return timeoutSeconds != null ? timeoutSeconds : original.getTerminationTimeoutSeconds();
Expand Down