1
1
package io .javaoperatorsdk .operator .processing .dependent .workflow ;
2
2
3
- import java .util .ArrayList ;
4
- import java .util .List ;
5
- import java .util .Map ;
3
+ import java .util .*;
6
4
import java .util .concurrent .ConcurrentHashMap ;
7
5
import java .util .concurrent .ExecutorService ;
8
6
import java .util .concurrent .Executors ;
18
16
*/
19
17
public class Workflow <P extends HasMetadata > {
20
18
21
- private final List <DependentResourceNode <?, P >> dependentResourceNodes ;
22
- private final List <DependentResourceNode <?, P >> topLevelResources = new ArrayList <>();
19
+ private final Set <DependentResourceNode <?, P >> dependentResourceNodes ;
20
+ private final Set <DependentResourceNode <?, P >> topLevelResources = new HashSet <>();
21
+ private final Set <DependentResourceNode <?, P >> bottomLevelResource = new HashSet <>();
23
22
private Map <DependentResourceNode <?, P >, List <DependentResourceNode <?, P >>> dependents ;
24
23
25
24
// it's "global" executor service shared between multiple reconciliations running parallel
26
25
private ExecutorService executorService ;
27
26
28
- public Workflow (List <DependentResourceNode <?, P >> dependentResourceNodes ) {
27
+ public Workflow (Set <DependentResourceNode <?, P >> dependentResourceNodes ) {
29
28
this .executorService = ConfigurationServiceProvider .instance ().getExecutorService ();
30
29
this .dependentResourceNodes = dependentResourceNodes ;
31
30
preprocessForReconcile ();
32
31
}
33
32
34
- public Workflow (List <DependentResourceNode <?, P >> dependentResourceNodes ,
33
+ public Workflow (Set <DependentResourceNode <?, P >> dependentResourceNodes ,
35
34
ExecutorService executorService ) {
36
35
this .executorService = executorService ;
37
36
this .dependentResourceNodes = dependentResourceNodes ;
38
37
preprocessForReconcile ();
39
38
}
40
39
41
- public Workflow (List <DependentResourceNode <?, P >> dependentResourceNodes , int globalParallelism ) {
40
+ public Workflow (Set <DependentResourceNode <?, P >> dependentResourceNodes , int globalParallelism ) {
42
41
this (dependentResourceNodes , Executors .newFixedThreadPool (globalParallelism ));
43
42
}
44
43
45
44
public WorkflowExecutionResult reconcile (P primary , Context <P > context ) {
46
- WorkflowReconcileExecutor workflowReconcileExecutor =
45
+ WorkflowReconcileExecutor < P > workflowReconcileExecutor =
47
46
new WorkflowReconcileExecutor <>(this , primary , context );
48
47
return workflowReconcileExecutor .reconcile ();
49
48
}
50
49
51
- public void cleanup (P resource , Context <P > context ) {
52
-
50
+ public WorkflowCleanupResult cleanup (P primary , Context <P > context ) {
51
+ WorkflowCleanupExecutor <P > workflowCleanupExecutor =
52
+ new WorkflowCleanupExecutor <>(this , primary , context );
53
+ return workflowCleanupExecutor .cleanup ();
53
54
}
54
55
55
56
// add cycle detection?
56
57
private void preprocessForReconcile () {
58
+ bottomLevelResource .addAll (dependentResourceNodes );
57
59
dependents = new ConcurrentHashMap <>(dependentResourceNodes .size ());
58
60
for (DependentResourceNode <?, P > node : dependentResourceNodes ) {
59
61
if (node .getDependsOn ().isEmpty ()) {
@@ -62,6 +64,7 @@ private void preprocessForReconcile() {
62
64
for (DependentResourceNode <?, P > dependsOn : node .getDependsOn ()) {
63
65
dependents .computeIfAbsent (dependsOn , dr -> new ArrayList <>());
64
66
dependents .get (dependsOn ).add (node );
67
+ bottomLevelResource .remove (dependsOn );
65
68
}
66
69
}
67
70
}
@@ -71,10 +74,14 @@ public void setExecutorService(ExecutorService executorService) {
71
74
this .executorService = executorService ;
72
75
}
73
76
74
- List <DependentResourceNode <?, P >> getTopLevelDependentResources () {
77
+ Set <DependentResourceNode <?, P >> getTopLevelDependentResources () {
75
78
return topLevelResources ;
76
79
}
77
80
81
+ Set <DependentResourceNode <?, P >> getBottomLevelResource () {
82
+ return bottomLevelResource ;
83
+ }
84
+
78
85
Map <DependentResourceNode <?, P >, List <DependentResourceNode <?, P >>> getDependents () {
79
86
return dependents ;
80
87
}
0 commit comments