Skip to content

Commit 6e809ab

Browse files
committed
fix: also resolve ManagedWorkflow nodes relations, simplify
1 parent b235620 commit 6e809ab

File tree

4 files changed

+79
-113
lines changed

4 files changed

+79
-113
lines changed
Original file line numberDiff line numberDiff line change
@@ -1,15 +1,20 @@
11
package io.javaoperatorsdk.operator.processing.dependent.workflow;
22

3+
import java.util.HashMap;
34
import java.util.HashSet;
45
import java.util.List;
56
import java.util.Set;
6-
import java.util.function.Function;
77
import java.util.stream.Collectors;
88

99
import io.fabric8.kubernetes.api.model.HasMetadata;
1010
import io.fabric8.kubernetes.client.KubernetesClient;
11+
import io.javaoperatorsdk.operator.api.config.ConfigurationServiceProvider;
1112
import io.javaoperatorsdk.operator.api.config.ControllerConfiguration;
1213
import io.javaoperatorsdk.operator.api.config.ExecutorServiceManager;
14+
import io.javaoperatorsdk.operator.api.config.dependent.DependentResourceSpec;
15+
import io.javaoperatorsdk.operator.api.reconciler.dependent.DependentResource;
16+
import io.javaoperatorsdk.operator.api.reconciler.dependent.EventSourceReferencer;
17+
import io.javaoperatorsdk.operator.api.reconciler.dependent.managed.KubernetesClientAware;
1318

1419
import static io.javaoperatorsdk.operator.processing.dependent.workflow.Workflow.THROW_EXCEPTION_AUTOMATICALLY_DEFAULT;
1520

@@ -18,24 +23,23 @@ public class DefaultManagedWorkflow<P extends HasMetadata> implements ManagedWor
1823

1924
private final Set<String> topLevelResources;
2025
private final Set<String> bottomLevelResources;
21-
private final Set<SpecDependentResourceNode> nodes;
26+
private final List<DependentResourceSpec<?, ?>> specs;
2227
private final boolean hasCleaner;
2328

