1
1
package io .javaoperatorsdk .operator .processing .dependent .workflow ;
2
2
3
+ import java .util .HashMap ;
4
+ import java .util .HashSet ;
5
+ import java .util .Map ;
6
+ import java .util .Set ;
7
+ import java .util .concurrent .Future ;
8
+
3
9
import org .slf4j .Logger ;
4
10
import org .slf4j .LoggerFactory ;
5
11
@@ -10,6 +16,15 @@ public class WorkflowCleanupExecutor<P extends HasMetadata> {
10
16
11
17
private static final Logger log = LoggerFactory .getLogger (WorkflowReconcileExecutor .class );
12
18
19
+ private final Map <DependentResourceNode <?, ?>, Future <?>> actualExecutions =
20
+ new HashMap <>();
21
+ private final Map <DependentResourceNode <?, ?>, Exception > exceptionsDuringExecution =
22
+ new HashMap <>();
23
+ private final Set <DependentResourceNode <?, ?>> alreadyReconciled = new HashSet <>();
24
+ private final Set <DependentResourceNode <?, ?>> notReady = new HashSet <>();
25
+ private final Set <DependentResourceNode <?, ?>> ownOrAncestorReconcileConditionConditionNotMet =
26
+ new HashSet <>();
27
+
13
28
private final Workflow <P > workflow ;
14
29
private final P primary ;
15
30
private final Context <P > context ;
@@ -23,13 +38,104 @@ public WorkflowCleanupExecutor(Workflow<P> workflow, P primary, Context<P> conte
23
38
24
39
public synchronized WorkflowCleanupResult cleanup () {
25
40
for (DependentResourceNode <?, P > dependentResourceNode : workflow
26
- .getTopLevelDependentResources ()) {
41
+ .getBottomLevelResource ()) {
27
42
handleCleanup (dependentResourceNode , false );
28
43
}
29
- return null ;
44
+ while (true ) {
45
+ try {
46
+ this .wait ();
47
+ if (noMoreExecutionsScheduled ()) {
48
+ break ;
49
+ } else {
50
+ log .warn ("Notified but still resources under execution. This should not happen." );
51
+ }
52
+ } catch (InterruptedException e ) {
53
+ log .warn ("Thread interrupted" , e );
54
+ Thread .currentThread ().interrupt ();
55
+ }
56
+ }
57
+ return createCleanupResult ();
58
+ }
59
+
60
+ private WorkflowCleanupResult createCleanupResult () {
61
+ return new WorkflowCleanupResult ();
62
+ }
63
+
64
+ private synchronized boolean noMoreExecutionsScheduled () {
65
+ return actualExecutions .isEmpty ();
30
66
}
31
67
32
68
private void handleCleanup (DependentResourceNode <?, P > dependentResourceNode , boolean b ) {
69
+ log .debug ("Submitting for cleanup: {}" , dependentResourceNode );
70
+
71
+ if (alreadyVisited (dependentResourceNode )
72
+ || isCleaningNow (dependentResourceNode )
73
+ || !allParentsCleaned (dependentResourceNode )
74
+ || hasErroredParent (dependentResourceNode )) {
75
+ log .debug ("Skipping submit of: {}, " , dependentResourceNode );
76
+ return ;
77
+ }
78
+
79
+ }
80
+
81
+ private class NodeExecutor implements Runnable {
82
+
83
+ private final DependentResourceNode <?, P > dependentResourceNode ;
84
+ private final boolean onlyReconcileForPossibleDelete ;
85
+
86
+ private NodeExecutor (DependentResourceNode <?, P > dependentResourceNode ,
87
+ boolean onlyReconcileForDelete ) {
88
+ this .dependentResourceNode = dependentResourceNode ;
89
+ this .onlyReconcileForPossibleDelete = onlyReconcileForDelete ;
90
+ }
91
+
92
+ @ Override
93
+ @ SuppressWarnings ("unchecked" )
94
+ public void run () {
95
+ try {
96
+
97
+ } catch (RuntimeException e ) {
98
+ handleExceptionInExecutor (dependentResourceNode , e );
99
+ } finally {
100
+ handleNodeExecutionFinish (dependentResourceNode );
101
+ }
102
+ }
103
+ }
104
+
105
+ private synchronized void handleExceptionInExecutor (DependentResourceNode dependentResourceNode ,
106
+ RuntimeException e ) {
107
+ exceptionsDuringExecution .put (dependentResourceNode , e );
108
+ }
109
+
110
+ private synchronized void handleNodeExecutionFinish (DependentResourceNode dependentResourceNode ) {
111
+ log .debug ("Finished execution for: {}" , dependentResourceNode );
112
+ actualExecutions .remove (dependentResourceNode );
113
+ if (actualExecutions .isEmpty ()) {
114
+ this .notifyAll ();
115
+ }
116
+ }
117
+
118
+ private boolean isCleaningNow (DependentResourceNode <?, ?> dependentResourceNode ) {
119
+ return actualExecutions .containsKey (dependentResourceNode );
120
+ }
121
+
122
+
123
+ private boolean alreadyVisited (
124
+ DependentResourceNode <?, ?> dependentResourceNode ) {
125
+ return alreadyReconciled .contains (dependentResourceNode );
126
+ }
127
+
128
+ private boolean allParentsCleaned (
129
+ DependentResourceNode <?, ?> dependentResourceNode ) {
130
+ return dependentResourceNode .getDependsOn ().isEmpty ()
131
+ || dependentResourceNode .getDependsOn ().stream ()
132
+ .allMatch (d -> alreadyVisited (d ) && !notReady .contains (d ));
133
+ }
33
134
135
+ private boolean hasErroredParent (
136
+ DependentResourceNode <?, ?> dependentResourceNode ) {
137
+ return !dependentResourceNode .getDependsOn ().isEmpty ()
138
+ && dependentResourceNode .getDependsOn ().stream ()
139
+ .anyMatch (exceptionsDuringExecution ::containsKey );
34
140
}
35
141
}
0 commit comments