Skip to content

Commit fdd60bb

Browse files
committed
fix: init sources before starting, wait for controller to be started
1 parent 6028d8a commit fdd60bb

File tree

6 files changed

+75
-26
lines changed

6 files changed

+75
-26
lines changed

operator-framework-core/src/main/java/io/javaoperatorsdk/operator/Operator.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -34,6 +34,7 @@ public Operator(ConfigurationService configurationService) {
3434
public Operator(KubernetesClient kubernetesClient, ConfigurationService configurationService) {
3535
this.kubernetesClient = kubernetesClient;
3636
this.configurationService = configurationService;
37+
ExecutorServiceManager.init(configurationService);
3738
}
3839

3940
/** Adds a shutdown hook that automatically calls {@link #close()} when the app shuts down. */
@@ -85,7 +86,6 @@ public void start() {
8586
throw new OperatorException(error, e);
8687
}
8788

88-
ExecutorServiceManager.init(configurationService);
8989
controllers.start();
9090
}
9191

operator-framework-core/src/main/java/io/javaoperatorsdk/operator/api/reconciler/DefaultContext.java

Lines changed: 4 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -4,7 +4,7 @@
44

55
import io.fabric8.kubernetes.api.model.HasMetadata;
66
import io.javaoperatorsdk.operator.processing.Controller;
7-
import io.javaoperatorsdk.operator.processing.event.ResourceID;
7+
import io.javaoperatorsdk.operator.processing.event.source.DependentResourceEventSource;
88
import io.javaoperatorsdk.operator.processing.event.source.EventSourceRegistry;
99

