Skip to content

Commit 1066d37

Browse files
committed
refactor: extract Workflow interface
1 parent 4aee39b commit 1066d37

File tree

12 files changed

+191
-161
lines changed

12 files changed

+191
-161
lines changed

operator-framework-core/src/main/java/io/javaoperatorsdk/operator/processing/Controller.java

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -81,8 +81,8 @@ public Controller(Reconciler<P> reconciler,
8181
this.metrics = Optional.ofNullable(configurationService.getMetrics()).orElse(Metrics.NOOP);
8282
contextInitializer = reconciler instanceof ContextInitializer;
8383
isCleaner = reconciler instanceof Cleaner;
84-
final var managed = configurationService.getWorkflowFactory()
85-
.workflowFor(configuration);
84+
85+
final var managed = configurationService.getWorkflowFactory().workflowFor(configuration);
8686
managedWorkflow = managed.resolve(kubernetesClient, configuration);
8787

8888
eventSourceManager = new EventSourceManager<>(this);

operator-framework-core/src/main/java/io/javaoperatorsdk/operator/processing/dependent/workflow/AbstractWorkflowExecutor.java

Lines changed: 1 addition & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -87,7 +87,7 @@ protected boolean isInError(DependentResourceNode<?, P> dependentResourceNode) {
8787
protected Map<DependentResource, Exception> getErroredDependents() {
8888
return exceptionsDuringExecution.entrySet().stream()
8989
.collect(
90-
Collectors.toMap(e -> workflow.getDependentResourceFor(e.getKey()), Entry::getValue));
90+
Collectors.toMap(e -> e.getKey().getDependentResource(), Entry::getValue));
9191
}
9292

9393
protected synchronized void handleNodeExecutionFinish(
@@ -99,11 +99,6 @@ protected synchronized void handleNodeExecutionFinish(
9999
}
100100
}
101101

102-
@SuppressWarnings("unchecked")
103-
protected <R> DependentResource<R, P> getDependentResourceFor(DependentResourceNode<R, P> drn) {
104-
return (DependentResource<R, P>) workflow.getDependentResourceFor(drn);
105-
}
106-
107102
protected <R> boolean isConditionMet(Optional<Condition<R, P>> condition,
108103
DependentResource<R, P> dependentResource) {
109104
return condition.map(c -> c.isMet(primary,

operator-framework-core/src/main/java/io/javaoperatorsdk/operator/processing/dependent/workflow/DefaultManagedWorkflow.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -88,7 +88,7 @@ public Workflow<P> resolve(KubernetesClient client,
8888
bottomLevelResources.stream().map(alreadyResolved::get).collect(Collectors.toSet());
8989
final var top =
9090
topLevelResources.stream().map(alreadyResolved::get).collect(Collectors.toSet());
91-
return new Workflow<>(alreadyResolved, bottom, top,
91+
return new DefaultWorkflow<>(alreadyResolved, bottom, top,
9292
ExecutorServiceManager.instance().workflowExecutorService(),
9393
THROW_EXCEPTION_AUTOMATICALLY_DEFAULT, hasCleaner);
9494
}
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,158 @@
1+
package io.javaoperatorsdk.operator.processing.dependent.workflow;
2+
3+
import java.util.Collections;
4+
import java.util.HashMap;
5+
import java.util.HashSet;
6+
import java.util.List;
7+
import java.util.Map;
8+
import java.util.Set;
9+
import java.util.concurrent.ExecutorService;
10+
import java.util.function.Function;
11+
import java.util.stream.Collectors;
12+
13+
import io.fabric8.kubernetes.api.model.HasMetadata;
14+
import io.javaoperatorsdk.operator.api.config.ExecutorServiceManager;
15+
import io.javaoperatorsdk.operator.api.reconciler.Context;
16+
import io.javaoperatorsdk.operator.api.reconciler.dependent.Deleter;
17+
import io.javaoperatorsdk.operator.api.reconciler.dependent.DependentResource;
18+
import io.javaoperatorsdk.operator.api.reconciler.dependent.GarbageCollected;
19+
import io.javaoperatorsdk.operator.processing.dependent.kubernetes.KubernetesDependentResource;
20+
21+
/**
22+
* Dependents definition: so if B depends on A, the B is dependent of A.
23+
*
24+
* @param <P> primary resource
25+
*/
26+
@SuppressWarnings("rawtypes")
27+
public class DefaultWorkflow<P extends HasMetadata> implements Workflow<P> {
28+
29+
private final Map<String, DependentResourceNode> dependentResourceNodes;
30+
private final Set<DependentResourceNode> topLevelResources;
31+
private final Set<DependentResourceNode> bottomLevelResource;
32+
33+
private final boolean throwExceptionAutomatically;
34+
// it's "global" executor service shared between multiple reconciliations running parallel
35+
private final ExecutorService executorService;
36+
private final boolean hasCleaner;
37+
38+
DefaultWorkflow(Set<DependentResourceNode> dependentResourceNodes) {
39+
this(dependentResourceNodes, ExecutorServiceManager.instance().workflowExecutorService(),
40+
THROW_EXCEPTION_AUTOMATICALLY_DEFAULT, false);
41+
}
42+
43+
DefaultWorkflow(Set<DependentResourceNode> dependentResourceNodes,
44+
ExecutorService executorService, boolean throwExceptionAutomatically,
45+
boolean hasCleaner) {
46+
this.executorService = executorService;
47+
this.throwExceptionAutomatically = throwExceptionAutomatically;
48+
this.hasCleaner = hasCleaner;
49+
50+
if (dependentResourceNodes == null) {
51+
this.topLevelResources = Collections.emptySet();
52+
this.bottomLevelResource = Collections.emptySet();
53+
this.dependentResourceNodes = Collections.emptyMap();
54+
} else {
55+
this.topLevelResources = new HashSet<>(dependentResourceNodes.size());
56+
this.bottomLevelResource = new HashSet<>(dependentResourceNodes);
57+
this.dependentResourceNodes = toMap(dependentResourceNodes);
58+
}
59+
}
60+
61+
DefaultWorkflow(Map<String, DependentResourceNode> dependentResourceNodes,
62+
Set<DependentResourceNode> bottomLevelResource, Set<DependentResourceNode> topLevelResources,
63+
ExecutorService executorService, boolean throwExceptionAutomatically,
64+
boolean hasCleaner) {
65+
this.executorService = executorService;
66+
this.throwExceptionAutomatically = throwExceptionAutomatically;
67+
this.hasCleaner = hasCleaner;
68+
69+
this.topLevelResources = topLevelResources;
70+
this.bottomLevelResource = bottomLevelResource;
71+
this.dependentResourceNodes = dependentResourceNodes;
72+
}
73+
74+
@SuppressWarnings("unchecked")
75+
private Map<String, DependentResourceNode> toMap(
76+
Set<DependentResourceNode> dependentResourceNodes) {
77+
return dependentResourceNodes.stream()
78+
.peek(drn -> {
79+
// add cycle detection?
80+
if (drn.getDependsOn().isEmpty()) {
81+
topLevelResources.add(drn);
82+
} else {
83+
for (DependentResourceNode dependsOn : (List<DependentResourceNode>) drn
84+
.getDependsOn()) {
85+
bottomLevelResource.remove(dependsOn);
86+
}
87+
}
88+
})
89+
.collect(Collectors.toMap(DependentResourceNode::getName, Function.identity()));
90+
}
91+
92+
@Override
93+
public WorkflowReconcileResult reconcile(P primary, Context<P> context) {
94+
WorkflowReconcileExecutor<P> workflowReconcileExecutor =
95+
new WorkflowReconcileExecutor<>(this, primary, context);
96+
var result = workflowReconcileExecutor.reconcile();
97+
if (throwExceptionAutomatically) {
98+
result.throwAggregateExceptionIfErrorsPresent();
99+
}
100+
return result;
101+
}
102+
103+
@Override
104+
public WorkflowCleanupResult cleanup(P primary, Context<P> context) {
105+
WorkflowCleanupExecutor<P> workflowCleanupExecutor =
106+
new WorkflowCleanupExecutor<>(this, primary, context);
107+
var result = workflowCleanupExecutor.cleanup();
108+
if (throwExceptionAutomatically) {
109+
result.throwAggregateExceptionIfErrorsPresent();
110+
}
111+
return result;
112+
}
113+
114+
@Override
115+
public Set<DependentResourceNode> getTopLevelDependentResources() {
116+
return topLevelResources;
117+
}
118+
119+
@Override
120+
public Set<DependentResourceNode> getBottomLevelResource() {
121+
return bottomLevelResource;
122+
}
123+
124+
@Override
125+
public ExecutorService getExecutorService() {
126+
return executorService;
127+
}
128+
129+
@Override
130+
public boolean hasCleaner() {
131+
return hasCleaner;
132+
}
133+
134+
static boolean isDeletable(Class<? extends DependentResource> drClass) {
135+
final var isDeleter = Deleter.class.isAssignableFrom(drClass);
136+
if (!isDeleter) {
137+
return false;
138+
}
139+
140+
if (KubernetesDependentResource.class.isAssignableFrom(drClass)) {
141+
return !GarbageCollected.class.isAssignableFrom(drClass);
142+
}
143+
return true;
144+
}
145+
146+
@Override
147+
public boolean isEmpty() {
148+
return dependentResourceNodes.isEmpty();
149+
}
150+
151+
@Override
152+
public Map<String, DependentResource> getDependentResourcesByName() {
153+
final var resources = new HashMap<String, DependentResource>(dependentResourceNodes.size());
154+
dependentResourceNodes
155+
.forEach((name, node) -> resources.put(name, node.getDependentResource()));
156+
return resources;
157+
}
158+
}

operator-framework-core/src/main/java/io/javaoperatorsdk/operator/processing/dependent/workflow/ManagedWorkflowFactory.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -22,7 +22,7 @@ public boolean isEmpty() {
2222

2323
@Override
2424
public Workflow resolve(KubernetesClient client, ControllerConfiguration configuration) {
25-
return new Workflow(null);
25+
return new DefaultWorkflow(null);
2626
}
2727
};
2828
}

operator-framework-core/src/main/java/io/javaoperatorsdk/operator/processing/dependent/workflow/ManagedWorkflowSupport.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -78,7 +78,7 @@ <P extends HasMetadata> DefaultManagedWorkflow<P> createAsDefault(
7878
toVisit.forEach(dr -> {
7979
if (cleanerHolder != null) {
8080
cleanerHolder[0] =
81-
cleanerHolder[0] || Workflow.isDeletable(dr.getDependentResourceClass());
81+
cleanerHolder[0] || DefaultWorkflow.isDeletable(dr.getDependentResourceClass());
8282
}
8383
final var name = dr.getName();
8484
var drInfo = drInfosByName.get(name);

operator-framework-core/src/main/java/io/javaoperatorsdk/operator/processing/dependent/workflow/NodeExecutor.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -17,7 +17,7 @@ protected NodeExecutor(DependentResourceNode<R, P> dependentResourceNode,
1717
@Override
1818
public void run() {
1919
try {
20-
var dependentResource = workflowExecutor.getDependentResourceFor(dependentResourceNode);
20+
var dependentResource = dependentResourceNode.getDependentResource();
2121

2222
doRun(dependentResourceNode, dependentResource);
2323

Lines changed: 13 additions & 136 deletions
Original file line numberDiff line numberDiff line change
@@ -1,156 +1,33 @@
11
package io.javaoperatorsdk.operator.processing.dependent.workflow;
22

3-
import java.util.Collections;
4-
import java.util.HashMap;
5-
import java.util.HashSet;
6-
import java.util.List;
73
import java.util.Map;
84
import java.util.Set;
95
import java.util.concurrent.ExecutorService;
10-
import java.util.function.Function;
11-
import java.util.stream.Collectors;
126

137
import io.fabric8.kubernetes.api.model.HasMetadata;
14-
import io.javaoperatorsdk.operator.api.config.ExecutorServiceManager;
158
import io.javaoperatorsdk.operator.api.reconciler.Context;
16-
import io.javaoperatorsdk.operator.api.reconciler.dependent.Deleter;
179
import io.javaoperatorsdk.operator.api.reconciler.dependent.DependentResource;
18-
import io.javaoperatorsdk.operator.api.reconciler.dependent.GarbageCollected;
19-
import io.javaoperatorsdk.operator.processing.dependent.kubernetes.KubernetesDependentResource;
2010

21-
/**
22-
* Dependents definition: so if B depends on A, the B is dependent of A.
23-
*
24-
* @param <P> primary resource
25-
*/
26-
@SuppressWarnings("rawtypes")
27-
public class Workflow<P extends HasMetadata> {
11+
public interface Workflow<P extends HasMetadata> {
2812

29-
public static final boolean THROW_EXCEPTION_AUTOMATICALLY_DEFAULT = true;
13+
boolean THROW_EXCEPTION_AUTOMATICALLY_DEFAULT = true;
3014

31-
private final Map<String, DependentResourceNode> dependentResourceNodes;
32-
private final Set<DependentResourceNode> topLevelResources;
33-
private final Set<DependentResourceNode> bottomLevelResource;
15+
WorkflowReconcileResult reconcile(P primary, Context<P> context);
3416

35-
private final boolean throwExceptionAutomatically;
36-
// it's "global" executor service shared between multiple reconciliations running parallel
37-
private final ExecutorService executorService;
38-
private final boolean hasCleaner;
17+
WorkflowCleanupResult cleanup(P primary, Context<P> context);
3918

40-
Workflow(Set<DependentResourceNode> dependentResourceNodes) {
41-
this(dependentResourceNodes, ExecutorServiceManager.instance().workflowExecutorService(),
42-
THROW_EXCEPTION_AUTOMATICALLY_DEFAULT, false);
43-
}
19+
@SuppressWarnings("rawtypes")
20+
Set<DependentResourceNode> getTopLevelDependentResources();
4421

45-
Workflow(Set<DependentResourceNode> dependentResourceNodes,
46-
ExecutorService executorService, boolean throwExceptionAutomatically,
47-
boolean hasCleaner) {
48-
this.executorService = executorService;
49-
this.throwExceptionAutomatically = throwExceptionAutomatically;
50-
this.hasCleaner = hasCleaner;
22+
@SuppressWarnings("rawtypes")
23+
Set<DependentResourceNode> getBottomLevelResource();
5124

52-
if (dependentResourceNodes == null) {
53-
this.topLevelResources = Collections.emptySet();
54-
this.bottomLevelResource = Collections.emptySet();
55-
this.dependentResourceNodes = Collections.emptyMap();
56-
} else {
57-
this.topLevelResources = new HashSet<>(dependentResourceNodes.size());
58-
this.bottomLevelResource = new HashSet<>(dependentResourceNodes);
59-
this.dependentResourceNodes = toMap(dependentResourceNodes);
60-
}
61-
}
25+
ExecutorService getExecutorService();
6226

63-
Workflow(Map<String, DependentResourceNode> dependentResourceNodes,
64-
Set<DependentResourceNode> bottomLevelResource, Set<DependentResourceNode> topLevelResources,
65-
ExecutorService executorService, boolean throwExceptionAutomatically,
66-
boolean hasCleaner) {
67-
this.executorService = executorService;
68-
this.throwExceptionAutomatically = throwExceptionAutomatically;
69-
this.hasCleaner = hasCleaner;
27+
boolean hasCleaner();
7028

71-
this.topLevelResources = topLevelResources;
72-
this.bottomLevelResource = bottomLevelResource;
73-
this.dependentResourceNodes = dependentResourceNodes;
74-
}
29+
boolean isEmpty();
7530

76-
@SuppressWarnings("unchecked")
77-
private Map<String, DependentResourceNode> toMap(
78-
Set<DependentResourceNode> dependentResourceNodes) {
79-
return dependentResourceNodes.stream()
80-
.peek(drn -> {
81-
// add cycle detection?
82-
if (drn.getDependsOn().isEmpty()) {
83-
topLevelResources.add(drn);
84-
} else {
85-
for (DependentResourceNode dependsOn : (List<DependentResourceNode>) drn
86-
.getDependsOn()) {
87-
bottomLevelResource.remove(dependsOn);
88-
}
89-
}
90-
})
91-
.collect(Collectors.toMap(DependentResourceNode::getName, Function.identity()));
92-
}
93-
94-
public DependentResource getDependentResourceFor(DependentResourceNode node) {
95-
return node.getDependentResource();
96-
}
97-
98-
public WorkflowReconcileResult reconcile(P primary, Context<P> context) {
99-
WorkflowReconcileExecutor<P> workflowReconcileExecutor =
100-
new WorkflowReconcileExecutor<>(this, primary, context);
101-
var result = workflowReconcileExecutor.reconcile();
102-
if (throwExceptionAutomatically) {
103-
result.throwAggregateExceptionIfErrorsPresent();
104-
}
105-
return result;
106-
}
107-
108-
public WorkflowCleanupResult cleanup(P primary, Context<P> context) {
109-
WorkflowCleanupExecutor<P> workflowCleanupExecutor =
110-
new WorkflowCleanupExecutor<>(this, primary, context);
111-
var result = workflowCleanupExecutor.cleanup();
112-
if (throwExceptionAutomatically) {
113-
result.throwAggregateExceptionIfErrorsPresent();
114-
}
115-
return result;
116-
}
117-
118-
Set<DependentResourceNode> getTopLevelDependentResources() {
119-
return topLevelResources;
120-
}
121-
122-
Set<DependentResourceNode> getBottomLevelResource() {
123-
return bottomLevelResource;
124-
}
125-
126-
ExecutorService getExecutorService() {
127-
return executorService;
128-
}
129-
130-
public boolean hasCleaner() {
131-
return hasCleaner;
132-
}
133-
134-
static boolean isDeletable(Class<? extends DependentResource> drClass) {
135-
final var isDeleter = Deleter.class.isAssignableFrom(drClass);
136-
if (!isDeleter) {
137-
return false;
138-
}
139-
140-
if (KubernetesDependentResource.class.isAssignableFrom(drClass)) {
141-
return !GarbageCollected.class.isAssignableFrom(drClass);
142-
}
143-
return true;
144-
}
145-
146-
public boolean isEmpty() {
147-
return dependentResourceNodes.isEmpty();
148-
}
149-
150-
public Map<String, DependentResource> getDependentResourcesByName() {
151-
final var resources = new HashMap<String, DependentResource>(dependentResourceNodes.size());
152-
dependentResourceNodes
153-
.forEach((name, node) -> resources.put(name, node.getDependentResource()));
154-
return resources;
155-
}
31+
@SuppressWarnings("rawtypes")
32+
Map<String, DependentResource> getDependentResourcesByName();
15633
}

0 commit comments

Comments
 (0)