Skip to content

Commit 4f3231e

Browse files
committed
unit tests passing
1 parent 9ec77eb commit 4f3231e

32 files changed

+176
-187
lines changed

operator-framework-core/src/main/java/io/javaoperatorsdk/operator/api/config/ConfigurationServiceOverrider.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -236,7 +236,7 @@ public ResourceClassResolver getResourceClassResolver() {
236236
}
237237

238238
/**
239-
* @deprecated Use {@link ConfigurationServiceProvider#overrideCurrent(Consumer)} instead
239+
* @deprecated Use {@link ConfigurationService#overrideCurrent(Consumer)} instead
240240
* @param original that will be overriding
241241
* @return current overrider
242242
*/

operator-framework-core/src/main/java/io/javaoperatorsdk/operator/api/config/ConfigurationServiceProvider.java

Lines changed: 0 additions & 74 deletions
This file was deleted.

operator-framework-core/src/main/java/io/javaoperatorsdk/operator/api/config/ExecutorServiceManager.java

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -30,12 +30,13 @@ public ExecutorServiceManager(ConfigurationService configurationService) {
3030
this.configurationService = configurationService;
3131
}
3232

33-
public void init() {
33+
public ExecutorServiceManager init() {
3434
final var executorService = configurationService.getExecutorService();
3535
final var workflowExecutorService = configurationService.getWorkflowExecutorService();
3636
this.cachingExecutorService = Executors.newCachedThreadPool();
3737
this.executor = new InstrumentedExecutorService(executorService);
3838
this.workflowExecutor = new InstrumentedExecutorService(workflowExecutorService);
39+
return this;
3940
}
4041

4142
/**

operator-framework-core/src/main/java/io/javaoperatorsdk/operator/api/reconciler/Context.java

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -2,6 +2,7 @@
22

33
import java.util.Optional;
44
import java.util.Set;
5+
import java.util.concurrent.ExecutorService;
56

67
import io.fabric8.kubernetes.api.model.HasMetadata;
78
import io.fabric8.kubernetes.client.KubernetesClient;
@@ -32,4 +33,9 @@ <R> Optional<R> getSecondaryResource(Class<R> expectedType,
3233
EventSourceRetriever<P> eventSourceRetriever();
3334

3435
KubernetesClient getClient();
36+
37+
/**
38+
* ExecutorService initialized by framework for workflows. Used for workflow standalone mode.
39+
*/
40+
ExecutorService getWorkflowExecutorService();
3541
}

operator-framework-core/src/main/java/io/javaoperatorsdk/operator/api/reconciler/DefaultContext.java

Lines changed: 8 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -2,6 +2,7 @@
22

33
import java.util.Optional;
44
import java.util.Set;
5+
import java.util.concurrent.ExecutorService;
56
import java.util.stream.Collectors;
67

78
import io.fabric8.kubernetes.api.model.HasMetadata;
@@ -75,6 +76,13 @@ public KubernetesClient getClient() {
7576
return controller.getClient();
7677
}
7778

