Skip to content

Commit 7e9d4ef

Browse files
authored
fix: PerResourcePollingEventSource issue with resources (#1417)
1 parent d563d63 commit 7e9d4ef

File tree

4 files changed

+136
-2
lines changed

4 files changed

+136
-2
lines changed

operator-framework-core/src/main/java/io/javaoperatorsdk/operator/processing/event/EventSourceManager.java

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -134,8 +134,8 @@ public final synchronized void registerEventSource(String name, EventSource even
134134
@SuppressWarnings("unchecked")
135135
public void broadcastOnResourceEvent(ResourceAction action, R resource, R oldResource) {
136136
eventSources.additionalNamedEventSources().forEach(eventSource -> {
137-
if (eventSource instanceof ResourceEventAware) {
138-
var lifecycleAwareES = ((ResourceEventAware<R>) eventSource);
137+
if (eventSource.original() instanceof ResourceEventAware) {
138+
var lifecycleAwareES = ((ResourceEventAware<R>) eventSource.original());
139139
switch (action) {
140140
case ADDED:
141141
lifecycleAwareES.onResourceCreated(resource);
Lines changed: 52 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,52 @@
1+
package io.javaoperatorsdk.operator;
2+
3+
import org.junit.jupiter.api.Test;
4+
import org.junit.jupiter.api.extension.RegisterExtension;
5+
6+
import io.fabric8.kubernetes.api.model.ObjectMetaBuilder;
7+
import io.javaoperatorsdk.operator.junit.LocallyRunOperatorExtension;
8+
import io.javaoperatorsdk.operator.sample.perresourceeventsource.PerResourceEventSourceCustomResource;
9+
import io.javaoperatorsdk.operator.sample.perresourceeventsource.PerResourcePollingEventSourceTestReconciler;
10+
11+
import static org.assertj.core.api.Assertions.assertThat;
12+
import static org.awaitility.Awaitility.await;
13+
14+
class PerResourcePollingEventSourceIT {
15+
16+
public static final String NAME_1 = "name1";
17+
public static final String NAME_2 = "name2";
18+
19+
@RegisterExtension
20+
LocallyRunOperatorExtension operator =
21+
LocallyRunOperatorExtension.builder()
22+
.withReconciler(new PerResourcePollingEventSourceTestReconciler())
23+
.build();
24+
25+
/**
26+
* This is kinda some test to verify that the implementation of PerResourcePollingEventSource
27+
* works with the underling mechanisms in event source manager and other parts of the system.
28+
**/
29+
@Test
30+
void fetchedAndReconciledMultipleTimes() {
31+
operator.create(PerResourceEventSourceCustomResource.class, resource(NAME_1));
32+
operator.create(PerResourceEventSourceCustomResource.class, resource(NAME_2));
33+
34+
var reconciler =
35+
operator.getReconcilerOfType(PerResourcePollingEventSourceTestReconciler.class);
36+
await().untilAsserted(() -> {
37+
assertThat(reconciler.getNumberOfExecutions(NAME_1)).isGreaterThan(2);
38+
assertThat(reconciler.getNumberOfFetchExecution(NAME_1)).isGreaterThan(2);
39+
assertThat(reconciler.getNumberOfExecutions(NAME_2)).isGreaterThan(2);
40+
assertThat(reconciler.getNumberOfFetchExecution(NAME_2)).isGreaterThan(2);
41+
});
42+
}
43+
44+
private PerResourceEventSourceCustomResource resource(String name) {
45+
var res = new PerResourceEventSourceCustomResource();
46+
res.setMetadata(new ObjectMetaBuilder()
47+
.withName(name)
48+
.build());
49+
return res;
50+
}
51+
52+
}
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,15 @@
1+
package io.javaoperatorsdk.operator.sample.perresourceeventsource;
2+
3+
import io.fabric8.kubernetes.api.model.Namespaced;
4+
import io.fabric8.kubernetes.client.CustomResource;
5+
import io.fabric8.kubernetes.model.annotation.Group;
6+
import io.fabric8.kubernetes.model.annotation.ShortNames;
7+
import io.fabric8.kubernetes.model.annotation.Version;
8+
9+
@Group("sample.javaoperatorsdk")
10+
@Version("v1")
11+
@ShortNames("pres")
12+
public class PerResourceEventSourceCustomResource
13+
extends CustomResource<Void, Void>
14+
implements Namespaced {
15+
}
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,67 @@
1+
package io.javaoperatorsdk.operator.sample.perresourceeventsource;
2+
3+
import java.util.Map;
4+
import java.util.Set;
5+
import java.util.UUID;
6+
import java.util.concurrent.ConcurrentHashMap;
7+
8+
import io.fabric8.kubernetes.client.KubernetesClient;
9+
import io.javaoperatorsdk.operator.api.reconciler.*;
10+
import io.javaoperatorsdk.operator.junit.KubernetesClientAware;
11+
import io.javaoperatorsdk.operator.processing.event.source.EventSource;
12+
import io.javaoperatorsdk.operator.processing.event.source.polling.PerResourcePollingEventSource;
13+
14+
@ControllerConfiguration
15+
public class PerResourcePollingEventSourceTestReconciler
16+
implements Reconciler<PerResourceEventSourceCustomResource>,
17+
EventSourceInitializer<PerResourceEventSourceCustomResource>,
18+
KubernetesClientAware {
19+
20+
public static final int POLL_PERIOD = 100;
21+
private final Map<String, Integer> numberOfExecutions = new ConcurrentHashMap<>();
22+
private final Map<String, Integer> numberOfFetchExecutions = new ConcurrentHashMap<>();
23+
24+
private KubernetesClient client;
25+
private PerResourcePollingEventSource<String, PerResourceEventSourceCustomResource> eventSource;
26+
27+
@Override
28+
public UpdateControl<PerResourceEventSourceCustomResource> reconcile(
29+
PerResourceEventSourceCustomResource resource,
30+
Context<PerResourceEventSourceCustomResource> context) throws Exception {
31+
numberOfExecutions.putIfAbsent(resource.getMetadata().getName(), 0);
32+
numberOfExecutions.compute(resource.getMetadata().getName(), (s, v) -> v + 1);
33+
return UpdateControl.noUpdate();
34+
}
35+
36+
@Override
37+
public Map<String, EventSource> prepareEventSources(
38+
EventSourceContext<PerResourceEventSourceCustomResource> context) {
39+
this.eventSource =
40+
new PerResourcePollingEventSource<>(resource -> {
41+
numberOfFetchExecutions.putIfAbsent(resource.getMetadata().getName(), 0);
42+
numberOfFetchExecutions.compute(resource.getMetadata().getName(), (s, v) -> v + 1);
43+
return Set.of(UUID.randomUUID().toString());
44+
},
45+
context.getPrimaryCache(), POLL_PERIOD, String.class);
46+
return EventSourceInitializer.nameEventSources(eventSource);
47+
}
48+
49+
@Override
50+
public KubernetesClient getKubernetesClient() {
51+
return client;
52+
}
53+
54+
@Override
55+
public void setKubernetesClient(KubernetesClient kubernetesClient) {
56+
this.client = kubernetesClient;
57+
}
58+
59+
public int getNumberOfExecutions(String name) {
60+
return numberOfExecutions.get(name);
61+
}
62+
63+
public int getNumberOfFetchExecution(String name) {
64+
return numberOfFetchExecutions.get(name);
65+
}
66+
67+
}

0 commit comments

Comments
 (0)