From 7472c0778d97bb9c786fc2c0913a0f5721a2e7a7 Mon Sep 17 00:00:00 2001 From: Chris Laprun Date: Fri, 30 Sep 2022 16:15:02 +0200 Subject: [PATCH 1/2] feat: enable configuring a handler to listen to informers stopping --- .../api/config/ConfigurationService.java | 3 + .../config/ConfigurationServiceOverrider.java | 12 ++ .../api/config/InformerStoppedHandler.java | 9 ++ .../source/informer/InformerWrapper.java | 22 ++++ .../operator/MockKubernetesClient.java | 27 +++++ .../ConfigurationServiceOverriderTest.java | 4 + .../informer/InformerEventSourceTest.java | 49 ++++---- .../junit/AbstractOperatorExtension.java | 16 +-- .../ClusterDeployedOperatorExtension.java | 8 +- .../junit/LocallyRunOperatorExtension.java | 8 +- .../operator/MultiVersionCRDIT.java | 106 ++++++++++++++++++ 11 files changed, 224 insertions(+), 40 deletions(-) create mode 100644 operator-framework-core/src/main/java/io/javaoperatorsdk/operator/api/config/InformerStoppedHandler.java diff --git a/operator-framework-core/src/main/java/io/javaoperatorsdk/operator/api/config/ConfigurationService.java b/operator-framework-core/src/main/java/io/javaoperatorsdk/operator/api/config/ConfigurationService.java index a7f50bcf6c..00e0b84526 100644 --- a/operator-framework-core/src/main/java/io/javaoperatorsdk/operator/api/config/ConfigurationService.java +++ b/operator-framework-core/src/main/java/io/javaoperatorsdk/operator/api/config/ConfigurationService.java @@ -148,4 +148,7 @@ default Optional getLeaderElectionConfiguration() { return Optional.empty(); } + default Optional getInformerStoppedHandler() { + return Optional.empty(); + } } diff --git a/operator-framework-core/src/main/java/io/javaoperatorsdk/operator/api/config/ConfigurationServiceOverrider.java b/operator-framework-core/src/main/java/io/javaoperatorsdk/operator/api/config/ConfigurationServiceOverrider.java index 26e1de7bb1..ee442c07fa 100644 --- a/operator-framework-core/src/main/java/io/javaoperatorsdk/operator/api/config/ConfigurationServiceOverrider.java +++ b/operator-framework-core/src/main/java/io/javaoperatorsdk/operator/api/config/ConfigurationServiceOverrider.java @@ -26,6 +26,7 @@ public class ConfigurationServiceOverrider { private ExecutorService executorService; private ExecutorService workflowExecutorService; private LeaderElectionConfiguration leaderElectionConfiguration; + private InformerStoppedHandler informerStoppedHandler; ConfigurationServiceOverrider(ConfigurationService original) { this.original = original; @@ -93,6 +94,11 @@ public ConfigurationServiceOverrider withLeaderElectionConfiguration( return this; } + public ConfigurationServiceOverrider withInformerStoppedHandler(InformerStoppedHandler handler) { + this.informerStoppedHandler = handler; + return this; + } + public ConfigurationService build() { return new BaseConfigurationService(original.getVersion(), cloner, objectMapper) { @Override @@ -159,6 +165,12 @@ public Optional getLeaderElectionConfiguration() { return leaderElectionConfiguration != null ? Optional.of(leaderElectionConfiguration) : original.getLeaderElectionConfiguration(); } + + @Override + public Optional getInformerStoppedHandler() { + return informerStoppedHandler != null ? Optional.of(informerStoppedHandler) + : original.getInformerStoppedHandler(); + } }; } diff --git a/operator-framework-core/src/main/java/io/javaoperatorsdk/operator/api/config/InformerStoppedHandler.java b/operator-framework-core/src/main/java/io/javaoperatorsdk/operator/api/config/InformerStoppedHandler.java new file mode 100644 index 0000000000..d204c86664 --- /dev/null +++ b/operator-framework-core/src/main/java/io/javaoperatorsdk/operator/api/config/InformerStoppedHandler.java @@ -0,0 +1,9 @@ +package io.javaoperatorsdk.operator.api.config; + +import io.fabric8.kubernetes.client.informers.SharedIndexInformer; + +public interface InformerStoppedHandler { + + @SuppressWarnings("rawtypes") + void onStop(SharedIndexInformer informer, Throwable ex); +} diff --git a/operator-framework-core/src/main/java/io/javaoperatorsdk/operator/processing/event/source/informer/InformerWrapper.java b/operator-framework-core/src/main/java/io/javaoperatorsdk/operator/processing/event/source/informer/InformerWrapper.java index 12ec40c03d..90bf01267c 100644 --- a/operator-framework-core/src/main/java/io/javaoperatorsdk/operator/processing/event/source/informer/InformerWrapper.java +++ b/operator-framework-core/src/main/java/io/javaoperatorsdk/operator/processing/event/source/informer/InformerWrapper.java @@ -16,6 +16,7 @@ import io.fabric8.kubernetes.client.informers.cache.Cache; import io.javaoperatorsdk.operator.OperatorException; import io.javaoperatorsdk.operator.ReconcilerUtils; +import io.javaoperatorsdk.operator.api.config.ConfigurationServiceProvider; import io.javaoperatorsdk.operator.processing.LifecycleAware; import io.javaoperatorsdk.operator.processing.event.ResourceID; import io.javaoperatorsdk.operator.processing.event.source.IndexerResourceCache; @@ -37,6 +38,27 @@ public InformerWrapper(SharedIndexInformer informer) { public void start() throws OperatorException { try { informer.run(); + + // register stopped handler if we have one defined + ConfigurationServiceProvider.instance().getInformerStoppedHandler() + .ifPresent(ish -> { + final var stopped = informer.stopped(); + if (stopped != null) { + stopped.handle((res, ex) -> { + ish.onStop(informer, ex); + return null; + }); + } else { + final var apiTypeClass = informer.getApiTypeClass(); + final var fullResourceName = + HasMetadata.getFullResourceName(apiTypeClass); + final var version = HasMetadata.getVersion(apiTypeClass); + throw new IllegalStateException( + "Cannot retrieve 'stopped' callback to listen to informer stopping for informer for " + + fullResourceName + "/" + version); + } + }); + } catch (Exception e) { log.error("Couldn't start informer for " + versionedFullResourceName() + " resources", e); ReconcilerUtils.handleKubernetesClientException(e, diff --git a/operator-framework-core/src/test/java/io/javaoperatorsdk/operator/MockKubernetesClient.java b/operator-framework-core/src/test/java/io/javaoperatorsdk/operator/MockKubernetesClient.java index 0527e9bd8f..2589f566da 100644 --- a/operator-framework-core/src/test/java/io/javaoperatorsdk/operator/MockKubernetesClient.java +++ b/operator-framework-core/src/test/java/io/javaoperatorsdk/operator/MockKubernetesClient.java @@ -1,10 +1,15 @@ package io.javaoperatorsdk.operator; +import java.util.concurrent.CompletableFuture; +import java.util.concurrent.Executors; +import java.util.function.Consumer; + import io.fabric8.kubernetes.api.model.HasMetadata; import io.fabric8.kubernetes.api.model.KubernetesResourceList; import io.fabric8.kubernetes.client.KubernetesClient; import io.fabric8.kubernetes.client.V1ApiextensionAPIGroupDSL; import io.fabric8.kubernetes.client.dsl.*; +import io.fabric8.kubernetes.client.extended.leaderelection.LeaderElectorBuilder; import io.fabric8.kubernetes.client.informers.SharedIndexInformer; import io.fabric8.kubernetes.client.informers.cache.Indexer; @@ -12,12 +17,19 @@ import static org.mockito.ArgumentMatchers.anyLong; import static org.mockito.ArgumentMatchers.anyString; import static org.mockito.ArgumentMatchers.nullable; +import static org.mockito.Mockito.doAnswer; import static org.mockito.Mockito.mock; import static org.mockito.Mockito.when; public class MockKubernetesClient { public static KubernetesClient client(Class clazz) { + return client(clazz, null); + } + + @SuppressWarnings({"unchecked", "rawtypes"}) + public static KubernetesClient client(Class clazz, + Consumer informerRunBehavior) { final var client = mock(KubernetesClient.class); MixedOperation, Resource> resources = mock(MixedOperation.class); @@ -32,10 +44,25 @@ public static KubernetesClient client(Class clazz) { when(resources.inAnyNamespace()).thenReturn(inAnyNamespace); when(inAnyNamespace.withLabelSelector(nullable(String.class))).thenReturn(filterable); SharedIndexInformer informer = mock(SharedIndexInformer.class); + CompletableFuture stopped = new CompletableFuture<>(); + when(informer.stopped()).thenReturn(stopped); + if (informerRunBehavior != null) { + doAnswer(invocation -> { + try { + informerRunBehavior.accept(null); + } catch (Exception e) { + stopped.completeExceptionally(e); + } + return null; + }).when(informer).run(); + } + doAnswer(invocation -> null).when(informer).stop(); Indexer mockIndexer = mock(Indexer.class); when(informer.getIndexer()).thenReturn(mockIndexer); when(filterable.runnableInformer(anyLong())).thenReturn(informer); when(client.resources(clazz)).thenReturn(resources); + when(client.leaderElector()) + .thenReturn(new LeaderElectorBuilder(client, Executors.newSingleThreadExecutor())); final var apiGroupDSL = mock(ApiextensionsAPIGroupDSL.class); when(client.apiextensions()).thenReturn(apiGroupDSL); diff --git a/operator-framework-core/src/test/java/io/javaoperatorsdk/operator/api/config/ConfigurationServiceOverriderTest.java b/operator-framework-core/src/test/java/io/javaoperatorsdk/operator/api/config/ConfigurationServiceOverriderTest.java index 8b973b8159..878118ba25 100644 --- a/operator-framework-core/src/test/java/io/javaoperatorsdk/operator/api/config/ConfigurationServiceOverriderTest.java +++ b/operator-framework-core/src/test/java/io/javaoperatorsdk/operator/api/config/ConfigurationServiceOverriderTest.java @@ -112,6 +112,8 @@ public R clone(R object) { .withTerminationTimeoutSeconds(100) .withMetrics(new Metrics() {}) .withLeaderElectionConfiguration(new LeaderElectionConfiguration("newLease", "newLeaseNS")) + .withInformerStoppedHandler((informer, ex) -> { + }) .build(); assertNotEquals(config.closeClientOnStop(), overridden.closeClientOnStop()); @@ -128,5 +130,7 @@ public R clone(R object) { assertNotEquals(config.getObjectMapper(), overridden.getObjectMapper()); assertNotEquals(config.getLeaderElectionConfiguration(), overridden.getLeaderElectionConfiguration()); + assertNotEquals(config.getInformerStoppedHandler(), + overridden.getLeaderElectionConfiguration()); } } diff --git a/operator-framework-core/src/test/java/io/javaoperatorsdk/operator/processing/event/source/informer/InformerEventSourceTest.java b/operator-framework-core/src/test/java/io/javaoperatorsdk/operator/processing/event/source/informer/InformerEventSourceTest.java index 67e9d1fca0..e941e8390d 100644 --- a/operator-framework-core/src/test/java/io/javaoperatorsdk/operator/processing/event/source/informer/InformerEventSourceTest.java +++ b/operator-framework-core/src/test/java/io/javaoperatorsdk/operator/processing/event/source/informer/InformerEventSourceTest.java @@ -9,10 +9,9 @@ import io.fabric8.kubernetes.api.model.ObjectMeta; import io.fabric8.kubernetes.api.model.apps.Deployment; import io.fabric8.kubernetes.client.KubernetesClient; -import io.fabric8.kubernetes.client.dsl.AnyNamespaceOperation; -import io.fabric8.kubernetes.client.dsl.MixedOperation; -import io.fabric8.kubernetes.client.informers.SharedIndexInformer; -import io.fabric8.kubernetes.client.informers.cache.Indexer; +import io.javaoperatorsdk.operator.MockKubernetesClient; +import io.javaoperatorsdk.operator.api.config.ConfigurationServiceProvider; +import io.javaoperatorsdk.operator.api.config.InformerStoppedHandler; import io.javaoperatorsdk.operator.api.config.informer.InformerConfiguration; import io.javaoperatorsdk.operator.processing.event.EventHandler; import io.javaoperatorsdk.operator.processing.event.ResourceID; @@ -21,7 +20,13 @@ import static io.javaoperatorsdk.operator.api.reconciler.Constants.DEFAULT_NAMESPACES_SET; import static org.mockito.ArgumentMatchers.any; -import static org.mockito.Mockito.*; +import static org.mockito.ArgumentMatchers.eq; +import static org.mockito.Mockito.atLeastOnce; +import static org.mockito.Mockito.mock; +import static org.mockito.Mockito.never; +import static org.mockito.Mockito.times; +import static org.mockito.Mockito.verify; +import static org.mockito.Mockito.when; @SuppressWarnings({"rawtypes", "unchecked"}) class InformerEventSourceTest { @@ -31,28 +36,15 @@ class InformerEventSourceTest { private static final String NEXT_RESOURCE_VERSION = "2"; private InformerEventSource informerEventSource; - private final KubernetesClient clientMock = mock(KubernetesClient.class); + private final KubernetesClient clientMock = MockKubernetesClient.client(Deployment.class); private final TemporaryResourceCache temporaryResourceCacheMock = mock(TemporaryResourceCache.class); private final EventHandler eventHandlerMock = mock(EventHandler.class); - private final MixedOperation crClientMock = mock(MixedOperation.class); - private final AnyNamespaceOperation specificResourceClientMock = - mock(AnyNamespaceOperation.class); - private final AnyNamespaceOperation labeledResourceClientMock = - mock(AnyNamespaceOperation.class); - private final SharedIndexInformer informer = mock(SharedIndexInformer.class); private final InformerConfiguration informerConfiguration = mock(InformerConfiguration.class); @BeforeEach void setup() { - when(clientMock.resources(any())).thenReturn(crClientMock); - when(crClientMock.inAnyNamespace()).thenReturn(specificResourceClientMock); - when(specificResourceClientMock.withLabelSelector((String) null)) - .thenReturn(labeledResourceClientMock); - when(labeledResourceClientMock.runnableInformer(0)).thenReturn(informer); - when(informer.getIndexer()).thenReturn(mock(Indexer.class)); - when(informerConfiguration.getEffectiveNamespaces()) .thenReturn(DEFAULT_NAMESPACES_SET); when(informerConfiguration.getSecondaryToPrimaryMapper()) @@ -251,6 +243,25 @@ void filtersOnDeleteEvents() { verify(eventHandlerMock, never()).handleEvent(any()); } + @Test + void informerStoppedHandlerShouldBeCalledWhenInformerStops() { + try { + final var exception = new RuntimeException("Informer stopped exceptionally!"); + final var informerStoppedHandler = mock(InformerStoppedHandler.class); + ConfigurationServiceProvider + .overrideCurrent( + overrider -> overrider.withInformerStoppedHandler(informerStoppedHandler)); + informerEventSource = new InformerEventSource<>(informerConfiguration, + MockKubernetesClient.client(Deployment.class, unused -> { + throw exception; + })); + informerEventSource.start(); + verify(informerStoppedHandler, atLeastOnce()).onStop(any(), eq(exception)); + } finally { + ConfigurationServiceProvider.reset(); + } + } + Deployment testDeployment() { Deployment deployment = new Deployment(); deployment.setMetadata(new ObjectMeta()); diff --git a/operator-framework-junit5/src/main/java/io/javaoperatorsdk/operator/junit/AbstractOperatorExtension.java b/operator-framework-junit5/src/main/java/io/javaoperatorsdk/operator/junit/AbstractOperatorExtension.java index 1da9973271..15713d47a1 100644 --- a/operator-framework-junit5/src/main/java/io/javaoperatorsdk/operator/junit/AbstractOperatorExtension.java +++ b/operator-framework-junit5/src/main/java/io/javaoperatorsdk/operator/junit/AbstractOperatorExtension.java @@ -7,6 +7,7 @@ import java.util.Locale; import java.util.UUID; import java.util.concurrent.TimeUnit; +import java.util.function.Consumer; import org.awaitility.Awaitility; import org.junit.jupiter.api.extension.*; @@ -22,7 +23,7 @@ import io.fabric8.kubernetes.client.dsl.Resource; import io.fabric8.kubernetes.client.utils.KubernetesResourceUtil; import io.fabric8.kubernetes.client.utils.Utils; -import io.javaoperatorsdk.operator.api.config.ConfigurationService; +import io.javaoperatorsdk.operator.api.config.ConfigurationServiceOverrider; import io.javaoperatorsdk.operator.api.config.ConfigurationServiceProvider; public abstract class AbstractOperatorExtension implements HasKubernetesClient, @@ -35,7 +36,6 @@ public abstract class AbstractOperatorExtension implements HasKubernetesClient, public static final int CRD_READY_WAIT = 2000; private final KubernetesClient kubernetesClient; - protected final ConfigurationService configurationService; protected final List infrastructure; protected Duration infrastructureTimeout; protected final boolean oneNamespacePerClass; @@ -45,7 +45,6 @@ public abstract class AbstractOperatorExtension implements HasKubernetesClient, protected String namespace; protected AbstractOperatorExtension( - ConfigurationService configurationService, List infrastructure, Duration infrastructureTimeout, boolean oneNamespacePerClass, @@ -54,8 +53,7 @@ protected AbstractOperatorExtension( KubernetesClient kubernetesClient) { this.kubernetesClient = kubernetesClient != null ? kubernetesClient : new KubernetesClientBuilder() - .withConfig(configurationService.getClientConfiguration()).build(); - this.configurationService = configurationService; + .withConfig(ConfigurationServiceProvider.instance().getClientConfiguration()).build(); this.infrastructure = infrastructure; this.infrastructureTimeout = infrastructureTimeout; this.oneNamespacePerClass = oneNamespacePerClass; @@ -126,7 +124,6 @@ public boolean delete(T resource) { } @Deprecated(forRemoval = true) - @SuppressWarnings("unchecked") public boolean delete(Class type, T resource) { return delete(resource); } @@ -214,7 +211,6 @@ protected void deleteOperator() { @SuppressWarnings("unchecked") public static abstract class AbstractBuilder> { - protected ConfigurationService configurationService; protected final List infrastructure; protected Duration infrastructureTimeout; protected boolean preserveNamespaceOnError; @@ -222,8 +218,6 @@ public static abstract class AbstractBuilder> { protected boolean oneNamespacePerClass; protected AbstractBuilder() { - this.configurationService = ConfigurationServiceProvider.instance(); - this.infrastructure = new ArrayList<>(); this.infrastructureTimeout = Duration.ofMinutes(1); @@ -255,8 +249,8 @@ public T oneNamespacePerClass(boolean value) { return (T) this; } - public T withConfigurationService(ConfigurationService value) { - configurationService = value; + public T withConfigurationService(Consumer overrider) { + ConfigurationServiceProvider.overrideCurrent(overrider); return (T) this; } diff --git a/operator-framework-junit5/src/main/java/io/javaoperatorsdk/operator/junit/ClusterDeployedOperatorExtension.java b/operator-framework-junit5/src/main/java/io/javaoperatorsdk/operator/junit/ClusterDeployedOperatorExtension.java index 1cda69dc01..ec1bff4064 100644 --- a/operator-framework-junit5/src/main/java/io/javaoperatorsdk/operator/junit/ClusterDeployedOperatorExtension.java +++ b/operator-framework-junit5/src/main/java/io/javaoperatorsdk/operator/junit/ClusterDeployedOperatorExtension.java @@ -19,7 +19,7 @@ import io.fabric8.kubernetes.api.model.HasMetadata; import io.fabric8.kubernetes.api.model.rbac.ClusterRoleBinding; import io.fabric8.kubernetes.client.KubernetesClient; -import io.javaoperatorsdk.operator.api.config.ConfigurationService; +import io.fabric8.kubernetes.client.KubernetesClientBuilder; public class ClusterDeployedOperatorExtension extends AbstractOperatorExtension { @@ -30,7 +30,6 @@ public class ClusterDeployedOperatorExtension extends AbstractOperatorExtension private final Duration operatorDeploymentTimeout; private ClusterDeployedOperatorExtension( - ConfigurationService configurationService, List operatorDeployment, Duration operatorDeploymentTimeout, List infrastructure, @@ -39,7 +38,7 @@ private ClusterDeployedOperatorExtension( boolean waitForNamespaceDeletion, boolean oneNamespacePerClass, KubernetesClient kubernetesClient) { - super(configurationService, infrastructure, infrastructureTimeout, oneNamespacePerClass, + super(infrastructure, infrastructureTimeout, oneNamespacePerClass, preserveNamespaceOnError, waitForNamespaceDeletion, kubernetesClient); @@ -146,7 +145,6 @@ public Builder withKubernetesClient(KubernetesClient kubernetesClient) { public ClusterDeployedOperatorExtension build() { return new ClusterDeployedOperatorExtension( - configurationService, operatorDeployment, deploymentTimeout, infrastructure, @@ -154,7 +152,7 @@ public ClusterDeployedOperatorExtension build() { preserveNamespaceOnError, waitForNamespaceDeletion, oneNamespacePerClass, - kubernetesClient); + kubernetesClient != null ? kubernetesClient : new KubernetesClientBuilder().build()); } } } diff --git a/operator-framework-junit5/src/main/java/io/javaoperatorsdk/operator/junit/LocallyRunOperatorExtension.java b/operator-framework-junit5/src/main/java/io/javaoperatorsdk/operator/junit/LocallyRunOperatorExtension.java index a4c04ab938..8f4d62341c 100644 --- a/operator-framework-junit5/src/main/java/io/javaoperatorsdk/operator/junit/LocallyRunOperatorExtension.java +++ b/operator-framework-junit5/src/main/java/io/javaoperatorsdk/operator/junit/LocallyRunOperatorExtension.java @@ -22,7 +22,7 @@ import io.javaoperatorsdk.operator.Operator; import io.javaoperatorsdk.operator.ReconcilerUtils; import io.javaoperatorsdk.operator.RegisteredController; -import io.javaoperatorsdk.operator.api.config.ConfigurationService; +import io.javaoperatorsdk.operator.api.config.ConfigurationServiceProvider; import io.javaoperatorsdk.operator.api.config.ControllerConfigurationOverrider; import io.javaoperatorsdk.operator.api.reconciler.Reconciler; import io.javaoperatorsdk.operator.processing.retry.Retry; @@ -42,7 +42,6 @@ public class LocallyRunOperatorExtension extends AbstractOperatorExtension { private final Map registeredControllers; private LocallyRunOperatorExtension( - ConfigurationService configurationService, List reconcilers, List infrastructure, List portForwards, @@ -53,7 +52,6 @@ private LocallyRunOperatorExtension( boolean oneNamespacePerClass, KubernetesClient kubernetesClient) { super( - configurationService, infrastructure, infrastructureTimeout, oneNamespacePerClass, @@ -64,7 +62,7 @@ private LocallyRunOperatorExtension( this.portForwards = portForwards; this.localPortForwards = new ArrayList<>(portForwards.size()); this.additionalCustomResourceDefinitions = additionalCustomResourceDefinitions; - this.operator = new Operator(getKubernetesClient(), this.configurationService); + this.operator = new Operator(getKubernetesClient()); this.registeredControllers = new HashMap<>(); } @@ -127,6 +125,7 @@ protected void before(ExtensionContext context) { additionalCustomResourceDefinitions .forEach(cr -> applyCrd(ReconcilerUtils.getResourceTypeName(cr))); + final var configurationService = ConfigurationServiceProvider.instance(); for (var ref : reconcilers) { final var config = configurationService.getConfigurationFor(ref.reconciler); final var oconfig = override(config); @@ -265,7 +264,6 @@ public Builder withAdditionalCustomResourceDefinition( public LocallyRunOperatorExtension build() { return new LocallyRunOperatorExtension( - configurationService, reconcilers, infrastructure, portForwards, diff --git a/operator-framework/src/test/java/io/javaoperatorsdk/operator/MultiVersionCRDIT.java b/operator-framework/src/test/java/io/javaoperatorsdk/operator/MultiVersionCRDIT.java index 3636542270..2126f129c7 100644 --- a/operator-framework/src/test/java/io/javaoperatorsdk/operator/MultiVersionCRDIT.java +++ b/operator-framework/src/test/java/io/javaoperatorsdk/operator/MultiVersionCRDIT.java @@ -6,7 +6,12 @@ import org.junit.jupiter.api.Test; import org.junit.jupiter.api.extension.RegisterExtension; +import io.fabric8.kubernetes.api.model.HasMetadata; import io.fabric8.kubernetes.api.model.ObjectMeta; +import io.fabric8.kubernetes.client.WatcherException; +import io.fabric8.kubernetes.client.informers.SharedIndexInformer; +import io.fabric8.kubernetes.client.utils.Serialization; +import io.javaoperatorsdk.operator.api.config.InformerStoppedHandler; import io.javaoperatorsdk.operator.junit.LocallyRunOperatorExtension; import io.javaoperatorsdk.operator.sample.multiversioncrd.MultiVersionCRDTestCustomResource1; import io.javaoperatorsdk.operator.sample.multiversioncrd.MultiVersionCRDTestCustomResource2; @@ -15,6 +20,8 @@ import io.javaoperatorsdk.operator.sample.multiversioncrd.MultiVersionCRDTestReconciler1; import io.javaoperatorsdk.operator.sample.multiversioncrd.MultiVersionCRDTestReconciler2; +import com.fasterxml.jackson.core.JsonProcessingException; + import static com.google.common.truth.Truth.assertThat; import static org.awaitility.Awaitility.await; @@ -28,8 +35,73 @@ class MultiVersionCRDIT { LocallyRunOperatorExtension.builder() .withReconciler(MultiVersionCRDTestReconciler1.class) .withReconciler(MultiVersionCRDTestReconciler2.class) + .withConfigurationService( + overrider -> overrider.withInformerStoppedHandler(informerStoppedHandler)) .build(); + private static class TestInformerStoppedHandler implements InformerStoppedHandler { + private String resourceClassName; + private String resourceCreateAsVersion; + + private String failedResourceVersion; + private String errorMessage; + + @Override + @SuppressWarnings("rawtypes") + public void onStop(SharedIndexInformer informer, Throwable ex) { + if (ex instanceof WatcherException) { + WatcherException watcherEx = (WatcherException) ex; + watcherEx.getRawWatchMessage().ifPresent(raw -> { + try { + // extract the resource at which the version is attempted to be created (i.e. the stored + // version) + final var unmarshal = Serialization.jsonMapper().readTree(raw); + final var object = unmarshal.get("object"); + resourceCreateAsVersion = acceptOnlyIfUnsetOrEqualToAlreadySet(resourceCreateAsVersion, + object.get("apiVersion").asText()); + // extract the asked resource version + failedResourceVersion = acceptOnlyIfUnsetOrEqualToAlreadySet(failedResourceVersion, + object.get("metadata").get("managedFields").get(0).get("apiVersion").asText()); + } catch (JsonProcessingException e) { + throw new RuntimeException(e); + } + }); + + // extract error message + errorMessage = + acceptOnlyIfUnsetOrEqualToAlreadySet(errorMessage, watcherEx.getCause().getMessage()); + } + final var apiTypeClass = informer.getApiTypeClass(); + resourceClassName = + acceptOnlyIfUnsetOrEqualToAlreadySet(resourceClassName, apiTypeClass.getName()); + System.out.println("Informer for " + HasMetadata.getFullResourceName(apiTypeClass) + + " stopped due to: " + ex.getMessage()); + } + + public String getResourceClassName() { + return resourceClassName; + } + + public String getResourceCreateAsVersion() { + return resourceCreateAsVersion; + } + + public String getErrorMessage() { + return errorMessage; + } + + public String getFailedResourceVersion() { + return failedResourceVersion; + } + + private String acceptOnlyIfUnsetOrEqualToAlreadySet(String existing, String newValue) { + return (existing == null || existing.equals(newValue)) ? newValue : null; + } + } + + private final static TestInformerStoppedHandler informerStoppedHandler = + new TestInformerStoppedHandler(); + @Test void multipleCRDVersions() { operator.create(createTestResourceV1WithoutLabel()); @@ -51,6 +123,40 @@ void multipleCRDVersions() { }); } + @Test + void invalidEventsShouldStopInformerAndCallInformerStoppedHandler() { + var v2res = createTestResourceV2WithLabel(); + v2res.getMetadata().getLabels().clear(); + operator.create(v2res); + var v1res = createTestResourceV1WithoutLabel(); + operator.create(v1res); + + await() + .atMost(Duration.ofSeconds(1)) + .pollInterval(Duration.ofMillis(50)) + .untilAsserted(() -> { + // v1 is the stored version so trying to create a v2 version should fail because we cannot + // convert a String (as defined by the spec of the v2 CRD) to an int (which is what the + // spec of the v1 CRD defines) + assertThat(informerStoppedHandler.getResourceCreateAsVersion()) + .isEqualTo(HasMetadata.getApiVersion( + MultiVersionCRDTestCustomResource1.class)); + assertThat(informerStoppedHandler.getResourceClassName()) + .isEqualTo(MultiVersionCRDTestCustomResource1.class.getName()); + assertThat(informerStoppedHandler.getFailedResourceVersion()) + .isEqualTo(HasMetadata.getApiVersion( + MultiVersionCRDTestCustomResource2.class)); + assertThat(informerStoppedHandler.getErrorMessage()).contains( + "Cannot deserialize value of type `int` from String \"string value\": not a valid `int` value"); + }); + assertThat( + operator + .get(MultiVersionCRDTestCustomResource2.class, CR_V2_NAME) + .getStatus()) + .isNull(); + } + + MultiVersionCRDTestCustomResource1 createTestResourceV1WithoutLabel() { MultiVersionCRDTestCustomResource1 cr = new MultiVersionCRDTestCustomResource1(); cr.setMetadata(new ObjectMeta()); From 695fb477bc5cf7c95b47ef0d66b7a71d983629e2 Mon Sep 17 00:00:00 2001 From: Chris Laprun Date: Mon, 3 Oct 2022 13:52:44 +0200 Subject: [PATCH 2/2] chore(ci): enable snapshot repository --- pom.xml | 11 +++++++++++ 1 file changed, 11 insertions(+) diff --git a/pom.xml b/pom.xml index 50f08527ee..292375c684 100644 --- a/pom.xml +++ b/pom.xml @@ -196,6 +196,17 @@ + + + ossrh + https://oss.sonatype.org/content/repositories/snapshots/ + + true + always + + + + ossrh