Skip to content

Commit 576a572

Browse files
committed
leader election impl
1 parent 9557e9c commit 576a572

File tree

6 files changed

+63
-38
lines changed

6 files changed

+63
-38
lines changed

operator-framework-core/src/main/java/io/javaoperatorsdk/operator/ControllerManager.java

Lines changed: 10 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -9,16 +9,16 @@
99
import org.slf4j.LoggerFactory;
1010

1111
import io.javaoperatorsdk.operator.processing.Controller;
12-
import io.javaoperatorsdk.operator.processing.LifecycleAware;
1312

1413
/**
1514
* Not confuse with controller manager form go operators. The highest level aggregate is
1615
* {@link Operator} in JOSDK.
1716
*/
18-
class ControllerManager implements LifecycleAware {
17+
class ControllerManager {
1918

2019
private static final Logger log = LoggerFactory.getLogger(ControllerManager.class);
2120

21+
@SuppressWarnings("rawtypes")
2222
private final Map<String, Controller> controllers = new HashMap<>();
2323
private boolean started = false;
2424

@@ -31,8 +31,8 @@ public synchronized void shouldStart() {
3131
}
3232
}
3333

34-
public synchronized void start() {
35-
controllers().parallelStream().forEach(Controller::start);
34+
public synchronized void start(boolean startEventProcessor) {
35+
controllers().parallelStream().forEach(c -> c.start(startEventProcessor));
3636
started = true;
3737
}
3838

@@ -41,10 +41,13 @@ public synchronized void stop() {
4141
log.debug("closing {}", closeable);
4242
closeable.stop();
4343
});
44-
4544
started = false;
4645
}
4746

47+
public synchronized void startEventProcessing() {
48+
controllers().parallelStream().forEach(Controller::startEventProcessing);
49+
}
50+
4851
@SuppressWarnings("unchecked")
4952
synchronized void add(Controller controller) {
5053
final var configuration = controller.getConfiguration();
@@ -57,17 +60,16 @@ synchronized void add(Controller controller) {
5760
+ "' is already registered for resource '" + resourceTypeName + "'");
5861
}
5962
controllers.put(resourceTypeName, controller);
60-
if (started) {
61-
controller.start();
62-
}
6363
}
6464

65+
@SuppressWarnings("rawtypes")
6566
synchronized Optional<Controller> get(String name) {
6667
return controllers().stream()
6768
.filter(c -> name.equals(c.getConfiguration().getName()))
6869
.findFirst();
6970
}
7071

72+
@SuppressWarnings("rawtypes")
7173
synchronized Collection<Controller> controllers() {
7274
return controllers.values();
7375
}

operator-framework-core/src/main/java/io/javaoperatorsdk/operator/LeaderElectionManager.java

Lines changed: 24 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,7 @@
11
package io.javaoperatorsdk.operator;
22

33
import java.util.UUID;
4+
import java.util.concurrent.CompletableFuture;
45