24-
@SuppressWarnings("unchecked")
25-
DefaultManagedWorkflow(Set<SpecDependentResourceNode> dependentResourceSpecs,
29+
DefaultManagedWorkflow(List<DependentResourceSpec<?, ?>> dependentResourceSpecs,
2630
boolean hasCleaner) {
2731
this.hasCleaner = hasCleaner;
2832
topLevelResources = new HashSet<>(dependentResourceSpecs.size());
29-
bottomLevelResources =
30-
dependentResourceSpecs.stream().map(SpecDependentResourceNode::getName).collect(
31-
Collectors.toSet());
32-
nodes = dependentResourceSpecs;
33-
dependentResourceSpecs.forEach(drn -> {
33+
bottomLevelResources = dependentResourceSpecs.stream()
34+
.map(DependentResourceSpec::getName)
35+
.collect(Collectors.toSet());
36+
specs = dependentResourceSpecs;
37+
dependentResourceSpecs.forEach(spec -> {
3438
// add cycle detection?
35-
if (drn.getDependsOn().isEmpty()) {
36-
topLevelResources.add(drn.getName());
39+
if (spec.getDependsOn().isEmpty()) {
40+
topLevelResources.add(spec.getName());
3741
} else {
38-
for (String dependsOn : (List<String>) drn.getDependsOn()) {
42+
for (String dependsOn : spec.getDependsOn()) {
3943
bottomLevelResources.remove(dependsOn);
4044
}
4145
}
@@ -50,8 +54,8 @@ Set<String> getBottomLevelResources() {
5054
return bottomLevelResources;
5155
}
5256

53-
Set<SpecDependentResourceNode> nodes() {
54-
return nodes;
57+
List<String> nodeNames() {
58+
return specs.stream().map(DependentResourceSpec::getName).collect(Collectors.toList());
5559
}
5660

5761
@Override
@@ -61,26 +65,54 @@ public boolean hasCleaner() {
6165

6266
@Override
6367
public boolean isEmpty() {
64-
return nodes.isEmpty();
68+
return specs.isEmpty();
6569
}
6670

6771
@Override
72+
@SuppressWarnings("unchecked")
6873
public Workflow<P> resolve(KubernetesClient client,
6974
ControllerConfiguration<P> configuration) {
70-
final var map = nodes.stream()
71-
.map(spec -> createDRN(client, configuration, spec))
72-
.collect(Collectors.toMap(DependentResourceNode::getName, Function.identity()));
73-
final var bottom = bottomLevelResources.stream().map(map::get).collect(Collectors.toSet());
74-
final var top = topLevelResources.stream().map(map::get).collect(Collectors.toSet());
75-
return new Workflow<>(map, bottom, top,
75+
final var alreadyResolved = new HashMap<String, DependentResourceNode>(specs.size());
76+
for (DependentResourceSpec spec : specs) {
77+
final var node = new DependentResourceNode(resolve(spec, client, configuration));
78+
alreadyResolved.put(node.getName(), node);
79+
spec.getDependsOn()
80+
.forEach(depend -> node.addDependsOnRelation(alreadyResolved.get((String) depend)));
81+
}
82+
83+
final var bottom =
84+
bottomLevelResources.stream().map(alreadyResolved::get).collect(Collectors.toSet());
85+
final var top =
86+
topLevelResources.stream().map(alreadyResolved::get).collect(Collectors.toSet());
87+
return new Workflow<>(alreadyResolved, bottom, top,
7688
ExecutorServiceManager.instance().workflowExecutorService(),
7789
THROW_EXCEPTION_AUTOMATICALLY_DEFAULT, hasCleaner);
7890
}
7991

80-
@SuppressWarnings("unchecked")
81-
private DependentResourceNode createDRN(
82-
KubernetesClient client, ControllerConfiguration<P> configuration,
83-
SpecDependentResourceNode spec) {
84-
return new DependentResourceNode(spec.resolve(client, configuration));
92+
@SuppressWarnings({"rawtypes", "unchecked"})
93+
private <R> DependentResource<R, P> resolve(DependentResourceSpec<R, P> spec,
94+
KubernetesClient client,
95+
ControllerConfiguration<P> configuration) {
96+
final DependentResource<R, P> dependentResource =
97+
ConfigurationServiceProvider.instance().dependentResourceFactory()
98+
.createFrom(spec, configuration);
99+
100+
if (dependentResource instanceof KubernetesClientAware) {
101+
((KubernetesClientAware) dependentResource).setKubernetesClient(client);
102+
}
103+
104+
spec.getUseEventSourceWithName()
105+
.ifPresent(esName -> {
106+
if (dependentResource instanceof EventSourceReferencer) {
107+
((EventSourceReferencer) dependentResource).useEventSourceWithName(esName);
108+
} else {
109+
throw new IllegalStateException(
110+
"DependentResource " + spec + " wants to use EventSource named " + esName
111+
+ " but doesn't implement support for this feature by implementing "
112+
+ EventSourceReferencer.class.getSimpleName());
113+
}
114+
});
115+
116+
return dependentResource;
85117
}
86118
}

operator-framework-core/src/main/java/io/javaoperatorsdk/operator/processing/dependent/workflow/ManagedWorkflowSupport.java

Lines changed: 20 additions & 25 deletions
Original file line numberDiff line numberDiff line change
@@ -54,47 +54,32 @@ public <P extends HasMetadata> ManagedWorkflow<P> createWorkflow(
5454

5555
<P extends HasMetadata> DefaultManagedWorkflow<P> createAsDefault(
5656
List<DependentResourceSpec> dependentResourceSpecs) {
57-
var orderedResourceSpecs = orderAndDetectCycles(dependentResourceSpecs);
58-
final var alreadyCreated =
59-
new ArrayList<SpecDependentResourceNode>(orderedResourceSpecs.size());
6057
final boolean[] cleanerHolder = {false};
61-
final var nodes = orderedResourceSpecs.stream()
62-
.map(spec -> createFrom(spec, alreadyCreated, cleanerHolder))
63-
.collect(Collectors.toSet());
64-
return new DefaultManagedWorkflow<>(nodes, cleanerHolder[0]);
65-
}
66-
67-
private SpecDependentResourceNode createFrom(DependentResourceSpec spec,
68-
List<SpecDependentResourceNode> alreadyCreated, boolean[] cleanerHolder) {
69-
final var node = new SpecDependentResourceNode<>(spec);
70-
alreadyCreated.add(node);
71-
// if any previously checked dependent was a cleaner, no need to check further
72-
cleanerHolder[0] = cleanerHolder[0] || Workflow.isDeletable(spec.getDependentResourceClass());
73-
spec.getDependsOn().forEach(depend -> {
74-
final SpecDependentResourceNode dependsOn = alreadyCreated.stream()
75-
.filter(drn -> depend.equals(drn.getName())).findFirst()
76-
.orElseThrow();
77-
node.addDependsOnRelation(dependsOn);
78-
});
79-
return node;
58+
var orderedResourceSpecs = orderAndDetectCycles(dependentResourceSpecs, cleanerHolder);
59+
return new DefaultManagedWorkflow<>(orderedResourceSpecs, cleanerHolder[0]);
8060
}
8161

8262
/**
8363
* @param dependentResourceSpecs list of specs
8464
* @return top-bottom ordered resources that can be added safely to workflow
8565
* @throws OperatorException if there is a cycle in the dependencies
8666
*/
87-
public List<DependentResourceSpec> orderAndDetectCycles(
88-
List<DependentResourceSpec> dependentResourceSpecs) {
67+
private List<DependentResourceSpec<?, ?>> orderAndDetectCycles(
68+
List<DependentResourceSpec> dependentResourceSpecs, boolean[] cleanerHolder) {
8969

9070
final var drInfosByName = createDRInfos(dependentResourceSpecs);
91-
final var orderedSpecs = new ArrayList<DependentResourceSpec>(dependentResourceSpecs.size());
71+
final var orderedSpecs =
72+
new ArrayList<DependentResourceSpec<?, ?>>(dependentResourceSpecs.size());
9273
final var alreadyVisited = new HashSet<String>();
9374
var toVisit = getTopDependentResources(dependentResourceSpecs);
9475

9576
while (!toVisit.isEmpty()) {
9677
final var toVisitNext = new HashSet<DependentResourceSpec>();
9778
toVisit.forEach(dr -> {
79+
if (cleanerHolder != null) {
80+
cleanerHolder[0] =
81+
cleanerHolder[0] || Workflow.isDeletable(dr.getDependentResourceClass());
82+
}
9883
final var name = dr.getName();
9984
var drInfo = drInfosByName.get(name);
10085
if (drInfo != null) {
@@ -118,6 +103,16 @@ public List<DependentResourceSpec> orderAndDetectCycles(
118103
return orderedSpecs;
119104
}
120105

106+
/**
107+
* @param dependentResourceSpecs list of specs
108+
* @return top-bottom ordered resources that can be added safely to workflow
109+
* @throws OperatorException if there is a cycle in the dependencies
110+
*/
111+
public List<DependentResourceSpec<?, ?>> orderAndDetectCycles(
112+
List<DependentResourceSpec> dependentResourceSpecs) {
113+
return orderAndDetectCycles(dependentResourceSpecs, null);
114+
}
115+
121116
private static class DRInfo {
122117

123118
private final DependentResourceSpec spec;

operator-framework-core/src/main/java/io/javaoperatorsdk/operator/processing/dependent/workflow/SpecDependentResourceNode.java

Lines changed: 0 additions & 61 deletions
This file was deleted.

operator-framework-core/src/test/java/io/javaoperatorsdk/operator/processing/dependent/workflow/ManagedWorkflowSupportTest.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -140,7 +140,7 @@ void createsWorkflow() {
140140

141141
var workflow = managedWorkflowSupport.createAsDefault(specs);
142142

143-
assertThat(workflow.nodes()).map(SpecDependentResourceNode::getName)
143+
assertThat(workflow.nodeNames())
144144
.containsExactlyInAnyOrder(NAME_1, NAME_2, NAME_3, NAME_4);
145145
assertThat(workflow.getTopLevelResources()).containsExactly(NAME_1);
146146
assertThat(workflow.getBottomLevelResources()).containsExactly(NAME_4);

0 commit comments

Comments
 (0)