Skip to content

feat: enable configuring a handler to listen to informers stopping #1493

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 10 commits into from
Sep 30, 2022
Merged
Original file line number Diff line number Diff line change
Expand Up @@ -148,4 +148,7 @@ default Optional<LeaderElectionConfiguration> getLeaderElectionConfiguration() {
return Optional.empty();
}

default Optional<InformerStoppedHandler> getInformerStoppedHandler() {
return Optional.empty();
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@ public class ConfigurationServiceOverrider {
private ExecutorService executorService;
private ExecutorService workflowExecutorService;
private LeaderElectionConfiguration leaderElectionConfiguration;
private InformerStoppedHandler informerStoppedHandler;

ConfigurationServiceOverrider(ConfigurationService original) {
this.original = original;
Expand Down Expand Up @@ -93,6 +94,11 @@ public ConfigurationServiceOverrider withLeaderElectionConfiguration(
return this;
}

public ConfigurationServiceOverrider withInformerStoppedHandler(InformerStoppedHandler handler) {
this.informerStoppedHandler = handler;
return this;
}

public ConfigurationService build() {
return new BaseConfigurationService(original.getVersion(), cloner, objectMapper) {
@Override
Expand Down Expand Up @@ -159,6 +165,12 @@ public Optional<LeaderElectionConfiguration> getLeaderElectionConfiguration() {
return leaderElectionConfiguration != null ? Optional.of(leaderElectionConfiguration)
: original.getLeaderElectionConfiguration();
}

@Override
public Optional<InformerStoppedHandler> getInformerStoppedHandler() {
return informerStoppedHandler != null ? Optional.of(informerStoppedHandler)
: original.getInformerStoppedHandler();
}
};
}

Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,9 @@
package io.javaoperatorsdk.operator.api.config;

import io.fabric8.kubernetes.client.informers.SharedIndexInformer;

public interface InformerStoppedHandler {
Copy link
Collaborator

@csviri csviri Sep 30, 2022

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Would rather rename it to InformerStopHandler ?

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think the current name is better because it reflects the fact that the handler is called after the informer is stopped.


@SuppressWarnings("rawtypes")
void onStop(SharedIndexInformer informer, Throwable ex);
}
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@
import io.fabric8.kubernetes.client.informers.cache.Cache;
import io.javaoperatorsdk.operator.OperatorException;
import io.javaoperatorsdk.operator.ReconcilerUtils;
import io.javaoperatorsdk.operator.api.config.ConfigurationServiceProvider;
import io.javaoperatorsdk.operator.processing.LifecycleAware;
import io.javaoperatorsdk.operator.processing.event.ResourceID;
import io.javaoperatorsdk.operator.processing.event.source.IndexerResourceCache;
Expand All @@ -25,6 +26,16 @@ class InformerWrapper<T extends HasMetadata>

public InformerWrapper(SharedIndexInformer<T> informer) {
this.informer = informer;

// register
ConfigurationServiceProvider.instance().getInformerStoppedHandler()
.ifPresent(ish -> {
final var stopped = informer.stopped();
stopped.handle((res, ex) -> {
ish.onStop(informer, ex);
return null;
});
});
this.cache = (Cache<T>) informer.getStore();
}

Expand Down
Original file line number Diff line number Diff line change
@@ -1,5 +1,8 @@
package io.javaoperatorsdk.operator;

import java.util.concurrent.CompletableFuture;
import java.util.function.Consumer;

import io.fabric8.kubernetes.api.model.HasMetadata;
import io.fabric8.kubernetes.api.model.KubernetesResourceList;
import io.fabric8.kubernetes.client.GenericKubernetesClient;
Expand All @@ -14,12 +17,18 @@
import static org.mockito.ArgumentMatchers.anyLong;
import static org.mockito.ArgumentMatchers.anyString;
import static org.mockito.ArgumentMatchers.nullable;
import static org.mockito.Mockito.doAnswer;
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.when;

public class MockKubernetesClient {

public static <T extends HasMetadata> KubernetesClient client(Class<T> clazz) {
return client(clazz, null);
}

public static <T extends HasMetadata> KubernetesClient client(Class<T> clazz,
Consumer<Void> informerRunBehavior) {
final var client = mock(GenericKubernetesClient.class);
MixedOperation<T, KubernetesResourceList<T>, Resource<T>> resources =
mock(MixedOperation.class);
Expand All @@ -34,6 +43,19 @@ public static <T extends HasMetadata> KubernetesClient client(Class<T> clazz) {
when(resources.inAnyNamespace()).thenReturn(inAnyNamespace);
when(inAnyNamespace.withLabelSelector(nullable(String.class))).thenReturn(filterable);
SharedIndexInformer<T> informer = mock(SharedIndexInformer.class);
CompletableFuture<Void> stopped = new CompletableFuture<>();
when(informer.stopped()).thenReturn(stopped);
if (informerRunBehavior != null) {
doAnswer(invocation -> {
try {
informerRunBehavior.accept(null);
} catch (Exception e) {
stopped.completeExceptionally(e);
}
return null;
}).when(informer).run();
}
doAnswer(invocation -> null).when(informer).stop();
Indexer mockIndexer = mock(Indexer.class);
when(informer.getIndexer()).thenReturn(mockIndexer);
when(filterable.runnableInformer(anyLong())).thenReturn(informer);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -112,6 +112,8 @@ public <R extends HasMetadata> R clone(R object) {
.withTerminationTimeoutSeconds(100)
.withMetrics(new Metrics() {})
.withLeaderElectionConfiguration(new LeaderElectionConfiguration("newLease", "newLeaseNS"))
.withInformerStoppedHandler((informer, ex) -> {
})
.build();

assertNotEquals(config.closeClientOnStop(), overridden.closeClientOnStop());
Expand All @@ -128,5 +130,7 @@ public <R extends HasMetadata> R clone(R object) {
assertNotEquals(config.getObjectMapper(), overridden.getObjectMapper());
assertNotEquals(config.getLeaderElectionConfiguration(),
overridden.getLeaderElectionConfiguration());
assertNotEquals(config.getInformerStoppedHandler(),
overridden.getLeaderElectionConfiguration());
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -9,11 +9,9 @@
import io.fabric8.kubernetes.api.model.ObjectMeta;
import io.fabric8.kubernetes.api.model.apps.Deployment;
import io.fabric8.kubernetes.client.KubernetesClient;
import io.fabric8.kubernetes.client.dsl.FilterWatchListDeletable;
import io.fabric8.kubernetes.client.dsl.FilterWatchListMultiDeletable;
import io.fabric8.kubernetes.client.dsl.MixedOperation;
import io.fabric8.kubernetes.client.informers.SharedIndexInformer;
import io.fabric8.kubernetes.client.informers.cache.Indexer;
import io.javaoperatorsdk.operator.MockKubernetesClient;
import io.javaoperatorsdk.operator.api.config.ConfigurationServiceProvider;
import io.javaoperatorsdk.operator.api.config.InformerStoppedHandler;
import io.javaoperatorsdk.operator.api.config.informer.InformerConfiguration;
import io.javaoperatorsdk.operator.processing.event.EventHandler;
import io.javaoperatorsdk.operator.processing.event.ResourceID;
Expand All @@ -22,6 +20,8 @@

import static io.javaoperatorsdk.operator.api.reconciler.Constants.DEFAULT_NAMESPACES_SET;
import static org.mockito.ArgumentMatchers.any;
import static org.mockito.ArgumentMatchers.eq;
import static org.mockito.Mockito.atLeastOnce;
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.never;
import static org.mockito.Mockito.times;
Expand All @@ -36,28 +36,15 @@ class InformerEventSourceTest {
private static final String NEXT_RESOURCE_VERSION = "2";

private InformerEventSource<Deployment, TestCustomResource> informerEventSource;
private final KubernetesClient clientMock = mock(KubernetesClient.class);
private final KubernetesClient clientMock = MockKubernetesClient.client(Deployment.class);
private final TemporaryResourceCache<Deployment> temporaryResourceCacheMock =
mock(TemporaryResourceCache.class);
private final EventHandler eventHandlerMock = mock(EventHandler.class);
private final MixedOperation crClientMock = mock(MixedOperation.class);
private final FilterWatchListMultiDeletable specificResourceClientMock =
mock(FilterWatchListMultiDeletable.class);
private final FilterWatchListDeletable labeledResourceClientMock =
mock(FilterWatchListDeletable.class);
private final SharedIndexInformer informer = mock(SharedIndexInformer.class);
private final InformerConfiguration<Deployment> informerConfiguration =
mock(InformerConfiguration.class);

@BeforeEach
void setup() {
when(clientMock.resources(any())).thenReturn(crClientMock);
when(crClientMock.inAnyNamespace()).thenReturn(specificResourceClientMock);
when(specificResourceClientMock.withLabelSelector((String) null))
.thenReturn(labeledResourceClientMock);
when(labeledResourceClientMock.runnableInformer(0)).thenReturn(informer);
when(informer.getIndexer()).thenReturn(mock(Indexer.class));

when(informerConfiguration.getEffectiveNamespaces())
.thenReturn(DEFAULT_NAMESPACES_SET);
when(informerConfiguration.getSecondaryToPrimaryMapper())
Expand Down Expand Up @@ -256,6 +243,25 @@ void filtersOnDeleteEvents() {
verify(eventHandlerMock, never()).handleEvent(any());
}

@Test
void informerStoppedHandlerShouldBeCalledWhenInformerStops() {
try {
final var exception = new RuntimeException("Informer stopped exceptionally!");
final var informerStoppedHandler = mock(InformerStoppedHandler.class);
ConfigurationServiceProvider
.overrideCurrent(
overrider -> overrider.withInformerStoppedHandler(informerStoppedHandler));
informerEventSource = new InformerEventSource<>(informerConfiguration,
MockKubernetesClient.client(Deployment.class, unused -> {
throw exception;
}));
informerEventSource.start();
verify(informerStoppedHandler, atLeastOnce()).onStop(any(), eq(exception));
} finally {
ConfigurationServiceProvider.reset();
}
}

Deployment testDeployment() {
Deployment deployment = new Deployment();
deployment.setMetadata(new ObjectMeta());
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -134,7 +134,10 @@ protected void before(ExtensionContext context) {
ref.controllerConfigurationOverrider.accept(oconfig);
}

applyCrd(config.getResourceTypeName());
// only try to apply a CRD for the reconciler if it is associated to a CR
if (CustomResource.class.isAssignableFrom(config.getResourceClass())) {
applyCrd(config.getResourceTypeName());
}

if (ref.reconciler instanceof KubernetesClientAware) {
((KubernetesClientAware) ref.reconciler).setKubernetesClient(kubernetesClient);
Expand All @@ -151,6 +154,9 @@ protected void before(ExtensionContext context) {
private void applyCrd(String resourceTypeName) {
String path = "/META-INF/fabric8/" + resourceTypeName + "-v1.yml";
try (InputStream is = getClass().getResourceAsStream(path)) {
if (is == null) {
throw new IllegalStateException("Cannot find CRD at " + path);
}
final var crd = getKubernetesClient().load(is);
crd.createOrReplace();
Thread.sleep(CRD_READY_WAIT); // readiness is not applicable for CRD, just wait a little
Expand Down
13 changes: 12 additions & 1 deletion pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -43,7 +43,7 @@
<sonar.host.url>https://sonarcloud.io</sonar.host.url>

<junit.version>5.9.0</junit.version>
<fabric8-client.version>5.12.3</fabric8-client.version>
<fabric8-client.version>5.12-SNAPSHOT</fabric8-client.version>
<slf4j.version>1.7.36</slf4j.version>
<log4j.version>2.18.0</log4j.version>
<mokito.version>4.8.0</mokito.version>
Expand Down Expand Up @@ -196,6 +196,17 @@
</dependencies>
</dependencyManagement>

<!-- todo: remove once fabric8 5.12.4 is released -->
<repositories>
<repository>
<id>ossrh</id>
<url>https://oss.sonatype.org/content/repositories/snapshots</url>
<snapshots>
<enabled>true</enabled>
</snapshots>
</repository>
</repositories>

<distributionManagement>
<snapshotRepository>
<id>ossrh</id>
Expand Down