diff --git a/operator-framework-core/src/main/java/io/javaoperatorsdk/operator/processing/dependent/workflow/AbstractWorkflowExecutor.java b/operator-framework-core/src/main/java/io/javaoperatorsdk/operator/processing/dependent/workflow/AbstractWorkflowExecutor.java index f5774ed42f..595341add7 100644 --- a/operator-framework-core/src/main/java/io/javaoperatorsdk/operator/processing/dependent/workflow/AbstractWorkflowExecutor.java +++ b/operator-framework-core/src/main/java/io/javaoperatorsdk/operator/processing/dependent/workflow/AbstractWorkflowExecutor.java @@ -5,6 +5,7 @@ import java.util.Optional; import java.util.Set; import java.util.concurrent.ConcurrentHashMap; +import java.util.concurrent.ExecutorService; import java.util.concurrent.Future; import java.util.stream.Collectors; @@ -30,12 +31,14 @@ public abstract class AbstractWorkflowExecutor

{ private final Map> actualExecutions = new ConcurrentHashMap<>(); private final Map exceptionsDuringExecution = new ConcurrentHashMap<>(); + private final ExecutorService executorService; public AbstractWorkflowExecutor(Workflow

workflow, P primary, Context

context) { this.workflow = workflow; this.primary = primary; this.context = context; this.primaryID = ResourceID.fromResource(primary); + executorService = context.getWorkflowExecutorService(); } protected abstract Logger logger(); @@ -107,10 +110,16 @@ protected synchronized void handleNodeExecutionFinish( } } - @SuppressWarnings("unchecked") protected boolean isConditionMet(Optional> condition, DependentResource dependentResource) { - return condition.map(c -> c.isMet(dependentResource, primary, context)) - .orElse(true); + return condition.map(c -> c.isMet(dependentResource, primary, context)).orElse(true); + } + + protected void submit(DependentResourceNode dependentResourceNode, + NodeExecutor nodeExecutor, String operation) { + final Future future = executorService.submit(nodeExecutor); + markAsExecuting(dependentResourceNode, future); + logger().debug("Submitted to {}: {} primaryID: {}", operation, dependentResourceNode, + primaryID); } } diff --git a/operator-framework-core/src/main/java/io/javaoperatorsdk/operator/processing/dependent/workflow/WorkflowCleanupExecutor.java b/operator-framework-core/src/main/java/io/javaoperatorsdk/operator/processing/dependent/workflow/WorkflowCleanupExecutor.java index 56262ed6af..5426eff1aa 100644 --- a/operator-framework-core/src/main/java/io/javaoperatorsdk/operator/processing/dependent/workflow/WorkflowCleanupExecutor.java +++ b/operator-framework-core/src/main/java/io/javaoperatorsdk/operator/processing/dependent/workflow/WorkflowCleanupExecutor.java @@ -3,8 +3,6 @@ import java.util.List; import java.util.Set; import java.util.concurrent.ConcurrentHashMap; -import java.util.concurrent.ExecutorService; -import java.util.concurrent.Future; import java.util.stream.Collectors; import org.slf4j.Logger; @@ -19,15 +17,14 @@ public class WorkflowCleanupExecutor

extends AbstractWorkflowExecutor

{ private static final Logger log = LoggerFactory.getLogger(WorkflowCleanupExecutor.class); + private static final String CLEANUP = "cleanup"; private final Set postDeleteConditionNotMet = ConcurrentHashMap.newKeySet(); private final Set deleteCalled = ConcurrentHashMap.newKeySet(); - private final ExecutorService executorService; public WorkflowCleanupExecutor(Workflow

workflow, P primary, Context

context) { super(workflow, primary, context); - this.executorService = context.getWorkflowExecutorService(); } public synchronized WorkflowCleanupResult cleanup() { @@ -55,9 +52,7 @@ private synchronized void handleCleanup(DependentResourceNode dependentResourceN return; } - Future nodeFuture = executorService.submit(new CleanupExecutor<>(dependentResourceNode)); - markAsExecuting(dependentResourceNode, nodeFuture); - log.debug("Submitted for cleanup: {}", dependentResourceNode); + submit(dependentResourceNode, new CleanupExecutor<>(dependentResourceNode), CLEANUP); } diff --git a/operator-framework-core/src/main/java/io/javaoperatorsdk/operator/processing/dependent/workflow/WorkflowReconcileExecutor.java b/operator-framework-core/src/main/java/io/javaoperatorsdk/operator/processing/dependent/workflow/WorkflowReconcileExecutor.java index 3e44cb3461..b1ef16c7b0 100644 --- a/operator-framework-core/src/main/java/io/javaoperatorsdk/operator/processing/dependent/workflow/WorkflowReconcileExecutor.java +++ b/operator-framework-core/src/main/java/io/javaoperatorsdk/operator/processing/dependent/workflow/WorkflowReconcileExecutor.java @@ -4,8 +4,6 @@ import java.util.Map; import java.util.Set; import java.util.concurrent.ConcurrentHashMap; -import java.util.concurrent.ExecutorService; -import java.util.concurrent.Future; import java.util.stream.Collectors; import org.slf4j.Logger; @@ -21,6 +19,8 @@ public class WorkflowReconcileExecutor

extends AbstractWorkflowExecutor

{ private static final Logger log = LoggerFactory.getLogger(WorkflowReconcileExecutor.class); + private static final String RECONCILE = "reconcile"; + private static final String DELETE = "delete"; private final Set notReady = ConcurrentHashMap.newKeySet(); @@ -32,11 +32,9 @@ public class WorkflowReconcileExecutor

extends AbstractWo private final Set reconciled = ConcurrentHashMap.newKeySet(); private final Map reconcileResults = new ConcurrentHashMap<>(); - private final ExecutorService executorService; public WorkflowReconcileExecutor(Workflow

workflow, P primary, Context

context) { super(workflow, primary, context); - this.executorService = context.getWorkflowExecutorService(); } public synchronized WorkflowReconcileResult reconcile() { @@ -69,9 +67,7 @@ private synchronized void handleReconcile(DependentResourceNode depend if (!reconcileConditionMet) { handleReconcileConditionNotMet(dependentResourceNode); } else { - var nodeFuture = executorService.submit(new NodeReconcileExecutor(dependentResourceNode)); - markAsExecuting(dependentResourceNode, nodeFuture); - log.debug("Submitted to reconcile: {} primaryID: {}", dependentResourceNode, primaryID); + submit(dependentResourceNode, new NodeReconcileExecutor<>(dependentResourceNode), RECONCILE); } } @@ -87,10 +83,7 @@ private synchronized void handleDelete(DependentResourceNode dependentResourceNo return; } - Future nodeFuture = executorService - .submit(new NodeDeleteExecutor(dependentResourceNode)); - markAsExecuting(dependentResourceNode, nodeFuture); - log.debug("Submitted to delete: {}", dependentResourceNode); + submit(dependentResourceNode, new NodeDeleteExecutor<>(dependentResourceNode), DELETE); } private boolean allDependentsDeletedAlready(DependentResourceNode dependentResourceNode) {