Skip to content

Commit 0be42b2

Browse files
committed
feat: enable configuring a handler to listen to informers stopping
1 parent 904275a commit 0be42b2

File tree

12 files changed

+204
-46
lines changed

12 files changed

+204
-46
lines changed

operator-framework-core/src/main/java/io/javaoperatorsdk/operator/api/config/ConfigurationService.java

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -148,4 +148,7 @@ default Optional<LeaderElectionConfiguration> getLeaderElectionConfiguration() {
148148
return Optional.empty();
149149
}
150150

151+
default Optional<InformerStoppedHandler> getInformerStoppedHandler() {
152+
return Optional.empty();
153+
}
151154
}

operator-framework-core/src/main/java/io/javaoperatorsdk/operator/api/config/ConfigurationServiceOverrider.java

Lines changed: 12 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -26,6 +26,7 @@ public class ConfigurationServiceOverrider {
2626
private ExecutorService executorService;
2727
private ExecutorService workflowExecutorService;
2828
private LeaderElectionConfiguration leaderElectionConfiguration;
29+
private InformerStoppedHandler informerStoppedHandler;
2930

3031
ConfigurationServiceOverrider(ConfigurationService original) {
3132
this.original = original;
@@ -93,6 +94,11 @@ public ConfigurationServiceOverrider withLeaderElectionConfiguration(
9394
return this;
9495
}
9596

97+
public ConfigurationServiceOverrider withInformerStoppedHandler(InformerStoppedHandler handler) {
98+
this.informerStoppedHandler = handler;
99+
return this;
100+
}
101+
96102
public ConfigurationService build() {
97103
return new BaseConfigurationService(original.getVersion(), cloner, objectMapper) {
98104
@Override
@@ -159,6 +165,12 @@ public Optional<LeaderElectionConfiguration> getLeaderElectionConfiguration() {
159165
return leaderElectionConfiguration != null ? Optional.of(leaderElectionConfiguration)
160166
: original.getLeaderElectionConfiguration();
161167
}
168+
169+
@Override
170+
public Optional<InformerStoppedHandler> getInformerStoppedHandler() {
171+
return informerStoppedHandler != null ? Optional.of(informerStoppedHandler)
172+
: original.getInformerStoppedHandler();
173+
}
162174
};
163175
}
164176

Original file line numberDiff line numberDiff line change
@@ -0,0 +1,9 @@
1+
package io.javaoperatorsdk.operator.api.config;
2+
3+
import io.fabric8.kubernetes.client.informers.SharedIndexInformer;
4+
5+
public interface InformerStoppedHandler {
6+
7+
@SuppressWarnings("rawtypes")
8+
void onStop(SharedIndexInformer informer, Throwable ex);
9+
}

operator-framework-core/src/main/java/io/javaoperatorsdk/operator/processing/event/source/informer/InformerWrapper.java

Lines changed: 22 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -13,6 +13,7 @@
1313
import io.fabric8.kubernetes.client.informers.cache.Cache;
1414
import io.javaoperatorsdk.operator.OperatorException;
1515
import io.javaoperatorsdk.operator.ReconcilerUtils;
16+
import io.javaoperatorsdk.operator.api.config.ConfigurationServiceProvider;
1617
import io.javaoperatorsdk.operator.processing.LifecycleAware;
1718
import io.javaoperatorsdk.operator.processing.event.ResourceID;
1819
import io.javaoperatorsdk.operator.processing.event.source.IndexerResourceCache;
@@ -32,6 +33,27 @@ public InformerWrapper(SharedIndexInformer<T> informer) {
3233
public void start() throws OperatorException {
3334
try {
3435
informer.run();
36+
37+
// register stopped handler if we have one defined
38+
ConfigurationServiceProvider.instance().getInformerStoppedHandler()
39+
.ifPresent(ish -> {
40+
final var stopped = informer.stopped();
41+
if (stopped != null) {
42+
stopped.handle((res, ex) -> {
43+
ish.onStop(informer, ex);
44+
return null;
45+
});
46+
} else {
47+
final var apiTypeClass = informer.getApiTypeClass();
48+
final var fullResourceName =
49+
HasMetadata.getFullResourceName(apiTypeClass);
50+
final var version = HasMetadata.getVersion(apiTypeClass);
51+
throw new IllegalStateException(
52+
"Cannot retrieve 'stopped' callback to listen to informer stopping for informer for "
53+
+ fullResourceName + "/" + version);
54+
}
55+
});
56+
3557
} catch (Exception e) {
3658
ReconcilerUtils.handleKubernetesClientException(e,
3759
HasMetadata.getFullResourceName(informer.getApiTypeClass()));

operator-framework-core/src/test/java/io/javaoperatorsdk/operator/MockKubernetesClient.java

Lines changed: 27 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,23 +1,35 @@
11
package io.javaoperatorsdk.operator;
22

3+
import java.util.concurrent.CompletableFuture;
4+
import java.util.concurrent.Executors;
5+
import java.util.function.Consumer;
6+
37
import io.fabric8.kubernetes.api.model.HasMetadata;
48
import io.fabric8.kubernetes.api.model.KubernetesResourceList;
59
import io.fabric8.kubernetes.client.KubernetesClient;
610
import io.fabric8.kubernetes.client.V1ApiextensionAPIGroupDSL;
711
import io.fabric8.kubernetes.client.dsl.*;
12+
import io.fabric8.kubernetes.client.extended.leaderelection.LeaderElectorBuilder;
813
import io.fabric8.kubernetes.client.informers.SharedIndexInformer;
914
import io.fabric8.kubernetes.client.informers.cache.Indexer;
1015

1116
import static org.mockito.ArgumentMatchers.any;
1217
import static org.mockito.ArgumentMatchers.anyLong;
1318
import static org.mockito.ArgumentMatchers.anyString;
1419
import static org.mockito.ArgumentMatchers.nullable;
20+
import static org.mockito.Mockito.doAnswer;
1521
import static org.mockito.Mockito.mock;
1622
import static org.mockito.Mockito.when;
1723

1824
public class MockKubernetesClient {
1925

2026
public static <T extends HasMetadata> KubernetesClient client(Class<T> clazz) {
27+
return client(clazz, null);
28+
}
29+
30+
@SuppressWarnings({"unchecked", "rawtypes"})
31+
public static <T extends HasMetadata> KubernetesClient client(Class<T> clazz,
32+
Consumer<Void> informerRunBehavior) {
2133
final var client = mock(KubernetesClient.class);
2234
MixedOperation<T, KubernetesResourceList<T>, Resource<T>> resources =
2335
mock(MixedOperation.class);
@@ -32,10 +44,25 @@ public static <T extends HasMetadata> KubernetesClient client(Class<T> clazz) {
3244
when(resources.inAnyNamespace()).thenReturn(inAnyNamespace);
3345
when(inAnyNamespace.withLabelSelector(nullable(String.class))).thenReturn(filterable);
3446
SharedIndexInformer<T> informer = mock(SharedIndexInformer.class);
47+
CompletableFuture<Void> stopped = new CompletableFuture<>();
48+
when(informer.stopped()).thenReturn(stopped);
49+
if (informerRunBehavior != null) {
50+
doAnswer(invocation -> {
51+
try {
52+
informerRunBehavior.accept(null);
53+
} catch (Exception e) {
54+
stopped.completeExceptionally(e);
55+
}
56+
return null;
57+
}).when(informer).run();
58+
}
59+
doAnswer(invocation -> null).when(informer).stop();
3560
Indexer mockIndexer = mock(Indexer.class);
3661
when(informer.getIndexer()).thenReturn(mockIndexer);
3762
when(filterable.runnableInformer(anyLong())).thenReturn(informer);
3863
when(client.resources(clazz)).thenReturn(resources);
64+
when(client.leaderElector())
65+
.thenReturn(new LeaderElectorBuilder(client, Executors.newSingleThreadExecutor()));
3966

4067
final var apiGroupDSL = mock(ApiextensionsAPIGroupDSL.class);
4168
when(client.apiextensions()).thenReturn(apiGroupDSL);

operator-framework-core/src/test/java/io/javaoperatorsdk/operator/api/config/ConfigurationServiceOverriderTest.java

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -112,6 +112,8 @@ public <R extends HasMetadata> R clone(R object) {
112112
.withTerminationTimeoutSeconds(100)
113113
.withMetrics(new Metrics() {})
114114
.withLeaderElectionConfiguration(new LeaderElectionConfiguration("newLease", "newLeaseNS"))
115+
.withInformerStoppedHandler((informer, ex) -> {
116+
})
115117
.build();
116118

117119
assertNotEquals(config.closeClientOnStop(), overridden.closeClientOnStop());
@@ -128,5 +130,7 @@ public <R extends HasMetadata> R clone(R object) {
128130
assertNotEquals(config.getObjectMapper(), overridden.getObjectMapper());
129131
assertNotEquals(config.getLeaderElectionConfiguration(),
130132
overridden.getLeaderElectionConfiguration());
133+
assertNotEquals(config.getInformerStoppedHandler(),
134+
overridden.getLeaderElectionConfiguration());
131135
}
132136
}

operator-framework-core/src/test/java/io/javaoperatorsdk/operator/processing/event/source/informer/InformerEventSourceTest.java

Lines changed: 30 additions & 19 deletions
Original file line numberDiff line numberDiff line change
@@ -9,10 +9,9 @@
99
import io.fabric8.kubernetes.api.model.ObjectMeta;
1010
import io.fabric8.kubernetes.api.model.apps.Deployment;
1111
import io.fabric8.kubernetes.client.KubernetesClient;
12-
import io.fabric8.kubernetes.client.dsl.AnyNamespaceOperation;
13-
import io.fabric8.kubernetes.client.dsl.MixedOperation;
14-
import io.fabric8.kubernetes.client.informers.SharedIndexInformer;
15-
import io.fabric8.kubernetes.client.informers.cache.Indexer;
12+
import io.javaoperatorsdk.operator.MockKubernetesClient;
13+
import io.javaoperatorsdk.operator.api.config.ConfigurationServiceProvider;
14+
import io.javaoperatorsdk.operator.api.config.InformerStoppedHandler;
1615
import io.javaoperatorsdk.operator.api.config.informer.InformerConfiguration;
1716
import io.javaoperatorsdk.operator.processing.event.EventHandler;
1817
import io.javaoperatorsdk.operator.processing.event.ResourceID;
@@ -21,7 +20,13 @@
2120

2221
import static io.javaoperatorsdk.operator.api.reconciler.Constants.DEFAULT_NAMESPACES_SET;
2322
import static org.mockito.ArgumentMatchers.any;
24-
import static org.mockito.Mockito.*;
23+
import static org.mockito.ArgumentMatchers.eq;
24+
import static org.mockito.Mockito.atLeastOnce;
25+
import static org.mockito.Mockito.mock;
26+
import static org.mockito.Mockito.never;
27+
import static org.mockito.Mockito.times;
28+
import static org.mockito.Mockito.verify;
29+
import static org.mockito.Mockito.when;
2530

2631
@SuppressWarnings({"rawtypes", "unchecked"})
2732
class InformerEventSourceTest {
@@ -31,28 +36,15 @@ class InformerEventSourceTest {
3136
private static final String NEXT_RESOURCE_VERSION = "2";
3237

3338
private InformerEventSource<Deployment, TestCustomResource> informerEventSource;
34-
private final KubernetesClient clientMock = mock(KubernetesClient.class);
39+
private final KubernetesClient clientMock = MockKubernetesClient.client(Deployment.class);
3540
private final TemporaryResourceCache<Deployment> temporaryResourceCacheMock =
3641
mock(TemporaryResourceCache.class);
3742
private final EventHandler eventHandlerMock = mock(EventHandler.class);
38-
private final MixedOperation crClientMock = mock(MixedOperation.class);
39-
private final AnyNamespaceOperation specificResourceClientMock =
40-
mock(AnyNamespaceOperation.class);
41-
private final AnyNamespaceOperation labeledResourceClientMock =
42-
mock(AnyNamespaceOperation.class);
43-
private final SharedIndexInformer informer = mock(SharedIndexInformer.class);
4443
private final InformerConfiguration<Deployment> informerConfiguration =
4544
mock(InformerConfiguration.class);
4645

4746
@BeforeEach
4847
void setup() {
49-
when(clientMock.resources(any())).thenReturn(crClientMock);
50-
when(crClientMock.inAnyNamespace()).thenReturn(specificResourceClientMock);
51-
when(specificResourceClientMock.withLabelSelector((String) null))
52-
.thenReturn(labeledResourceClientMock);
53-
when(labeledResourceClientMock.runnableInformer(0)).thenReturn(informer);
54-
when(informer.getIndexer()).thenReturn(mock(Indexer.class));
55-
5648
when(informerConfiguration.getEffectiveNamespaces())
5749
.thenReturn(DEFAULT_NAMESPACES_SET);
5850
when(informerConfiguration.getSecondaryToPrimaryMapper())
@@ -251,6 +243,25 @@ void filtersOnDeleteEvents() {
251243
verify(eventHandlerMock, never()).handleEvent(any());
252244
}
253245

246+
@Test
247+
void informerStoppedHandlerShouldBeCalledWhenInformerStops() {
248+
try {
249+
final var exception = new RuntimeException("Informer stopped exceptionally!");
250+
final var informerStoppedHandler = mock(InformerStoppedHandler.class);
251+
ConfigurationServiceProvider
252+
.overrideCurrent(
253+
overrider -> overrider.withInformerStoppedHandler(informerStoppedHandler));
254+
informerEventSource = new InformerEventSource<>(informerConfiguration,
255+
MockKubernetesClient.client(Deployment.class, unused -> {
256+
throw exception;
257+
}));
258+
informerEventSource.start();
259+
verify(informerStoppedHandler, atLeastOnce()).onStop(any(), eq(exception));
260+
} finally {
261+
ConfigurationServiceProvider.reset();
262+
}
263+
}
264+
254265
Deployment testDeployment() {
255266
Deployment deployment = new Deployment();
256267
deployment.setMetadata(new ObjectMeta());

operator-framework-junit5/src/main/java/io/javaoperatorsdk/operator/junit/AbstractOperatorExtension.java

Lines changed: 4 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -7,6 +7,7 @@
77
import java.util.Locale;
88
import java.util.UUID;
99
import java.util.concurrent.TimeUnit;
10+
import java.util.function.Consumer;
1011

1112
import org.awaitility.Awaitility;
1213
import org.junit.jupiter.api.extension.*;
@@ -21,7 +22,7 @@
2122
import io.fabric8.kubernetes.client.dsl.Resource;
2223
import io.fabric8.kubernetes.client.utils.KubernetesResourceUtil;
2324
import io.fabric8.kubernetes.client.utils.Utils;
24-
import io.javaoperatorsdk.operator.api.config.ConfigurationService;
25+
import io.javaoperatorsdk.operator.api.config.ConfigurationServiceOverrider;
2526
import io.javaoperatorsdk.operator.api.config.ConfigurationServiceProvider;
2627

2728
public abstract class AbstractOperatorExtension implements HasKubernetesClient,
@@ -34,7 +35,6 @@ public abstract class AbstractOperatorExtension implements HasKubernetesClient,
3435
public static final int CRD_READY_WAIT = 2000;
3536

3637
private final KubernetesClient kubernetesClient;
37-
protected final ConfigurationService configurationService;
3838
protected final List<HasMetadata> infrastructure;
3939
protected Duration infrastructureTimeout;
4040
protected final boolean oneNamespacePerClass;
@@ -44,15 +44,13 @@ public abstract class AbstractOperatorExtension implements HasKubernetesClient,
4444
protected String namespace;
4545

4646
protected AbstractOperatorExtension(
47-
ConfigurationService configurationService,
4847
List<HasMetadata> infrastructure,
4948
Duration infrastructureTimeout,
5049
boolean oneNamespacePerClass,
5150
boolean preserveNamespaceOnError,
5251
boolean waitForNamespaceDeletion,
5352
KubernetesClient kubernetesClient) {
5453
this.kubernetesClient = kubernetesClient;
55-
this.configurationService = configurationService;
5654
this.infrastructure = infrastructure;
5755
this.infrastructureTimeout = infrastructureTimeout;
5856
this.oneNamespacePerClass = oneNamespacePerClass;
@@ -123,7 +121,6 @@ public <T extends HasMetadata> boolean delete(T resource) {
123121
}
124122

125123
@Deprecated(forRemoval = true)
126-
@SuppressWarnings("unchecked")
127124
public <T extends HasMetadata> boolean delete(Class<T> type, T resource) {
128125
return delete(resource);
129126
}
@@ -211,16 +208,13 @@ protected void deleteOperator() {
211208

212209
@SuppressWarnings("unchecked")
213210
public static abstract class AbstractBuilder<T extends AbstractBuilder<T>> {
214-
protected ConfigurationService configurationService;
215211
protected final List<HasMetadata> infrastructure;
216212
protected Duration infrastructureTimeout;
217213
protected boolean preserveNamespaceOnError;
218214
protected boolean waitForNamespaceDeletion;
219215
protected boolean oneNamespacePerClass;
220216

221217
protected AbstractBuilder() {
222-
this.configurationService = ConfigurationServiceProvider.instance();
223-
224218
this.infrastructure = new ArrayList<>();
225219
this.infrastructureTimeout = Duration.ofMinutes(1);
226220

@@ -252,8 +246,8 @@ public T oneNamespacePerClass(boolean value) {
252246
return (T) this;
253247
}
254248

255-
public T withConfigurationService(ConfigurationService value) {
256-
configurationService = value;
249+
public T withConfigurationService(Consumer<ConfigurationServiceOverrider> overrider) {
250+
ConfigurationServiceProvider.overrideCurrent(overrider);
257251
return (T) this;
258252
}
259253

operator-framework-junit5/src/main/java/io/javaoperatorsdk/operator/junit/ClusterDeployedOperatorExtension.java

Lines changed: 2 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,6 @@
11
package io.javaoperatorsdk.operator.junit;
22

3+
import io.fabric8.kubernetes.client.KubernetesClientBuilder;
34
import java.io.File;
45
import java.io.FileInputStream;
56
import java.io.InputStream;
@@ -19,8 +20,6 @@
1920
import io.fabric8.kubernetes.api.model.HasMetadata;
2021
import io.fabric8.kubernetes.api.model.rbac.ClusterRoleBinding;
2122
import io.fabric8.kubernetes.client.KubernetesClient;
22-
import io.fabric8.kubernetes.client.KubernetesClientBuilder;
23-
import io.javaoperatorsdk.operator.api.config.ConfigurationService;
2423

2524
public class ClusterDeployedOperatorExtension extends AbstractOperatorExtension {
2625

@@ -31,7 +30,6 @@ public class ClusterDeployedOperatorExtension extends AbstractOperatorExtension
3130
private final Duration operatorDeploymentTimeout;
3231

3332
private ClusterDeployedOperatorExtension(
34-
ConfigurationService configurationService,
3533
List<HasMetadata> operatorDeployment,
3634
Duration operatorDeploymentTimeout,
3735
List<HasMetadata> infrastructure,
@@ -40,7 +38,7 @@ private ClusterDeployedOperatorExtension(
4038
boolean waitForNamespaceDeletion,
4139
boolean oneNamespacePerClass,
4240
KubernetesClient kubernetesClient) {
43-
super(configurationService, infrastructure, infrastructureTimeout, oneNamespacePerClass,
41+
super(infrastructure, infrastructureTimeout, oneNamespacePerClass,
4442
preserveNamespaceOnError,
4543
waitForNamespaceDeletion,
4644
kubernetesClient);
@@ -147,7 +145,6 @@ public Builder withKubernetesClient(KubernetesClient kubernetesClient) {
147145

148146
public ClusterDeployedOperatorExtension build() {
149147
return new ClusterDeployedOperatorExtension(
150-
configurationService,
151148
operatorDeployment,
152149
deploymentTimeout,
153150
infrastructure,

0 commit comments

Comments
 (0)