Skip to content

Commit 9b647ee

Browse files
authored
feat: enable configuring a handler to listen to informers stopping (#1509)
* feat: enable configuring a handler to listen to informers stopping * chore(ci): enable snapshot repository
1 parent 7c3ea82 commit 9b647ee

File tree

12 files changed

+235
-40
lines changed

12 files changed

+235
-40
lines changed

operator-framework-core/src/main/java/io/javaoperatorsdk/operator/api/config/ConfigurationService.java

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -148,4 +148,7 @@ default Optional<LeaderElectionConfiguration> getLeaderElectionConfiguration() {
148148
return Optional.empty();
149149
}
150150

151+
default Optional<InformerStoppedHandler> getInformerStoppedHandler() {
152+
return Optional.empty();
153+
}
151154
}

operator-framework-core/src/main/java/io/javaoperatorsdk/operator/api/config/ConfigurationServiceOverrider.java

Lines changed: 12 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -26,6 +26,7 @@ public class ConfigurationServiceOverrider {
2626
private ExecutorService executorService;
2727
private ExecutorService workflowExecutorService;
2828
private LeaderElectionConfiguration leaderElectionConfiguration;
29+
private InformerStoppedHandler informerStoppedHandler;
2930

3031
ConfigurationServiceOverrider(ConfigurationService original) {
3132
this.original = original;
@@ -93,6 +94,11 @@ public ConfigurationServiceOverrider withLeaderElectionConfiguration(
9394
return this;
9495
}
9596

97+
public ConfigurationServiceOverrider withInformerStoppedHandler(InformerStoppedHandler handler) {
98+
this.informerStoppedHandler = handler;
99+
return this;
100+
}
101+
96102
public ConfigurationService build() {
97103
return new BaseConfigurationService(original.getVersion(), cloner, objectMapper) {
98104
@Override
@@ -159,6 +165,12 @@ public Optional<LeaderElectionConfiguration> getLeaderElectionConfiguration() {
159165
return leaderElectionConfiguration != null ? Optional.of(leaderElectionConfiguration)
160166
: original.getLeaderElectionConfiguration();
161167
}
168+
169+
@Override
170+
public Optional<InformerStoppedHandler> getInformerStoppedHandler() {
171+
return informerStoppedHandler != null ? Optional.of(informerStoppedHandler)
172+
: original.getInformerStoppedHandler();
173+
}
162174
};
163175
}
164176

Original file line numberDiff line numberDiff line change
@@ -0,0 +1,9 @@
1+
package io.javaoperatorsdk.operator.api.config;
2+
3+
import io.fabric8.kubernetes.client.informers.SharedIndexInformer;
4+
5+
public interface InformerStoppedHandler {
6+
7+
@SuppressWarnings("rawtypes")
8+
void onStop(SharedIndexInformer informer, Throwable ex);
9+
}

operator-framework-core/src/main/java/io/javaoperatorsdk/operator/processing/event/source/informer/InformerWrapper.java

Lines changed: 22 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -16,6 +16,7 @@
1616
import io.fabric8.kubernetes.client.informers.cache.Cache;
1717
import io.javaoperatorsdk.operator.OperatorException;
1818
import io.javaoperatorsdk.operator.ReconcilerUtils;
19+
import io.javaoperatorsdk.operator.api.config.ConfigurationServiceProvider;
1920
import io.javaoperatorsdk.operator.processing.LifecycleAware;
2021
import io.javaoperatorsdk.operator.processing.event.ResourceID;
2122
import io.javaoperatorsdk.operator.processing.event.source.IndexerResourceCache;
@@ -37,6 +38,27 @@ public InformerWrapper(SharedIndexInformer<T> informer) {
3738
public void start() throws OperatorException {
3839
try {
3940
informer.run();
41+
42+
// register stopped handler if we have one defined
43+
ConfigurationServiceProvider.instance().getInformerStoppedHandler()
44+
.ifPresent(ish -> {
45+
final var stopped = informer.stopped();
46+
if (stopped != null) {
47+
stopped.handle((res, ex) -> {
48+
ish.onStop(informer, ex);
49+
return null;
50+
});
51+
} else {
52+
final var apiTypeClass = informer.getApiTypeClass();
53+
final var fullResourceName =
54+
HasMetadata.getFullResourceName(apiTypeClass);
55+
final var version = HasMetadata.getVersion(apiTypeClass);
56+
throw new IllegalStateException(
57+
"Cannot retrieve 'stopped' callback to listen to informer stopping for informer for "
58+
+ fullResourceName + "/" + version);
59+
}
60+
});
61+
4062
} catch (Exception e) {
4163
log.error("Couldn't start informer for " + versionedFullResourceName() + " resources", e);
4264
ReconcilerUtils.handleKubernetesClientException(e,

operator-framework-core/src/test/java/io/javaoperatorsdk/operator/MockKubernetesClient.java

Lines changed: 27 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,23 +1,35 @@
11
package io.javaoperatorsdk.operator;
22

3+
import java.util.concurrent.CompletableFuture;
4+
import java.util.concurrent.Executors;
5+
import java.util.function.Consumer;
6+
37
import io.fabric8.kubernetes.api.model.HasMetadata;
48
import io.fabric8.kubernetes.api.model.KubernetesResourceList;
59
import io.fabric8.kubernetes.client.KubernetesClient;
610
import io.fabric8.kubernetes.client.V1ApiextensionAPIGroupDSL;
711
import io.fabric8.kubernetes.client.dsl.*;
12+
import io.fabric8.kubernetes.client.extended.leaderelection.LeaderElectorBuilder;
813
import io.fabric8.kubernetes.client.informers.SharedIndexInformer;
914
import io.fabric8.kubernetes.client.informers.cache.Indexer;
1015

1116
import static org.mockito.ArgumentMatchers.any;
1217
import static org.mockito.ArgumentMatchers.anyLong;
1318
import static org.mockito.ArgumentMatchers.anyString;
1419
import static org.mockito.ArgumentMatchers.nullable;
20+
import static org.mockito.Mockito.doAnswer;
1521
import static org.mockito.Mockito.mock;
1622
import static org.mockito.Mockito.when;
1723

1824
public class MockKubernetesClient {
1925

2026
public static <T extends HasMetadata> KubernetesClient client(Class<T> clazz) {
27+
return client(clazz, null);
28+
}
29+
30+
@SuppressWarnings({"unchecked", "rawtypes"})
31+
public static <T extends HasMetadata> KubernetesClient client(Class<T> clazz,
32+
Consumer<Void> informerRunBehavior) {
2133
final var client = mock(KubernetesClient.class);
2234
MixedOperation<T, KubernetesResourceList<T>, Resource<T>> resources =
2335
mock(MixedOperation.class);
@@ -32,10 +44,25 @@ public static <T extends HasMetadata> KubernetesClient client(Class<T> clazz) {
3244
when(resources.inAnyNamespace()).thenReturn(inAnyNamespace);
3345
when(inAnyNamespace.withLabelSelector(nullable(String.class))).thenReturn(filterable);
3446
SharedIndexInformer<T> informer = mock(SharedIndexInformer.class);
47+
CompletableFuture<Void> stopped = new CompletableFuture<>();
48+
when(informer.stopped()).thenReturn(stopped);
49+
if (informerRunBehavior != null) {
50+
doAnswer(invocation -> {
51+
try {
52+
informerRunBehavior.accept(null);
53+
} catch (Exception e) {
54+
stopped.completeExceptionally(e);
55+
}
56+
return null;
57+
}).when(informer).run();
58+
}
59+
doAnswer(invocation -> null).when(informer).stop();
3560
Indexer mockIndexer = mock(Indexer.class);
3661
when(informer.getIndexer()).thenReturn(mockIndexer);
3762
when(filterable.runnableInformer(anyLong())).thenReturn(informer);
3863
when(client.resources(clazz)).thenReturn(resources);
64+
when(client.leaderElector())
65+
.thenReturn(new LeaderElectorBuilder(client, Executors.newSingleThreadExecutor()));
3966

4067
final var apiGroupDSL = mock(ApiextensionsAPIGroupDSL.class);
4168
when(client.apiextensions()).thenReturn(apiGroupDSL);

operator-framework-core/src/test/java/io/javaoperatorsdk/operator/api/config/ConfigurationServiceOverriderTest.java

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -112,6 +112,8 @@ public <R extends HasMetadata> R clone(R object) {
112112
.withTerminationTimeoutSeconds(100)
113113
.withMetrics(new Metrics() {})
114114
.withLeaderElectionConfiguration(new LeaderElectionConfiguration("newLease", "newLeaseNS"))
115+
.withInformerStoppedHandler((informer, ex) -> {
116+
})
115117
.build();
116118

117119
assertNotEquals(config.closeClientOnStop(), overridden.closeClientOnStop());
@@ -128,5 +130,7 @@ public <R extends HasMetadata> R clone(R object) {
128130
assertNotEquals(config.getObjectMapper(), overridden.getObjectMapper());
129131
assertNotEquals(config.getLeaderElectionConfiguration(),
130132
overridden.getLeaderElectionConfiguration());
133+
assertNotEquals(config.getInformerStoppedHandler(),
134+
overridden.getLeaderElectionConfiguration());
131135
}
132136
}

operator-framework-core/src/test/java/io/javaoperatorsdk/operator/processing/event/source/informer/InformerEventSourceTest.java

Lines changed: 30 additions & 19 deletions
Original file line numberDiff line numberDiff line change
@@ -9,10 +9,9 @@
99
import io.fabric8.kubernetes.api.model.ObjectMeta;
1010
import io.fabric8.kubernetes.api.model.apps.Deployment;
1111
import io.fabric8.kubernetes.client.KubernetesClient;
12-
import io.fabric8.kubernetes.client.dsl.AnyNamespaceOperation;
13-
import io.fabric8.kubernetes.client.dsl.MixedOperation;
14-
import io.fabric8.kubernetes.client.informers.SharedIndexInformer;
15-
import io.fabric8.kubernetes.client.informers.cache.Indexer;
12+
import io.javaoperatorsdk.operator.MockKubernetesClient;
13+
import io.javaoperatorsdk.operator.api.config.ConfigurationServiceProvider;
14+
import io.javaoperatorsdk.operator.api.config.InformerStoppedHandler;
1615
import io.javaoperatorsdk.operator.api.config.informer.InformerConfiguration;
1716
import io.javaoperatorsdk.operator.processing.event.EventHandler;
1817
import io.javaoperatorsdk.operator.processing.event.ResourceID;
@@ -21,7 +20,13 @@
2120

2221
import static io.javaoperatorsdk.operator.api.reconciler.Constants.DEFAULT_NAMESPACES_SET;
2322
import static org.mockito.ArgumentMatchers.any;
24-
import static org.mockito.Mockito.*;
23+
import static org.mockito.ArgumentMatchers.eq;
24+
import static org.mockito.Mockito.atLeastOnce;
25+
import static org.mockito.Mockito.mock;
26+
import static org.mockito.Mockito.never;
27+
import static org.mockito.Mockito.times;
28+
import static org.mockito.Mockito.verify;
29+
import static org.mockito.Mockito.when;
2530

2631
@SuppressWarnings({"rawtypes", "unchecked"})
2732
class InformerEventSourceTest {
@@ -31,28 +36,15 @@ class InformerEventSourceTest {
3136
private static final String NEXT_RESOURCE_VERSION = "2";
3237

3338
private InformerEventSource<Deployment, TestCustomResource> informerEventSource;
34-
private final KubernetesClient clientMock = mock(KubernetesClient.class);
39+
private final KubernetesClient clientMock = MockKubernetesClient.client(Deployment.class);
3540
private final TemporaryResourceCache<Deployment> temporaryResourceCacheMock =
3641
mock(TemporaryResourceCache.class);
3742
private final EventHandler eventHandlerMock = mock(EventHandler.class);
38-
private final MixedOperation crClientMock = mock(MixedOperation.class);
39-
private final AnyNamespaceOperation specificResourceClientMock =
40-
mock(AnyNamespaceOperation.class);
41-
private final AnyNamespaceOperation labeledResourceClientMock =
42-
mock(AnyNamespaceOperation.class);
43-
private final SharedIndexInformer informer = mock(SharedIndexInformer.class);
4443
private final InformerConfiguration<Deployment> informerConfiguration =
4544
mock(InformerConfiguration.class);
4645

4746
@BeforeEach
4847
void setup() {
49-
when(clientMock.resources(any())).thenReturn(crClientMock);
50-
when(crClientMock.inAnyNamespace()).thenReturn(specificResourceClientMock);
51-
when(specificResourceClientMock.withLabelSelector((String) null))
52-
.thenReturn(labeledResourceClientMock);
53-
when(labeledResourceClientMock.runnableInformer(0)).thenReturn(informer);
54-
when(informer.getIndexer()).thenReturn(mock(Indexer.class));
55-
5648
when(informerConfiguration.getEffectiveNamespaces())
5749
.thenReturn(DEFAULT_NAMESPACES_SET);
5850
when(informerConfiguration.getSecondaryToPrimaryMapper())
@@ -251,6 +243,25 @@ void filtersOnDeleteEvents() {
251243
verify(eventHandlerMock, never()).handleEvent(any());
252244
}
253245

246+
@Test
247+
void informerStoppedHandlerShouldBeCalledWhenInformerStops() {
248+
try {
249+
final var exception = new RuntimeException("Informer stopped exceptionally!");
250+
final var informerStoppedHandler = mock(InformerStoppedHandler.class);
251+
ConfigurationServiceProvider
252+
.overrideCurrent(
253+
overrider -> overrider.withInformerStoppedHandler(informerStoppedHandler));
254+
informerEventSource = new InformerEventSource<>(informerConfiguration,
255+
MockKubernetesClient.client(Deployment.class, unused -> {
256+
throw exception;
257+
}));
258+
informerEventSource.start();
259+
verify(informerStoppedHandler, atLeastOnce()).onStop(any(), eq(exception));
260+
} finally {
261+
ConfigurationServiceProvider.reset();
262+
}
263+
}
264+
254265
Deployment testDeployment() {
255266
Deployment deployment = new Deployment();
256267
deployment.setMetadata(new ObjectMeta());

operator-framework-junit5/src/main/java/io/javaoperatorsdk/operator/junit/AbstractOperatorExtension.java

Lines changed: 5 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -7,6 +7,7 @@
77
import java.util.Locale;
88
import java.util.UUID;
99
import java.util.concurrent.TimeUnit;
10+
import java.util.function.Consumer;
1011

1112
import org.awaitility.Awaitility;
1213
import org.junit.jupiter.api.extension.*;
@@ -22,7 +23,7 @@
2223
import io.fabric8.kubernetes.client.dsl.Resource;
2324
import io.fabric8.kubernetes.client.utils.KubernetesResourceUtil;
2425
import io.fabric8.kubernetes.client.utils.Utils;
25-
import io.javaoperatorsdk.operator.api.config.ConfigurationService;
26+
import io.javaoperatorsdk.operator.api.config.ConfigurationServiceOverrider;
2627
import io.javaoperatorsdk.operator.api.config.ConfigurationServiceProvider;
2728

2829
public abstract class AbstractOperatorExtension implements HasKubernetesClient,
@@ -35,7 +36,6 @@ public abstract class AbstractOperatorExtension implements HasKubernetesClient,
3536
public static final int CRD_READY_WAIT = 2000;
3637

3738
private final KubernetesClient kubernetesClient;
38-
protected final ConfigurationService configurationService;
3939
protected final List<HasMetadata> infrastructure;
4040
protected Duration infrastructureTimeout;
4141
protected final boolean oneNamespacePerClass;
@@ -45,7 +45,6 @@ public abstract class AbstractOperatorExtension implements HasKubernetesClient,
4545
protected String namespace;
4646

4747
protected AbstractOperatorExtension(
48-
ConfigurationService configurationService,
4948
List<HasMetadata> infrastructure,
5049
Duration infrastructureTimeout,
5150
boolean oneNamespacePerClass,
@@ -54,8 +53,7 @@ protected AbstractOperatorExtension(
5453
KubernetesClient kubernetesClient) {
5554
this.kubernetesClient = kubernetesClient != null ? kubernetesClient
5655
: new KubernetesClientBuilder()
57-
.withConfig(configurationService.getClientConfiguration()).build();
58-
this.configurationService = configurationService;
56+
.withConfig(ConfigurationServiceProvider.instance().getClientConfiguration()).build();
5957
this.infrastructure = infrastructure;
6058
this.infrastructureTimeout = infrastructureTimeout;
6159
this.oneNamespacePerClass = oneNamespacePerClass;
@@ -126,7 +124,6 @@ public <T extends HasMetadata> boolean delete(T resource) {
126124
}
127125

128126
@Deprecated(forRemoval = true)
129-
@SuppressWarnings("unchecked")
130127
public <T extends HasMetadata> boolean delete(Class<T> type, T resource) {
131128
return delete(resource);
132129
}
@@ -214,16 +211,13 @@ protected void deleteOperator() {
214211

215212
@SuppressWarnings("unchecked")
216213
public static abstract class AbstractBuilder<T extends AbstractBuilder<T>> {
217-
protected ConfigurationService configurationService;
218214
protected final List<HasMetadata> infrastructure;
219215
protected Duration infrastructureTimeout;
220216
protected boolean preserveNamespaceOnError;
221217
protected boolean waitForNamespaceDeletion;
222218
protected boolean oneNamespacePerClass;
223219

224220
protected AbstractBuilder() {
225-
this.configurationService = ConfigurationServiceProvider.instance();
226-
227221
this.infrastructure = new ArrayList<>();
228222
this.infrastructureTimeout = Duration.ofMinutes(1);
229223

@@ -255,8 +249,8 @@ public T oneNamespacePerClass(boolean value) {
255249
return (T) this;
256250
}
257251

258-
public T withConfigurationService(ConfigurationService value) {
259-
configurationService = value;
252+
public T withConfigurationService(Consumer<ConfigurationServiceOverrider> overrider) {
253+
ConfigurationServiceProvider.overrideCurrent(overrider);
260254
return (T) this;
261255
}
262256

operator-framework-junit5/src/main/java/io/javaoperatorsdk/operator/junit/ClusterDeployedOperatorExtension.java

Lines changed: 3 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -19,7 +19,7 @@
1919
import io.fabric8.kubernetes.api.model.HasMetadata;
2020
import io.fabric8.kubernetes.api.model.rbac.ClusterRoleBinding;
2121
import io.fabric8.kubernetes.client.KubernetesClient;
22-
import io.javaoperatorsdk.operator.api.config.ConfigurationService;
22+
import io.fabric8.kubernetes.client.KubernetesClientBuilder;
2323

2424
public class ClusterDeployedOperatorExtension extends AbstractOperatorExtension {
2525

@@ -30,7 +30,6 @@ public class ClusterDeployedOperatorExtension extends AbstractOperatorExtension
3030
private final Duration operatorDeploymentTimeout;
3131

3232
private ClusterDeployedOperatorExtension(
33-
ConfigurationService configurationService,
3433
List<HasMetadata> operatorDeployment,
3534
Duration operatorDeploymentTimeout,
3635
List<HasMetadata> infrastructure,
@@ -39,7 +38,7 @@ private ClusterDeployedOperatorExtension(
3938
boolean waitForNamespaceDeletion,
4039
boolean oneNamespacePerClass,
4140
KubernetesClient kubernetesClient) {
42-
super(configurationService, infrastructure, infrastructureTimeout, oneNamespacePerClass,
41+
super(infrastructure, infrastructureTimeout, oneNamespacePerClass,
4342
preserveNamespaceOnError,
4443
waitForNamespaceDeletion,
4544
kubernetesClient);
@@ -146,15 +145,14 @@ public Builder withKubernetesClient(KubernetesClient kubernetesClient) {
146145

147146
public ClusterDeployedOperatorExtension build() {
148147
return new ClusterDeployedOperatorExtension(
149-
configurationService,
150148
operatorDeployment,
151149
deploymentTimeout,
152150
infrastructure,
153151
infrastructureTimeout,
154152
preserveNamespaceOnError,
155153
waitForNamespaceDeletion,
156154
oneNamespacePerClass,
157-
kubernetesClient);
155+
kubernetesClient != null ? kubernetesClient : new KubernetesClientBuilder().build());
158156
}
159157
}
160158
}

0 commit comments

Comments
 (0)