Skip to content

feat: enable configuring a handler to listen to informers stopping #1493

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 10 commits into from
Sep 30, 2022
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -148,4 +148,7 @@ default Optional<LeaderElectionConfiguration> getLeaderElectionConfiguration() {
return Optional.empty();
}

default Optional<InformerStoppedHandler> getInformerStoppedHandler() {
return Optional.empty();
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@ public class ConfigurationServiceOverrider {
private ExecutorService executorService;
private ExecutorService workflowExecutorService;
private LeaderElectionConfiguration leaderElectionConfiguration;
private InformerStoppedHandler informerStoppedHandler;

ConfigurationServiceOverrider(ConfigurationService original) {
this.original = original;
Expand Down Expand Up @@ -93,6 +94,11 @@ public ConfigurationServiceOverrider withLeaderElectionConfiguration(
return this;
}

public ConfigurationServiceOverrider withInformerStoppedHandler(InformerStoppedHandler handler) {
this.informerStoppedHandler = handler;
return this;
}

public ConfigurationService build() {
return new BaseConfigurationService(original.getVersion(), cloner, objectMapper) {
@Override
Expand Down Expand Up @@ -159,6 +165,12 @@ public Optional<LeaderElectionConfiguration> getLeaderElectionConfiguration() {
return leaderElectionConfiguration != null ? Optional.of(leaderElectionConfiguration)
: original.getLeaderElectionConfiguration();
}

@Override
public Optional<InformerStoppedHandler> getInformerStoppedHandler() {
return informerStoppedHandler != null ? Optional.of(informerStoppedHandler)
: original.getInformerStoppedHandler();
}
};
}

Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,9 @@
package io.javaoperatorsdk.operator.api.config;

import io.fabric8.kubernetes.client.informers.SharedIndexInformer;

public interface InformerStoppedHandler {
Copy link
Collaborator

@csviri csviri Sep 30, 2022

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Would rather rename it to InformerStopHandler ?

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think the current name is better because it reflects the fact that the handler is called after the informer is stopped.


@SuppressWarnings("rawtypes")
void onStop(SharedIndexInformer informer, Throwable ex);
}
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@
import io.fabric8.kubernetes.client.informers.cache.Cache;
import io.javaoperatorsdk.operator.OperatorException;
import io.javaoperatorsdk.operator.ReconcilerUtils;
import io.javaoperatorsdk.operator.api.config.ConfigurationServiceProvider;
import io.javaoperatorsdk.operator.processing.LifecycleAware;
import io.javaoperatorsdk.operator.processing.event.ResourceID;
import io.javaoperatorsdk.operator.processing.event.source.IndexerResourceCache;
Expand All @@ -32,6 +33,27 @@ public InformerWrapper(SharedIndexInformer<T> informer) {
public void start() throws OperatorException {
try {
informer.run();

// register stopped handler if we have one defined
ConfigurationServiceProvider.instance().getInformerStoppedHandler()
.ifPresent(ish -> {
final var stopped = informer.stopped();
if (stopped != null) {
stopped.handle((res, ex) -> {
ish.onStop(informer, ex);
return null;
});
} else {
final var apiTypeClass = informer.getApiTypeClass();
final var fullResourceName =
HasMetadata.getFullResourceName(apiTypeClass);
final var version = HasMetadata.getVersion(apiTypeClass);
throw new IllegalStateException(
"Cannot retrieve 'stopped' callback to listen to informer stopping for informer for "
+ fullResourceName + "/" + version);
}
});

} catch (Exception e) {
ReconcilerUtils.handleKubernetesClientException(e,
HasMetadata.getFullResourceName(informer.getApiTypeClass()));
Expand Down
Original file line number Diff line number Diff line change
@@ -1,5 +1,8 @@
package io.javaoperatorsdk.operator;

import java.util.concurrent.CompletableFuture;
import java.util.function.Consumer;

import io.fabric8.kubernetes.api.model.HasMetadata;
import io.fabric8.kubernetes.api.model.KubernetesResourceList;
import io.fabric8.kubernetes.client.GenericKubernetesClient;
Expand All @@ -14,12 +17,18 @@
import static org.mockito.ArgumentMatchers.anyLong;
import static org.mockito.ArgumentMatchers.anyString;
import static org.mockito.ArgumentMatchers.nullable;
import static org.mockito.Mockito.doAnswer;
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.when;

public class MockKubernetesClient {

public static <T extends HasMetadata> KubernetesClient client(Class<T> clazz) {
return client(clazz, null);
}

public static <T extends HasMetadata> KubernetesClient client(Class<T> clazz,
Consumer<Void> informerRunBehavior) {
final var client = mock(GenericKubernetesClient.class);
MixedOperation<T, KubernetesResourceList<T>, Resource<T>> resources =
mock(MixedOperation.class);
Expand All @@ -34,6 +43,19 @@ public static <T extends HasMetadata> KubernetesClient client(Class<T> clazz) {
when(resources.inAnyNamespace()).thenReturn(inAnyNamespace);
when(inAnyNamespace.withLabelSelector(nullable(String.class))).thenReturn(filterable);
SharedIndexInformer<T> informer = mock(SharedIndexInformer.class);
CompletableFuture<Void> stopped = new CompletableFuture<>();
when(informer.stopped()).thenReturn(stopped);
if (informerRunBehavior != null) {
doAnswer(invocation -> {
try {
informerRunBehavior.accept(null);
} catch (Exception e) {
stopped.completeExceptionally(e);
}
return null;
}).when(informer).run();
}
doAnswer(invocation -> null).when(informer).stop();
Indexer mockIndexer = mock(Indexer.class);
when(informer.getIndexer()).thenReturn(mockIndexer);
when(filterable.runnableInformer(anyLong())).thenReturn(informer);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -112,6 +112,8 @@ public <R extends HasMetadata> R clone(R object) {
.withTerminationTimeoutSeconds(100)
.withMetrics(new Metrics() {})
.withLeaderElectionConfiguration(new LeaderElectionConfiguration("newLease", "newLeaseNS"))
.withInformerStoppedHandler((informer, ex) -> {
})
.build();

assertNotEquals(config.closeClientOnStop(), overridden.closeClientOnStop());
Expand All @@ -128,5 +130,7 @@ public <R extends HasMetadata> R clone(R object) {
assertNotEquals(config.getObjectMapper(), overridden.getObjectMapper());
assertNotEquals(config.getLeaderElectionConfiguration(),
overridden.getLeaderElectionConfiguration());
assertNotEquals(config.getInformerStoppedHandler(),
overridden.getLeaderElectionConfiguration());
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -9,11 +9,9 @@
import io.fabric8.kubernetes.api.model.ObjectMeta;
import io.fabric8.kubernetes.api.model.apps.Deployment;
import io.fabric8.kubernetes.client.KubernetesClient;
import io.fabric8.kubernetes.client.dsl.FilterWatchListDeletable;
import io.fabric8.kubernetes.client.dsl.FilterWatchListMultiDeletable;
import io.fabric8.kubernetes.client.dsl.MixedOperation;
import io.fabric8.kubernetes.client.informers.SharedIndexInformer;
import io.fabric8.kubernetes.client.informers.cache.Indexer;
import io.javaoperatorsdk.operator.MockKubernetesClient;
import io.javaoperatorsdk.operator.api.config.ConfigurationServiceProvider;
import io.javaoperatorsdk.operator.api.config.InformerStoppedHandler;
import io.javaoperatorsdk.operator.api.config.informer.InformerConfiguration;
import io.javaoperatorsdk.operator.processing.event.EventHandler;
import io.javaoperatorsdk.operator.processing.event.ResourceID;
Expand All @@ -22,6 +20,8 @@

import static io.javaoperatorsdk.operator.api.reconciler.Constants.DEFAULT_NAMESPACES_SET;
import static org.mockito.ArgumentMatchers.any;
import static org.mockito.ArgumentMatchers.eq;
import static org.mockito.Mockito.atLeastOnce;
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.never;
import static org.mockito.Mockito.times;
Expand All @@ -36,28 +36,15 @@ class InformerEventSourceTest {
private static final String NEXT_RESOURCE_VERSION = "2";

private InformerEventSource<Deployment, TestCustomResource> informerEventSource;
private final KubernetesClient clientMock = mock(KubernetesClient.class);
private final KubernetesClient clientMock = MockKubernetesClient.client(Deployment.class);
private final TemporaryResourceCache<Deployment> temporaryResourceCacheMock =
mock(TemporaryResourceCache.class);
private final EventHandler eventHandlerMock = mock(EventHandler.class);
private final MixedOperation crClientMock = mock(MixedOperation.class);
private final FilterWatchListMultiDeletable specificResourceClientMock =
mock(FilterWatchListMultiDeletable.class);
private final FilterWatchListDeletable labeledResourceClientMock =
mock(FilterWatchListDeletable.class);
private final SharedIndexInformer informer = mock(SharedIndexInformer.class);
private final InformerConfiguration<Deployment> informerConfiguration =
mock(InformerConfiguration.class);

@BeforeEach
void setup() {
when(clientMock.resources(any())).thenReturn(crClientMock);
when(crClientMock.inAnyNamespace()).thenReturn(specificResourceClientMock);
when(specificResourceClientMock.withLabelSelector((String) null))
.thenReturn(labeledResourceClientMock);
when(labeledResourceClientMock.runnableInformer(0)).thenReturn(informer);
when(informer.getIndexer()).thenReturn(mock(Indexer.class));

when(informerConfiguration.getEffectiveNamespaces())
.thenReturn(DEFAULT_NAMESPACES_SET);
when(informerConfiguration.getSecondaryToPrimaryMapper())
Expand Down Expand Up @@ -256,6 +243,25 @@ void filtersOnDeleteEvents() {
verify(eventHandlerMock, never()).handleEvent(any());
}

@Test
void informerStoppedHandlerShouldBeCalledWhenInformerStops() {
try {
final var exception = new RuntimeException("Informer stopped exceptionally!");
final var informerStoppedHandler = mock(InformerStoppedHandler.class);
ConfigurationServiceProvider
.overrideCurrent(
overrider -> overrider.withInformerStoppedHandler(informerStoppedHandler));
informerEventSource = new InformerEventSource<>(informerConfiguration,
MockKubernetesClient.client(Deployment.class, unused -> {
throw exception;
}));
informerEventSource.start();
verify(informerStoppedHandler, atLeastOnce()).onStop(any(), eq(exception));
} finally {
ConfigurationServiceProvider.reset();
}
}

Deployment testDeployment() {
Deployment deployment = new Deployment();
deployment.setMetadata(new ObjectMeta());
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@
import java.util.Locale;
import java.util.UUID;
import java.util.concurrent.TimeUnit;
import java.util.function.Consumer;

import org.awaitility.Awaitility;
import org.junit.jupiter.api.extension.*;
Expand All @@ -22,7 +23,7 @@
import io.fabric8.kubernetes.client.dsl.Resource;
import io.fabric8.kubernetes.client.utils.KubernetesResourceUtil;
import io.fabric8.kubernetes.client.utils.Utils;
import io.javaoperatorsdk.operator.api.config.ConfigurationService;
import io.javaoperatorsdk.operator.api.config.ConfigurationServiceOverrider;
import io.javaoperatorsdk.operator.api.config.ConfigurationServiceProvider;

public abstract class AbstractOperatorExtension implements HasKubernetesClient,
Expand All @@ -35,7 +36,6 @@ public abstract class AbstractOperatorExtension implements HasKubernetesClient,
public static final int CRD_READY_WAIT = 2000;

private final KubernetesClient kubernetesClient = new DefaultKubernetesClient();
protected final ConfigurationService configurationService;
protected final List<HasMetadata> infrastructure;
protected Duration infrastructureTimeout;
protected final boolean oneNamespacePerClass;
Expand All @@ -45,14 +45,11 @@ public abstract class AbstractOperatorExtension implements HasKubernetesClient,
protected String namespace;

protected AbstractOperatorExtension(
ConfigurationService configurationService,
List<HasMetadata> infrastructure,
Duration infrastructureTimeout,
boolean oneNamespacePerClass,
boolean preserveNamespaceOnError,
boolean waitForNamespaceDeletion) {

this.configurationService = configurationService;
this.infrastructure = infrastructure;
this.infrastructureTimeout = infrastructureTimeout;
this.oneNamespacePerClass = oneNamespacePerClass;
Expand Down Expand Up @@ -214,16 +211,13 @@ protected void deleteOperator() {

@SuppressWarnings("unchecked")
public static abstract class AbstractBuilder<T extends AbstractBuilder<T>> {
protected ConfigurationService configurationService;
protected final List<HasMetadata> infrastructure;
protected Duration infrastructureTimeout;
protected boolean preserveNamespaceOnError;
protected boolean waitForNamespaceDeletion;
protected boolean oneNamespacePerClass;

protected AbstractBuilder() {
this.configurationService = ConfigurationServiceProvider.instance();

this.infrastructure = new ArrayList<>();
this.infrastructureTimeout = Duration.ofMinutes(1);

Expand Down Expand Up @@ -255,8 +249,8 @@ public T oneNamespacePerClass(boolean value) {
return (T) this;
}

public T withConfigurationService(ConfigurationService value) {
configurationService = value;
public T withConfigurationService(Consumer<ConfigurationServiceOverrider> overrider) {
ConfigurationServiceProvider.overrideCurrent(overrider);
return (T) this;
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,6 @@

import io.fabric8.kubernetes.api.model.HasMetadata;
import io.fabric8.kubernetes.api.model.rbac.ClusterRoleBinding;
import io.javaoperatorsdk.operator.api.config.ConfigurationService;

public class ClusterDeployedOperatorExtension extends AbstractOperatorExtension {

Expand All @@ -29,15 +28,14 @@ public class ClusterDeployedOperatorExtension extends AbstractOperatorExtension
private final Duration operatorDeploymentTimeout;

private ClusterDeployedOperatorExtension(
ConfigurationService configurationService,
List<HasMetadata> operatorDeployment,
Duration operatorDeploymentTimeout,
List<HasMetadata> infrastructure,
Duration infrastructureTimeout,
boolean preserveNamespaceOnError,
boolean waitForNamespaceDeletion,
boolean oneNamespacePerClass) {
super(configurationService, infrastructure, infrastructureTimeout, oneNamespacePerClass,
super(infrastructure, infrastructureTimeout, oneNamespacePerClass,
preserveNamespaceOnError,
waitForNamespaceDeletion);
this.operatorDeployment = operatorDeployment;
Expand Down Expand Up @@ -137,7 +135,6 @@ public Builder withOperatorDeployment(HasMetadata... hms) {

public ClusterDeployedOperatorExtension build() {
return new ClusterDeployedOperatorExtension(
configurationService,
operatorDeployment,
deploymentTimeout,
infrastructure,
Expand Down
Loading