diff --git a/operator-framework-core/src/main/java/io/javaoperatorsdk/operator/api/config/ConfigurationService.java b/operator-framework-core/src/main/java/io/javaoperatorsdk/operator/api/config/ConfigurationService.java index 1fe9485588..036aa76bbf 100644 --- a/operator-framework-core/src/main/java/io/javaoperatorsdk/operator/api/config/ConfigurationService.java +++ b/operator-framework-core/src/main/java/io/javaoperatorsdk/operator/api/config/ConfigurationService.java @@ -16,6 +16,7 @@ import io.javaoperatorsdk.operator.api.monitoring.Metrics; import io.javaoperatorsdk.operator.api.reconciler.Reconciler; import io.javaoperatorsdk.operator.api.reconciler.dependent.DependentResourceFactory; +import io.javaoperatorsdk.operator.processing.dependent.workflow.ManagedWorkflowFactory; import com.fasterxml.jackson.core.JsonProcessingException; import com.fasterxml.jackson.databind.ObjectMapper; @@ -193,4 +194,8 @@ default Optional getInformerStoppedHandler() { } }); } + + default ManagedWorkflowFactory getWorkflowFactory() { + return ManagedWorkflowFactory.DEFAULT; + } } diff --git a/operator-framework-core/src/main/java/io/javaoperatorsdk/operator/api/reconciler/dependent/DependentResource.java b/operator-framework-core/src/main/java/io/javaoperatorsdk/operator/api/reconciler/dependent/DependentResource.java index ea7759b8c4..d522a30151 100644 --- a/operator-framework-core/src/main/java/io/javaoperatorsdk/operator/api/reconciler/dependent/DependentResource.java +++ b/operator-framework-core/src/main/java/io/javaoperatorsdk/operator/api/reconciler/dependent/DependentResource.java @@ -64,4 +64,8 @@ default Optional getSecondaryResource(P primary, Context

context) { static String defaultNameFor(Class dependentResourceClass) { return dependentResourceClass.getName(); } + + default boolean isDeletable() { + return this instanceof Deleter; + } } diff --git a/operator-framework-core/src/main/java/io/javaoperatorsdk/operator/processing/Controller.java b/operator-framework-core/src/main/java/io/javaoperatorsdk/operator/processing/Controller.java index ca27ad4f2c..2d9e6b4007 100644 --- a/operator-framework-core/src/main/java/io/javaoperatorsdk/operator/processing/Controller.java +++ b/operator-framework-core/src/main/java/io/javaoperatorsdk/operator/processing/Controller.java @@ -77,12 +77,12 @@ public Controller(Reconciler

reconciler, this.reconciler = reconciler; this.configuration = configuration; this.kubernetesClient = kubernetesClient; - this.metrics = Optional.ofNullable(ConfigurationServiceProvider.instance().getMetrics()) - .orElse(Metrics.NOOP); + final var configurationService = ConfigurationServiceProvider.instance(); + this.metrics = Optional.ofNullable(configurationService.getMetrics()).orElse(Metrics.NOOP); contextInitializer = reconciler instanceof ContextInitializer; isCleaner = reconciler instanceof Cleaner; - managedWorkflow = - ManagedWorkflow.workflowFor(kubernetesClient, configuration.getDependentResources()); + managedWorkflow = configurationService.getWorkflowFactory().workflowFor(configuration); + managedWorkflow.resolve(kubernetesClient, configuration.getDependentResources()); eventSourceManager = new EventSourceManager<>(this); eventProcessor = new EventProcessor<>(eventSourceManager); diff --git a/operator-framework-core/src/main/java/io/javaoperatorsdk/operator/processing/dependent/kubernetes/KubernetesDependentResource.java b/operator-framework-core/src/main/java/io/javaoperatorsdk/operator/processing/dependent/kubernetes/KubernetesDependentResource.java index b9d26a0c14..5631536da5 100644 --- a/operator-framework-core/src/main/java/io/javaoperatorsdk/operator/processing/dependent/kubernetes/KubernetesDependentResource.java +++ b/operator-framework-core/src/main/java/io/javaoperatorsdk/operator/processing/dependent/kubernetes/KubernetesDependentResource.java @@ -294,4 +294,9 @@ public KubernetesDependentResourceConfig configFrom(KubernetesDependent kubeDepe public Optional> configuration() { return Optional.ofNullable(kubernetesDependentResourceConfig); } + + @Override + public boolean isDeletable() { + return super.isDeletable() && !garbageCollected; + } } diff --git a/operator-framework-core/src/main/java/io/javaoperatorsdk/operator/processing/dependent/workflow/AbstractDependentResourceNode.java b/operator-framework-core/src/main/java/io/javaoperatorsdk/operator/processing/dependent/workflow/AbstractDependentResourceNode.java new file mode 100644 index 0000000000..22e368249a --- /dev/null +++ b/operator-framework-core/src/main/java/io/javaoperatorsdk/operator/processing/dependent/workflow/AbstractDependentResourceNode.java @@ -0,0 +1,103 @@ +package io.javaoperatorsdk.operator.processing.dependent.workflow; + +import java.util.LinkedList; +import java.util.List; +import java.util.Optional; + +import io.fabric8.kubernetes.api.model.HasMetadata; +import io.javaoperatorsdk.operator.api.reconciler.dependent.DependentResource; + +@SuppressWarnings("rawtypes") +abstract class AbstractDependentResourceNode + implements DependentResourceNode { + + private final List dependsOn = new LinkedList<>(); + private final List parents = new LinkedList<>(); + private final String name; + private Condition reconcilePrecondition; + private Condition deletePostcondition; + private Condition readyPostcondition; + private DependentResource dependentResource; + + protected AbstractDependentResourceNode(String name) { + this.name = name; + } + + @Override + public List getDependsOn() { + return dependsOn; + } + + @Override + public void addParent(DependentResourceNode parent) { + parents.add(parent); + } + + @Override + public void addDependsOnRelation(DependentResourceNode node) { + node.addParent(this); + dependsOn.add(node); + } + + @Override + public List getParents() { + return parents; + } + + @Override + public String getName() { + return name; + } + + @Override + public Optional> getReconcilePrecondition() { + return Optional.ofNullable(reconcilePrecondition); + } + + @Override + public Optional> getDeletePostcondition() { + return Optional.ofNullable(deletePostcondition); + } + + public void setReconcilePrecondition(Condition reconcilePrecondition) { + this.reconcilePrecondition = reconcilePrecondition; + } + + public void setDeletePostcondition(Condition cleanupCondition) { + this.deletePostcondition = cleanupCondition; + } + + @Override + public Optional> getReadyPostcondition() { + return Optional.ofNullable(readyPostcondition); + } + + public void setReadyPostcondition(Condition readyPostcondition) { + this.readyPostcondition = readyPostcondition; + } + + public DependentResource getDependentResource() { + return dependentResource; + } + + public void setDependentResource(DependentResource dependentResource) { + this.dependentResource = dependentResource; + } + + @Override + public boolean equals(Object o) { + if (this == o) { + return true; + } + if (o == null || getClass() != o.getClass()) { + return false; + } + AbstractDependentResourceNode that = (AbstractDependentResourceNode) o; + return name.equals(that.name); + } + + @Override + public int hashCode() { + return name.hashCode(); + } +} diff --git a/operator-framework-core/src/main/java/io/javaoperatorsdk/operator/processing/dependent/workflow/AbstractWorkflowExecutor.java b/operator-framework-core/src/main/java/io/javaoperatorsdk/operator/processing/dependent/workflow/AbstractWorkflowExecutor.java new file mode 100644 index 0000000000..354975ebad --- /dev/null +++ b/operator-framework-core/src/main/java/io/javaoperatorsdk/operator/processing/dependent/workflow/AbstractWorkflowExecutor.java @@ -0,0 +1,114 @@ +package io.javaoperatorsdk.operator.processing.dependent.workflow; + +import java.util.HashMap; +import java.util.Map; +import java.util.Map.Entry; +import java.util.Optional; +import java.util.Set; +import java.util.concurrent.ConcurrentHashMap; +import java.util.concurrent.Future; +import java.util.stream.Collectors; + +import org.slf4j.Logger; + +import io.fabric8.kubernetes.api.model.HasMetadata; +import io.javaoperatorsdk.operator.api.reconciler.Context; +import io.javaoperatorsdk.operator.api.reconciler.dependent.DependentResource; + +@SuppressWarnings("rawtypes") +public abstract class AbstractWorkflowExecutor

{ + + protected final Workflow

workflow; + protected final P primary; + protected final Context

context; + /** + * Covers both deleted and reconciled + */ + private final Set alreadyVisited = ConcurrentHashMap.newKeySet(); + private final Map> actualExecutions = new HashMap<>(); + private final Map exceptionsDuringExecution = + new ConcurrentHashMap<>(); + + public AbstractWorkflowExecutor(Workflow

workflow, P primary, Context

context) { + this.workflow = workflow; + this.primary = primary; + this.context = context; + } + + protected abstract Logger logger(); + + protected synchronized void waitForScheduledExecutionsToRun() { + while (true) { + try { + this.wait(); + if (noMoreExecutionsScheduled()) { + break; + } else { + logger().warn("Notified but still resources under execution. This should not happen."); + } + } catch (InterruptedException e) { + logger().warn("Thread interrupted", e); + Thread.currentThread().interrupt(); + } + } + } + + protected boolean noMoreExecutionsScheduled() { + return actualExecutions.isEmpty(); + } + + protected boolean alreadyVisited(DependentResourceNode dependentResourceNode) { + return alreadyVisited.contains(dependentResourceNode); + } + + protected void markAsVisited(DependentResourceNode dependentResourceNode) { + alreadyVisited.add(dependentResourceNode); + } + + protected boolean isExecutingNow(DependentResourceNode dependentResourceNode) { + return actualExecutions.containsKey(dependentResourceNode); + } + + protected void markAsExecuting(DependentResourceNode dependentResourceNode, + Future future) { + actualExecutions.put(dependentResourceNode, future); + } + + protected synchronized void handleExceptionInExecutor( + DependentResourceNode dependentResourceNode, + RuntimeException e) { + exceptionsDuringExecution.put(dependentResourceNode, e); + } + + protected boolean isInError(DependentResourceNode dependentResourceNode) { + return exceptionsDuringExecution.containsKey(dependentResourceNode); + } + + protected Map getErroredDependents() { + return exceptionsDuringExecution.entrySet().stream() + .collect( + Collectors.toMap(e -> workflow.getDependentResourceFor(e.getKey()), Entry::getValue)); + } + + protected synchronized void handleNodeExecutionFinish( + DependentResourceNode dependentResourceNode) { + logger().debug("Finished execution for: {}", dependentResourceNode); + actualExecutions.remove(dependentResourceNode); + if (noMoreExecutionsScheduled()) { + this.notifyAll(); + } + } + + @SuppressWarnings("unchecked") + protected DependentResource getDependentResourceFor(DependentResourceNode drn) { + return (DependentResource) workflow.getDependentResourceFor(drn); + } + + protected boolean isConditionMet(Optional> condition, + DependentResource dependentResource) { + return condition.map(c -> c.isMet(primary, + dependentResource.getSecondaryResource(primary, context).orElse(null), + context)) + .orElse(true); + } +} diff --git a/operator-framework-core/src/main/java/io/javaoperatorsdk/operator/processing/dependent/workflow/DefaultDependentResourceNode.java b/operator-framework-core/src/main/java/io/javaoperatorsdk/operator/processing/dependent/workflow/DefaultDependentResourceNode.java new file mode 100644 index 0000000000..d0d844ea9c --- /dev/null +++ b/operator-framework-core/src/main/java/io/javaoperatorsdk/operator/processing/dependent/workflow/DefaultDependentResourceNode.java @@ -0,0 +1,31 @@ +package io.javaoperatorsdk.operator.processing.dependent.workflow; + +import io.fabric8.kubernetes.api.model.HasMetadata; +import io.javaoperatorsdk.operator.api.reconciler.dependent.DependentResource; + +class DefaultDependentResourceNode + extends AbstractDependentResourceNode { + + public DefaultDependentResourceNode(DependentResource dependentResource) { + this(dependentResource, null, null); + } + + public DefaultDependentResourceNode(DependentResource dependentResource, + Condition reconcilePrecondition, Condition deletePostcondition) { + super(getNameFor(dependentResource)); + setDependentResource(dependentResource); + setReconcilePrecondition(reconcilePrecondition); + setDeletePostcondition(deletePostcondition); + } + + @SuppressWarnings("rawtypes") + static String getNameFor(DependentResource dependentResource) { + return DependentResource.defaultNameFor(dependentResource.getClass()) + "#" + + dependentResource.hashCode(); + } + + @Override + public String toString() { + return "DependentResourceNode{" + getDependentResource() + '}'; + } +} diff --git a/operator-framework-core/src/main/java/io/javaoperatorsdk/operator/processing/dependent/workflow/DefaultManagedWorkflow.java b/operator-framework-core/src/main/java/io/javaoperatorsdk/operator/processing/dependent/workflow/DefaultManagedWorkflow.java index e1162ebcca..3096f1d4fb 100644 --- a/operator-framework-core/src/main/java/io/javaoperatorsdk/operator/processing/dependent/workflow/DefaultManagedWorkflow.java +++ b/operator-framework-core/src/main/java/io/javaoperatorsdk/operator/processing/dependent/workflow/DefaultManagedWorkflow.java @@ -1,58 +1,39 @@ package io.javaoperatorsdk.operator.processing.dependent.workflow; +import java.util.HashMap; import java.util.List; import java.util.Map; -import java.util.stream.Collectors; import io.fabric8.kubernetes.api.model.HasMetadata; import io.fabric8.kubernetes.client.KubernetesClient; import io.javaoperatorsdk.operator.api.config.dependent.DependentResourceSpec; import io.javaoperatorsdk.operator.api.reconciler.Context; -import io.javaoperatorsdk.operator.api.reconciler.dependent.Deleter; import io.javaoperatorsdk.operator.api.reconciler.dependent.DependentResource; -import io.javaoperatorsdk.operator.api.reconciler.dependent.GarbageCollected; @SuppressWarnings("rawtypes") public class DefaultManagedWorkflow

implements ManagedWorkflow

{ private final Workflow

workflow; - private final boolean isCleaner; private final boolean isEmptyWorkflow; - private final Map dependentResourcesByName; - - DefaultManagedWorkflow(KubernetesClient client, - List dependentResourceSpecs, - ManagedWorkflowSupport managedWorkflowSupport) { - managedWorkflowSupport.checkForNameDuplication(dependentResourceSpecs); - dependentResourcesByName = dependentResourceSpecs - .stream().collect(Collectors.toMap(DependentResourceSpec::getName, - spec -> managedWorkflowSupport.createAndConfigureFrom(spec, client))); + private boolean resolved; + DefaultManagedWorkflow(List dependentResourceSpecs, Workflow

workflow) { isEmptyWorkflow = dependentResourceSpecs.isEmpty(); - workflow = - managedWorkflowSupport.createWorkflow(dependentResourceSpecs, dependentResourcesByName); - isCleaner = checkIfCleaner(); + this.workflow = workflow; } public WorkflowReconcileResult reconcile(P primary, Context

context) { + checkIfResolved(); return workflow.reconcile(primary, context); } public WorkflowCleanupResult cleanup(P primary, Context

context) { + checkIfResolved(); return workflow.cleanup(primary, context); } - private boolean checkIfCleaner() { - for (var dr : workflow.getDependentResources()) { - if (dr instanceof Deleter && !(dr instanceof GarbageCollected)) { - return true; - } - } - return false; - } - public boolean isCleaner() { - return isCleaner; + return workflow.hasCleaner(); } public boolean isEmptyWorkflow() { @@ -60,6 +41,25 @@ public boolean isEmptyWorkflow() { } public Map getDependentResourcesByName() { - return dependentResourcesByName; + checkIfResolved(); + final var nodes = workflow.nodes(); + final var result = new HashMap(nodes.size()); + nodes.forEach((key, drn) -> result.put(key, workflow.getDependentResourceFor(drn))); + return result; + } + + @Override + public ManagedWorkflow

resolve(KubernetesClient client, List specs) { + if (!resolved) { + workflow.resolve(client, specs); + resolved = true; + } + return this; + } + + private void checkIfResolved() { + if (!resolved) { + throw new IllegalStateException("resolve should be called before"); + } } } diff --git a/operator-framework-core/src/main/java/io/javaoperatorsdk/operator/processing/dependent/workflow/DependentResourceNode.java b/operator-framework-core/src/main/java/io/javaoperatorsdk/operator/processing/dependent/workflow/DependentResourceNode.java index 32cef6c68e..2c069bf999 100644 --- a/operator-framework-core/src/main/java/io/javaoperatorsdk/operator/processing/dependent/workflow/DependentResourceNode.java +++ b/operator-framework-core/src/main/java/io/javaoperatorsdk/operator/processing/dependent/workflow/DependentResourceNode.java @@ -1,93 +1,30 @@ package io.javaoperatorsdk.operator.processing.dependent.workflow; -import java.util.LinkedList; import java.util.List; import java.util.Optional; import io.fabric8.kubernetes.api.model.HasMetadata; -import io.javaoperatorsdk.operator.api.reconciler.Context; -import io.javaoperatorsdk.operator.api.reconciler.dependent.DependentResource; +import io.fabric8.kubernetes.client.KubernetesClient; +import io.javaoperatorsdk.operator.api.config.dependent.DependentResourceSpec; @SuppressWarnings("rawtypes") -public class DependentResourceNode { +public interface DependentResourceNode { - private final DependentResource dependentResource; - private Condition reconcilePrecondition; - private Condition deletePostcondition; - private Condition readyPostcondition; - private final List dependsOn = new LinkedList<>(); - private final List parents = new LinkedList<>(); + Optional> getReconcilePrecondition(); - public DependentResourceNode(DependentResource dependentResource) { - this(dependentResource, null, null); - } + Optional> getDeletePostcondition(); - public DependentResourceNode(DependentResource dependentResource, - Condition reconcilePrecondition) { - this(dependentResource, reconcilePrecondition, null); - } + List getDependsOn(); - public DependentResourceNode(DependentResource dependentResource, - Condition reconcilePrecondition, Condition deletePostcondition) { - this.dependentResource = dependentResource; - this.reconcilePrecondition = reconcilePrecondition; - this.deletePostcondition = deletePostcondition; - } + void addDependsOnRelation(DependentResourceNode node); - public DependentResource getDependentResource() { - return dependentResource; - } + Optional> getReadyPostcondition(); - public Optional> getReconcilePrecondition() { - return Optional.ofNullable(reconcilePrecondition); - } + List getParents(); - public Optional> getDeletePostcondition() { - return Optional.ofNullable(deletePostcondition); - } + void addParent(DependentResourceNode parent); - public List getDependsOn() { - return dependsOn; - } + String getName(); - @SuppressWarnings("unchecked") - public void addDependsOnRelation(DependentResourceNode node) { - node.parents.add(this); - dependsOn.add(node); - } - - @Override - public String toString() { - return "DependentResourceNode{" + - "dependentResource=" + dependentResource + - '}'; - } - - public DependentResourceNode setReconcilePrecondition( - Condition reconcilePrecondition) { - this.reconcilePrecondition = reconcilePrecondition; - return this; - } - - public DependentResourceNode setDeletePostcondition(Condition cleanupCondition) { - this.deletePostcondition = cleanupCondition; - return this; - } - - public Optional> getReadyPostcondition() { - return Optional.ofNullable(readyPostcondition); - } - - public DependentResourceNode setReadyPostcondition(Condition readyPostcondition) { - this.readyPostcondition = readyPostcondition; - return this; - } - - public List getParents() { - return parents; - } - - protected R getSecondaryResource(P primary, Context

context) { - return getDependentResource().getSecondaryResource(primary, context).orElse(null); - } + default void resolve(KubernetesClient client, List dependentResources) {} } diff --git a/operator-framework-core/src/main/java/io/javaoperatorsdk/operator/processing/dependent/workflow/ManagedWorkflow.java b/operator-framework-core/src/main/java/io/javaoperatorsdk/operator/processing/dependent/workflow/ManagedWorkflow.java index 08e2c497b0..de2b634b5a 100644 --- a/operator-framework-core/src/main/java/io/javaoperatorsdk/operator/processing/dependent/workflow/ManagedWorkflow.java +++ b/operator-framework-core/src/main/java/io/javaoperatorsdk/operator/processing/dependent/workflow/ManagedWorkflow.java @@ -38,17 +38,12 @@ public boolean isEmptyWorkflow() { public Map getDependentResourcesByName() { return Collections.emptyMap(); } - }; - @SuppressWarnings("unchecked") - static ManagedWorkflow workflowFor(KubernetesClient client, - List dependentResourceSpecs) { - if (dependentResourceSpecs == null || dependentResourceSpecs.isEmpty()) { - return noOpWorkflow; + @Override + public ManagedWorkflow resolve(KubernetesClient client, List dependentResources) { + return this; } - return new DefaultManagedWorkflow(client, dependentResourceSpecs, - ManagedWorkflowSupport.instance()); - } + }; WorkflowReconcileResult reconcile(P primary, Context

context); @@ -59,4 +54,7 @@ static ManagedWorkflow workflowFor(KubernetesClient client, boolean isEmptyWorkflow(); Map getDependentResourcesByName(); + + ManagedWorkflow

resolve(KubernetesClient client, + List dependentResources); } diff --git a/operator-framework-core/src/main/java/io/javaoperatorsdk/operator/processing/dependent/workflow/ManagedWorkflowFactory.java b/operator-framework-core/src/main/java/io/javaoperatorsdk/operator/processing/dependent/workflow/ManagedWorkflowFactory.java new file mode 100644 index 0000000000..5b5135e95a --- /dev/null +++ b/operator-framework-core/src/main/java/io/javaoperatorsdk/operator/processing/dependent/workflow/ManagedWorkflowFactory.java @@ -0,0 +1,22 @@ +package io.javaoperatorsdk.operator.processing.dependent.workflow; + +import io.javaoperatorsdk.operator.api.config.ControllerConfiguration; + +import static io.javaoperatorsdk.operator.processing.dependent.workflow.ManagedWorkflow.noOpWorkflow; + +public interface ManagedWorkflowFactory { + + @SuppressWarnings({"rawtypes", "unchecked"}) + ManagedWorkflowFactory DEFAULT = (configuration) -> { + final var dependentResourceSpecs = configuration.getDependentResources(); + if (dependentResourceSpecs == null || dependentResourceSpecs.isEmpty()) { + return noOpWorkflow; + } + return new DefaultManagedWorkflow(dependentResourceSpecs, + ManagedWorkflowSupport.instance().createWorkflow(dependentResourceSpecs)); + }; + + @SuppressWarnings("rawtypes") + ManagedWorkflow workflowFor(ControllerConfiguration configuration); +} + diff --git a/operator-framework-core/src/main/java/io/javaoperatorsdk/operator/processing/dependent/workflow/ManagedWorkflowSupport.java b/operator-framework-core/src/main/java/io/javaoperatorsdk/operator/processing/dependent/workflow/ManagedWorkflowSupport.java index 7a5a7521ba..9b11784443 100644 --- a/operator-framework-core/src/main/java/io/javaoperatorsdk/operator/processing/dependent/workflow/ManagedWorkflowSupport.java +++ b/operator-framework-core/src/main/java/io/javaoperatorsdk/operator/processing/dependent/workflow/ManagedWorkflowSupport.java @@ -10,13 +10,8 @@ import java.util.stream.Collectors; import io.fabric8.kubernetes.api.model.HasMetadata; -import io.fabric8.kubernetes.client.KubernetesClient; import io.javaoperatorsdk.operator.OperatorException; import io.javaoperatorsdk.operator.api.config.dependent.DependentResourceSpec; -import io.javaoperatorsdk.operator.api.reconciler.dependent.DependentResource; -import io.javaoperatorsdk.operator.api.reconciler.dependent.EventSourceReferencer; -import io.javaoperatorsdk.operator.api.reconciler.dependent.managed.KubernetesClientAware; -import io.javaoperatorsdk.operator.processing.dependent.workflow.builder.WorkflowBuilder; @SuppressWarnings({"rawtypes", "unchecked"}) class ManagedWorkflowSupport { @@ -51,56 +46,33 @@ public void checkForNameDuplication(List dependentResourc } } - @SuppressWarnings("unchecked") public

Workflow

createWorkflow( - List dependentResourceSpecs, - Map dependentResourceByName) { + List dependentResourceSpecs) { var orderedResourceSpecs = orderAndDetectCycles(dependentResourceSpecs); - var workflowBuilder = new WorkflowBuilder

().withThrowExceptionFurther(false); - orderedResourceSpecs.forEach(spec -> { - final var dependentResource = dependentResourceByName.get(spec.getName()); - final var dependsOn = (Set) spec.getDependsOn() - .stream().map(dependentResourceByName::get).collect(Collectors.toSet()); - workflowBuilder - .addDependentResource(dependentResource) - .dependsOn(dependsOn) - .withDeletePostcondition(spec.getDeletePostCondition()) - .withReconcilePrecondition(spec.getReconcileCondition()) - .withReadyPostcondition(spec.getReadyCondition()); - }); - return workflowBuilder.build(); + final var alreadyCreated = new ArrayList(orderedResourceSpecs.size()); + final var nodes = orderedResourceSpecs.stream() + .map(spec -> createFrom(spec, alreadyCreated)) + .collect(Collectors.toSet()); + return new Workflow<>(nodes); } - @SuppressWarnings({"rawtypes"}) - public DependentResource createAndConfigureFrom(DependentResourceSpec spec, - KubernetesClient client) { - final var dependentResource = spec.getDependentResource(); - - if (dependentResource instanceof KubernetesClientAware) { - ((KubernetesClientAware) dependentResource).setKubernetesClient(client); - } - - spec.getUseEventSourceWithName() - .ifPresent(esName -> { - final var name = (String) esName; - if (dependentResource instanceof EventSourceReferencer) { - ((EventSourceReferencer) dependentResource).useEventSourceWithName(name); - } else { - throw new IllegalStateException( - "DependentResource " + spec + " wants to use EventSource named " + name - + " but doesn't implement support for this feature by implementing " - + EventSourceReferencer.class.getSimpleName()); - } - }); - return dependentResource; + private DependentResourceNode createFrom(DependentResourceSpec spec, + List alreadyCreated) { + final var node = new SpecDependentResourceNode<>(spec); + alreadyCreated.add(node); + spec.getDependsOn().forEach(depend -> { + final DependentResourceNode dependsOn = alreadyCreated.stream() + .filter(drn -> depend.equals(drn.getName())).findFirst() + .orElseThrow(); + node.addDependsOnRelation(dependsOn); + }); + return node; } /** - * * @param dependentResourceSpecs list of specs * @return top-bottom ordered resources that can be added safely to workflow * @throws OperatorException if there is a cycle in the dependencies - * */ public List orderAndDetectCycles( List dependentResourceSpecs) { @@ -137,6 +109,7 @@ public List orderAndDetectCycles( } private static class DRInfo { + private final DependentResourceSpec spec; private final List waitingForCompletion; @@ -157,8 +130,9 @@ String name() { private boolean isReadyForVisit(DependentResourceSpec dr, Set alreadyVisited, String alreadyPresentName) { for (var name : dr.getDependsOn()) { - if (name.equals(alreadyPresentName)) + if (name.equals(alreadyPresentName)) { continue; + } if (!alreadyVisited.contains(name)) { return false; } diff --git a/operator-framework-core/src/main/java/io/javaoperatorsdk/operator/processing/dependent/workflow/NodeExecutor.java b/operator-framework-core/src/main/java/io/javaoperatorsdk/operator/processing/dependent/workflow/NodeExecutor.java new file mode 100644 index 0000000000..393e3d2e2e --- /dev/null +++ b/operator-framework-core/src/main/java/io/javaoperatorsdk/operator/processing/dependent/workflow/NodeExecutor.java @@ -0,0 +1,33 @@ +package io.javaoperatorsdk.operator.processing.dependent.workflow; + +import io.fabric8.kubernetes.api.model.HasMetadata; +import io.javaoperatorsdk.operator.api.reconciler.dependent.DependentResource; + +abstract class NodeExecutor implements Runnable { + + private final DependentResourceNode dependentResourceNode; + private final AbstractWorkflowExecutor

workflowExecutor; + + protected NodeExecutor(DependentResourceNode dependentResourceNode, + AbstractWorkflowExecutor

workflowExecutor) { + this.dependentResourceNode = dependentResourceNode; + this.workflowExecutor = workflowExecutor; + } + + @Override + public void run() { + try { + var dependentResource = workflowExecutor.getDependentResourceFor(dependentResourceNode); + + doRun(dependentResourceNode, dependentResource); + + } catch (RuntimeException e) { + workflowExecutor.handleExceptionInExecutor(dependentResourceNode, e); + } finally { + workflowExecutor.handleNodeExecutionFinish(dependentResourceNode); + } + } + + protected abstract void doRun(DependentResourceNode dependentResourceNode, + DependentResource dependentResource); +} diff --git a/operator-framework-core/src/main/java/io/javaoperatorsdk/operator/processing/dependent/workflow/SpecDependentResourceNode.java b/operator-framework-core/src/main/java/io/javaoperatorsdk/operator/processing/dependent/workflow/SpecDependentResourceNode.java new file mode 100644 index 0000000000..ae8ed282c6 --- /dev/null +++ b/operator-framework-core/src/main/java/io/javaoperatorsdk/operator/processing/dependent/workflow/SpecDependentResourceNode.java @@ -0,0 +1,50 @@ +package io.javaoperatorsdk.operator.processing.dependent.workflow; + +import java.util.List; + +import io.fabric8.kubernetes.api.model.HasMetadata; +import io.fabric8.kubernetes.client.KubernetesClient; +import io.javaoperatorsdk.operator.api.config.dependent.DependentResourceSpec; +import io.javaoperatorsdk.operator.api.reconciler.dependent.DependentResource; +import io.javaoperatorsdk.operator.api.reconciler.dependent.EventSourceReferencer; +import io.javaoperatorsdk.operator.api.reconciler.dependent.managed.KubernetesClientAware; + +class SpecDependentResourceNode + extends AbstractDependentResourceNode { + @SuppressWarnings("unchecked") + public SpecDependentResourceNode(DependentResourceSpec spec) { + super(spec.getName()); + setReadyPostcondition(spec.getReadyCondition()); + setDeletePostcondition(spec.getDeletePostCondition()); + setReconcilePrecondition(spec.getReconcileCondition()); + } + + @Override + @SuppressWarnings({"rawtypes", "unchecked"}) + public void resolve(KubernetesClient client, List dependentResources) { + final var spec = dependentResources.stream() + .filter(drs -> drs.getName().equals(getName())) + .findFirst().orElseThrow(); + + final DependentResource dependentResource = spec.getDependentResource(); + + if (dependentResource instanceof KubernetesClientAware) { + ((KubernetesClientAware) dependentResource).setKubernetesClient(client); + } + + spec.getUseEventSourceWithName() + .ifPresent(esName -> { + final var name = (String) esName; + if (dependentResource instanceof EventSourceReferencer) { + ((EventSourceReferencer) dependentResource).useEventSourceWithName(name); + } else { + throw new IllegalStateException( + "DependentResource " + spec + " wants to use EventSource named " + name + + " but doesn't implement support for this feature by implementing " + + EventSourceReferencer.class.getSimpleName()); + } + }); + + setDependentResource(dependentResource); + } +} diff --git a/operator-framework-core/src/main/java/io/javaoperatorsdk/operator/processing/dependent/workflow/Workflow.java b/operator-framework-core/src/main/java/io/javaoperatorsdk/operator/processing/dependent/workflow/Workflow.java index f955b83f89..07163ab8c5 100644 --- a/operator-framework-core/src/main/java/io/javaoperatorsdk/operator/processing/dependent/workflow/Workflow.java +++ b/operator-framework-core/src/main/java/io/javaoperatorsdk/operator/processing/dependent/workflow/Workflow.java @@ -1,13 +1,18 @@ package io.javaoperatorsdk.operator.processing.dependent.workflow; +import java.util.ArrayList; import java.util.HashSet; +import java.util.List; +import java.util.Map; import java.util.Set; import java.util.concurrent.ExecutorService; -import java.util.concurrent.Executors; +import java.util.function.Function; import java.util.stream.Collectors; import io.fabric8.kubernetes.api.model.HasMetadata; +import io.fabric8.kubernetes.client.KubernetesClient; import io.javaoperatorsdk.operator.api.config.ExecutorServiceManager; +import io.javaoperatorsdk.operator.api.config.dependent.DependentResourceSpec; import io.javaoperatorsdk.operator.api.reconciler.Context; import io.javaoperatorsdk.operator.api.reconciler.dependent.DependentResource; @@ -21,34 +26,52 @@ public class Workflow

{ public static final boolean THROW_EXCEPTION_AUTOMATICALLY_DEFAULT = true; - private final Set dependentResourceNodes; + private final Map dependentResourceNodes; private final Set topLevelResources = new HashSet<>(); private final Set bottomLevelResource = new HashSet<>(); private final boolean throwExceptionAutomatically; // it's "global" executor service shared between multiple reconciliations running parallel - private ExecutorService executorService; + private final ExecutorService executorService; + private boolean resolved; + private boolean hasCleaner; - public Workflow(Set dependentResourceNodes) { - this.executorService = ExecutorServiceManager.instance().workflowExecutorService(); - this.dependentResourceNodes = dependentResourceNodes; - this.throwExceptionAutomatically = THROW_EXCEPTION_AUTOMATICALLY_DEFAULT; - preprocessForReconcile(); + Workflow(Set dependentResourceNodes) { + this(dependentResourceNodes, ExecutorServiceManager.instance().workflowExecutorService(), + THROW_EXCEPTION_AUTOMATICALLY_DEFAULT, false, false); } - public Workflow(Set dependentResourceNodes, - ExecutorService executorService, boolean throwExceptionAutomatically) { + Workflow(Set dependentResourceNodes, + ExecutorService executorService, boolean throwExceptionAutomatically, boolean resolved, + boolean hasCleaner) { this.executorService = executorService; - this.dependentResourceNodes = dependentResourceNodes; + this.dependentResourceNodes = dependentResourceNodes.stream() + .collect(Collectors.toMap(DependentResourceNode::getName, Function.identity())); this.throwExceptionAutomatically = throwExceptionAutomatically; + this.resolved = resolved; + this.hasCleaner = hasCleaner; preprocessForReconcile(); } - public Workflow(Set dependentResourceNodes, int globalParallelism) { - this(dependentResourceNodes, Executors.newFixedThreadPool(globalParallelism), true); + public DependentResource getDependentResourceFor(DependentResourceNode node) { + throwIfUnresolved(); + return dependentResource(node); + } + + private DependentResource dependentResource(DependentResourceNode node) { + return ((AbstractDependentResourceNode) dependentResourceNodes.get(node.getName())) + .getDependentResource(); + } + + private void throwIfUnresolved() { + if (!resolved) { + throw new IllegalStateException( + "Should call resolved before trying to access DependentResources"); + } } public WorkflowReconcileResult reconcile(P primary, Context

context) { + throwIfUnresolved(); WorkflowReconcileExecutor

workflowReconcileExecutor = new WorkflowReconcileExecutor<>(this, primary, context); var result = workflowReconcileExecutor.reconcile(); @@ -59,6 +82,7 @@ public WorkflowReconcileResult reconcile(P primary, Context

context) { } public WorkflowCleanupResult cleanup(P primary, Context

context) { + throwIfUnresolved(); WorkflowCleanupExecutor

workflowCleanupExecutor = new WorkflowCleanupExecutor<>(this, primary, context); var result = workflowCleanupExecutor.cleanup(); @@ -69,9 +93,11 @@ public WorkflowCleanupResult cleanup(P primary, Context

context) { } // add cycle detection? + @SuppressWarnings("unchecked") private void preprocessForReconcile() { - bottomLevelResource.addAll(dependentResourceNodes); - for (DependentResourceNode node : dependentResourceNodes) { + final var nodes = new ArrayList<>(dependentResourceNodes.values()); + bottomLevelResource.addAll(nodes); + for (DependentResourceNode node : nodes) { if (node.getDependsOn().isEmpty()) { topLevelResources.add(node); } else { @@ -82,14 +108,6 @@ private void preprocessForReconcile() { } } - public boolean isThrowExceptionAutomatically() { - return throwExceptionAutomatically; - } - - public void setExecutorService(ExecutorService executorService) { - this.executorService = executorService; - } - Set getTopLevelDependentResources() { return topLevelResources; } @@ -102,8 +120,29 @@ ExecutorService getExecutorService() { return executorService; } - public Set getDependentResources() { - return dependentResourceNodes.stream().map(DependentResourceNode::getDependentResource) - .collect(Collectors.toSet()); + Map nodes() { + return dependentResourceNodes; + } + + @SuppressWarnings("unchecked") + void resolve(KubernetesClient client, List dependentResources) { + if (!resolved) { + final boolean[] cleanerHolder = {false}; + dependentResourceNodes.values() + .forEach(drn -> { + drn.resolve(client, dependentResources); + final var dr = dependentResource(drn); + if (dr.isDeletable()) { + cleanerHolder[0] = true; + } + }); + resolved = true; + hasCleaner = cleanerHolder[0]; + } + } + + boolean hasCleaner() { + throwIfUnresolved(); + return hasCleaner; } } diff --git a/operator-framework-core/src/main/java/io/javaoperatorsdk/operator/processing/dependent/workflow/builder/WorkflowBuilder.java b/operator-framework-core/src/main/java/io/javaoperatorsdk/operator/processing/dependent/workflow/WorkflowBuilder.java similarity index 65% rename from operator-framework-core/src/main/java/io/javaoperatorsdk/operator/processing/dependent/workflow/builder/WorkflowBuilder.java rename to operator-framework-core/src/main/java/io/javaoperatorsdk/operator/processing/dependent/workflow/WorkflowBuilder.java index a9dd3f121d..9f24ff3bd2 100644 --- a/operator-framework-core/src/main/java/io/javaoperatorsdk/operator/processing/dependent/workflow/builder/WorkflowBuilder.java +++ b/operator-framework-core/src/main/java/io/javaoperatorsdk/operator/processing/dependent/workflow/WorkflowBuilder.java @@ -1,30 +1,33 @@ -package io.javaoperatorsdk.operator.processing.dependent.workflow.builder; +package io.javaoperatorsdk.operator.processing.dependent.workflow; import java.util.Arrays; +import java.util.HashMap; import java.util.HashSet; +import java.util.Map; import java.util.Set; import java.util.concurrent.ExecutorService; +import java.util.concurrent.Executors; import io.fabric8.kubernetes.api.model.HasMetadata; import io.javaoperatorsdk.operator.api.config.ExecutorServiceManager; import io.javaoperatorsdk.operator.api.reconciler.dependent.DependentResource; -import io.javaoperatorsdk.operator.processing.dependent.workflow.Condition; -import io.javaoperatorsdk.operator.processing.dependent.workflow.DependentResourceNode; -import io.javaoperatorsdk.operator.processing.dependent.workflow.Workflow; import static io.javaoperatorsdk.operator.processing.dependent.workflow.Workflow.THROW_EXCEPTION_AUTOMATICALLY_DEFAULT; @SuppressWarnings({"rawtypes", "unchecked"}) public class WorkflowBuilder

{ - private final Set> dependentResourceNodes = new HashSet<>(); + private final Map> dependentResourceNodes = + new HashMap<>(); private boolean throwExceptionAutomatically = THROW_EXCEPTION_AUTOMATICALLY_DEFAULT; - - private DependentResourceNode currentNode; + private DefaultDependentResourceNode currentNode; + private boolean isCleaner = false; public WorkflowBuilder

addDependentResource(DependentResource dependentResource) { - currentNode = new DependentResourceNode<>(dependentResource); - dependentResourceNodes.add(currentNode); + currentNode = new DefaultDependentResourceNode<>(dependentResource); + isCleaner = dependentResource.isDeletable(); + final var name = currentNode.getName(); + dependentResourceNodes.put(name, currentNode); return this; } @@ -59,10 +62,17 @@ public WorkflowBuilder

withDeletePostcondition(Condition deletePostcondition) } DependentResourceNode getNodeByDependentResource(DependentResource dependentResource) { - return dependentResourceNodes.stream() - .filter(dr -> dr.getDependentResource() == dependentResource) - .findFirst() - .orElseThrow(); + // first check by name + final var node = + dependentResourceNodes.get(DefaultDependentResourceNode.getNameFor(dependentResource)); + if (node != null) { + return node; + } else { + return dependentResourceNodes.values().stream() + .filter(dr -> dr.getDependentResource() == dependentResource) + .findFirst() + .orElseThrow(); + } } public boolean isThrowExceptionAutomatically() { @@ -75,16 +85,16 @@ public WorkflowBuilder

withThrowExceptionFurther(boolean throwExceptionFurthe } public Workflow

build() { - return new Workflow( - dependentResourceNodes, ExecutorServiceManager.instance().workflowExecutorService(), - throwExceptionAutomatically); + return build(ExecutorServiceManager.instance().workflowExecutorService()); } public Workflow

build(int parallelism) { - return new Workflow(dependentResourceNodes, parallelism); + return build(Executors.newFixedThreadPool(parallelism)); } public Workflow

build(ExecutorService executorService) { - return new Workflow(dependentResourceNodes, executorService, throwExceptionAutomatically); + // workflow has been built from dependent resources so it is already resolved + return new Workflow(new HashSet<>(dependentResourceNodes.values()), executorService, + throwExceptionAutomatically, true, isCleaner); } } diff --git a/operator-framework-core/src/main/java/io/javaoperatorsdk/operator/processing/dependent/workflow/WorkflowCleanupExecutor.java b/operator-framework-core/src/main/java/io/javaoperatorsdk/operator/processing/dependent/workflow/WorkflowCleanupExecutor.java index b1dcfb938c..12656c2a8a 100644 --- a/operator-framework-core/src/main/java/io/javaoperatorsdk/operator/processing/dependent/workflow/WorkflowCleanupExecutor.java +++ b/operator-framework-core/src/main/java/io/javaoperatorsdk/operator/processing/dependent/workflow/WorkflowCleanupExecutor.java @@ -1,9 +1,6 @@ package io.javaoperatorsdk.operator.processing.dependent.workflow; import java.util.List; -import java.util.Map; -import java.util.Map.Entry; -import java.util.Optional; import java.util.Set; import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.Future; @@ -15,55 +12,32 @@ import io.fabric8.kubernetes.api.model.HasMetadata; import io.javaoperatorsdk.operator.api.reconciler.Context; import io.javaoperatorsdk.operator.api.reconciler.dependent.Deleter; -import io.javaoperatorsdk.operator.api.reconciler.dependent.GarbageCollected; +import io.javaoperatorsdk.operator.api.reconciler.dependent.DependentResource; @SuppressWarnings("rawtypes") -public class WorkflowCleanupExecutor

{ +public class WorkflowCleanupExecutor

extends AbstractWorkflowExecutor

{ private static final Logger log = LoggerFactory.getLogger(WorkflowCleanupExecutor.class); - private final Map> actualExecutions = - new ConcurrentHashMap<>(); - private final Map exceptionsDuringExecution = - new ConcurrentHashMap<>(); - private final Set alreadyVisited = ConcurrentHashMap.newKeySet(); private final Set postDeleteConditionNotMet = ConcurrentHashMap.newKeySet(); private final Set deleteCalled = ConcurrentHashMap.newKeySet(); - private final Workflow

workflow; - private final P primary; - private final Context

context; - public WorkflowCleanupExecutor(Workflow

workflow, P primary, Context

context) { - this.workflow = workflow; - this.primary = primary; - this.context = context; + super(workflow, primary, context); } public synchronized WorkflowCleanupResult cleanup() { - for (DependentResourceNode dependentResourceNode : workflow - .getBottomLevelResource()) { + for (DependentResourceNode dependentResourceNode : workflow.getBottomLevelResource()) { handleCleanup(dependentResourceNode); } - while (true) { - try { - this.wait(); - if (noMoreExecutionsScheduled()) { - break; - } else { - log.warn("Notified but still resources under execution. This should not happen."); - } - } catch (InterruptedException e) { - log.warn("Thread interrupted", e); - Thread.currentThread().interrupt(); - } - } + waitForScheduledExecutionsToRun(); return createCleanupResult(); } - private synchronized boolean noMoreExecutionsScheduled() { - return actualExecutions.isEmpty(); + @Override + protected Logger logger() { + return log; } @SuppressWarnings({"rawtypes", "unchecked"}) @@ -71,62 +45,49 @@ private synchronized void handleCleanup(DependentResourceNode dependentResourceN log.debug("Submitting for cleanup: {}", dependentResourceNode); if (alreadyVisited(dependentResourceNode) - || isCleaningNow(dependentResourceNode) + || isExecutingNow(dependentResourceNode) || !allDependentsCleaned(dependentResourceNode) || hasErroredDependent(dependentResourceNode)) { log.debug("Skipping submit of: {}, ", dependentResourceNode); return; } - Future nodeFuture = - workflow.getExecutorService().submit(new NodeExecutor(dependentResourceNode)); - actualExecutions.put(dependentResourceNode, nodeFuture); + Future nodeFuture = workflow.getExecutorService() + .submit(new CleanupExecutor<>(dependentResourceNode)); + markAsExecuting(dependentResourceNode, nodeFuture); log.debug("Submitted for cleanup: {}", dependentResourceNode); } - private class NodeExecutor implements Runnable { - private final DependentResourceNode dependentResourceNode; + private class CleanupExecutor extends NodeExecutor { - private NodeExecutor(DependentResourceNode dependentResourceNode) { - this.dependentResourceNode = dependentResourceNode; + private CleanupExecutor(DependentResourceNode drn) { + super(drn, WorkflowCleanupExecutor.this); } @Override @SuppressWarnings("unchecked") - public void run() { - try { - var dependentResource = dependentResourceNode.getDependentResource(); - Optional> deletePostCondition = - dependentResourceNode.getDeletePostcondition(); - - if (dependentResource instanceof Deleter - && !(dependentResource instanceof GarbageCollected)) { - ((Deleter

) dependentResourceNode.getDependentResource()).delete(primary, context); - deleteCalled.add(dependentResourceNode); - } - boolean deletePostConditionMet = deletePostCondition - .map(c -> c.isMet(primary, dependentResourceNode.getSecondaryResource(primary, context), - context)) - .orElse(true); - if (deletePostConditionMet) { - alreadyVisited.add(dependentResourceNode); - handleDependentCleaned(dependentResourceNode); - } else { - // updating alreadyVisited needs to be the last operation otherwise could lead to a race - // condition in handleCleanup condition checks - postDeleteConditionNotMet.add(dependentResourceNode); - alreadyVisited.add(dependentResourceNode); - } - } catch (RuntimeException e) { - handleExceptionInExecutor(dependentResourceNode, e); - } finally { - handleNodeExecutionFinish(dependentResourceNode); + protected void doRun(DependentResourceNode dependentResourceNode, + DependentResource dependentResource) { + var deletePostCondition = dependentResourceNode.getDeletePostcondition(); + + if (dependentResource.isDeletable()) { + ((Deleter

) dependentResource).delete(primary, context); + deleteCalled.add(dependentResourceNode); + } + boolean deletePostConditionMet = isConditionMet(deletePostCondition, dependentResource); + if (deletePostConditionMet) { + markAsVisited(dependentResourceNode); + handleDependentCleaned(dependentResourceNode); + } else { + // updating alreadyVisited needs to be the last operation otherwise could lead to a race + // condition in handleCleanup condition checks + postDeleteConditionNotMet.add(dependentResourceNode); + markAsVisited(dependentResourceNode); } } } - private synchronized void handleDependentCleaned( DependentResourceNode dependentResourceNode) { var dependOns = dependentResourceNode.getDependsOn(); @@ -138,29 +99,6 @@ private synchronized void handleDependentCleaned( } } - private synchronized void handleExceptionInExecutor( - DependentResourceNode dependentResourceNode, - RuntimeException e) { - exceptionsDuringExecution.put(dependentResourceNode, e); - } - - private synchronized void handleNodeExecutionFinish( - DependentResourceNode dependentResourceNode) { - log.debug("Finished execution for: {}", dependentResourceNode); - actualExecutions.remove(dependentResourceNode); - if (actualExecutions.isEmpty()) { - this.notifyAll(); - } - } - - private boolean isCleaningNow(DependentResourceNode dependentResourceNode) { - return actualExecutions.containsKey(dependentResourceNode); - } - - private boolean alreadyVisited(DependentResourceNode dependentResourceNode) { - return alreadyVisited.contains(dependentResourceNode); - } - @SuppressWarnings("unchecked") private boolean allDependentsCleaned(DependentResourceNode dependentResourceNode) { List parents = dependentResourceNode.getParents(); @@ -172,19 +110,17 @@ private boolean allDependentsCleaned(DependentResourceNode dependentResourceNode @SuppressWarnings("unchecked") private boolean hasErroredDependent(DependentResourceNode dependentResourceNode) { List parents = dependentResourceNode.getParents(); - return !parents.isEmpty() - && parents.stream().anyMatch(exceptionsDuringExecution::containsKey); + return !parents.isEmpty() && parents.stream().anyMatch(this::isInError); } private WorkflowCleanupResult createCleanupResult() { - final var erroredDependents = exceptionsDuringExecution.entrySet().stream() - .collect(Collectors.toMap(e -> e.getKey().getDependentResource(), Entry::getValue)); + final var erroredDependents = getErroredDependents(); final var postConditionNotMet = postDeleteConditionNotMet.stream() - .map(DependentResourceNode::getDependentResource) + .map(workflow::getDependentResourceFor) + .collect(Collectors.toList()); + final var deleteCalled = this.deleteCalled.stream() + .map(workflow::getDependentResourceFor) .collect(Collectors.toList()); - final var deleteCalled = - this.deleteCalled.stream().map(DependentResourceNode::getDependentResource) - .collect(Collectors.toList()); return new WorkflowCleanupResult(erroredDependents, postConditionNotMet, deleteCalled); } } diff --git a/operator-framework-core/src/main/java/io/javaoperatorsdk/operator/processing/dependent/workflow/WorkflowReconcileExecutor.java b/operator-framework-core/src/main/java/io/javaoperatorsdk/operator/processing/dependent/workflow/WorkflowReconcileExecutor.java index 1168d523a9..98abe41977 100644 --- a/operator-framework-core/src/main/java/io/javaoperatorsdk/operator/processing/dependent/workflow/WorkflowReconcileExecutor.java +++ b/operator-framework-core/src/main/java/io/javaoperatorsdk/operator/processing/dependent/workflow/WorkflowReconcileExecutor.java @@ -1,6 +1,5 @@ package io.javaoperatorsdk.operator.processing.dependent.workflow; -import java.util.HashMap; import java.util.HashSet; import java.util.Map; import java.util.Set; @@ -15,24 +14,16 @@ import io.javaoperatorsdk.operator.api.reconciler.Context; import io.javaoperatorsdk.operator.api.reconciler.dependent.Deleter; import io.javaoperatorsdk.operator.api.reconciler.dependent.DependentResource; -import io.javaoperatorsdk.operator.api.reconciler.dependent.GarbageCollected; import io.javaoperatorsdk.operator.api.reconciler.dependent.ReconcileResult; import io.javaoperatorsdk.operator.processing.event.ResourceID; @SuppressWarnings({"rawtypes", "unchecked"}) -public class WorkflowReconcileExecutor

{ +public class WorkflowReconcileExecutor

extends AbstractWorkflowExecutor

{ private static final Logger log = LoggerFactory.getLogger(WorkflowReconcileExecutor.class); - private final Workflow

workflow; - /** Covers both deleted and reconciled */ - private final Set alreadyVisited = ConcurrentHashMap.newKeySet(); private final Set notReady = ConcurrentHashMap.newKeySet(); - private final Map> actualExecutions = - new HashMap<>(); - private final Map exceptionsDuringExecution = - new ConcurrentHashMap<>(); private final Set markedForDelete = ConcurrentHashMap.newKeySet(); private final Set deletePostConditionNotMet = @@ -42,41 +33,28 @@ public class WorkflowReconcileExecutor

{ private final Map reconcileResults = new ConcurrentHashMap<>(); - private final P primary; - private final Context

context; - public WorkflowReconcileExecutor(Workflow

workflow, P primary, Context

context) { - this.primary = primary; - this.context = context; - this.workflow = workflow; + super(workflow, primary, context); } public synchronized WorkflowReconcileResult reconcile() { - for (DependentResourceNode dependentResourceNode : workflow - .getTopLevelDependentResources()) { + for (DependentResourceNode dependentResourceNode : workflow.getTopLevelDependentResources()) { handleReconcile(dependentResourceNode); } - while (true) { - try { - this.wait(); - if (noMoreExecutionsScheduled()) { - break; - } else { - log.warn("Notified but still resources under execution. This should not happen."); - } - } catch (InterruptedException e) { - log.warn("Thread interrupted", e); - Thread.currentThread().interrupt(); - } - } + waitForScheduledExecutionsToRun(); return createReconcileResult(); } + @Override + protected Logger logger() { + return log; + } + private synchronized void handleReconcile(DependentResourceNode dependentResourceNode) { log.debug("Submitting for reconcile: {}", dependentResourceNode); if (alreadyVisited(dependentResourceNode) - || isReconcilingNow(dependentResourceNode) + || isExecutingNow(dependentResourceNode) || !allParentsReconciledAndReady(dependentResourceNode) || markedForDelete.contains(dependentResourceNode) || hasErroredParent(dependentResourceNode)) { @@ -84,17 +62,14 @@ private synchronized void handleReconcile(DependentResourceNode depend return; } - boolean reconcileConditionMet = dependentResourceNode.getReconcilePrecondition() - .map(rc -> rc.isMet(primary, dependentResourceNode.getSecondaryResource(primary, context), - context)) - .orElse(true); - + boolean reconcileConditionMet = isConditionMet(dependentResourceNode.getReconcilePrecondition(), + getDependentResourceFor(dependentResourceNode)); if (!reconcileConditionMet) { handleReconcileConditionNotMet(dependentResourceNode); } else { Future nodeFuture = workflow.getExecutorService() .submit(new NodeReconcileExecutor(dependentResourceNode)); - actualExecutions.put(dependentResourceNode, nodeFuture); + markAsExecuting(dependentResourceNode, nodeFuture); log.debug("Submitted to reconcile: {}", dependentResourceNode); } } @@ -103,7 +78,7 @@ private synchronized void handleDelete(DependentResourceNode dependentResourceNo log.debug("Submitting for delete: {}", dependentResourceNode); if (alreadyVisited(dependentResourceNode) - || isReconcilingNow(dependentResourceNode) + || isExecutingNow(dependentResourceNode) || !markedForDelete.contains(dependentResourceNode) || !allDependentsDeletedAlready(dependentResourceNode)) { log.debug("Skipping submit for delete of: {}, ", dependentResourceNode); @@ -112,120 +87,79 @@ private synchronized void handleDelete(DependentResourceNode dependentResourceNo Future nodeFuture = workflow.getExecutorService() .submit(new NodeDeleteExecutor(dependentResourceNode)); - actualExecutions.put(dependentResourceNode, nodeFuture); + markAsExecuting(dependentResourceNode, nodeFuture); log.debug("Submitted to delete: {}", dependentResourceNode); } private boolean allDependentsDeletedAlready(DependentResourceNode dependentResourceNode) { var dependents = dependentResourceNode.getParents(); - return dependents.stream().allMatch(d -> alreadyVisited.contains(d) && !notReady.contains(d) - && !exceptionsDuringExecution.containsKey(d) && !deletePostConditionNotMet.contains(d)); - } - - - private synchronized void handleExceptionInExecutor(DependentResourceNode dependentResourceNode, - RuntimeException e) { - exceptionsDuringExecution.put(dependentResourceNode, e); - } - - private synchronized void handleNodeExecutionFinish(DependentResourceNode dependentResourceNode) { - log.debug("Finished execution for: {}", dependentResourceNode); - actualExecutions.remove(dependentResourceNode); - if (actualExecutions.isEmpty()) { - this.notifyAll(); - } + return dependents.stream().allMatch(d -> alreadyVisited(d) && !notReady.contains(d) + && !isInError(d) && !deletePostConditionNotMet.contains(d)); } // needs to be in one step private synchronized void setAlreadyReconciledButNotReady( DependentResourceNode dependentResourceNode) { log.debug("Setting already reconciled but not ready for: {}", dependentResourceNode); - alreadyVisited.add(dependentResourceNode); + markAsVisited(dependentResourceNode); notReady.add(dependentResourceNode); } - private class NodeReconcileExecutor implements Runnable { - - private final DependentResourceNode dependentResourceNode; + private class NodeReconcileExecutor extends NodeExecutor { - private NodeReconcileExecutor(DependentResourceNode dependentResourceNode) { - this.dependentResourceNode = dependentResourceNode; + private NodeReconcileExecutor(DependentResourceNode dependentResourceNode) { + super(dependentResourceNode, WorkflowReconcileExecutor.this); } @Override - @SuppressWarnings("unchecked") - public void run() { - try { - DependentResource dependentResource = dependentResourceNode.getDependentResource(); - if (log.isDebugEnabled()) { - log.debug( - "Reconciling {} for primary: {}", - dependentResourceNode, - ResourceID.fromResource(primary)); - } - ReconcileResult reconcileResult = dependentResource.reconcile(primary, context); - reconcileResults.put(dependentResource, reconcileResult); - reconciled.add(dependentResourceNode); - - boolean ready = dependentResourceNode.getReadyPostcondition() - .map(rc -> rc.isMet(primary, - dependentResourceNode.getDependentResource().getSecondaryResource(primary, context) - .orElse(null), - context)) - .orElse(true); - - if (ready) { - log.debug("Setting already reconciled for: {}", dependentResourceNode); - alreadyVisited.add(dependentResourceNode); - handleDependentsReconcile(dependentResourceNode); - } else { - setAlreadyReconciledButNotReady(dependentResourceNode); - } - } catch (RuntimeException e) { - handleExceptionInExecutor(dependentResourceNode, e); - } finally { - handleNodeExecutionFinish(dependentResourceNode); + protected void doRun(DependentResourceNode dependentResourceNode, + DependentResource dependentResource) { + if (log.isDebugEnabled()) { + log.debug( + "Reconciling {} for primary: {}", + dependentResourceNode, + ResourceID.fromResource(primary)); + } + ReconcileResult reconcileResult = dependentResource.reconcile(primary, context); + reconcileResults.put(dependentResource, reconcileResult); + reconciled.add(dependentResourceNode); + + boolean ready = isConditionMet(dependentResourceNode.getReadyPostcondition(), + dependentResource); + if (ready) { + log.debug("Setting already reconciled for: {}", dependentResourceNode); + markAsVisited(dependentResourceNode); + handleDependentsReconcile(dependentResourceNode); + } else { + setAlreadyReconciledButNotReady(dependentResourceNode); } } } - private class NodeDeleteExecutor implements Runnable { - - private final DependentResourceNode dependentResourceNode; + private class NodeDeleteExecutor extends NodeExecutor { private NodeDeleteExecutor(DependentResourceNode dependentResourceNode) { - this.dependentResourceNode = dependentResourceNode; + super(dependentResourceNode, WorkflowReconcileExecutor.this); } @Override @SuppressWarnings("unchecked") - public void run() { - try { - DependentResource dependentResource = dependentResourceNode.getDependentResource(); - var deletePostCondition = dependentResourceNode.getDeletePostcondition(); - - if (dependentResource instanceof Deleter - && !(dependentResource instanceof GarbageCollected)) { - ((Deleter

) dependentResourceNode.getDependentResource()).delete(primary, context); - } - boolean deletePostConditionMet = - deletePostCondition.map(c -> c.isMet(primary, - dependentResourceNode.getDependentResource().getSecondaryResource(primary, context) - .orElse(null), - context)).orElse(true); - if (deletePostConditionMet) { - alreadyVisited.add(dependentResourceNode); - handleDependentDeleted(dependentResourceNode); - } else { - // updating alreadyVisited needs to be the last operation otherwise could lead to a race - // condition in handleDelete condition checks - deletePostConditionNotMet.add(dependentResourceNode); - alreadyVisited.add(dependentResourceNode); - } - } catch (RuntimeException e) { - handleExceptionInExecutor(dependentResourceNode, e); - } finally { - handleNodeExecutionFinish(dependentResourceNode); + protected void doRun(DependentResourceNode dependentResourceNode, + DependentResource dependentResource) { + var deletePostCondition = dependentResourceNode.getDeletePostcondition(); + + if (dependentResource.isDeletable()) { + ((Deleter

) dependentResource).delete(primary, context); + } + boolean deletePostConditionMet = isConditionMet(deletePostCondition, dependentResource); + if (deletePostConditionMet) { + markAsVisited(dependentResourceNode); + handleDependentDeleted(dependentResourceNode); + } else { + // updating alreadyVisited needs to be the last operation otherwise could lead to a race + // condition in handleDelete condition checks + deletePostConditionNotMet.add(dependentResourceNode); + markAsVisited(dependentResourceNode); } } } @@ -238,10 +172,6 @@ private synchronized void handleDependentDeleted( }); } - private boolean isReconcilingNow(DependentResourceNode dependentResourceNode) { - return actualExecutions.containsKey(dependentResourceNode); - } - private synchronized void handleDependentsReconcile( DependentResourceNode dependentResourceNode) { var dependents = dependentResourceNode.getParents(); @@ -251,15 +181,6 @@ private synchronized void handleDependentsReconcile( }); } - private boolean noMoreExecutionsScheduled() { - return actualExecutions.isEmpty(); - } - - private boolean alreadyVisited( - DependentResourceNode dependentResourceNode) { - return alreadyVisited.contains(dependentResourceNode); - } - private void handleReconcileConditionNotMet(DependentResourceNode dependentResourceNode) { Set bottomNodes = new HashSet<>(); @@ -278,30 +199,27 @@ private void markDependentsForDelete(DependentResourceNode dependentResour } } - private boolean allParentsReconciledAndReady( - DependentResourceNode dependentResourceNode) { + private boolean allParentsReconciledAndReady(DependentResourceNode dependentResourceNode) { return dependentResourceNode.getDependsOn().isEmpty() || dependentResourceNode.getDependsOn().stream() .allMatch(d -> alreadyVisited(d) && !notReady.contains(d)); } - private boolean hasErroredParent( - DependentResourceNode dependentResourceNode) { + private boolean hasErroredParent(DependentResourceNode dependentResourceNode) { return !dependentResourceNode.getDependsOn().isEmpty() && dependentResourceNode.getDependsOn().stream() - .anyMatch(exceptionsDuringExecution::containsKey); + .anyMatch(this::isInError); } private WorkflowReconcileResult createReconcileResult() { return new WorkflowReconcileResult( reconciled.stream() - .map(DependentResourceNode::getDependentResource).collect(Collectors.toList()), + .map(workflow::getDependentResourceFor) + .collect(Collectors.toList()), notReady.stream() - .map(DependentResourceNode::getDependentResource) + .map(workflow::getDependentResourceFor) .collect(Collectors.toList()), - exceptionsDuringExecution - .entrySet().stream() - .collect(Collectors.toMap(e -> e.getKey().getDependentResource(), Map.Entry::getValue)), + getErroredDependents(), reconcileResults); } diff --git a/operator-framework-core/src/test/java/io/javaoperatorsdk/operator/processing/dependent/workflow/AbstractWorkflowExecutorTest.java b/operator-framework-core/src/test/java/io/javaoperatorsdk/operator/processing/dependent/workflow/AbstractWorkflowExecutorTest.java index 9c10c06cc0..467820940d 100644 --- a/operator-framework-core/src/test/java/io/javaoperatorsdk/operator/processing/dependent/workflow/AbstractWorkflowExecutorTest.java +++ b/operator-framework-core/src/test/java/io/javaoperatorsdk/operator/processing/dependent/workflow/AbstractWorkflowExecutorTest.java @@ -4,11 +4,14 @@ import java.util.Collections; import java.util.List; +import io.fabric8.kubernetes.api.model.ConfigMap; +import io.fabric8.kubernetes.api.model.ConfigMapBuilder; import io.javaoperatorsdk.operator.api.reconciler.Context; import io.javaoperatorsdk.operator.api.reconciler.dependent.Deleter; import io.javaoperatorsdk.operator.api.reconciler.dependent.DependentResource; import io.javaoperatorsdk.operator.api.reconciler.dependent.GarbageCollected; import io.javaoperatorsdk.operator.api.reconciler.dependent.ReconcileResult; +import io.javaoperatorsdk.operator.processing.dependent.kubernetes.KubernetesDependentResource; import io.javaoperatorsdk.operator.sample.simple.TestCustomResource; public class AbstractWorkflowExecutorTest { @@ -28,24 +31,21 @@ public class AbstractWorkflowExecutorTest { protected List executionHistory = Collections.synchronizedList(new ArrayList<>()); - public class TestDependent implements DependentResource { + public class TestDependent extends KubernetesDependentResource { private final String name; public TestDependent(String name) { + super(ConfigMap.class); this.name = name; } @Override - public ReconcileResult reconcile(TestCustomResource primary, + public ReconcileResult reconcile(TestCustomResource primary, Context context) { executionHistory.add(new ReconcileRecord(this)); - return ReconcileResult.resourceCreated(VALUE); - } - - @Override - public Class resourceType() { - return String.class; + return ReconcileResult + .resourceCreated(new ConfigMapBuilder().addToBinaryData("key", VALUE).build()); } @Override diff --git a/operator-framework-core/src/test/java/io/javaoperatorsdk/operator/processing/dependent/workflow/ManagedWorkflowSupportTest.java b/operator-framework-core/src/test/java/io/javaoperatorsdk/operator/processing/dependent/workflow/ManagedWorkflowSupportTest.java index 85f04f220f..f4ef13d122 100644 --- a/operator-framework-core/src/test/java/io/javaoperatorsdk/operator/processing/dependent/workflow/ManagedWorkflowSupportTest.java +++ b/operator-framework-core/src/test/java/io/javaoperatorsdk/operator/processing/dependent/workflow/ManagedWorkflowSupportTest.java @@ -11,7 +11,6 @@ import io.fabric8.kubernetes.client.KubernetesClient; import io.javaoperatorsdk.operator.OperatorException; import io.javaoperatorsdk.operator.api.config.dependent.DependentResourceSpec; -import io.javaoperatorsdk.operator.api.reconciler.dependent.DependentResource; import static io.javaoperatorsdk.operator.processing.dependent.workflow.ManagedWorkflowTestUtils.createDRS; import static org.assertj.core.api.Assertions.assertThat; @@ -141,19 +140,18 @@ void createsWorkflow() { createDRS(NAME_3, NAME_1), createDRS(NAME_4, NAME_3, NAME_2)); - var drByName = specs - .stream().collect(Collectors.toMap(DependentResourceSpec::getName, - spec -> managedWorkflowSupport.createAndConfigureFrom(spec, - mock(KubernetesClient.class)))); + final var client = mock(KubernetesClient.class); + var workflow = managedWorkflowSupport.createWorkflow(specs); + workflow.resolve(client, specs); - var workflow = managedWorkflowSupport.createWorkflow(specs, drByName); - - assertThat(workflow.getDependentResources()).containsExactlyInAnyOrder(drByName.values() - .toArray(new DependentResource[0])); + assertThat(workflow.nodes().values()).map(DependentResourceNode::getName) + .containsExactlyInAnyOrder(NAME_1, NAME_2, NAME_3, NAME_4); assertThat(workflow.getTopLevelDependentResources()) - .map(DependentResourceNode::getDependentResource).containsExactly(drByName.get(NAME_1)); - assertThat(workflow.getBottomLevelResource()).map(DependentResourceNode::getDependentResource) - .containsExactly(drByName.get(NAME_4)); + .map(DependentResourceNode::getName) + .containsExactly(NAME_1); + assertThat(workflow.getBottomLevelResource()) + .map(DependentResourceNode::getName) + .containsExactly(NAME_4); } } diff --git a/operator-framework-core/src/test/java/io/javaoperatorsdk/operator/processing/dependent/workflow/ManagedWorkflowTest.java b/operator-framework-core/src/test/java/io/javaoperatorsdk/operator/processing/dependent/workflow/ManagedWorkflowTest.java index df556885b1..96a55aa85f 100644 --- a/operator-framework-core/src/test/java/io/javaoperatorsdk/operator/processing/dependent/workflow/ManagedWorkflowTest.java +++ b/operator-framework-core/src/test/java/io/javaoperatorsdk/operator/processing/dependent/workflow/ManagedWorkflowTest.java @@ -1,65 +1,60 @@ package io.javaoperatorsdk.operator.processing.dependent.workflow; import java.util.List; -import java.util.Set; import org.junit.jupiter.api.Test; import io.fabric8.kubernetes.client.KubernetesClient; +import io.javaoperatorsdk.operator.api.config.ConfigurationServiceProvider; +import io.javaoperatorsdk.operator.api.config.ControllerConfiguration; import io.javaoperatorsdk.operator.api.config.dependent.DependentResourceSpec; import io.javaoperatorsdk.operator.api.reconciler.dependent.Deleter; -import io.javaoperatorsdk.operator.api.reconciler.dependent.DependentResource; import io.javaoperatorsdk.operator.api.reconciler.dependent.GarbageCollected; import static io.javaoperatorsdk.operator.processing.dependent.workflow.ManagedWorkflowTestUtils.createDRS; +import static io.javaoperatorsdk.operator.processing.dependent.workflow.ManagedWorkflowTestUtils.createDRSWithTraits; import static org.assertj.core.api.Assertions.assertThat; -import static org.mockito.ArgumentMatchers.any; import static org.mockito.Mockito.mock; import static org.mockito.Mockito.when; -import static org.mockito.Mockito.withSettings; @SuppressWarnings({"rawtypes", "unchecked"}) class ManagedWorkflowTest { public static final String NAME = "name"; - ManagedWorkflowSupport managedWorkflowSupportMock = mock(ManagedWorkflowSupport.class); - KubernetesClient kubernetesClientMock = mock(KubernetesClient.class); - @Test void checksIfWorkflowEmpty() { - var mockWorkflow = mock(Workflow.class); - when(managedWorkflowSupportMock.createWorkflow(any(), any())).thenReturn(mockWorkflow); - when(managedWorkflowSupportMock.createAndConfigureFrom(any(), any())) - .thenReturn(mock(DependentResource.class)); assertThat(managedWorkflow().isEmptyWorkflow()).isTrue(); - - when(mockWorkflow.getDependentResources()).thenReturn(Set.of(mock(DependentResource.class))); assertThat(managedWorkflow(createDRS(NAME)).isEmptyWorkflow()).isFalse(); } @Test - void isCleanerIfAtLeastOneDRIsDeleterAndNoGC() { - var mockWorkflow = mock(Workflow.class); - when(managedWorkflowSupportMock.createWorkflow(any(), any())).thenReturn(mockWorkflow); - when(managedWorkflowSupportMock.createAndConfigureFrom(any(), any())) - .thenReturn(mock(DependentResource.class)); - when(mockWorkflow.getDependentResources()).thenReturn(Set.of(mock(DependentResource.class))); - + void isNotCleanerIfNoDeleter() { assertThat(managedWorkflow(createDRS(NAME)).isCleaner()).isFalse(); + } - when(mockWorkflow.getDependentResources()).thenReturn( - Set.of(mock(DependentResource.class, withSettings().extraInterfaces(Deleter.class)))); - assertThat(managedWorkflow(createDRS(NAME)).isCleaner()).isTrue(); + @Test + void isNotCleanerIfGarbageCollected() { + assertThat(managedWorkflow(createDRSWithTraits(NAME, GarbageCollected.class)) + .isCleaner()).isFalse(); + } - when(mockWorkflow.getDependentResources()).thenReturn(Set.of(mock(DependentResource.class, - withSettings().extraInterfaces(Deleter.class, GarbageCollected.class)))); - assertThat(managedWorkflow(createDRS(NAME)).isCleaner()).isFalse(); + @Test + void isCleanerIfHasDeleter() { + var spec = createDRSWithTraits(NAME, Deleter.class); + assertThat(managedWorkflow(spec).isCleaner()).isTrue(); } ManagedWorkflow managedWorkflow(DependentResourceSpec... specs) { - return new DefaultManagedWorkflow(kubernetesClientMock, List.of(specs), - managedWorkflowSupportMock); + final var configuration = mock(ControllerConfiguration.class); + final var specList = List.of(specs); + + KubernetesClient kubernetesClientMock = mock(KubernetesClient.class); + + when(configuration.getDependentResources()).thenReturn(specList); + return ConfigurationServiceProvider.instance().getWorkflowFactory() + .workflowFor(configuration) + .resolve(kubernetesClientMock, specList); } } diff --git a/operator-framework-core/src/test/java/io/javaoperatorsdk/operator/processing/dependent/workflow/ManagedWorkflowTestUtils.java b/operator-framework-core/src/test/java/io/javaoperatorsdk/operator/processing/dependent/workflow/ManagedWorkflowTestUtils.java index 85e4af05eb..52c6840b1c 100644 --- a/operator-framework-core/src/test/java/io/javaoperatorsdk/operator/processing/dependent/workflow/ManagedWorkflowTestUtils.java +++ b/operator-framework-core/src/test/java/io/javaoperatorsdk/operator/processing/dependent/workflow/ManagedWorkflowTestUtils.java @@ -1,10 +1,19 @@ package io.javaoperatorsdk.operator.processing.dependent.workflow; +import java.util.Arrays; import java.util.Set; +import org.mockito.Mockito; + import io.javaoperatorsdk.operator.api.config.dependent.DependentResourceSpec; +import io.javaoperatorsdk.operator.api.reconciler.dependent.DependentResource; +import io.javaoperatorsdk.operator.api.reconciler.dependent.GarbageCollected; import io.javaoperatorsdk.operator.processing.dependent.EmptyTestDependentResource; +import static org.mockito.Mockito.mock; +import static org.mockito.Mockito.when; +import static org.mockito.Mockito.withSettings; + @SuppressWarnings("rawtypes") public class ManagedWorkflowTestUtils { @@ -14,4 +23,22 @@ public static DependentResourceSpec createDRS(String name, String... dependOns) null, null, null, null); } + public static DependentResourceSpec createDRSWithTraits(String name, + Class... dependentResourceTraits) { + final var spy = Mockito.mock(DependentResourceSpec.class); + when(spy.getName()).thenReturn(name); + + Class toMock = DependentResource.class; + final var garbageCollected = dependentResourceTraits != null && + Arrays.asList(dependentResourceTraits).contains(GarbageCollected.class); + + final var dr = mock(toMock, withSettings().extraInterfaces(dependentResourceTraits)); + // it would be better to call the real method here but it doesn't work because + // KubernetesDependentResource checks for GarbageCollected trait when instantiated which doesn't + // happen when using mocks + when(dr.isDeletable()).thenReturn(!garbageCollected); + when(spy.getDependentResource()).thenReturn(dr); + return spy; + } + } diff --git a/operator-framework-core/src/test/java/io/javaoperatorsdk/operator/processing/dependent/workflow/WorkflowCleanupExecutorTest.java b/operator-framework-core/src/test/java/io/javaoperatorsdk/operator/processing/dependent/workflow/WorkflowCleanupExecutorTest.java index 56bd876687..9aa2fb07e9 100644 --- a/operator-framework-core/src/test/java/io/javaoperatorsdk/operator/processing/dependent/workflow/WorkflowCleanupExecutorTest.java +++ b/operator-framework-core/src/test/java/io/javaoperatorsdk/operator/processing/dependent/workflow/WorkflowCleanupExecutorTest.java @@ -5,7 +5,6 @@ import io.javaoperatorsdk.operator.AggregatedOperatorException; import io.javaoperatorsdk.operator.api.reconciler.Context; -import io.javaoperatorsdk.operator.processing.dependent.workflow.builder.WorkflowBuilder; import io.javaoperatorsdk.operator.sample.simple.TestCustomResource; import static io.javaoperatorsdk.operator.processing.dependent.workflow.ExecutionAssert.assertThat; @@ -17,6 +16,7 @@ class WorkflowCleanupExecutorTest extends AbstractWorkflowExecutorTest { protected TestDeleterDependent dd1 = new TestDeleterDependent("DR_DELETER_1"); protected TestDeleterDependent dd2 = new TestDeleterDependent("DR_DELETER_2"); protected TestDeleterDependent dd3 = new TestDeleterDependent("DR_DELETER_3"); + @SuppressWarnings("unchecked") Context mockContext = mock(Context.class); @Test diff --git a/operator-framework-core/src/test/java/io/javaoperatorsdk/operator/processing/dependent/workflow/WorkflowReconcileExecutorTest.java b/operator-framework-core/src/test/java/io/javaoperatorsdk/operator/processing/dependent/workflow/WorkflowReconcileExecutorTest.java index 873cf66cb6..a445dc783f 100644 --- a/operator-framework-core/src/test/java/io/javaoperatorsdk/operator/processing/dependent/workflow/WorkflowReconcileExecutorTest.java +++ b/operator-framework-core/src/test/java/io/javaoperatorsdk/operator/processing/dependent/workflow/WorkflowReconcileExecutorTest.java @@ -5,7 +5,6 @@ import io.javaoperatorsdk.operator.AggregatedOperatorException; import io.javaoperatorsdk.operator.api.reconciler.Context; -import io.javaoperatorsdk.operator.processing.dependent.workflow.builder.WorkflowBuilder; import io.javaoperatorsdk.operator.sample.simple.TestCustomResource; import static io.javaoperatorsdk.operator.processing.dependent.workflow.ExecutionAssert.assertThat; @@ -23,6 +22,7 @@ class WorkflowReconcileExecutorTest extends AbstractWorkflowExecutorTest { private final Condition notMetReadyCondition = (primary, secondary, context) -> false; + @SuppressWarnings("unchecked") Context mockContext = mock(Context.class); @Test diff --git a/operator-framework-core/src/test/java/io/javaoperatorsdk/operator/processing/dependent/workflow/WorkflowTest.java b/operator-framework-core/src/test/java/io/javaoperatorsdk/operator/processing/dependent/workflow/WorkflowTest.java index df12c8af54..7da98f66b2 100644 --- a/operator-framework-core/src/test/java/io/javaoperatorsdk/operator/processing/dependent/workflow/WorkflowTest.java +++ b/operator-framework-core/src/test/java/io/javaoperatorsdk/operator/processing/dependent/workflow/WorkflowTest.java @@ -6,7 +6,6 @@ import org.junit.jupiter.api.Test; import io.javaoperatorsdk.operator.api.reconciler.dependent.DependentResource; -import io.javaoperatorsdk.operator.processing.dependent.workflow.builder.WorkflowBuilder; import io.javaoperatorsdk.operator.sample.simple.TestCustomResource; import static org.assertj.core.api.Assertions.assertThat; @@ -29,7 +28,7 @@ void calculatesTopLevelResources() { Set topResources = workflow.getTopLevelDependentResources().stream() - .map(DependentResourceNode::getDependentResource) + .map(workflow::getDependentResourceFor) .collect(Collectors.toSet()); assertThat(topResources).containsExactlyInAnyOrder(dr1, independentDR); @@ -49,7 +48,7 @@ void calculatesBottomLevelResources() { Set bottomResources = workflow.getBottomLevelResource().stream() - .map(DependentResourceNode::getDependentResource) + .map(workflow::getDependentResourceFor) .collect(Collectors.toSet()); assertThat(bottomResources).containsExactlyInAnyOrder(dr2, independentDR); diff --git a/sample-operators/webpage/src/main/java/io/javaoperatorsdk/operator/sample/WebPageDependentsWorkflowReconciler.java b/sample-operators/webpage/src/main/java/io/javaoperatorsdk/operator/sample/WebPageDependentsWorkflowReconciler.java index 0eeba42083..287d27a5c3 100644 --- a/sample-operators/webpage/src/main/java/io/javaoperatorsdk/operator/sample/WebPageDependentsWorkflowReconciler.java +++ b/sample-operators/webpage/src/main/java/io/javaoperatorsdk/operator/sample/WebPageDependentsWorkflowReconciler.java @@ -19,7 +19,7 @@ import io.javaoperatorsdk.operator.processing.dependent.kubernetes.KubernetesDependentResource; import io.javaoperatorsdk.operator.processing.dependent.kubernetes.KubernetesDependentResourceConfig; import io.javaoperatorsdk.operator.processing.dependent.workflow.Workflow; -import io.javaoperatorsdk.operator.processing.dependent.workflow.builder.WorkflowBuilder; +import io.javaoperatorsdk.operator.processing.dependent.workflow.WorkflowBuilder; import io.javaoperatorsdk.operator.processing.event.source.EventSource; import static io.javaoperatorsdk.operator.sample.Utils.createStatus; @@ -31,6 +31,7 @@ */ @ControllerConfiguration( labelSelector = WebPageDependentsWorkflowReconciler.DEPENDENT_RESOURCE_LABEL_SELECTOR) +@SuppressWarnings("unused") public class WebPageDependentsWorkflowReconciler implements Reconciler, ErrorStatusHandler, EventSourceInitializer { @@ -79,7 +80,7 @@ public ErrorStatusUpdateControl updateErrorStatus( return handleError(resource, e); } - @SuppressWarnings("rawtypes") + @SuppressWarnings({"rawtypes", "unchecked"}) private void initDependentResources(KubernetesClient client) { this.configMapDR = new ConfigMapDependentResource(); this.deploymentDR = new DeploymentDependentResource();