Skip to content

Commit 4f5741c

Browse files
committed
feat: external resource with generated id support (#1527)
1 parent 9fa0ef2 commit 4f5741c

File tree

14 files changed

+345
-3
lines changed

14 files changed

+345
-3
lines changed

operator-framework-core/src/main/java/io/javaoperatorsdk/operator/api/config/informer/InformerConfiguration.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -164,7 +164,7 @@ public <P extends HasMetadata> InformerConfigurationBuilder<R> withNamespacesInh
164164
}
165165

166166
/**
167-
* Whether or not the associated informer should track changes made to the parent
167+
* Whether the associated informer should track changes made to the parent
168168
* {@link io.javaoperatorsdk.operator.processing.Controller}'s namespaces configuration.
169169
*
170170
* @param followChanges {@code true} to reconfigure the associated informer when the parent

operator-framework-core/src/main/java/io/javaoperatorsdk/operator/processing/event/EventSourceManager.java

Lines changed: 7 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -17,6 +17,7 @@
1717
import io.javaoperatorsdk.operator.processing.Controller;
1818
import io.javaoperatorsdk.operator.processing.LifecycleAware;
1919
import io.javaoperatorsdk.operator.processing.event.source.EventSource;
20+
import io.javaoperatorsdk.operator.processing.event.source.EventSourceStartPriority;
2021
import io.javaoperatorsdk.operator.processing.event.source.ResourceEventAware;
2122
import io.javaoperatorsdk.operator.processing.event.source.ResourceEventSource;
2223
import io.javaoperatorsdk.operator.processing.event.source.controller.ControllerResourceEventSource;
@@ -63,7 +64,12 @@ public void postProcessDefaultEventSourcesAfterProcessorInitializer() {
6364
@Override
6465
public synchronized void start() {
6566
startEventSource(eventSources.namedControllerResourceEventSource());
66-
eventSources.additionalNamedEventSources().parallel().forEach(this::startEventSource);
67+
eventSources.additionalNamedEventSources()
68+
.filter(es -> es.priority().equals(EventSourceStartPriority.RESOURCE_STATE_LOADER))
69+
.parallel().forEach(this::startEventSource);
70+
eventSources.additionalNamedEventSources()
71+
.filter(es -> es.priority().equals(EventSourceStartPriority.DEFAULT))
72+
.parallel().forEach(this::startEventSource);
6773
}
6874

6975
@Override

operator-framework-core/src/main/java/io/javaoperatorsdk/operator/processing/event/NamedEventSource.java

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -4,6 +4,7 @@
44

55
import io.javaoperatorsdk.operator.OperatorException;
66
import io.javaoperatorsdk.operator.processing.event.source.EventSource;
7+
import io.javaoperatorsdk.operator.processing.event.source.EventSourceStartPriority;
78

89
class NamedEventSource implements EventSource {
910

@@ -57,4 +58,9 @@ public boolean equals(Object o) {
5758
public int hashCode() {
5859
return Objects.hash(original, name);
5960
}
61+
62+
@Override
63+
public EventSourceStartPriority priority() {
64+
return original.priority();
65+
}
6066
}

operator-framework-core/src/main/java/io/javaoperatorsdk/operator/processing/event/source/AbstractEventSource.java

Lines changed: 12 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -6,6 +6,7 @@
66
public abstract class AbstractEventSource implements EventSource {
77
private EventHandler handler;
88
private volatile boolean running = false;
9+
private EventSourceStartPriority eventSourceStartPriority = EventSourceStartPriority.DEFAULT;
910

1011
protected EventHandler getEventHandler() {
1112
return handler;
@@ -29,4 +30,15 @@ public void start() throws OperatorException {
2930
public void stop() throws OperatorException {
3031
running = false;
3132
}
33+
34+
@Override
35+
public EventSourceStartPriority priority() {
36+
return eventSourceStartPriority;
37+
}
38+
39+
public AbstractEventSource setEventSourcePriority(
40+
EventSourceStartPriority eventSourceStartPriority) {
41+
this.eventSourceStartPriority = eventSourceStartPriority;
42+
return this;
43+
}
3244
}

operator-framework-core/src/main/java/io/javaoperatorsdk/operator/processing/event/source/EventSource.java

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -20,5 +20,7 @@ public interface EventSource extends LifecycleAware {
2020
*/
2121
void setEventHandler(EventHandler handler);
2222

