Skip to content

Commit d542af7

Browse files
committed
fix workflow execution issue
1 parent 15e74f8 commit d542af7

File tree

13 files changed

+43
-41
lines changed

13 files changed

+43
-41
lines changed

operator-framework-core/src/main/java/io/javaoperatorsdk/operator/LeaderElectionManager.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -49,7 +49,7 @@ public void init(LeaderElectionConfiguration config, KubernetesClient client) {
4949
// releaseOnCancel is not used in the underlying implementation
5050
leaderElector =
5151
new LeaderElectorBuilder(
52-
client, executorServiceManager.executorService())
52+
client, executorServiceManager.reconcileExecutorService())
5353
.withConfig(
5454
new LeaderElectionConfig(
5555
lock,

operator-framework-core/src/main/java/io/javaoperatorsdk/operator/api/config/ExecutorServiceManager.java

Lines changed: 4 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -31,11 +31,10 @@ public ExecutorServiceManager(ConfigurationService configurationService) {
3131
}
3232

3333
public ExecutorServiceManager init() {
34-
final var executorService = configurationService.getExecutorService();
35-
final var workflowExecutorService = configurationService.getWorkflowExecutorService();
3634
this.cachingExecutorService = Executors.newCachedThreadPool();
37-
this.executor = new InstrumentedExecutorService(executorService);
38-
this.workflowExecutor = new InstrumentedExecutorService(workflowExecutorService);
35+
this.executor = new InstrumentedExecutorService(configurationService.getExecutorService());
36+
this.workflowExecutor =
37+
new InstrumentedExecutorService(configurationService.getWorkflowExecutorService());
3938
return this;
4039
}
4140

@@ -87,7 +86,7 @@ public static <T> void executeAndWaitForAllToComplete(Stream<T> stream,
8786
}
8887
}
8988

90-
public ExecutorService executorService() {
89+
public ExecutorService reconcileExecutorService() {
9190
return executor;
9291
}
9392

operator-framework-core/src/main/java/io/javaoperatorsdk/operator/api/reconciler/DefaultContext.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -80,7 +80,7 @@ public KubernetesClient getClient() {
8080
public ExecutorService getWorkflowExecutorService() {
8181
// not that this should be always received from executor service manager, so we are able to do
8282
// restarts.
83-
return controller.getExecutorServiceManager().executorService();
83+
return controller.getExecutorServiceManager().workflowExecutorService();
8484
}
8585

8686
public DefaultContext<P> setRetryInfo(RetryInfo retryInfo) {

operator-framework-core/src/main/java/io/javaoperatorsdk/operator/processing/Controller.java

Lines changed: 1 addition & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -82,8 +82,7 @@ public Controller(Reconciler<P> reconciler,
8282
isCleaner = reconciler instanceof Cleaner;
8383

8484
final var managed = configurationService.getWorkflowFactory().workflowFor(configuration);
85-
managedWorkflow = managed.resolve(kubernetesClient, configuration,
86-
executorServiceManager.workflowExecutorService());
85+
managedWorkflow = managed.resolve(kubernetesClient, configuration);
8786

8887
eventSourceManager = new EventSourceManager<>(this, executorServiceManager);
8988
eventProcessor =

operator-framework-core/src/main/java/io/javaoperatorsdk/operator/processing/dependent/workflow/AbstractWorkflowExecutor.java

Lines changed: 12 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -11,14 +11,17 @@
1111
import org.slf4j.Logger;
1212

1313
import io.fabric8.kubernetes.api.model.HasMetadata;
14+
import io.javaoperatorsdk.operator.OperatorException;
1415
import io.javaoperatorsdk.operator.api.reconciler.Context;
1516
import io.javaoperatorsdk.operator.api.reconciler.dependent.DependentResource;
17+
import io.javaoperatorsdk.operator.processing.event.ResourceID;
1618

1719
@SuppressWarnings("rawtypes")
1820
public abstract class AbstractWorkflowExecutor<P extends HasMetadata> {
1921

2022
protected final Workflow<P> workflow;
2123
protected final P primary;
24+
protected final ResourceID primaryID;
2225
protected final Context<P> context;
2326
/**
2427
* Covers both deleted and reconciled
@@ -32,6 +35,7 @@ public AbstractWorkflowExecutor(Workflow<P> workflow, P primary, Context<P> cont
3235
this.workflow = workflow;
3336
this.primary = primary;
3437
this.context = context;
38+
this.primaryID = ResourceID.fromResource(primary);
3539
}
3640

3741
protected abstract Logger logger();
@@ -46,8 +50,13 @@ protected synchronized void waitForScheduledExecutionsToRun() {
4650
logger().warn("Notified but still resources under execution. This should not happen.");
4751
}
4852
} catch (InterruptedException e) {
49-
logger().warn("Thread interrupted", e);
50-
Thread.currentThread().interrupt();
53+
if (noMoreExecutionsScheduled()) {
54+
logger().debug("interrupted, no more executions for: {}", primaryID);
55+
return;
56+
} else {
57+
logger().error("Thread interrupted for primary: {}", primaryID, e);
58+
throw new OperatorException(e);
59+
}
5160
}
5261
}
5362
}
@@ -91,7 +100,7 @@ protected Map<DependentResource, Exception> getErroredDependents() {
91100

92101
protected synchronized void handleNodeExecutionFinish(
93102
DependentResourceNode<?, P> dependentResourceNode) {
94-
logger().debug("Finished execution for: {}", dependentResourceNode);
103+
logger().trace("Finished execution for: {} primary: {}", dependentResourceNode, primaryID);
95104
actualExecutions.remove(dependentResourceNode);
96105
if (noMoreExecutionsScheduled()) {
97106
this.notifyAll();

operator-framework-core/src/main/java/io/javaoperatorsdk/operator/processing/dependent/workflow/DefaultManagedWorkflow.java

Lines changed: 1 addition & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -4,7 +4,6 @@
44
import java.util.HashSet;
55
import java.util.List;
66
import java.util.Set;
7-
import java.util.concurrent.ExecutorService;
87
import java.util.stream.Collectors;
98

109
import io.fabric8.kubernetes.api.model.HasMetadata;
@@ -79,7 +78,7 @@ public boolean isEmpty() {
7978
@Override
8079
@SuppressWarnings("unchecked")
8180
public Workflow<P> resolve(KubernetesClient client,
82-
ControllerConfiguration<P> configuration, ExecutorService executorService) {
81+
ControllerConfiguration<P> configuration) {
8382
final var alreadyResolved = new HashMap<String, DependentResourceNode>(orderedSpecs.size());
8483
for (DependentResourceSpec spec : orderedSpecs) {
8584
final var node = new DependentResourceNode(spec.getName(),

operator-framework-core/src/main/java/io/javaoperatorsdk/operator/processing/dependent/workflow/ManagedWorkflow.java

Lines changed: 1 addition & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -2,7 +2,6 @@
22

33
import java.util.Collections;
44
import java.util.List;
5-
import java.util.concurrent.ExecutorService;
65

76
import io.fabric8.kubernetes.api.model.HasMetadata;
87
import io.fabric8.kubernetes.client.KubernetesClient;
@@ -24,6 +23,5 @@ default boolean isEmpty() {
2423
return true;
2524
}
2625

27-
Workflow<P> resolve(KubernetesClient client, ControllerConfiguration<P> configuration,
28-
ExecutorService executorService);
26+
Workflow<P> resolve(KubernetesClient client, ControllerConfiguration<P> configuration);
2927
}

operator-framework-core/src/main/java/io/javaoperatorsdk/operator/processing/dependent/workflow/ManagedWorkflowFactory.java

Lines changed: 1 addition & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -1,7 +1,5 @@
11
package io.javaoperatorsdk.operator.processing.dependent.workflow;
22

3-
import java.util.concurrent.ExecutorService;
4-
53
import io.fabric8.kubernetes.client.KubernetesClient;
64
import io.javaoperatorsdk.operator.api.config.ControllerConfiguration;
75

@@ -23,8 +21,7 @@ public boolean isEmpty() {
2321
}
2422

2523
@Override
26-
public Workflow resolve(KubernetesClient client, ControllerConfiguration configuration,
27-
ExecutorService executorService) {
24+
public Workflow resolve(KubernetesClient client, ControllerConfiguration configuration) {
2825
return new DefaultWorkflow(null);
2926
}
3027
};

operator-framework-core/src/main/java/io/javaoperatorsdk/operator/processing/dependent/workflow/WorkflowCleanupExecutor.java

Lines changed: 4 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -45,13 +45,13 @@ protected Logger logger() {
4545

4646
@SuppressWarnings({"rawtypes", "unchecked"})
4747
private synchronized void handleCleanup(DependentResourceNode dependentResourceNode) {
48-
log.debug("Submitting for cleanup: {}", dependentResourceNode);
48+
log.debug("Submitting for cleanup: {} primaryID: {}", dependentResourceNode, primaryID);
4949

5050
if (alreadyVisited(dependentResourceNode)
5151
|| isExecutingNow(dependentResourceNode)
5252
|| !allDependentsCleaned(dependentResourceNode)
5353
|| hasErroredDependent(dependentResourceNode)) {
54-
log.debug("Skipping submit of: {}, ", dependentResourceNode);
54+
log.debug("Skipping submit of: {} primaryID: {}", dependentResourceNode, primaryID);
5555
return;
5656
}
5757

@@ -96,7 +96,8 @@ private synchronized void handleDependentCleaned(
9696
var dependOns = dependentResourceNode.getDependsOn();
9797
if (dependOns != null) {
9898
dependOns.forEach(d -> {
99-
log.debug("Handle cleanup for dependent: {} of parent:{}", d, dependentResourceNode);
99+
log.debug("Handle cleanup for dependent: {} of parent: {} primaryID: {}", d,
100+
dependentResourceNode, primaryID);
100101
handleCleanup(d);
101102
});
102103
}

operator-framework-core/src/main/java/io/javaoperatorsdk/operator/processing/dependent/workflow/WorkflowReconcileExecutor.java

Lines changed: 14 additions & 14 deletions
Original file line numberDiff line numberDiff line change
@@ -16,7 +16,6 @@
1616
import io.javaoperatorsdk.operator.api.reconciler.dependent.Deleter;
1717
import io.javaoperatorsdk.operator.api.reconciler.dependent.DependentResource;
1818
import io.javaoperatorsdk.operator.api.reconciler.dependent.ReconcileResult;
19-
import io.javaoperatorsdk.operator.processing.event.ResourceID;
2019

2120
@SuppressWarnings({"rawtypes", "unchecked"})
2221
public class WorkflowReconcileExecutor<P extends HasMetadata> extends AbstractWorkflowExecutor<P> {
@@ -54,14 +53,14 @@ protected Logger logger() {
5453
}
5554

5655
private synchronized <R> void handleReconcile(DependentResourceNode<R, P> dependentResourceNode) {
57-
log.debug("Submitting for reconcile: {}", dependentResourceNode);
56+
log.debug("Submitting for reconcile: {} primaryID: {}", dependentResourceNode, primaryID);
5857

5958
if (alreadyVisited(dependentResourceNode)
6059
|| isExecutingNow(dependentResourceNode)
6160
|| !allParentsReconciledAndReady(dependentResourceNode)
6261
|| markedForDelete.contains(dependentResourceNode)
6362
|| hasErroredParent(dependentResourceNode)) {
64-
log.debug("Skipping submit of: {}, ", dependentResourceNode);
63+
log.debug("Skipping submit of: {}, primaryID: {}", dependentResourceNode, primaryID);
6564
return;
6665
}
6766

@@ -73,7 +72,7 @@ private synchronized <R> void handleReconcile(DependentResourceNode<R, P> depend
7372
Future<?> nodeFuture = executorService
7473
.submit(new NodeReconcileExecutor(dependentResourceNode));
7574
markAsExecuting(dependentResourceNode, nodeFuture);
76-
log.debug("Submitted to reconcile: {}", dependentResourceNode);
75+
log.debug("Submitted to reconcile: {} primaryID: {}", dependentResourceNode, primaryID);
7776
}
7877
}
7978

@@ -84,7 +83,8 @@ private synchronized void handleDelete(DependentResourceNode dependentResourceNo
8483
|| isExecutingNow(dependentResourceNode)
8584
|| !markedForDelete.contains(dependentResourceNode)
8685
|| !allDependentsDeletedAlready(dependentResourceNode)) {
87-
log.debug("Skipping submit for delete of: {}, ", dependentResourceNode);
86+
log.debug("Skipping submit for delete of: {} primaryID: {} ", dependentResourceNode,
87+
primaryID);
8888
return;
8989
}
9090

@@ -117,20 +117,18 @@ private NodeReconcileExecutor(DependentResourceNode<R, P> dependentResourceNode)
117117
@Override
118118
protected void doRun(DependentResourceNode<R, P> dependentResourceNode,
119119
DependentResource<R, P> dependentResource) {
120-
if (log.isDebugEnabled()) {
121-
log.debug(
122-
"Reconciling {} for primary: {}",
123-
dependentResourceNode,
124-
ResourceID.fromResource(primary));
125-
}
120+
121+
log.debug(
122+
"Reconciling for primary: {} node: {} ", primaryID, dependentResourceNode);
126123
ReconcileResult reconcileResult = dependentResource.reconcile(primary, context);
127124
reconcileResults.put(dependentResource, reconcileResult);
128125
reconciled.add(dependentResourceNode);
129126

130127
boolean ready = isConditionMet(dependentResourceNode.getReadyPostcondition(),
131128
dependentResource);
132129
if (ready) {
133-
log.debug("Setting already reconciled for: {}", dependentResourceNode);
130+
log.debug("Setting already reconciled for: {} primaryID: {}",
131+
dependentResourceNode, primaryID);
134132
markAsVisited(dependentResourceNode);
135133
handleDependentsReconcile(dependentResourceNode);
136134
} else {
@@ -173,7 +171,8 @@ protected void doRun(DependentResourceNode<R, P> dependentResourceNode,
173171
private synchronized void handleDependentDeleted(
174172
DependentResourceNode<?, P> dependentResourceNode) {
175173
dependentResourceNode.getDependsOn().forEach(dr -> {
176-
log.debug("Handle deleted for: {} with dependent: {}", dr, dependentResourceNode);
174+
log.debug("Handle deleted for: {} with dependent: {} primaryID: {}", dr,
175+
dependentResourceNode, primaryID);
177176
handleDelete(dr);
178177
});
179178
}
@@ -182,7 +181,8 @@ private synchronized void handleDependentsReconcile(
182181
DependentResourceNode<?, P> dependentResourceNode) {
183182
var dependents = dependentResourceNode.getParents();
184183
dependents.forEach(d -> {
185-
log.debug("Handle reconcile for dependent: {} of parent:{}", d, dependentResourceNode);
184+
log.debug("Handle reconcile for dependent: {} of parent:{} primaryID: {}", d,
185+
dependentResourceNode, primaryID);
186186
handleReconcile(d);
187187
});
188188
}

operator-framework-core/src/main/java/io/javaoperatorsdk/operator/processing/event/EventProcessor.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -370,7 +370,7 @@ public synchronized void stop() {
370370
@Override
371371
public void start() throws OperatorException {
372372
// on restart new executor service is created and needs to be set here
373-
executor = executorServiceManager.executorService();
373+
executor = executorServiceManager.reconcileExecutorService();
374374
this.running = true;
375375
handleAlreadyMarkedEvents();
376376
}

operator-framework-core/src/main/java/io/javaoperatorsdk/operator/processing/event/EventSourceManager.java

Lines changed: 0 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -46,8 +46,6 @@ public EventSourceManager(Controller<P> controller,
4646
this.executorServiceManager = executorServiceManager;
4747
// controller event source needs to be available before we create the event processor
4848
eventSources.createControllerEventSource(controller);
49-
50-
5149
postProcessDefaultEventSourcesAfterProcessorInitializer();
5250
}
5351

operator-framework/src/test/java/io/javaoperatorsdk/operator/DependentResourceCrossRefIT.java

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -28,6 +28,7 @@ class DependentResourceCrossRefIT {
2828

2929
@Test
3030
void dependentResourceCanReferenceEachOther() {
31+
3132
for (int i = 0; i < EXECUTION_NUMBER; i++) {
3233
operator.create(testResource(i));
3334
}
@@ -43,6 +44,7 @@ void dependentResourceCanReferenceEachOther() {
4344
assertThat(operator.get(Secret.class, TEST_RESOURCE_NAME + i)).isNotNull();
4445
}
4546
});
47+
4648
}
4749

4850
DependentResourceCrossRefResource testResource(int n) {

0 commit comments

Comments
 (0)