79+
@Override
80+
public ExecutorService getWorkflowExecutorService() {
81+
// not that this should be always received from executor service manager, so we are able to do
82+
// restarts.
83+
return controller.getExecutorServiceManager().executorService();
84+
}
85+
7886
public DefaultContext<P> setRetryInfo(RetryInfo retryInfo) {
7987
this.retryInfo = retryInfo;
8088
return this;

operator-framework-core/src/main/java/io/javaoperatorsdk/operator/processing/Controller.java

Lines changed: 8 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -64,6 +64,7 @@ public class Controller<P extends HasMetadata>
6464
private final EventProcessor<P> eventProcessor;
6565
private final ControllerHealthInfo controllerHealthInfo;
6666
private final ConfigurationService configurationService;
67+
private final ExecutorServiceManager executorServiceManager;
6768

6869
public Controller(Reconciler<P> reconciler,
6970
ControllerConfiguration<P> configuration,
@@ -76,11 +77,13 @@ public Controller(Reconciler<P> reconciler,
7677
this.configurationService = configuration.getConfigurationService();
7778
this.kubernetesClient = kubernetesClient;
7879
this.metrics = Optional.ofNullable(configurationService.getMetrics()).orElse(Metrics.NOOP);
80+
this.executorServiceManager = executorServiceManager;
7981
contextInitializer = reconciler instanceof ContextInitializer;
8082
isCleaner = reconciler instanceof Cleaner;
8183

8284
final var managed = configurationService.getWorkflowFactory().workflowFor(configuration);
83-
managedWorkflow = managed.resolve(kubernetesClient, configuration);
85+
managedWorkflow = managed.resolve(kubernetesClient, configuration,
86+
executorServiceManager.workflowExecutorService());
8487

8588
eventSourceManager = new EventSourceManager<>(this, executorServiceManager);
8689
eventProcessor =
@@ -425,4 +428,8 @@ public GroupVersionKind getAssociatedGroupVersionKind() {
425428
public EventProcessor<P> getEventProcessor() {
426429
return eventProcessor;
427430
}
431+
432+
public ExecutorServiceManager getExecutorServiceManager() {
433+
return executorServiceManager;
434+
}
428435
}

operator-framework-core/src/main/java/io/javaoperatorsdk/operator/processing/dependent/workflow/DefaultManagedWorkflow.java

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -4,6 +4,7 @@
44
import java.util.HashSet;
55
import java.util.List;
66
import java.util.Set;
7+
import java.util.concurrent.ExecutorService;
78
import java.util.stream.Collectors;
89

910
import io.fabric8.kubernetes.api.model.HasMetadata;
@@ -78,7 +79,7 @@ public boolean isEmpty() {
7879
@Override
7980
@SuppressWarnings("unchecked")
8081
public Workflow<P> resolve(KubernetesClient client,
81-
ControllerConfiguration<P> configuration) {
82+
ControllerConfiguration<P> configuration, ExecutorService executorService) {
8283
final var alreadyResolved = new HashMap<String, DependentResourceNode>(orderedSpecs.size());
8384
for (DependentResourceSpec spec : orderedSpecs) {
8485
final var node = new DependentResourceNode(spec.getName(),

operator-framework-core/src/main/java/io/javaoperatorsdk/operator/processing/dependent/workflow/DefaultWorkflow.java

Lines changed: 2 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -8,8 +8,6 @@
88
import java.util.Set;
99

1010
import io.fabric8.kubernetes.api.model.HasMetadata;
11-
import io.javaoperatorsdk.operator.api.config.BaseConfigurationService;
12-
import io.javaoperatorsdk.operator.api.config.ExecutorServiceManager;
1311
import io.javaoperatorsdk.operator.api.reconciler.Context;
1412
import io.javaoperatorsdk.operator.api.reconciler.dependent.Deleter;
1513
import io.javaoperatorsdk.operator.api.reconciler.dependent.DependentResource;
@@ -58,7 +56,6 @@ protected DefaultWorkflow(Map<String, DependentResourceNode> dependentResourceNo
5856
boolean hasCleaner) {
5957
this.throwExceptionAutomatically = throwExceptionAutomatically;
6058
this.hasCleaner = hasCleaner;
61-
6259
this.topLevelResources = topLevelResources;
6360
this.bottomLevelResource = bottomLevelResource;
6461
this.dependentResourceNodes = dependentResourceNodes;
@@ -88,8 +85,7 @@ private Map<String, DependentResourceNode> toMap(Set<DependentResourceNode> node
8885
@Override
8986
public WorkflowReconcileResult reconcile(P primary, Context<P> context) {
9087
WorkflowReconcileExecutor<P> workflowReconcileExecutor =
91-
new WorkflowReconcileExecutor<>(this, primary, context,
92-
new ExecutorServiceManager(new BaseConfigurationService()));
88+
new WorkflowReconcileExecutor<>(this, primary, context);
9389
var result = workflowReconcileExecutor.reconcile();
9490
if (throwExceptionAutomatically) {
9591
result.throwAggregateExceptionIfErrorsPresent();
@@ -100,8 +96,7 @@ public WorkflowReconcileResult reconcile(P primary, Context<P> context) {
10096
@Override
10197
public WorkflowCleanupResult cleanup(P primary, Context<P> context) {
10298
WorkflowCleanupExecutor<P> workflowCleanupExecutor =
103-
new WorkflowCleanupExecutor<>(this, primary, context,
104-
new ExecutorServiceManager(new BaseConfigurationService()));
99+
new WorkflowCleanupExecutor<>(this, primary, context);
105100
var result = workflowCleanupExecutor.cleanup();
106101
if (throwExceptionAutomatically) {
107102
result.throwAggregateExceptionIfErrorsPresent();

operator-framework-core/src/main/java/io/javaoperatorsdk/operator/processing/dependent/workflow/ManagedWorkflow.java

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -2,6 +2,7 @@
22

33
import java.util.Collections;
44
import java.util.List;
5+
import java.util.concurrent.ExecutorService;
56

67
import io.fabric8.kubernetes.api.model.HasMetadata;
78
import io.fabric8.kubernetes.client.KubernetesClient;
@@ -23,5 +24,6 @@ default boolean isEmpty() {
2324
return true;
2425
}
2526

26-
Workflow<P> resolve(KubernetesClient client, ControllerConfiguration<P> configuration);
27+
Workflow<P> resolve(KubernetesClient client, ControllerConfiguration<P> configuration,
28+
ExecutorService executorService);
2729
}

operator-framework-core/src/main/java/io/javaoperatorsdk/operator/processing/dependent/workflow/ManagedWorkflowFactory.java

Lines changed: 4 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,7 @@
11
package io.javaoperatorsdk.operator.processing.dependent.workflow;
22

3+
import java.util.concurrent.ExecutorService;
4+
35
import io.fabric8.kubernetes.client.KubernetesClient;
46
import io.javaoperatorsdk.operator.api.config.ControllerConfiguration;
57

@@ -21,7 +23,8 @@ public boolean isEmpty() {
2123
}
2224

2325
@Override
24-
public Workflow resolve(KubernetesClient client, ControllerConfiguration configuration) {
26+
public Workflow resolve(KubernetesClient client, ControllerConfiguration configuration,
27+
ExecutorService executorService) {
2528
return new DefaultWorkflow(null);
2629
}
2730
};

operator-framework-core/src/main/java/io/javaoperatorsdk/operator/processing/dependent/workflow/WorkflowCleanupExecutor.java

Lines changed: 5 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -3,14 +3,14 @@
33
import java.util.List;
44
import java.util.Set;
55
import java.util.concurrent.ConcurrentHashMap;
6+
import java.util.concurrent.ExecutorService;
67
import java.util.concurrent.Future;
78
import java.util.stream.Collectors;
89

910
import org.slf4j.Logger;
1011
import org.slf4j.LoggerFactory;
1112

1213
import io.fabric8.kubernetes.api.model.HasMetadata;
13-
import io.javaoperatorsdk.operator.api.config.ExecutorServiceManager;
1414
import io.javaoperatorsdk.operator.api.reconciler.Context;
1515
import io.javaoperatorsdk.operator.api.reconciler.dependent.Deleter;
1616
import io.javaoperatorsdk.operator.api.reconciler.dependent.DependentResource;
@@ -23,12 +23,11 @@ public class WorkflowCleanupExecutor<P extends HasMetadata> extends AbstractWork
2323
private final Set<DependentResourceNode> postDeleteConditionNotMet =
2424
ConcurrentHashMap.newKeySet();
2525
private final Set<DependentResourceNode> deleteCalled = ConcurrentHashMap.newKeySet();
26-
private final ExecutorServiceManager executorServiceManager;
26+
private final ExecutorService executorService;
2727

28-
public WorkflowCleanupExecutor(Workflow<P> workflow, P primary, Context<P> context,
29-
ExecutorServiceManager executorServiceManager) {
28+
public WorkflowCleanupExecutor(Workflow<P> workflow, P primary, Context<P> context) {
3029
super(workflow, primary, context);
31-
this.executorServiceManager = executorServiceManager;
30+
this.executorService = context.getWorkflowExecutorService();
3231
}
3332

3433
public synchronized WorkflowCleanupResult cleanup() {
@@ -56,7 +55,7 @@ private synchronized void handleCleanup(DependentResourceNode dependentResourceN
5655
return;
5756
}
5857

59-
Future<?> nodeFuture = executorServiceManager.workflowExecutorService()
58+
Future<?> nodeFuture = executorService
6059
.submit(new CleanupExecutor<>(dependentResourceNode));
6160
markAsExecuting(dependentResourceNode, nodeFuture);
6261
log.debug("Submitted for cleanup: {}", dependentResourceNode);

operator-framework-core/src/main/java/io/javaoperatorsdk/operator/processing/dependent/workflow/WorkflowReconcileExecutor.java

Lines changed: 6 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -4,14 +4,14 @@
44
import java.util.Map;
55
import java.util.Set;
66
import java.util.concurrent.ConcurrentHashMap;
7+
import java.util.concurrent.ExecutorService;
78
import java.util.concurrent.Future;
89
import java.util.stream.Collectors;
910

1011
import org.slf4j.Logger;
1112
import org.slf4j.LoggerFactory;
1213

1314
import io.fabric8.kubernetes.api.model.HasMetadata;
14-
import io.javaoperatorsdk.operator.api.config.ExecutorServiceManager;
1515
import io.javaoperatorsdk.operator.api.reconciler.Context;
1616
import io.javaoperatorsdk.operator.api.reconciler.dependent.Deleter;
1717
import io.javaoperatorsdk.operator.api.reconciler.dependent.DependentResource;
@@ -33,12 +33,11 @@ public class WorkflowReconcileExecutor<P extends HasMetadata> extends AbstractWo
3333
private final Set<DependentResourceNode> reconciled = ConcurrentHashMap.newKeySet();
3434
private final Map<DependentResource, ReconcileResult> reconcileResults =
3535
new ConcurrentHashMap<>();
36-
private final ExecutorServiceManager executorServiceManager;
36+
private final ExecutorService executorService;
3737

38-
public WorkflowReconcileExecutor(Workflow<P> workflow, P primary, Context<P> context,
39-
ExecutorServiceManager executorServiceManager) {
38+
public WorkflowReconcileExecutor(Workflow<P> workflow, P primary, Context<P> context) {
4039
super(workflow, primary, context);
41-
this.executorServiceManager = executorServiceManager;
40+
this.executorService = context.getWorkflowExecutorService();
4241
}
4342

4443
public synchronized WorkflowReconcileResult reconcile() {
@@ -71,7 +70,7 @@ private synchronized <R> void handleReconcile(DependentResourceNode<R, P> depend
7170
if (!reconcileConditionMet) {
7271
handleReconcileConditionNotMet(dependentResourceNode);
7372
} else {
74-
Future<?> nodeFuture = executorServiceManager.workflowExecutorService()
73+
Future<?> nodeFuture = executorService
7574
.submit(new NodeReconcileExecutor(dependentResourceNode));
7675
markAsExecuting(dependentResourceNode, nodeFuture);
7776
log.debug("Submitted to reconcile: {}", dependentResourceNode);
@@ -89,7 +88,7 @@ private synchronized void handleDelete(DependentResourceNode dependentResourceNo
8988
return;
9089
}
9190

92-
Future<?> nodeFuture = executorServiceManager.workflowExecutorService()
91+
Future<?> nodeFuture = executorService
9392
.submit(new NodeDeleteExecutor(dependentResourceNode));
9493
markAsExecuting(dependentResourceNode, nodeFuture);
9594
log.debug("Submitted to delete: {}", dependentResourceNode);

operator-framework-core/src/main/java/io/javaoperatorsdk/operator/processing/event/EventSourceManager.java

Lines changed: 4 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -147,6 +147,7 @@ public final void registerEventSource(EventSource eventSource) throws OperatorEx
147147
registerEventSource(null, eventSource);
148148
}
149149

150+
@SuppressWarnings("rawtypes")
150151
public final synchronized void registerEventSource(String name, EventSource eventSource)
151152
throws OperatorException {
152153
Objects.requireNonNull(eventSource, "EventSource must not be null");
@@ -155,8 +156,10 @@ public final synchronized void registerEventSource(String name, EventSource even
155156
name = EventSourceInitializer.generateNameFor(eventSource);
156157
}
157158
if (eventSource instanceof ManagedInformerEventSource) {
158-
((ManagedInformerEventSource) eventSource).setConfigurationService(controller
159+
var managedInformerEventSource = ((ManagedInformerEventSource) eventSource);
160+
managedInformerEventSource.setConfigurationService(controller
159161
.getConfiguration().getConfigurationService());
162+
managedInformerEventSource.setExecutorServiceManager(executorServiceManager);
160163
}
161164
final var named = new NamedEventSource(eventSource, name);
162165
eventSources.add(named);

operator-framework-core/src/main/java/io/javaoperatorsdk/operator/processing/event/EventSources.java

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -34,6 +34,7 @@ void initControllerEventSource(Controller<R> controller) {
3434
controllerResourceEventSource = new ControllerResourceEventSource<>(controller);
3535
controllerResourceEventSource
3636
.setConfigurationService(controller.getConfiguration().getConfigurationService());
37+
controllerResourceEventSource.setExecutorServiceManager(controller.getExecutorServiceManager());
3738
}
3839

3940
ControllerResourceEventSource<R> controllerResourceEventSource() {

operator-framework-core/src/main/java/io/javaoperatorsdk/operator/processing/event/source/informer/ManagedInformerEventSource.java

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -17,6 +17,7 @@
1717
import io.fabric8.kubernetes.client.dsl.Resource;
1818
import io.fabric8.kubernetes.client.informers.ResourceEventHandler;
1919
import io.javaoperatorsdk.operator.api.config.ConfigurationService;
20+
import io.javaoperatorsdk.operator.api.config.ExecutorServiceManager;
2021
import io.javaoperatorsdk.operator.api.config.NamespaceChangeable;
2122
import io.javaoperatorsdk.operator.api.config.ResourceConfiguration;
2223
import io.javaoperatorsdk.operator.api.reconciler.dependent.RecentOperationCacheFiller;
@@ -179,4 +180,8 @@ public void setConfigurationService(ConfigurationService configurationService) {
179180
this.configurationService = configurationService;
180181
}
181182

183+
public void setExecutorServiceManager(
184+
ExecutorServiceManager executorServiceManager) {
185+
cache.setExecutorServiceManager(executorServiceManager);
186+
}
182187
}

0 commit comments

Comments
 (0)