2
2
3
3
import java .util .*;
4
4
import java .util .concurrent .*;
5
+ import java .util .stream .Collectors ;
5
6
6
7
import org .slf4j .Logger ;
7
8
import org .slf4j .LoggerFactory ;
8
9
9
10
import io .fabric8 .kubernetes .api .model .HasMetadata ;
10
- import io .javaoperatorsdk .operator .AggregatedOperatorException ;
11
11
import io .javaoperatorsdk .operator .api .reconciler .Context ;
12
12
import io .javaoperatorsdk .operator .api .reconciler .dependent .Deleter ;
13
13
import io .javaoperatorsdk .operator .api .reconciler .dependent .DependentResource ;
@@ -20,17 +20,14 @@ public class WorkflowReconcileExecutor<P extends HasMetadata> {
20
20
21
21
private final Workflow <P > workflow ;
22
22
23
- private final Set <DependentResourceNode <?, ?>> alreadyReconciled = ConcurrentHashMap .newKeySet ();
24
- private final Set <DependentResourceNode <?, ?>> errored = ConcurrentHashMap .newKeySet ();
25
- private final Set <DependentResourceNode <?, ?>> notReady =
26
- ConcurrentHashMap .newKeySet ();
27
- private final Set <DependentResourceNode <?, ?>> reconcileConditionOrParentsConditionNotMet =
28
- ConcurrentHashMap .newKeySet ();
29
-
23
+ private final Set <DependentResourceNode <?, ?>> alreadyReconciled = new HashSet <>();
24
+ private final Set <DependentResourceNode <?, ?>> notReady = new HashSet <>();
25
+ private final Set <DependentResourceNode <?, ?>> ownOrAncestorReconcileConditionConditionNotMet =
26
+ new HashSet <>();
30
27
private final Map <DependentResourceNode <?, ?>, Future <?>> actualExecutions =
31
- new ConcurrentHashMap <>();
32
- private final List < Exception > exceptionsDuringExecution =
33
- Collections . synchronizedList ( new ArrayList <>() );
28
+ new HashMap <>();
29
+ private final Map < DependentResourceNode <?, ?>, Exception > exceptionsDuringExecution =
30
+ new HashMap <>();
34
31
35
32
private final P primary ;
36
33
private final Context <P > context ;
@@ -42,18 +39,14 @@ public WorkflowReconcileExecutor(Workflow<P> workflow, P primary, Context<P> con
42
39
}
43
40
44
41
// add reconcile results
45
- public synchronized void reconcile () {
42
+ public synchronized WorkflowExecutionResult reconcile () {
46
43
for (DependentResourceNode <?, P > dependentResourceNode : workflow
47
44
.getTopLevelDependentResources ()) {
48
45
handleReconcile (dependentResourceNode , false );
49
46
}
50
47
while (true ) {
51
48
try {
52
49
this .wait ();
53
- if (!exceptionsDuringExecution .isEmpty ()) {
54
- log .debug ("Exception during reconciliation for: {}" , primary );
55
- throw createFinalException ();
56
- }
57
50
if (noMoreExecutionsScheduled ()) {
58
51
break ;
59
52
} else {
@@ -64,6 +57,7 @@ public synchronized void reconcile() {
64
57
Thread .currentThread ().interrupt ();
65
58
}
66
59
}
60
+ return createReconcileResult ();
67
61
}
68
62
69
63
private synchronized void handleReconcile (
@@ -80,7 +74,7 @@ private synchronized void handleReconcile(
80
74
}
81
75
82
76
if (onlyReconcileForPossibleDelete ) {
83
- reconcileConditionOrParentsConditionNotMet .add (dependentResourceNode );
77
+ ownOrAncestorReconcileConditionConditionNotMet .add (dependentResourceNode );
84
78
} else {
85
79
dependentResourceNode .getReconcileCondition ()
86
80
.ifPresent (reconcileCondition -> handleReconcileCondition (dependentResourceNode ,
@@ -98,8 +92,7 @@ private synchronized void handleReconcile(
98
92
99
93
private synchronized void handleExceptionInExecutor (DependentResourceNode dependentResourceNode ,
100
94
RuntimeException e ) {
101
- exceptionsDuringExecution .add (e );
102
- errored .add (dependentResourceNode );
95
+ exceptionsDuringExecution .put (dependentResourceNode , e );
103
96
}
104
97
105
98
private synchronized void handleNodeExecutionFinish (DependentResourceNode dependentResourceNode ) {
@@ -125,9 +118,9 @@ private synchronized void setAlreadyReconciledButNotReady(
125
118
126
119
private boolean ownOrParentsReconcileConditionNotMet (
127
120
DependentResourceNode <?, ?> dependentResourceNode ) {
128
- return reconcileConditionOrParentsConditionNotMet .contains (dependentResourceNode ) ||
121
+ return ownOrAncestorReconcileConditionConditionNotMet .contains (dependentResourceNode ) ||
129
122
dependentResourceNode .getDependsOn ().stream ()
130
- .anyMatch (reconcileConditionOrParentsConditionNotMet ::contains );
123
+ .anyMatch (ownOrAncestorReconcileConditionConditionNotMet ::contains );
131
124
}
132
125
133
126
private class NodeExecutor implements Runnable {
@@ -196,10 +189,6 @@ private boolean noMoreExecutionsScheduled() {
196
189
return actualExecutions .isEmpty ();
197
190
}
198
191
199
- private AggregatedOperatorException createFinalException () {
200
- return new AggregatedOperatorException ("Exception during workflow." , exceptionsDuringExecution );
201
- }
202
-
203
192
private boolean alreadyReconciled (
204
193
DependentResourceNode <?, ?> dependentResourceNode ) {
205
194
return alreadyReconciled .contains (dependentResourceNode );
@@ -211,7 +200,7 @@ private void handleReconcileCondition(DependentResourceNode<?, ?> dependentResou
211
200
boolean conditionMet =
212
201
reconcileCondition .isMet (dependentResourceNode .getDependentResource (), primary , context );
213
202
if (!conditionMet ) {
214
- reconcileConditionOrParentsConditionNotMet .add (dependentResourceNode );
203
+ ownOrAncestorReconcileConditionConditionNotMet .add (dependentResourceNode );
215
204
}
216
205
}
217
206
@@ -226,6 +215,29 @@ private boolean hasErroredParent(
226
215
DependentResourceNode <?, ?> dependentResourceNode ) {
227
216
return !dependentResourceNode .getDependsOn ().isEmpty ()
228
217
&& dependentResourceNode .getDependsOn ().stream ()
229
- .anyMatch (errored :: contains );
218
+ .anyMatch (exceptionsDuringExecution :: containsKey );
230
219
}
220
+
221
+ private WorkflowExecutionResult createReconcileResult () {
222
+ WorkflowExecutionResult workflowExecutionResult = new WorkflowExecutionResult ();
223
+
224
+ workflowExecutionResult .setErroredDependents (exceptionsDuringExecution
225
+ .entrySet ().stream ()
226
+ .collect (Collectors .toMap (e -> e .getKey ().getDependentResource (), Map .Entry ::getValue )));
227
+ workflowExecutionResult .setNotReadyDependents (notReady .stream ()
228
+ .map (DependentResourceNode ::getDependentResource )
229
+ .collect (Collectors .toList ()));
230
+
231
+ workflowExecutionResult .setReconciledDependents (alreadyReconciled .stream ()
232
+ .map (DependentResourceNode ::getDependentResource ).collect (Collectors .toList ()));
233
+
234
+ var notReconciledDependentResources =
235
+ new HashSet <DependentResourceNode <?, ?>>(workflow .getDependents ().keySet ());
236
+ notReconciledDependentResources .removeAll (alreadyReconciled );
237
+ workflowExecutionResult .setNotReconciledDependents (notReconciledDependentResources .stream ()
238
+ .map (DependentResourceNode ::getDependentResource ).collect (Collectors .toList ()));
239
+
240
+ return workflowExecutionResult ;
241
+ }
242
+
231
243
}
0 commit comments