Skip to content

Commit 91d08f3

Browse files
committed
refactor: get ExecutorServiceManager from ConfigurationService
1 parent 208b8a9 commit 91d08f3

20 files changed

+112
-170
lines changed

operator-framework-core/src/main/java/io/javaoperatorsdk/operator/ControllerManager.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -22,7 +22,7 @@ class ControllerManager {
2222
@SuppressWarnings("rawtypes")
2323
private final Map<String, Controller> controllers = new HashMap<>();
2424
private boolean started = false;
25-
private ExecutorServiceManager executorServiceManager;
25+
private final ExecutorServiceManager executorServiceManager;
2626

2727
public ControllerManager(ExecutorServiceManager executorServiceManager) {
2828
this.executorServiceManager = executorServiceManager;

operator-framework-core/src/main/java/io/javaoperatorsdk/operator/LeaderElectionManager.java

Lines changed: 15 additions & 19 deletions
Original file line numberDiff line numberDiff line change
@@ -13,7 +13,6 @@
1313
import io.fabric8.kubernetes.client.extended.leaderelection.LeaderElectorBuilder;
1414
import io.fabric8.kubernetes.client.extended.leaderelection.resourcelock.LeaseLock;
1515
import io.javaoperatorsdk.operator.api.config.ConfigurationService;
16-
import io.javaoperatorsdk.operator.api.config.ExecutorServiceManager;
1716
import io.javaoperatorsdk.operator.api.config.LeaderElectionConfiguration;
1817

1918
public class LeaderElectionManager {
@@ -24,17 +23,15 @@ public class LeaderElectionManager {
2423
private final ControllerManager controllerManager;
2524
private String identity;
2625
private CompletableFuture<?> leaderElectionFuture;
27-
private ConfigurationService configurationService;
28-
private ExecutorServiceManager executorServiceManager;
29-
private KubernetesClient kubernetesClient;
26+
private final ConfigurationService configurationService;
27+
private final KubernetesClient kubernetesClient;
3028

3129
public LeaderElectionManager(KubernetesClient kubernetesClient,
3230
ControllerManager controllerManager,
33-
ConfigurationService configurationService, ExecutorServiceManager executorServiceManager) {
31+
ConfigurationService configurationService) {
3432
this.kubernetesClient = kubernetesClient;
3533
this.controllerManager = controllerManager;
3634
this.configurationService = configurationService;
37-
this.executorServiceManager = executorServiceManager;
3835
}
3936

4037
public boolean isLeaderElectionEnabled() {
@@ -94,18 +91,17 @@ private void init(LeaderElectionConfiguration config) {
9491
}
9592
final var lock = new LeaseLock(leaseNamespace, config.getLeaseName(), identity);
9693
// releaseOnCancel is not used in the underlying implementation
97-
leaderElector =
98-
new LeaderElectorBuilder(
99-
kubernetesClient, executorServiceManager.cachingExecutorService())
100-
.withConfig(
101-
new LeaderElectionConfig(
102-
lock,
103-
config.getLeaseDuration(),
104-
config.getRenewDeadline(),
105-
config.getRetryPeriod(),
106-
leaderCallbacks(),
107-
true,
108-
config.getLeaseName()))
109-
.build();
94+
leaderElector = new LeaderElectorBuilder(
95+
kubernetesClient, configurationService.getExecutorServiceManager().cachingExecutorService())
96+
.withConfig(
97+
new LeaderElectionConfig(
98+
lock,
99+
config.getLeaseDuration(),
100+
config.getRenewDeadline(),
101+
config.getRetryPeriod(),
102+
leaderCallbacks(),
103+
true,
104+
config.getLeaseName()))
105+
.build();
110106
}
111107
}

operator-framework-core/src/main/java/io/javaoperatorsdk/operator/Operator.java

Lines changed: 4 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -19,7 +19,6 @@
1919
import io.javaoperatorsdk.operator.api.config.ConfigurationServiceOverrider;
2020
import io.javaoperatorsdk.operator.api.config.ControllerConfiguration;
2121
import io.javaoperatorsdk.operator.api.config.ControllerConfigurationOverrider;
22-
import io.javaoperatorsdk.operator.api.config.ExecutorServiceManager;
2322
import io.javaoperatorsdk.operator.api.reconciler.Reconciler;
2423
import io.javaoperatorsdk.operator.processing.Controller;
2524
import io.javaoperatorsdk.operator.processing.LifecycleAware;
@@ -32,7 +31,6 @@ public class Operator implements LifecycleAware {
3231
private final ControllerManager controllerManager;
3332
private final LeaderElectionManager leaderElectionManager;
3433
private final ConfigurationService configurationService;
35-
private final ExecutorServiceManager executorServiceManager;
3634
private volatile boolean started = false;
3735

3836

@@ -71,7 +69,7 @@ public Operator(KubernetesClient client, Consumer<ConfigurationServiceOverrider>
7169
*/
7270
public Operator(KubernetesClient kubernetesClient, ConfigurationService configurationService) {
7371
this.configurationService = configurationService;
74-
this.executorServiceManager = new ExecutorServiceManager(configurationService);
72+
final var executorServiceManager = configurationService.getExecutorServiceManager();
7573
controllerManager = new ControllerManager(executorServiceManager);
7674
this.kubernetesClient =
7775
kubernetesClient != null ? kubernetesClient
@@ -82,8 +80,7 @@ public Operator(KubernetesClient kubernetesClient, ConfigurationService configur
8280

8381

8482
leaderElectionManager =
85-
new LeaderElectionManager(kubernetesClient, controllerManager, configurationService,
86-
executorServiceManager);
83+
new LeaderElectionManager(kubernetesClient, controllerManager, configurationService);
8784
}
8885

8986
/**
@@ -131,7 +128,6 @@ public synchronized void start() {
131128
if (started) {
132129
return;
133130
}
134-
executorServiceManager.init();
135131
controllerManager.shouldStart();
136132
final var version = configurationService.getVersion();
137133
log.info(
@@ -161,7 +157,7 @@ public void stop(Duration gracefulShutdownTimeout) throws OperatorException {
161157
"Operator SDK {} is shutting down...", configurationService.getVersion().getSdkVersion());
162158
controllerManager.stop();
163159

164-
executorServiceManager.stop(gracefulShutdownTimeout);
160+
configurationService.getExecutorServiceManager().stop(gracefulShutdownTimeout);
165161
leaderElectionManager.stop();
166162
if (configurationService.closeClientOnStop()) {
167163
kubernetesClient.close();
@@ -220,8 +216,7 @@ public <P extends HasMetadata> RegisteredController<P> register(Reconciler<P> re
220216
+ configurationService.getKnownReconcilerNames());
221217
}
222218

223-
final var controller =
224-
new Controller<>(reconciler, configuration, kubernetesClient, executorServiceManager);
219+
final var controller = new Controller<>(reconciler, configuration, kubernetesClient);
225220

226221
controllerManager.add(controller);
227222

operator-framework-core/src/main/java/io/javaoperatorsdk/operator/api/config/ConfigurationService.java

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -263,4 +263,8 @@ static ConfigurationService newOverriddenConfigurationService(
263263
}
264264
return baseConfiguration;
265265
}
266+
267+
default ExecutorServiceManager getExecutorServiceManager() {
268+
return new ExecutorServiceManager(this);
269+
}
266270
}

operator-framework-core/src/main/java/io/javaoperatorsdk/operator/api/config/ExecutorServiceManager.java

Lines changed: 4 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -21,21 +21,15 @@
2121

2222
public class ExecutorServiceManager {
2323
private static final Logger log = LoggerFactory.getLogger(ExecutorServiceManager.class);
24-
private ExecutorService executor;
25-
private ExecutorService workflowExecutor;
26-
private ExecutorService cachingExecutorService;
27-
private final ConfigurationService configurationService;
24+
private final ExecutorService executor;
25+
private final ExecutorService workflowExecutor;
26+
private final ExecutorService cachingExecutorService;
2827

29-
public ExecutorServiceManager(ConfigurationService configurationService) {
30-
this.configurationService = configurationService;
31-
}
32-
33-
public ExecutorServiceManager init() {
28+
ExecutorServiceManager(ConfigurationService configurationService) {
3429
this.cachingExecutorService = Executors.newCachedThreadPool();
3530
this.executor = new InstrumentedExecutorService(configurationService.getExecutorService());
3631
this.workflowExecutor =
3732
new InstrumentedExecutorService(configurationService.getWorkflowExecutorService());
38-
return this;
3933
}
4034

4135
/**

operator-framework-core/src/main/java/io/javaoperatorsdk/operator/processing/Controller.java

Lines changed: 24 additions & 14 deletions
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,11 @@
11
package io.javaoperatorsdk.operator.processing;
22

3-
import java.util.*;
3+
import java.util.ArrayList;
4+
import java.util.HashMap;
5+
import java.util.List;
6+
import java.util.Map;
7+
import java.util.Optional;
8+
import java.util.Set;
49

510
import org.slf4j.Logger;
611
import org.slf4j.LoggerFactory;
@@ -16,12 +21,20 @@
1621
import io.javaoperatorsdk.operator.MissingCRDException;
1722
import io.javaoperatorsdk.operator.OperatorException;
1823
import io.javaoperatorsdk.operator.RegisteredController;
19-
import io.javaoperatorsdk.operator.api.config.ConfigurationService;
2024
import io.javaoperatorsdk.operator.api.config.ControllerConfiguration;
2125
import io.javaoperatorsdk.operator.api.config.ExecutorServiceManager;
2226
import io.javaoperatorsdk.operator.api.monitoring.Metrics;
2327
import io.javaoperatorsdk.operator.api.monitoring.Metrics.ControllerExecution;
24-
import io.javaoperatorsdk.operator.api.reconciler.*;
28+
import io.javaoperatorsdk.operator.api.reconciler.Cleaner;
29+
import io.javaoperatorsdk.operator.api.reconciler.Constants;
30+
import io.javaoperatorsdk.operator.api.reconciler.Context;
31+
import io.javaoperatorsdk.operator.api.reconciler.ContextInitializer;
32+
import io.javaoperatorsdk.operator.api.reconciler.DeleteControl;
33+
import io.javaoperatorsdk.operator.api.reconciler.EventSourceContext;
34+
import io.javaoperatorsdk.operator.api.reconciler.EventSourceInitializer;
35+
import io.javaoperatorsdk.operator.api.reconciler.Ignore;
36+
import io.javaoperatorsdk.operator.api.reconciler.Reconciler;
37+
import io.javaoperatorsdk.operator.api.reconciler.UpdateControl;
2538
import io.javaoperatorsdk.operator.api.reconciler.dependent.EventSourceNotFoundException;
2639
import io.javaoperatorsdk.operator.api.reconciler.dependent.EventSourceProvider;
2740
import io.javaoperatorsdk.operator.api.reconciler.dependent.EventSourceReferencer;
@@ -63,30 +76,27 @@ public class Controller<P extends HasMetadata>
6376
private final GroupVersionKind associatedGVK;
6477
private final EventProcessor<P> eventProcessor;
6578
private final ControllerHealthInfo controllerHealthInfo;
66-
private final ConfigurationService configurationService;
67-
private final ExecutorServiceManager executorServiceManager;
6879

6980
public Controller(Reconciler<P> reconciler,
7081
ControllerConfiguration<P> configuration,
71-
KubernetesClient kubernetesClient, ExecutorServiceManager executorServiceManager) {
82+
KubernetesClient kubernetesClient) {
7283
// needs to be initialized early since it's used in other downstream classes
7384
associatedGVK = GroupVersionKind.gvkFor(configuration.getResourceClass());
7485

86+
final var configurationService = configuration.getConfigurationService();
87+
final var executorServiceManager = configurationService.getExecutorServiceManager();
7588
this.reconciler = reconciler;
7689
this.configuration = configuration;
77-
this.configurationService = configuration.getConfigurationService();
7890
this.kubernetesClient = kubernetesClient;
7991
this.metrics = Optional.ofNullable(configurationService.getMetrics()).orElse(Metrics.NOOP);
80-
this.executorServiceManager = executorServiceManager;
8192
contextInitializer = reconciler instanceof ContextInitializer;
8293
isCleaner = reconciler instanceof Cleaner;
8394

8495
final var managed = configurationService.getWorkflowFactory().workflowFor(configuration);
8596
managedWorkflow = managed.resolve(kubernetesClient, configuration);
8697

87-
eventSourceManager = new EventSourceManager<>(this, executorServiceManager);
88-
eventProcessor =
89-
new EventProcessor<>(eventSourceManager, executorServiceManager, configurationService);
98+
eventSourceManager = new EventSourceManager<>(this);
99+
eventProcessor = new EventProcessor<>(eventSourceManager, configurationService);
90100
eventSourceManager.postProcessDefaultEventSourcesAfterProcessorInitializer();
91101
controllerHealthInfo = new ControllerHealthInfo(eventSourceManager);
92102
final var context = new EventSourceContext<>(
@@ -345,7 +355,7 @@ public synchronized void start(boolean startEventProcessor) throws OperatorExcep
345355
private void validateCRDWithLocalModelIfRequired(Class<P> resClass, String controllerName,
346356
String crdName, String specVersion) {
347357
final CustomResourceDefinition crd;
348-
if (configurationService.checkCRDAndValidateLocalModel()
358+
if (getConfiguration().getConfigurationService().checkCRDAndValidateLocalModel()
349359
&& CustomResource.class.isAssignableFrom(resClass)) {
350360
crd = kubernetesClient.apiextensions().v1().customResourceDefinitions().withName(crdName)
351361
.get();
@@ -394,7 +404,7 @@ private void throwMissingCRDException(String crdName, String specVersion, String
394404
*/
395405
private void failOnMissingCurrentNS() {
396406
try {
397-
configuration.getEffectiveNamespaces(configurationService);
407+
configuration.getEffectiveNamespaces(getConfiguration().getConfigurationService());
398408
} catch (OperatorException e) {
399409
throw new OperatorException(
400410
"Controller '"
@@ -429,6 +439,6 @@ public EventProcessor<P> getEventProcessor() {
429439
}
430440

431441
public ExecutorServiceManager getExecutorServiceManager() {
432-
return executorServiceManager;
442+
return getConfiguration().getConfigurationService().getExecutorServiceManager();
433443
}
434444
}

operator-framework-core/src/main/java/io/javaoperatorsdk/operator/processing/dependent/workflow/ManagedWorkflowFactory.java

Lines changed: 1 addition & 17 deletions
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,5 @@
11
package io.javaoperatorsdk.operator.processing.dependent.workflow;
22

3-
import io.fabric8.kubernetes.client.KubernetesClient;
43
import io.javaoperatorsdk.operator.api.config.ControllerConfiguration;
54

65
public interface ManagedWorkflowFactory<C extends ControllerConfiguration<?>> {
@@ -9,22 +8,7 @@ public interface ManagedWorkflowFactory<C extends ControllerConfiguration<?>> {
98
ManagedWorkflowFactory DEFAULT = (configuration) -> {
109
final var dependentResourceSpecs = configuration.getDependentResources();
1110
if (dependentResourceSpecs == null || dependentResourceSpecs.isEmpty()) {
12-
return new ManagedWorkflow() {
13-
@Override
14-
public boolean hasCleaner() {
15-
return false;
16-
}
17-
18-
@Override
19-
public boolean isEmpty() {
20-
return true;
21-
}
22-
23-
@Override
24-
public Workflow resolve(KubernetesClient client, ControllerConfiguration configuration) {
25-
return new DefaultWorkflow(null);
26-
}
27-
};
11+
return (ManagedWorkflow) (client, configuration1) -> new DefaultWorkflow(null);
2812
}
2913
ManagedWorkflowSupport support = new ManagedWorkflowSupport();
3014
return support.createWorkflow(dependentResourceSpecs);

operator-framework-core/src/main/java/io/javaoperatorsdk/operator/processing/event/EventProcessor.java

Lines changed: 9 additions & 17 deletions
Original file line numberDiff line numberDiff line change
@@ -13,7 +13,6 @@
1313
import io.javaoperatorsdk.operator.OperatorException;
1414
import io.javaoperatorsdk.operator.api.config.ConfigurationService;
1515
import io.javaoperatorsdk.operator.api.config.ControllerConfiguration;
16-
import io.javaoperatorsdk.operator.api.config.ExecutorServiceManager;
1716
import io.javaoperatorsdk.operator.api.monitoring.Metrics;
1817
import io.javaoperatorsdk.operator.api.reconciler.Constants;
1918
import io.javaoperatorsdk.operator.processing.LifecycleAware;
@@ -44,41 +43,33 @@ public class EventProcessor<P extends HasMetadata> implements EventHandler, Life
4443
private final RateLimiter<? extends RateLimitState> rateLimiter;
4544
private final ResourceStateManager resourceStateManager = new ResourceStateManager();
4645
private final Map<String, Object> metricsMetadata;
47-
private final ExecutorServiceManager executorServiceManager;
4846
private ExecutorService executor;
4947

50-
public EventProcessor(EventSourceManager<P> eventSourceManager, ExecutorServiceManager executor,
48+
public EventProcessor(EventSourceManager<P> eventSourceManager,
5149
ConfigurationService configurationService) {
5250
this(
5351
eventSourceManager.getController().getConfiguration(),
54-
eventSourceManager.getControllerResourceEventSource(),
55-
new ReconciliationDispatcher<>(eventSourceManager.getController()),
56-
configurationService.getMetrics(),
57-
eventSourceManager, executor);
52+
new ReconciliationDispatcher<>(eventSourceManager.getController()), eventSourceManager,
53+
configurationService.getMetrics(), eventSourceManager.getControllerResourceEventSource());
5854
}
5955

6056
@SuppressWarnings("rawtypes")
6157
EventProcessor(
6258
ControllerConfiguration controllerConfiguration,
6359
ReconciliationDispatcher<P> reconciliationDispatcher,
6460
EventSourceManager<P> eventSourceManager,
65-
Metrics metrics, ExecutorServiceManager executor) {
61+
Metrics metrics) {
6662
this(
6763
controllerConfiguration,
68-
eventSourceManager.getControllerResourceEventSource(),
69-
reconciliationDispatcher,
70-
metrics,
71-
eventSourceManager, executor);
64+
reconciliationDispatcher, eventSourceManager, metrics,
65+
eventSourceManager.getControllerResourceEventSource());
7266
}
7367

7468
@SuppressWarnings({"rawtypes", "unchecked"})
7569
private EventProcessor(
7670
ControllerConfiguration controllerConfiguration,
77-
Cache<P> cache,
7871
ReconciliationDispatcher<P> reconciliationDispatcher,
79-
Metrics metrics,
80-
EventSourceManager<P> eventSourceManager, ExecutorServiceManager executorServiceManager) {
81-
this.executorServiceManager = executorServiceManager;
72+
EventSourceManager<P> eventSourceManager, Metrics metrics, Cache<P> cache) {
8273
this.controllerConfiguration = controllerConfiguration;
8374
this.running = false;
8475
this.reconciliationDispatcher = reconciliationDispatcher;
@@ -370,7 +361,8 @@ public synchronized void stop() {
370361
@Override
371362
public void start() throws OperatorException {
372363
// on restart new executor service is created and needs to be set here
373-
executor = executorServiceManager.reconcileExecutorService();
364+
executor = controllerConfiguration.getConfigurationService().getExecutorServiceManager()
365+
.reconcileExecutorService();
374366
this.running = true;
375367
handleAlreadyMarkedEvents();
376368
}

operator-framework-core/src/main/java/io/javaoperatorsdk/operator/processing/event/EventSourceManager.java

Lines changed: 9 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,10 @@
11
package io.javaoperatorsdk.operator.processing.event;
22

3-
import java.util.*;
3+
import java.util.LinkedHashSet;
4+
import java.util.List;
5+
import java.util.Map;
6+
import java.util.Objects;
7+
import java.util.Set;
48
import java.util.function.Function;
59
import java.util.stream.Collectors;
610
import java.util.stream.Stream;
@@ -34,16 +38,14 @@ public class EventSourceManager<P extends HasMetadata>
3438
private final Controller<P> controller;
3539
private final ExecutorServiceManager executorServiceManager;
3640

37-
public EventSourceManager(Controller<P> controller,
38-
ExecutorServiceManager executorServiceManager) {
39-
this(controller, new EventSources<>(), executorServiceManager);
41+
public EventSourceManager(Controller<P> controller) {
42+
this(controller, new EventSources<>());
4043
}
4144

42-
EventSourceManager(Controller<P> controller, EventSources<P> eventSources,
43-
ExecutorServiceManager executorServiceManager) {
45+
EventSourceManager(Controller<P> controller, EventSources<P> eventSources) {
4446
this.eventSources = eventSources;
4547
this.controller = controller;
46-
this.executorServiceManager = executorServiceManager;
48+
this.executorServiceManager = controller.getExecutorServiceManager();
4749
// controller event source needs to be available before we create the event processor
4850
eventSources.createControllerEventSource(controller);
4951
postProcessDefaultEventSourcesAfterProcessorInitializer();

0 commit comments

Comments
 (0)