Skip to content

Commit 0a7c439

Browse files
committed
refactor: extract ResourceEventSource interface
1 parent 1809d35 commit 0a7c439

File tree

5 files changed

+139
-130
lines changed

5 files changed

+139
-130
lines changed

operator-framework-core/src/main/java/io/javaoperatorsdk/operator/processing/Controller.java

Lines changed: 4 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -23,11 +23,11 @@
2323
import io.javaoperatorsdk.operator.api.reconciler.Reconciler;
2424
import io.javaoperatorsdk.operator.api.reconciler.UpdateControl;
2525
import io.javaoperatorsdk.operator.processing.event.EventSourceManager;
26+
import io.javaoperatorsdk.operator.processing.event.source.AbstractResourceEventSource;
2627
import io.javaoperatorsdk.operator.processing.event.source.EventSourceRegistry;
2728
import io.javaoperatorsdk.operator.processing.event.source.EventSourceWrapper;
2829
import io.javaoperatorsdk.operator.processing.event.source.InformerEventSourceEventSourceWrapper;
2930
import io.javaoperatorsdk.operator.processing.event.source.ResourceEventFilter;
30-
import io.javaoperatorsdk.operator.processing.event.source.ResourceEventSource;
3131

3232
public class Controller<R extends HasMetadata> implements Reconciler<R>,
3333
LifecycleAware, EventSourceInitializer<R> {
@@ -143,7 +143,7 @@ public UpdateControl<R> execute() {
143143
public void prepareEventSources(EventSourceRegistry<R> eventSourceRegistry, Cloner cloner) {
144144
configuration.getDependentResources().forEach(dependent -> {
145145
final var dependentConfiguration = dependent.getConfiguration();
146-
eventSourceRegistry.registerEventSource(new ResourceEventSource<>(dependentConfiguration,
146+
final var source = new AbstractResourceEventSource<>(dependentConfiguration,
147147
kubernetesClient.resources(dependentConfiguration.getResourceClass()), cloner) {
148148
@Override
149149
protected ResourceEventFilter initFilter(ResourceConfiguration configuration) {
@@ -158,7 +158,8 @@ protected EventSourceWrapper wrapEventSource(
158158
dependentConfiguration.getSecondaryResourceAssociatedWithPrimaryMapper(),
159159
dependentConfiguration.skipUpdateIfUnchanged());
160160
}
161-
});
161+
};
162+
eventSourceRegistry.registerEventSource(source);
162163
});
163164

164165
if (reconciler instanceof EventSourceInitializer) {
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,128 @@
1+
package io.javaoperatorsdk.operator.processing.event.source;
2+
3+
import java.util.Map;
4+
import java.util.concurrent.ConcurrentHashMap;
5+
6+
import org.slf4j.Logger;
7+
import org.slf4j.LoggerFactory;
8+
9+
import io.fabric8.kubernetes.api.model.HasMetadata;
10+
import io.fabric8.kubernetes.api.model.KubernetesResourceList;
11+
import io.fabric8.kubernetes.client.dsl.FilterWatchListDeletable;
12+
import io.fabric8.kubernetes.client.dsl.MixedOperation;
13+
import io.fabric8.kubernetes.client.dsl.Resource;
14+
import io.fabric8.kubernetes.client.informers.ResourceEventHandler;
15+
import io.javaoperatorsdk.operator.OperatorException;
16+
import io.javaoperatorsdk.operator.api.config.Cloner;
17+
import io.javaoperatorsdk.operator.api.config.ResourceConfiguration;
18+
import io.javaoperatorsdk.operator.processing.LifecycleAware;
19+
import io.javaoperatorsdk.operator.processing.event.ResourceID;
20+
21+
import static io.javaoperatorsdk.operator.processing.KubernetesResourceUtils.getName;
22+
import static io.javaoperatorsdk.operator.processing.KubernetesResourceUtils.getUID;
23+
import static io.javaoperatorsdk.operator.processing.KubernetesResourceUtils.getVersion;
24+
25+
public abstract class AbstractResourceEventSource<T extends HasMetadata, U extends ResourceConfiguration<T, U>, V extends EventSourceWrapper<T>>
26+
extends AbstractEventSource implements ResourceEventHandler<T>, ResourceEventSource<T> {
27+
28+
private static final String ANY_NAMESPACE_MAP_KEY = "anyNamespace";
29+
private static final Logger log = LoggerFactory.getLogger(AbstractResourceEventSource.class);
30+
31+
private final Map<String, V> sources = new ConcurrentHashMap<>();
32+
private final ResourceEventFilter<T, U> filter;
33+
private final U configuration;
34+
private final MixedOperation<T, KubernetesResourceList<T>, Resource<T>> client;
35+
private final Cloner cloner;
36+
private ResourceCache<T> cache;
37+
38+
public AbstractResourceEventSource(U configuration,
39+
MixedOperation<T, KubernetesResourceList<T>, Resource<T>> client, Cloner cloner) {
40+
this.configuration = configuration;
41+
this.client = client;
42+
this.filter = initFilter(configuration);
43+
this.cloner = cloner;
44+
}
45+
46+
protected abstract ResourceEventFilter<T, U> initFilter(U configuration);
47+
48+
protected abstract V wrapEventSource(
49+
FilterWatchListDeletable<T, KubernetesResourceList<T>> filteredBySelectorClient,
50+
Cloner cloner);
51+
52+
void eventReceived(ResourceAction action, T resource, T oldResource) {
53+
log.debug("Event received for resource: {}", getName(resource));
54+
if (filter.acceptChange(configuration, oldResource, resource)) {
55+
eventHandler.handleEvent(new ResourceEvent(action, ResourceID.fromResource(resource)));
56+
} else {
57+
log.debug(
58+
"Skipping event handling resource {} with version: {}",
59+
getUID(resource),
60+
getVersion(resource));
61+
}
62+
}
63+
64+
@Override
65+
public void onAdd(T resource) {
66+
eventReceived(ResourceAction.ADDED, resource, null);
67+
}
68+
69+
@Override
70+
public void onUpdate(T oldResource, T newResource) {
71+
eventReceived(ResourceAction.UPDATED, newResource, oldResource);
72+
}
73+
74+
@Override
75+
public void onDelete(T resource, boolean b) {
76+
eventReceived(ResourceAction.DELETED, resource, null);
77+
}
78+
79+
@Override
80+
public void start() throws OperatorException {
81+
final var targetNamespaces = configuration.getEffectiveNamespaces();
82+
final var labelSelector = configuration.getLabelSelector();
83+
84+
if (ResourceConfiguration.allNamespacesWatched(targetNamespaces)) {
85+
final var filteredBySelectorClient =
86+
client.inAnyNamespace().withLabelSelector(labelSelector);
87+
final var source = createEventSource(filteredBySelectorClient, ANY_NAMESPACE_MAP_KEY);
88+
log.debug("Registered {} -> {} for any namespace", this, source);
89+
} else {
90+
targetNamespaces.forEach(
91+
ns -> {
92+
final var source =
93+
createEventSource(client.inNamespace(ns).withLabelSelector(labelSelector), ns);
94+
log.debug("Registered {} -> {} for namespace: {}", this, source,
95+
ns);
96+
});
97+
}
98+
this.cache = new AggregateResourceCache<>(sources);
99+
100+
// start sources only after the cache is populated so that events don't start coming in before
101+
// it's ready
102+
sources.values().parallelStream().forEach(LifecycleAware::start);
103+
}
104+
105+
106+
private V createEventSource(
107+
FilterWatchListDeletable<T, KubernetesResourceList<T>> filteredBySelectorClient, String key) {
108+
final var source = wrapEventSource(filteredBySelectorClient, cloner);
109+
sources.put(key, source);
110+
return source;
111+
}
112+
113+
@Override
114+
public void stop() {
115+
for (V source : sources.values()) {
116+
try {
117+
log.info("Stopping informer {} -> {}", this, source);
118+
source.stop();
119+
} catch (Exception e) {
120+
log.warn("Error stopping informer {} -> {}", this, source, e);
121+
}
122+
}
123+
}
124+
125+
public ResourceCache<T> getResourceCache() {
126+
return cache;
127+
}
128+
}

operator-framework-core/src/main/java/io/javaoperatorsdk/operator/processing/event/source/ControllerResourceEventSource.java

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -15,7 +15,8 @@
1515
* This is a special case since is not bound to a single custom resource
1616
*/
1717
public class ControllerResourceEventSource<T extends HasMetadata>
18-
extends ResourceEventSource<T, ControllerConfiguration<T>, InformerEventSourceWrapper<T>> {
18+
extends
19+
AbstractResourceEventSource<T, ControllerConfiguration<T>, InformerEventSourceWrapper<T>> {
1920

2021
public static final String ANY_NAMESPACE_MAP_KEY = "anyNamespace";
2122

operator-framework-core/src/main/java/io/javaoperatorsdk/operator/processing/event/source/InformerEventSourceWrapper.java

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -5,19 +5,19 @@
55
import java.util.stream.Stream;
66

77
import io.fabric8.kubernetes.api.model.HasMetadata;
8+
import io.fabric8.kubernetes.client.informers.ResourceEventHandler;
89
import io.fabric8.kubernetes.client.informers.SharedIndexInformer;
910
import io.javaoperatorsdk.operator.OperatorException;
1011
import io.javaoperatorsdk.operator.api.config.Cloner;
11-
import io.javaoperatorsdk.operator.api.config.ControllerConfiguration;
1212
import io.javaoperatorsdk.operator.processing.event.ResourceID;
1313

1414
class InformerEventSourceWrapper<T extends HasMetadata> implements EventSourceWrapper<T> {
1515

1616
private final SharedIndexInformer<T> informer;
17-
private InformerResourceCache<T> cache;
17+
private final InformerResourceCache<T> cache;
1818

1919
InformerEventSourceWrapper(SharedIndexInformer<T> informer, Cloner cloner,
20-
ResourceEventSource<T, ControllerConfiguration<T>, InformerEventSourceWrapper<T>> parent) {
20+
ResourceEventHandler<T> parent) {
2121
this.informer = informer;
2222
this.informer.addEventHandler(parent);
2323
this.cache = new InformerResourceCache<>(informer, cloner);
Original file line numberDiff line numberDiff line change
@@ -1,128 +1,7 @@
11
package io.javaoperatorsdk.operator.processing.event.source;
22

3-
import java.util.Map;
4-
import java.util.concurrent.ConcurrentHashMap;
5-
6-
import org.slf4j.Logger;
7-
import org.slf4j.LoggerFactory;
8-
93
import io.fabric8.kubernetes.api.model.HasMetadata;
10-
import io.fabric8.kubernetes.api.model.KubernetesResourceList;
11-
import io.fabric8.kubernetes.client.dsl.FilterWatchListDeletable;
12-
import io.fabric8.kubernetes.client.dsl.MixedOperation;
13-
import io.fabric8.kubernetes.client.dsl.Resource;
14-
import io.fabric8.kubernetes.client.informers.ResourceEventHandler;
15-
import io.javaoperatorsdk.operator.OperatorException;
16-
import io.javaoperatorsdk.operator.api.config.Cloner;
17-
import io.javaoperatorsdk.operator.api.config.ResourceConfiguration;
18-
import io.javaoperatorsdk.operator.processing.LifecycleAware;
19-
import io.javaoperatorsdk.operator.processing.event.ResourceID;
20-
21-
import static io.javaoperatorsdk.operator.processing.KubernetesResourceUtils.getName;
22-
import static io.javaoperatorsdk.operator.processing.KubernetesResourceUtils.getUID;
23-
import static io.javaoperatorsdk.operator.processing.KubernetesResourceUtils.getVersion;
24-
25-
public abstract class ResourceEventSource<T extends HasMetadata, U extends ResourceConfiguration<T, U>, V extends EventSourceWrapper<T>>
26-
extends AbstractEventSource implements ResourceEventHandler<T> {
27-
28-
private static final String ANY_NAMESPACE_MAP_KEY = "anyNamespace";
29-
private static final Logger log = LoggerFactory.getLogger(ResourceEventSource.class);
30-
31-
private final Map<String, V> sources = new ConcurrentHashMap<>();
32-
private final ResourceEventFilter<T, U> filter;
33-
private final U configuration;
34-
private final MixedOperation<T, KubernetesResourceList<T>, Resource<T>> client;
35-
private final Cloner cloner;
36-
private ResourceCache<T> cache;
37-
38-
public ResourceEventSource(U configuration,
39-
MixedOperation<T, KubernetesResourceList<T>, Resource<T>> client, Cloner cloner) {
40-
this.configuration = configuration;
41-
this.client = client;
42-
this.filter = initFilter(configuration);
43-
this.cloner = cloner;
44-
}
45-
46-
protected abstract ResourceEventFilter<T, U> initFilter(U configuration);
47-
48-
protected abstract V wrapEventSource(
49-
FilterWatchListDeletable<T, KubernetesResourceList<T>> filteredBySelectorClient,
50-
Cloner cloner);
51-
52-
void eventReceived(ResourceAction action, T resource, T oldResource) {
53-
log.debug("Event received for resource: {}", getName(resource));
54-
if (filter.acceptChange(configuration, oldResource, resource)) {
55-
eventHandler.handleEvent(new ResourceEvent(action, ResourceID.fromResource(resource)));
56-
} else {
57-
log.debug(
58-
"Skipping event handling resource {} with version: {}",
59-
getUID(resource),
60-
getVersion(resource));
61-
}
62-
}
63-
64-
@Override
65-
public void onAdd(T resource) {
66-
eventReceived(ResourceAction.ADDED, resource, null);
67-
}
68-
69-
@Override
70-
public void onUpdate(T oldResource, T newResource) {
71-
eventReceived(ResourceAction.UPDATED, newResource, oldResource);
72-
}
73-
74-
@Override
75-
public void onDelete(T resource, boolean b) {
76-
eventReceived(ResourceAction.DELETED, resource, null);
77-
}
78-
79-
@Override
80-
public void start() throws OperatorException {
81-
final var targetNamespaces = configuration.getEffectiveNamespaces();
82-
final var labelSelector = configuration.getLabelSelector();
83-
84-
if (ResourceConfiguration.allNamespacesWatched(targetNamespaces)) {
85-
final var filteredBySelectorClient =
86-
client.inAnyNamespace().withLabelSelector(labelSelector);
87-
final var source = createEventSource(filteredBySelectorClient, ANY_NAMESPACE_MAP_KEY);
88-
log.debug("Registered {} -> {} for any namespace", this, source);
89-
} else {
90-
targetNamespaces.forEach(
91-
ns -> {
92-
final var source =
93-
createEventSource(client.inNamespace(ns).withLabelSelector(labelSelector), ns);
94-
log.debug("Registered {} -> {} for namespace: {}", this, source,
95-
ns);
96-
});
97-
}
98-
this.cache = new AggregateResourceCache<>(sources);
99-
100-
// start sources only after the cache is populated so that events don't start coming in before
101-
// it's ready
102-
sources.values().parallelStream().forEach(LifecycleAware::start);
103-
}
104-
105-
106-
private V createEventSource(
107-
FilterWatchListDeletable<T, KubernetesResourceList<T>> filteredBySelectorClient, String key) {
108-
final var source = wrapEventSource(filteredBySelectorClient, cloner);
109-
sources.put(key, source);
110-
return source;
111-
}
112-
113-
@Override
114-
public void stop() {
115-
for (V source : sources.values()) {
116-
try {
117-
log.info("Stopping informer {} -> {}", this, source);
118-
source.stop();
119-
} catch (Exception e) {
120-
log.warn("Error stopping informer {} -> {}", this, source, e);
121-
}
122-
}
123-
}
1244

125-
public ResourceCache<T> getResourceCache() {
126-
return cache;
127-
}
5+
public interface ResourceEventSource<T extends HasMetadata> extends EventSource {
6+
ResourceCache<T> getResourceCache();
1287
}

0 commit comments

Comments
 (0)