56
import org.slf4j.Logger;
67
import org.slf4j.LoggerFactory;
@@ -20,16 +21,18 @@ public class LeaderElectionManager {
2021
private static final Logger log = LoggerFactory.getLogger(LeaderElectionManager.class);
2122

2223
private LeaderElector leaderElector = null;
23-
private ControllerManager controllerManager;
24+
private final ControllerManager controllerManager;
25+
private String identity;
26+
private CompletableFuture<?> leaderElectionFuture;
2427

2528
public LeaderElectionManager(ControllerManager controllerManager) {
2629
this.controllerManager = controllerManager;
2730
}
2831

2932
public void init(LeaderElectionConfiguration config, KubernetesClient client) {
30-
Lock lock = new LeaseLock(config.getLeaseNamespace(), config.getLeaseName(), identity(config));
31-
// todo releaseOnCancel
32-
// todo use this executor service?
33+
this.identity = identity(config);
34+
Lock lock = new LeaseLock(config.getLeaseNamespace(), config.getLeaseName(), identity);
35+
// releaseOnCancel is not used in the underlying implementation
3336
leaderElector = new LeaderElectorBuilder(client,
3437
ConfigurationServiceProvider.instance().getExecutorService())
3538
.withConfig(
@@ -38,7 +41,6 @@ public void init(LeaderElectionConfiguration config, KubernetesClient client) {
3841
.build();
3942
}
4043

41-
4244
public boolean isLeaderElectionOn() {
4345
return leaderElector != null;
4446
}
@@ -50,11 +52,15 @@ private LeaderCallbacks leaderCallbacks() {
5052
}
5153

5254
private void startLeading() {
53-
55+
controllerManager.startEventProcessing();
5456
}
5557

5658
private void stopLeading() {
57-
59+
log.info("Stopped leading for identity: {}. Exiting.", identity);
60+
// When leader stops leading the process ends immediately to prevent multiple reconciliations
61+
// running parallel.
62+
// Note that some reconciliations might run a very long time.
63+
System.exit(1);
5864
}
5965

6066
private String identity(LeaderElectionConfiguration config) {
@@ -65,4 +71,15 @@ private String identity(LeaderElectionConfiguration config) {
6571
return identity;
6672
}
6773

74+
public void start() {
75+
if (isLeaderElectionOn()) {
76+
leaderElectionFuture = leaderElector.start();
77+
}
78+
}
79+
80+
public void stop() {
81+
if (leaderElectionFuture != null) {
82+
leaderElectionFuture.cancel(false);
83+
}
84+
}
6885
}

operator-framework-core/src/main/java/io/javaoperatorsdk/operator/Operator.java

Lines changed: 14 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -11,6 +11,7 @@
1111
import io.fabric8.kubernetes.api.model.HasMetadata;
1212
import io.fabric8.kubernetes.client.DefaultKubernetesClient;
1313
import io.fabric8.kubernetes.client.KubernetesClient;
14+
import io.fabric8.kubernetes.client.KubernetesClientBuilder;
1415
import io.fabric8.kubernetes.client.Version;
1516
import io.javaoperatorsdk.operator.api.config.*;
1617
import io.javaoperatorsdk.operator.api.reconciler.Reconciler;
@@ -24,9 +25,10 @@ public class Operator implements LifecycleAware {
2425
private final ControllerManager controllerManager = new ControllerManager();
2526
private final LeaderElectionManager leaderElectionManager =
2627
new LeaderElectionManager(controllerManager);
28+
private volatile boolean started = false;
2729

2830
public Operator() {
29-
this(new DefaultKubernetesClient(), ConfigurationServiceProvider.instance());
31+
this(new KubernetesClientBuilder().build(), ConfigurationServiceProvider.instance());
3032
}
3133

3234
public Operator(KubernetesClient kubernetesClient) {
@@ -42,7 +44,7 @@ public Operator(ConfigurationService configurationService) {
4244
}
4345

4446
public Operator(Consumer<ConfigurationServiceOverrider> overrider) {
45-
this(new DefaultKubernetesClient(), overrider);
47+
this(new KubernetesClientBuilder().build(), overrider);
4648
}
4749

4850
public Operator(KubernetesClient client, Consumer<ConfigurationServiceOverrider> overrider) {
@@ -82,9 +84,11 @@ public KubernetesClient getKubernetesClient() {
8284
*/
8385
public void start() {
8486
try {
85-
87+
if (started) {
88+
return;
89+
}
90+
started = true;
8691
controllerManager.shouldStart();
87-
8892
final var version = ConfigurationServiceProvider.instance().getVersion();
8993
log.info(
9094
"Operator SDK {} (commit: {}) built on {} starting...",
@@ -94,9 +98,9 @@ public void start() {
9498

9599
final var clientVersion = Version.clientVersion();
96100
log.info("Client version: {}", clientVersion);
97-
98101
ExecutorServiceManager.init();
99-
controllerManager.start();
102+
controllerManager.start(!leaderElectionManager.isLeaderElectionOn());
103+
leaderElectionManager.start();
100104
} catch (Exception e) {
101105
log.error("Error starting operator", e);
102106
stop();
@@ -111,8 +115,8 @@ public void stop() throws OperatorException {
111115
"Operator SDK {} is shutting down...", configurationService.getVersion().getSdkVersion());
112116

113117
controllerManager.stop();
114-
115118
ExecutorServiceManager.stop();
119+
leaderElectionManager.stop();
116120
if (configurationService.closeClientOnStop()) {
117121
kubernetesClient.close();
118122
}
@@ -148,6 +152,9 @@ public <P extends HasMetadata> RegisteredController<P> register(Reconciler<P> re
148152
public <P extends HasMetadata> RegisteredController<P> register(Reconciler<P> reconciler,
149153
ControllerConfiguration<P> configuration)
150154
throws OperatorException {
155+
if (started) {
156+
throw new OperatorException("Operator already started. Register all the controllers before.");
157+
}
151158

152159
if (configuration == null) {
153160
throw new OperatorException(

operator-framework-core/src/main/java/io/javaoperatorsdk/operator/processing/Controller.java

Lines changed: 12 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -45,7 +45,7 @@
4545
@SuppressWarnings({"unchecked", "rawtypes"})
4646
@Ignore
4747
public class Controller<P extends HasMetadata>
48-
implements Reconciler<P>, Cleaner<P>, LifecycleAware,
48+
implements Reconciler<P>, Cleaner<P>,
4949
RegisteredController<P> {
5050

5151
private static final Logger log = LoggerFactory.getLogger(Controller.class);
@@ -62,7 +62,6 @@ public class Controller<P extends HasMetadata>
6262
private final GroupVersionKind associatedGVK;
6363
private final EventProcessor<P> eventProcessor;
6464

65-
6665
public Controller(Reconciler<P> reconciler,
6766
ControllerConfiguration<P> configuration,
6867
KubernetesClient kubernetesClient) {
@@ -272,7 +271,7 @@ public MixedOperation<P, KubernetesResourceList<P>, Resource<P>> getCRClient() {
272271
*
273272
* @throws OperatorException if a problem occurred during the registration process
274273
*/
275-
public void start() throws OperatorException {
274+
public synchronized void start(boolean startEventProcessor) throws OperatorException {
276275
final Class<P> resClass = configuration.getResourceClass();
277276
final String controllerName = configuration.getName();
278277
final var crdName = configuration.getResourceTypeName();
@@ -290,14 +289,17 @@ public void start() throws OperatorException {
290289

291290
initAndRegisterEventSources(context);
292291
eventSourceManager.start();
293-
eventProcessor.start();
292+
if (startEventProcessor) {
293+
eventProcessor.start();
294+
}
294295
log.info("'{}' controller started, pending event sources initialization", controllerName);
295296
} catch (MissingCRDException e) {
296297
stop();
297298
throwMissingCRDException(crdName, specVersion, controllerName);
298299
}
299300
}
300301

302+
301303
private void validateCRDWithLocalModelIfRequired(Class<P> resClass, String controllerName,
302304
String crdName, String specVersion) {
303305
final CustomResourceDefinition crd;
@@ -323,14 +325,11 @@ public void changeNamespaces(Set<String> namespaces) {
323325
eventProcessor.start();
324326
}
325327

326-
public void startEventProcessing() {
328+
public synchronized void startEventProcessing() {
329+
log.info("Started event processing for controller: {}", configuration.getName());
327330
eventProcessor.start();
328331
}
329332

330-
public void stopEventProcessing() {
331-
eventProcessor.stop();
332-
}
333-
334333
private void throwMissingCRDException(String crdName, String specVersion, String controllerName) {
335334
throw new MissingCRDException(
336335
crdName,
@@ -363,13 +362,13 @@ public EventSourceManager<P> getEventSourceManager() {
363362
return eventSourceManager;
364363
}
365364

366-
public void stop() {
367-
if (eventSourceManager != null) {
368-
eventSourceManager.stop();
369-
}
365+
public synchronized void stop() {
370366
if (eventProcessor != null) {
371367
eventProcessor.stop();
372368
}
369+
if (eventSourceManager != null) {
370+
eventSourceManager.stop();
371+
}
373372
}
374373

375374
public boolean useFinalizer() {

operator-framework-core/src/test/java/io/javaoperatorsdk/operator/processing/ControllerTest.java

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -24,7 +24,7 @@ void crdShouldNotBeCheckedForNativeResources() {
2424
final var configuration = MockControllerConfiguration.forResource(Secret.class);
2525

2626
final var controller = new Controller<Secret>(reconciler, configuration, client);
27-
controller.start();
27+
controller.start(true);
2828
verify(client, never()).apiextensions();
2929
}
3030

@@ -36,7 +36,7 @@ void crdShouldNotBeCheckedForCustomResourcesIfDisabled() {
3636
try {
3737
ConfigurationServiceProvider.overrideCurrent(o -> o.checkingCRDAndValidateLocalModel(false));
3838
final var controller = new Controller<TestCustomResource>(reconciler, configuration, client);
39-
controller.start();
39+
controller.start(true);
4040
verify(client, never()).apiextensions();
4141
} finally {
4242
ConfigurationServiceProvider.reset();

operator-framework-core/src/test/java/io/javaoperatorsdk/operator/processing/event/ReconciliationDispatcherTest.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -121,7 +121,7 @@ public boolean useFinalizer() {
121121
return useFinalizer;
122122
}
123123
};
124-
controller.start();
124+
controller.start(true);
125125

126126
return new ReconciliationDispatcher<>(controller, customResourceFacade);
127127
}

0 commit comments

Comments
 (0)