23-
23+
default EventSourceStartPriority priority() {
24+
return EventSourceStartPriority.DEFAULT;
25+
}
2426
}
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,17 @@
1+
package io.javaoperatorsdk.operator.processing.event.source;
2+
3+
public enum EventSourceStartPriority {
4+
5+
/**
6+
* Event Sources with this priority are started and synced before the event source with DEFAULT
7+
* priority. The use case to use this, if the event source holds an information regarding the
8+
* state of a resource. For example a ConfigMap would store an ID of an external resource, in this
9+
* case an event source that tracks the external resource might need this ID (event before the
10+
* reconciliation) to check the state of the external resource. The only way to ensure that the ID
11+
* is already cached is to start/sync related event source before the event source of the external
12+
* resource.
13+
*/
14+
RESOURCE_STATE_LOADER, DEFAULT
15+
16+
17+
}

operator-framework-core/src/test/java/io/javaoperatorsdk/operator/processing/event/EventSourceManagerTest.java

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -12,6 +12,7 @@
1212
import io.javaoperatorsdk.operator.api.reconciler.Reconciler;
1313
import io.javaoperatorsdk.operator.processing.Controller;
1414
import io.javaoperatorsdk.operator.processing.event.source.EventSource;
15+
import io.javaoperatorsdk.operator.processing.event.source.EventSourceStartPriority;
1516
import io.javaoperatorsdk.operator.processing.event.source.controller.ControllerResourceEventSource;
1617
import io.javaoperatorsdk.operator.processing.event.source.informer.InformerEventSource;
1718
import io.javaoperatorsdk.operator.processing.event.source.informer.ManagedInformerEventSource;
@@ -59,7 +60,9 @@ public void closeShouldCascadeToEventSources() {
5960
@Test
6061
public void startCascadesToEventSources() {
6162
EventSource eventSource = mock(EventSource.class);
63+
when(eventSource.priority()).thenReturn(EventSourceStartPriority.DEFAULT);
6264
EventSource eventSource2 = mock(TimerEventSource.class);
65+
when(eventSource2.priority()).thenReturn(EventSourceStartPriority.DEFAULT);
6366
eventSourceManager.registerEventSource(eventSource);
6467
eventSourceManager.registerEventSource(eventSource2);
6568

Lines changed: 77 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,77 @@
1+
package io.javaoperatorsdk.operator;
2+
3+
import org.junit.jupiter.api.Test;
4+
import org.junit.jupiter.api.extension.RegisterExtension;
5+
6+
import io.fabric8.kubernetes.api.model.ConfigMap;
7+
import io.fabric8.kubernetes.api.model.ObjectMetaBuilder;
8+
import io.javaoperatorsdk.operator.junit.LocallyRunOperatorExtension;
9+
import io.javaoperatorsdk.operator.sample.externalstate.ExternalStateCustomResource;
10+
import io.javaoperatorsdk.operator.sample.externalstate.ExternalStateReconciler;
11+
import io.javaoperatorsdk.operator.sample.externalstate.ExternalStateSpec;
12+
import io.javaoperatorsdk.operator.support.ExternalIDGenServiceMock;
13+
14+
import static io.javaoperatorsdk.operator.sample.externalstate.ExternalStateReconciler.ID_KEY;
15+
import static org.assertj.core.api.Assertions.assertThat;
16+
import static org.awaitility.Awaitility.await;
17+
18+
class ExternalStateIT {
19+
20+
private static final String TEST_RESOURCE_NAME = "test1";
21+
22+
public static final String INITIAL_TEST_DATA = "initialTestData";
23+
public static final String UPDATED_DATA = "updatedData";
24+
25+
private ExternalIDGenServiceMock externalService = ExternalIDGenServiceMock.getInstance();
26+
27+
@RegisterExtension
28+
LocallyRunOperatorExtension operator =
29+
LocallyRunOperatorExtension.builder().withReconciler(ExternalStateReconciler.class)
30+
.build();
31+
32+
@Test
33+
public void reconcilesResourceWithPersistentState() {
34+
var resource = operator.create(testResource());
35+
assertResourcesCreated(resource, INITIAL_TEST_DATA);
36+
37+
resource.getSpec().setData(UPDATED_DATA);
38+
operator.replace(resource);
39+
assertResourcesCreated(resource, UPDATED_DATA);
40+
41+
operator.delete(resource);
42+
assertResourcesDeleted(resource);
43+
}
44+
45+
private void assertResourcesDeleted(ExternalStateCustomResource resource) {
46+
await().untilAsserted(() -> {
47+
var cm = operator.get(ConfigMap.class, resource.getMetadata().getName());
48+
var resources = externalService.listResources();
49+
assertThat(cm).isNull();
50+
assertThat(resources).isEmpty();
51+
});
52+
}
53+
54+
private void assertResourcesCreated(ExternalStateCustomResource resource,
55+
String initialTestData) {
56+
await().untilAsserted(() -> {
57+
var cm = operator.get(ConfigMap.class, resource.getMetadata().getName());
58+
var resources = externalService.listResources();
59+
assertThat(resources).hasSize(1);
60+
var extRes = externalService.listResources().get(0);
61+
assertThat(extRes.getData()).isEqualTo(initialTestData);
62+
assertThat(cm).isNotNull();
63+
assertThat(cm.getData().get(ID_KEY)).isEqualTo(extRes.getId());
64+
});
65+
}
66+
67+
private ExternalStateCustomResource testResource() {
68+
var res = new ExternalStateCustomResource();
69+
res.setMetadata(new ObjectMetaBuilder()
70+
.withName(TEST_RESOURCE_NAME)
71+
.build());
72+
73+
res.setSpec(new ExternalStateSpec());
74+
res.getSpec().setData(INITIAL_TEST_DATA);
75+
return res;
76+
}
77+
}
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,15 @@
1+
package io.javaoperatorsdk.operator.sample.externalstate;
2+
3+
import io.fabric8.kubernetes.api.model.Namespaced;
4+
import io.fabric8.kubernetes.client.CustomResource;
5+
import io.fabric8.kubernetes.model.annotation.Group;
6+
import io.fabric8.kubernetes.model.annotation.ShortNames;
7+
import io.fabric8.kubernetes.model.annotation.Version;
8+
9+
@Group("sample.javaoperatorsdk")
10+
@Version("v1")
11+
@ShortNames("ess")
12+
public class ExternalStateCustomResource
13+
extends CustomResource<ExternalStateSpec, Void>
14+
implements Namespaced {
15+
}
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,135 @@
1+
package io.javaoperatorsdk.operator.sample.externalstate;
2+
3+
import java.util.Collections;
4+
import java.util.Map;
5+
import java.util.Set;
6+
import java.util.concurrent.atomic.AtomicInteger;
7+
8+
import io.fabric8.kubernetes.api.model.ConfigMap;
9+
import io.fabric8.kubernetes.api.model.ConfigMapBuilder;
10+
import io.fabric8.kubernetes.api.model.ObjectMetaBuilder;
11+
import io.fabric8.kubernetes.client.KubernetesClient;
12+
import io.javaoperatorsdk.operator.api.config.informer.InformerConfiguration;
13+
import io.javaoperatorsdk.operator.api.reconciler.*;
14+
import io.javaoperatorsdk.operator.junit.KubernetesClientAware;
15+
import io.javaoperatorsdk.operator.processing.event.ResourceID;
16+
import io.javaoperatorsdk.operator.processing.event.source.EventSource;
17+
import io.javaoperatorsdk.operator.processing.event.source.EventSourceStartPriority;
18+
import io.javaoperatorsdk.operator.processing.event.source.informer.InformerEventSource;
19+
import io.javaoperatorsdk.operator.processing.event.source.polling.PerResourcePollingEventSource;
20+
import io.javaoperatorsdk.operator.support.ExternalIDGenServiceMock;
21+
import io.javaoperatorsdk.operator.support.ExternalResource;
22+
import io.javaoperatorsdk.operator.support.TestExecutionInfoProvider;
23+
24+
@ControllerConfiguration()
25+
public class ExternalStateReconciler
26+
implements Reconciler<ExternalStateCustomResource>, Cleaner<ExternalStateCustomResource>,
27+
EventSourceInitializer<ExternalStateCustomResource>, KubernetesClientAware,
28+
TestExecutionInfoProvider {
29+
30+
public static final String ID_KEY = "id";
31+
private final AtomicInteger numberOfExecutions = new AtomicInteger(0);
32+
33+
private ExternalIDGenServiceMock externalService = ExternalIDGenServiceMock.getInstance();
34+
private KubernetesClient client;
35+
36+
InformerEventSource<ConfigMap, ExternalStateCustomResource> configMapEventSource;
37+
PerResourcePollingEventSource<ExternalResource, ExternalStateCustomResource> externalResourceEventSource;
38+
39+
@Override
40+
public UpdateControl<ExternalStateCustomResource> reconcile(
41+
ExternalStateCustomResource resource, Context<ExternalStateCustomResource> context) {
42+
numberOfExecutions.addAndGet(1);
43+
44+
var externalResource = context.getSecondaryResource(ExternalResource.class);
45+
externalResource.ifPresentOrElse(r -> {
46+
if (!r.getData().equals(resource.getSpec().getData())) {
47+
updateExternalResource(resource, r);
48+
}
49+
}, () -> {
50+
if (externalResource.isEmpty()) {
51+
createExternalResource(resource);
52+
}
53+
});
54+
55+
56+
return UpdateControl.noUpdate();
57+
}
58+
59+
private void updateExternalResource(ExternalStateCustomResource resource,
60+
ExternalResource externalResource) {
61+
var newResource = new ExternalResource(externalResource.getId(), resource.getSpec().getData());
62+
externalService.update(newResource);
63+
externalResourceEventSource.handleRecentResourceUpdate(ResourceID.fromResource(resource),
64+
newResource, externalResource);
65+
}
66+
67+
private void createExternalResource(ExternalStateCustomResource resource) {
68+
var createdResource =
69+
externalService.create(new ExternalResource(resource.getSpec().getData()));
70+
var configMap = new ConfigMapBuilder()
71+
.withMetadata(new ObjectMetaBuilder()
72+
.withName(resource.getMetadata().getName())
73+
.withNamespace(resource.getMetadata().getNamespace())
74+
.build())
75+
.withData(Map.of(ID_KEY, createdResource.getId()))
76+
.build();
77+
configMap.addOwnerReference(resource);
78+
client.configMaps().resource(configMap).create();
79+
80+
var primaryID = ResourceID.fromResource(resource);
81+
// Making sure that the created resources are in the cache for the next reconciliation.
82+
// This is critical in this case, since on next reconciliation if it would not be in the cache
83+
// it would be created again.
84+
configMapEventSource.handleRecentResourceCreate(primaryID, configMap);
85+
externalResourceEventSource.handleRecentResourceCreate(primaryID, createdResource);
86+
}
87+
88+
@Override
89+
public DeleteControl cleanup(ExternalStateCustomResource resource,
90+
Context<ExternalStateCustomResource> context) {
91+
var externalResource = context.getSecondaryResource(ExternalResource.class);
92+
externalResource.ifPresent(er -> {
93+
externalService.delete(er.getId());
94+
});
95+
client.configMaps().inNamespace(resource.getMetadata().getNamespace())
96+
.withName(resource.getMetadata().getName()).delete();
97+
return DeleteControl.defaultDelete();
98+
}
99+
100+
public int getNumberOfExecutions() {
101+
return numberOfExecutions.get();
102+
}
103+
104+
@Override
105+
public Map<String, EventSource> prepareEventSources(
106+
EventSourceContext<ExternalStateCustomResource> context) {
107+
108+
configMapEventSource = new InformerEventSource<>(
109+
InformerConfiguration.from(ConfigMap.class, context).build(), context);
110+
configMapEventSource.setEventSourcePriority(EventSourceStartPriority.RESOURCE_STATE_LOADER);
111+
112+
externalResourceEventSource = new PerResourcePollingEventSource<>(primaryResource -> {
113+
var configMap = configMapEventSource.getSecondaryResource(primaryResource).orElse(null);
114+
if (configMap == null) {
115+
return Collections.emptySet();
116+
}
117+
var id = configMap.getData().get(ID_KEY);
118+
var externalResource = externalService.read(id);
119+
return externalResource.map(er -> Set.of(er)).orElse(Collections.emptySet());
120+
}, context.getPrimaryCache(), 300L, ExternalResource.class);
121+
122+
return EventSourceInitializer.nameEventSources(configMapEventSource,
123+
externalResourceEventSource);
124+
}
125+
126+
@Override
127+
public KubernetesClient getKubernetesClient() {
128+
return client;
129+
}
130+
131+
@Override
132+
public void setKubernetesClient(KubernetesClient kubernetesClient) {
133+
this.client = kubernetesClient;
134+
}
135+
}
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,15 @@
1+
package io.javaoperatorsdk.operator.sample.externalstate;
2+
3+
public class ExternalStateSpec {
4+
5+
private String data;
6+
7+
public String getData() {
8+
return data;
9+
}
10+
11+
public ExternalStateSpec setData(String data) {
12+
this.data = data;
13+
return this;
14+
}
15+
}

0 commit comments

Comments
 (0)