Skip to content

Commit 66dd28e

Browse files
committed
fix: PerResourcePollingEventSource issue with resources
1 parent f7688c2 commit 66dd28e

File tree

4 files changed

+124
-2
lines changed

4 files changed

+124
-2
lines changed

operator-framework-core/src/main/java/io/javaoperatorsdk/operator/processing/event/EventSourceManager.java

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -134,8 +134,8 @@ public final synchronized void registerEventSource(String name, EventSource even
134134
@SuppressWarnings("unchecked")
135135
public void broadcastOnResourceEvent(ResourceAction action, R resource, R oldResource) {
136136
eventSources.additionalNamedEventSources().forEach(eventSource -> {
137-
if (eventSource instanceof ResourceEventAware) {
138-
var lifecycleAwareES = ((ResourceEventAware<R>) eventSource);
137+
if (eventSource.original() instanceof ResourceEventAware) {
138+
var lifecycleAwareES = ((ResourceEventAware<R>) eventSource.original());
139139
switch (action) {
140140
case ADDED:
141141
lifecycleAwareES.onResourceCreated(resource);
Lines changed: 42 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,42 @@
1+
package io.javaoperatorsdk.operator;
2+
3+
import org.junit.jupiter.api.Test;
4+
import org.junit.jupiter.api.extension.RegisterExtension;
5+
6+
import io.fabric8.kubernetes.api.model.ObjectMetaBuilder;
7+
import io.javaoperatorsdk.operator.junit.LocallyRunOperatorExtension;
8+
import io.javaoperatorsdk.operator.sample.perresourceeventsource.PerResourceEventSourceCustomResource;
9+
import io.javaoperatorsdk.operator.sample.perresourceeventsource.PerResourcePollingEventSourceTestReconciler;
10+
11+
import static org.assertj.core.api.Assertions.assertThat;
12+
import static org.awaitility.Awaitility.await;
13+
14+
class PerResourcePollingEventSourceIT {
15+
16+
@RegisterExtension
17+
LocallyRunOperatorExtension operator =
18+
LocallyRunOperatorExtension.builder()
19+
.withReconciler(new PerResourcePollingEventSourceTestReconciler())
20+
.build();
21+
22+
@Test
23+
void managedDependentsAreReconciledInOrder() {
24+
operator.create(PerResourceEventSourceCustomResource.class, resource());
25+
26+
var reconciler =
27+
operator.getReconcilerOfType(PerResourcePollingEventSourceTestReconciler.class);
28+
await().untilAsserted(() -> {
29+
assertThat(reconciler.getNumberOfExecutions()).isGreaterThan(2);
30+
assertThat(reconciler.getNumberOfFetchExecution()).isGreaterThan(2);
31+
});
32+
}
33+
34+
private PerResourceEventSourceCustomResource resource() {
35+
var res = new PerResourceEventSourceCustomResource();
36+
res.setMetadata(new ObjectMetaBuilder()
37+
.withName("test1")
38+
.build());
39+
return res;
40+
}
41+
42+
}
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,15 @@
1+
package io.javaoperatorsdk.operator.sample.perresourceeventsource;
2+
3+
import io.fabric8.kubernetes.api.model.Namespaced;
4+
import io.fabric8.kubernetes.client.CustomResource;
5+
import io.fabric8.kubernetes.model.annotation.Group;
6+
import io.fabric8.kubernetes.model.annotation.ShortNames;
7+
import io.fabric8.kubernetes.model.annotation.Version;
8+
9+
@Group("sample.javaoperatorsdk")
10+
@Version("v1")
11+
@ShortNames("pres")
12+
public class PerResourceEventSourceCustomResource
13+
extends CustomResource<Void, Void>
14+
implements Namespaced {
15+
}
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,65 @@
1+
package io.javaoperatorsdk.operator.sample.perresourceeventsource;
2+
3+
import java.util.Map;
4+
import java.util.Set;
5+
import java.util.UUID;
6+
import java.util.concurrent.atomic.AtomicInteger;
7+
8+
import io.fabric8.kubernetes.client.KubernetesClient;
9+
import io.javaoperatorsdk.operator.api.reconciler.*;
10+
import io.javaoperatorsdk.operator.junit.KubernetesClientAware;
11+
import io.javaoperatorsdk.operator.processing.event.source.EventSource;
12+
import io.javaoperatorsdk.operator.processing.event.source.polling.PerResourcePollingEventSource;
13+
14+
@ControllerConfiguration
15+
public class PerResourcePollingEventSourceTestReconciler
16+
implements Reconciler<PerResourceEventSourceCustomResource>,
17+
EventSourceInitializer<PerResourceEventSourceCustomResource>,
18+
KubernetesClientAware {
19+
20+
public static final int POLL_PERIOD = 100;
21+
private final AtomicInteger numberOfExecutions = new AtomicInteger(0);
22+
private final AtomicInteger numberOfFetchExecutions = new AtomicInteger(0);
23+
24+
private KubernetesClient client;
25+
private PerResourcePollingEventSource<String, PerResourceEventSourceCustomResource> eventSource;
26+
27+
@Override
28+
public UpdateControl<PerResourceEventSourceCustomResource> reconcile(
29+
PerResourceEventSourceCustomResource resource,
30+
Context<PerResourceEventSourceCustomResource> context) throws Exception {
31+
numberOfExecutions.addAndGet(1);
32+
return UpdateControl.noUpdate();
33+
}
34+
35+
@Override
36+
public Map<String, EventSource> prepareEventSources(
37+
EventSourceContext<PerResourceEventSourceCustomResource> context) {
38+
this.eventSource =
39+
new PerResourcePollingEventSource<>(primaryResource -> {
40+
numberOfFetchExecutions.addAndGet(1);
41+
return Set.of(UUID.randomUUID().toString());
42+
},
43+
context.getPrimaryCache(), POLL_PERIOD, String.class);
44+
return EventSourceInitializer.nameEventSources(eventSource);
45+
}
46+
47+
@Override
48+
public KubernetesClient getKubernetesClient() {
49+
return client;
50+
}
51+
52+
@Override
53+
public void setKubernetesClient(KubernetesClient kubernetesClient) {
54+
this.client = kubernetesClient;
55+
}
56+
57+
public int getNumberOfExecutions() {
58+
return numberOfExecutions.get();
59+
}
60+
61+
public int getNumberOfFetchExecution() {
62+
return numberOfFetchExecutions.get();
63+
}
64+
65+
}

0 commit comments

Comments
 (0)