Skip to content

Commit 9961885

Browse files
committed
feat: enable configuring a handler to listen to informers stopping
1 parent 447b885 commit 9961885

File tree

7 files changed

+83
-19
lines changed

7 files changed

+83
-19
lines changed

operator-framework-core/src/main/java/io/javaoperatorsdk/operator/api/config/ConfigurationService.java

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -148,4 +148,7 @@ default Optional<LeaderElectionConfiguration> getLeaderElectionConfiguration() {
148148
return Optional.empty();
149149
}
150150

151+
default Optional<InformerStoppedHandler> getInformerStoppedHandler() {
152+
return Optional.empty();
153+
}
151154
}

operator-framework-core/src/main/java/io/javaoperatorsdk/operator/api/config/ConfigurationServiceOverrider.java

Lines changed: 12 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -26,6 +26,7 @@ public class ConfigurationServiceOverrider {
2626
private ExecutorService executorService;
2727
private ExecutorService workflowExecutorService;
2828
private LeaderElectionConfiguration leaderElectionConfiguration;
29+
private InformerStoppedHandler informerStoppedHandler;
2930

3031
ConfigurationServiceOverrider(ConfigurationService original) {
3132
this.original = original;
@@ -93,6 +94,11 @@ public ConfigurationServiceOverrider withLeaderElectionConfiguration(
9394
return this;
9495
}
9596

97+
public ConfigurationServiceOverrider withInformerStoppedHandler(InformerStoppedHandler handler) {
98+
this.informerStoppedHandler = handler;
99+
return this;
100+
}
101+
96102
public ConfigurationService build() {
97103
return new BaseConfigurationService(original.getVersion(), cloner, objectMapper) {
98104
@Override
@@ -159,6 +165,12 @@ public Optional<LeaderElectionConfiguration> getLeaderElectionConfiguration() {
159165
return leaderElectionConfiguration != null ? Optional.of(leaderElectionConfiguration)
160166
: original.getLeaderElectionConfiguration();
161167
}
168+
169+
@Override
170+
public Optional<InformerStoppedHandler> getInformerStoppedHandler() {
171+
return informerStoppedHandler != null ? Optional.of(informerStoppedHandler)
172+
: original.getInformerStoppedHandler();
173+
}
162174
};
163175
}
164176

Original file line numberDiff line numberDiff line change
@@ -0,0 +1,9 @@
1+
package io.javaoperatorsdk.operator.api.config;
2+
3+
import io.fabric8.kubernetes.client.informers.SharedIndexInformer;
4+
5+
public interface InformerStoppedHandler {
6+
7+
@SuppressWarnings("rawtypes")
8+
void onStop(SharedIndexInformer informer, Throwable ex);
9+
}

operator-framework-core/src/main/java/io/javaoperatorsdk/operator/processing/event/source/informer/InformerWrapper.java

Lines changed: 13 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -13,6 +13,7 @@
1313
import io.fabric8.kubernetes.client.informers.cache.Cache;
1414
import io.javaoperatorsdk.operator.OperatorException;
1515
import io.javaoperatorsdk.operator.ReconcilerUtils;
16+
import io.javaoperatorsdk.operator.api.config.ConfigurationServiceProvider;
1617
import io.javaoperatorsdk.operator.processing.LifecycleAware;
1718
import io.javaoperatorsdk.operator.processing.event.ResourceID;
1819
import io.javaoperatorsdk.operator.processing.event.source.IndexerResourceCache;
@@ -25,6 +26,18 @@ class InformerWrapper<T extends HasMetadata>
2526

2627
public InformerWrapper(SharedIndexInformer<T> informer) {
2728
this.informer = informer;
29+
30+
// register
31+
ConfigurationServiceProvider.instance().getInformerStoppedHandler()
32+
.ifPresent(ish -> {
33+
final var stopped = informer.stopped();
34+
if (stopped != null) {
35+
stopped.handle((res, ex) -> {
36+
ish.onStop(informer, ex);
37+
return null;
38+
});
39+
}
40+
});
2841
this.cache = (Cache<T>) informer.getStore();
2942
}
3043

operator-framework-core/src/test/java/io/javaoperatorsdk/operator/MockKubernetesClient.java

Lines changed: 22 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,8 @@
11
package io.javaoperatorsdk.operator;
22

3+
import java.util.concurrent.CompletableFuture;
4+
import java.util.function.Consumer;
5+
36
import io.fabric8.kubernetes.api.model.HasMetadata;
47
import io.fabric8.kubernetes.api.model.KubernetesResourceList;
58
import io.fabric8.kubernetes.client.GenericKubernetesClient;
@@ -14,12 +17,18 @@
1417
import static org.mockito.ArgumentMatchers.anyLong;
1518
import static org.mockito.ArgumentMatchers.anyString;
1619
import static org.mockito.ArgumentMatchers.nullable;
20+
import static org.mockito.Mockito.doAnswer;
1721
import static org.mockito.Mockito.mock;
1822
import static org.mockito.Mockito.when;
1923

2024
public class MockKubernetesClient {
2125

2226
public static <T extends HasMetadata> KubernetesClient client(Class<T> clazz) {
27+
return client(clazz, null);
28+
}
29+
30+
public static <T extends HasMetadata> KubernetesClient client(Class<T> clazz,
31+
Consumer<Void> informerRunBehavior) {
2332
final var client = mock(GenericKubernetesClient.class);
2433
MixedOperation<T, KubernetesResourceList<T>, Resource<T>> resources =
2534
mock(MixedOperation.class);
@@ -34,6 +43,19 @@ public static <T extends HasMetadata> KubernetesClient client(Class<T> clazz) {
3443
when(resources.inAnyNamespace()).thenReturn(inAnyNamespace);
3544
when(inAnyNamespace.withLabelSelector(nullable(String.class))).thenReturn(filterable);
3645
SharedIndexInformer<T> informer = mock(SharedIndexInformer.class);
46+
CompletableFuture<Void> stopped = new CompletableFuture<>();
47+
when(informer.stopped()).thenReturn(stopped);
48+
if (informerRunBehavior != null) {
49+
doAnswer(invocation -> {
50+
try {
51+
informerRunBehavior.accept(null);
52+
} catch (Exception e) {
53+
stopped.completeExceptionally(e);
54+
}
55+
return null;
56+
}).when(informer).run();
57+
}
58+
doAnswer(invocation -> null).when(informer).stop();
3759
Indexer mockIndexer = mock(Indexer.class);
3860
when(informer.getIndexer()).thenReturn(mockIndexer);
3961
when(filterable.runnableInformer(anyLong())).thenReturn(informer);

operator-framework-core/src/test/java/io/javaoperatorsdk/operator/api/config/ConfigurationServiceOverriderTest.java

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -112,6 +112,8 @@ public <R extends HasMetadata> R clone(R object) {
112112
.withTerminationTimeoutSeconds(100)
113113
.withMetrics(new Metrics() {})
114114
.withLeaderElectionConfiguration(new LeaderElectionConfiguration("newLease", "newLeaseNS"))
115+
.withInformerStoppedHandler((informer, ex) -> {
116+
})
115117
.build();
116118

117119
assertNotEquals(config.closeClientOnStop(), overridden.closeClientOnStop());
@@ -128,5 +130,7 @@ public <R extends HasMetadata> R clone(R object) {
128130
assertNotEquals(config.getObjectMapper(), overridden.getObjectMapper());
129131
assertNotEquals(config.getLeaderElectionConfiguration(),
130132
overridden.getLeaderElectionConfiguration());
133+
assertNotEquals(config.getInformerStoppedHandler(),
134+
overridden.getLeaderElectionConfiguration());
131135
}
132136
}

operator-framework-core/src/test/java/io/javaoperatorsdk/operator/processing/event/source/informer/InformerEventSourceTest.java

Lines changed: 20 additions & 19 deletions
Original file line numberDiff line numberDiff line change
@@ -9,11 +9,9 @@
99
import io.fabric8.kubernetes.api.model.ObjectMeta;
1010
import io.fabric8.kubernetes.api.model.apps.Deployment;
1111
import io.fabric8.kubernetes.client.KubernetesClient;
12-
import io.fabric8.kubernetes.client.dsl.FilterWatchListDeletable;
13-
import io.fabric8.kubernetes.client.dsl.FilterWatchListMultiDeletable;
14-
import io.fabric8.kubernetes.client.dsl.MixedOperation;
15-
import io.fabric8.kubernetes.client.informers.SharedIndexInformer;
16-
import io.fabric8.kubernetes.client.informers.cache.Indexer;
12+
import io.javaoperatorsdk.operator.MockKubernetesClient;
13+
import io.javaoperatorsdk.operator.api.config.ConfigurationServiceProvider;
14+
import io.javaoperatorsdk.operator.api.config.InformerStoppedHandler;
1715
import io.javaoperatorsdk.operator.api.config.informer.InformerConfiguration;
1816
import io.javaoperatorsdk.operator.processing.event.EventHandler;
1917
import io.javaoperatorsdk.operator.processing.event.ResourceID;
@@ -22,6 +20,8 @@
2220

2321
import static io.javaoperatorsdk.operator.api.reconciler.Constants.DEFAULT_NAMESPACES_SET;
2422
import static org.mockito.ArgumentMatchers.any;
23+
import static org.mockito.ArgumentMatchers.eq;
24+
import static org.mockito.Mockito.atLeastOnce;
2525
import static org.mockito.Mockito.mock;
2626
import static org.mockito.Mockito.never;
2727
import static org.mockito.Mockito.times;
@@ -36,28 +36,15 @@ class InformerEventSourceTest {
3636
private static final String NEXT_RESOURCE_VERSION = "2";
3737

3838
private InformerEventSource<Deployment, TestCustomResource> informerEventSource;
39-
private final KubernetesClient clientMock = mock(KubernetesClient.class);
39+
private final KubernetesClient clientMock = MockKubernetesClient.client(Deployment.class);
4040
private final TemporaryResourceCache<Deployment> temporaryResourceCacheMock =
4141
mock(TemporaryResourceCache.class);
4242
private final EventHandler eventHandlerMock = mock(EventHandler.class);
43-
private final MixedOperation crClientMock = mock(MixedOperation.class);
44-
private final FilterWatchListMultiDeletable specificResourceClientMock =
45-
mock(FilterWatchListMultiDeletable.class);
46-
private final FilterWatchListDeletable labeledResourceClientMock =
47-
mock(FilterWatchListDeletable.class);
48-
private final SharedIndexInformer informer = mock(SharedIndexInformer.class);
4943
private final InformerConfiguration<Deployment> informerConfiguration =
5044
mock(InformerConfiguration.class);
5145

5246
@BeforeEach
5347
void setup() {
54-
when(clientMock.resources(any())).thenReturn(crClientMock);
55-
when(crClientMock.inAnyNamespace()).thenReturn(specificResourceClientMock);
56-
when(specificResourceClientMock.withLabelSelector((String) null))
57-
.thenReturn(labeledResourceClientMock);
58-
when(labeledResourceClientMock.runnableInformer(0)).thenReturn(informer);
59-
when(informer.getIndexer()).thenReturn(mock(Indexer.class));
60-
6148
when(informerConfiguration.getEffectiveNamespaces())
6249
.thenReturn(DEFAULT_NAMESPACES_SET);
6350
when(informerConfiguration.getSecondaryToPrimaryMapper())
@@ -256,6 +243,20 @@ void filtersOnDeleteEvents() {
256243
verify(eventHandlerMock, never()).handleEvent(any());
257244
}
258245

246+
@Test
247+
void informerStoppedHandlerShouldBeCalledWhenInformerStops() {
248+
final var exception = new RuntimeException("Informer stopped exceptionally!");
249+
final var informerStoppedHandler = mock(InformerStoppedHandler.class);
250+
ConfigurationServiceProvider
251+
.overrideCurrent(overrider -> overrider.withInformerStoppedHandler(informerStoppedHandler));
252+
informerEventSource = new InformerEventSource<>(informerConfiguration,
253+
MockKubernetesClient.client(Deployment.class, unused -> {
254+
throw exception;
255+
}));
256+
informerEventSource.start();
257+
verify(informerStoppedHandler, atLeastOnce()).onStop(any(), eq(exception));
258+
}
259+
259260
Deployment testDeployment() {
260261
Deployment deployment = new Deployment();
261262
deployment.setMetadata(new ObjectMeta());

0 commit comments

Comments
 (0)