1010
public class DefaultContext<P extends HasMetadata> implements Context<P> {
@@ -32,9 +32,8 @@ public EventSourceRegistry<P> getEventSourceRegistry() {
3232
@Override
3333
public <T extends HasMetadata> T getSecondaryResource(Class<T> expectedType,
3434
String... qualifier) {
35-
return getEventSourceRegistry()
36-
.getResourceEventSourceFor(expectedType, qualifier)
37-
.getResourceCache()
38-
.get(ResourceID.fromResource(primaryResource)).orElse(null);
35+
final var eventSource = (DependentResourceEventSource<T, P>) getEventSourceRegistry()
36+
.getResourceEventSourceFor(expectedType, qualifier);
37+
return eventSource == null ? null : eventSource.getAssociated(primaryResource);
3938
}
4039
}

operator-framework-core/src/main/java/io/javaoperatorsdk/operator/processing/Controller.java

Lines changed: 38 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,11 @@
11
package io.javaoperatorsdk.operator.processing;
22

33
import java.util.Objects;
4+
import java.util.concurrent.atomic.AtomicBoolean;
5+
import java.util.concurrent.atomic.AtomicInteger;
6+
7+
import org.slf4j.Logger;
8+
import org.slf4j.LoggerFactory;
49

510
import io.fabric8.kubernetes.api.model.HasMetadata;
611
import io.fabric8.kubernetes.api.model.KubernetesResourceList;
@@ -31,21 +36,46 @@
3136

3237
public class Controller<R extends HasMetadata> implements Reconciler<R>,
3338
LifecycleAware, EventSourceInitializer<R> {
39+
40+
private static final Logger log = LoggerFactory.getLogger(Controller.class);
3441
private final Reconciler<R> reconciler;
3542
private final ControllerConfiguration<R> configuration;
3643
private final KubernetesClient kubernetesClient;
37-
private EventSourceManager<R> eventSourceManager;
44+
private final EventSourceManager<R> eventSourceManager;
45+
private final AtomicBoolean started = new AtomicBoolean(false);
3846

3947
public Controller(Reconciler<R> reconciler,
4048
ControllerConfiguration<R> configuration,
4149
KubernetesClient kubernetesClient) {
4250
this.reconciler = reconciler;
4351
this.configuration = configuration;
4452
this.kubernetesClient = kubernetesClient;
53+
54+
eventSourceManager = new EventSourceManager<>(this);
55+
prepareEventSources(eventSourceManager,
56+
configuration.getConfigurationService().getResourceCloner());
57+
}
58+
59+
private void waitUntilStarted() {
60+
if (!started.get()) {
61+
AtomicInteger count = new AtomicInteger(0);
62+
final var waitTime = 50;
63+
while (!started.get()) {
64+
try {
65+
count.getAndIncrement();
66+
Thread.sleep(waitTime);
67+
} catch (InterruptedException e) {
68+
e.printStackTrace();
69+
}
70+
}
71+
log.info("Waited {}ms for controller '{}' to finish initializing", count.get() * waitTime,
72+
configuration.getName());
73+
}
4574
}
4675

4776
@Override
4877
public DeleteControl cleanup(R resource, Context<R> context) {
78+
waitUntilStarted();
4979
return configuration.getConfigurationService().getMetrics().timeControllerExecution(
5080
new ControllerExecution<>() {
5181
@Override
@@ -72,6 +102,7 @@ public DeleteControl execute() {
72102

73103
@Override
74104
public UpdateControl<R> reconcile(R resource, Context<R> context) {
105+
waitUntilStarted();
75106
final var metrics = configuration.getConfigurationService().getMetrics();
76107

77108
configuration.getDependentResources().forEach(dependent -> {
@@ -154,10 +185,11 @@ protected ResourceEventFilter initFilter(ResourceConfiguration configuration) {
154185
@Override
155186
protected EventSourceWrapper wrapEventSource(
156187
FilterWatchListDeletable filteredBySelectorClient, Cloner cloner) {
157-
final var source = new DependentResourceEventSource(filteredBySelectorClient, cloner,
188+
final var dependent = new DependentResourceEventSource(filteredBySelectorClient, cloner,
158189
dependentConfiguration);
159-
eventSourceRegistry.registerEventSource(source);
160-
return source;
190+
// make sure we're set to receive events
191+
eventSourceRegistry.registerEventSource(dependent);
192+
return dependent;
161193
}
162194
};
163195
});
@@ -235,15 +267,14 @@ public void start() throws OperatorException {
235267
CustomResourceUtils.assertCustomResource(resClass, crd);
236268
}
237269

238-
eventSourceManager = new EventSourceManager<>(this);
239-
prepareEventSources(eventSourceManager, configurationService.getResourceCloner());
240270
if (failOnMissingCurrentNS()) {
241271
throw new OperatorException(
242272
"Controller '"
243273
+ controllerName
244274
+ "' is configured to watch the current namespace but it couldn't be inferred from the current configuration.");
245275
}
246276
eventSourceManager.start();
277+
started.set(true);
247278
} catch (MissingCRDException e) {
248279
throwMissingCRDException(crdName, specVersion, controllerName);
249280
}
@@ -282,6 +313,7 @@ public void stop() {
282313
if (eventSourceManager != null) {
283314
eventSourceManager.stop();
284315
}
316+
started.set(false);
285317
}
286318

287319
public EventSourceRegistry<R> getEventSourceRegistry() {

operator-framework-core/src/main/java/io/javaoperatorsdk/operator/processing/event/EventSourceManager.java

Lines changed: 7 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -41,10 +41,11 @@ public class EventSourceManager<R extends HasMetadata>
4141

4242
public EventSourceManager(Controller<R> controller) {
4343
this.controller = controller;
44+
initRetryEventSource();
45+
4446
controllerResourceEventSource = new ControllerResourceEventSource<>(controller, this);
4547
this.eventProcessor = new EventProcessor<>(this);
4648
registerEventSource(controllerResourceEventSource);
47-
initRetryEventSource();
4849
}
4950

5051
private void initRetryEventSource() {
@@ -62,9 +63,10 @@ public void start() throws OperatorException {
6263
eventProcessor.start();
6364
lock.lock();
6465
try {
65-
log.debug("Starting event sources.");
6666
for (var eventSource : eventSources.values()) {
6767
try {
68+
log.debug("Starting source {} for {}", eventSource.getClass(),
69+
eventSource.getResourceClass());
6870
eventSource.start();
6971
} catch (Exception e) {
7072
log.warn("Error starting {} -> {}", eventSource, e);
@@ -134,15 +136,15 @@ private String keyFor(Class<?> dependentType, String... qualifier) {
134136
key += "-" + qualifier[0];
135137
}
136138

137-
// make sure we process controller and timer event sources first
139+
// make sure timer event source is started first, then controller event source
138140
// this is needed so that these sources are set when informer sources start so that events can
139141
// properly be processed
140142
if (controllerResourceEventSource != null
141143
&& className.equals(controllerResourceEventSource.getResourceClass().getCanonicalName())) {
142-
key = 0 + key;
144+
key = 1 + key;
143145
} else if (retryAndRescheduleTimerEventSource != null && className
144146
.equals(retryAndRescheduleTimerEventSource.getResourceClass().getCanonicalName())) {
145-
key = 1 + key;
147+
key = 0 + key;
146148
}
147149
return key;
148150
}

operator-framework-core/src/main/java/io/javaoperatorsdk/operator/processing/event/source/AbstractResourceEventSource.java

Lines changed: 11 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -23,7 +23,8 @@
2323
import static io.javaoperatorsdk.operator.processing.KubernetesResourceUtils.getVersion;
2424

2525
public abstract class AbstractResourceEventSource<T extends HasMetadata, U extends ResourceConfiguration<T, U>, V extends EventSourceWrapper<T>, P extends HasMetadata>
26-
extends AbstractEventSource<P> implements ResourceEventHandler<T>, ResourceEventSource<T, P> {
26+
extends AbstractEventSource<P>
27+
implements ResourceEventHandler<T> {
2728

2829
private static final String ANY_NAMESPACE_MAP_KEY = "anyNamespace";
2930
private static final Logger log = LoggerFactory.getLogger(AbstractResourceEventSource.class);
@@ -33,7 +34,7 @@ public abstract class AbstractResourceEventSource<T extends HasMetadata, U exten
3334
private final U configuration;
3435
private final MixedOperation<T, KubernetesResourceList<T>, Resource<T>> client;
3536
private final Cloner cloner;
36-
private ResourceCache<T> cache;
37+
private final ResourceCache<T> cache;
3738

3839
public AbstractResourceEventSource(U configuration,
3940
MixedOperation<T, KubernetesResourceList<T>, Resource<T>> client, Cloner cloner,
@@ -44,9 +45,12 @@ public AbstractResourceEventSource(U configuration,
4445
this.filter = initFilter(configuration);
4546
this.cloner = cloner;
4647
setEventRegistry(registry);
48+
49+
initSources();
50+
51+
this.cache = new AggregateResourceCache<>(sources);
4752
}
4853

49-
@Override
5054
public U getConfiguration() {
5155
return configuration;
5256
}
@@ -86,6 +90,10 @@ public void onDelete(T resource, boolean b) {
8690

8791
@Override
8892
public void start() throws OperatorException {
93+
sources.values().parallelStream().forEach(LifecycleAware::start);
94+
}
95+
96+
private void initSources() {
8997
final var targetNamespaces = configuration.getEffectiveNamespaces();
9098
final var labelSelector = configuration.getLabelSelector();
9199

@@ -103,11 +111,6 @@ public void start() throws OperatorException {
103111
ns);
104112
});
105113
}
106-
this.cache = new AggregateResourceCache<>(sources);
107-
108-
// start sources only after the cache is populated so that events don't start coming in before
109-
// it's ready
110-
sources.values().parallelStream().forEach(LifecycleAware::start);
111114
}
112115

113116

sample-operators/tomcat-operator/src/main/java/io/javaoperatorsdk/operator/sample/WebappReconciler.java

Lines changed: 14 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -25,16 +25,19 @@
2525
import io.javaoperatorsdk.operator.api.reconciler.UpdateControl;
2626
import io.javaoperatorsdk.operator.api.reconciler.dependent.DependentResourceConfiguration;
2727
import io.javaoperatorsdk.operator.processing.event.ResourceID;
28+
import io.javaoperatorsdk.operator.processing.event.source.AssociatedSecondaryRetriever;
2829
import io.javaoperatorsdk.operator.processing.event.source.EventSourceRegistry;
2930
import io.javaoperatorsdk.operator.processing.event.source.PrimaryResourcesRetriever;
31+
import io.javaoperatorsdk.operator.sample.WebappReconciler.TomcatRetriever;
3032
import io.javaoperatorsdk.operator.sample.WebappReconciler.WebappRetriever;
3133

3234
import okhttp3.Response;
3335

3436
@ControllerConfiguration(
3537
dependents = @DependentResourceConfiguration(
3638
creatable = false, resourceType = Tomcat.class,
37-
associatedPrimariesRetriever = WebappRetriever.class))
39+
associatedPrimariesRetriever = WebappRetriever.class,
40+
associatedSecondaryRetriever = TomcatRetriever.class))
3841
public class WebappReconciler implements Reconciler<Webapp> {
3942

4043
private KubernetesClient kubernetesClient;
@@ -60,6 +63,16 @@ public Set<ResourceID> associatedPrimaryResources(Tomcat t,
6063
}
6164
}
6265

66+
public static class TomcatRetriever implements AssociatedSecondaryRetriever<Tomcat, Webapp> {
67+
68+
@Override
69+
public Tomcat associatedSecondary(Webapp primary, EventSourceRegistry<Webapp> registry) {
70+
return registry.getResourceEventSourceFor(Tomcat.class).getResourceCache()
71+
.get(new ResourceID(primary.getSpec().getTomcat(), primary.getMetadata().getNamespace()))
72+
.orElse(null);
73+
}
74+
}
75+
6376
/**
6477
* This method will be called not only on changes to Webapp objects but also when Tomcat objects
6578
* change.

0 commit comments

Comments
 (0)