Skip to content

Commit 2cb616c

Browse files
authored
feat: informer related behavior on startup and in case of errors (#1571)
1 parent 1afee65 commit 2cb616c

File tree

16 files changed

+509
-11
lines changed

16 files changed

+509
-11
lines changed

.github/workflows/pr.yml

Lines changed: 9 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -53,3 +53,12 @@ jobs:
5353
driver: 'docker'
5454
- name: Run integration tests
5555
run: ./mvnw ${MAVEN_ARGS} -B package -P no-unit-tests --file pom.xml
56+
- name: Adjust Minikube Min Request Timeout Setting
57+
uses: manusa/actions-setup-minikube@v2.7.0
58+
with:
59+
minikube version: 'v1.26.0'
60+
kubernetes version: ${{ matrix.kubernetes }}
61+
driver: 'docker'
62+
start args: '--extra-config=apiserver.min-request-timeout=3'
63+
- name: Run Special Integration Tests
64+
run: ./mvnw ${MAVEN_ARGS} -B package -P minimal-watch-timeout-dependent-it --file pom.xml

operator-framework-core/src/main/java/io/javaoperatorsdk/operator/api/config/ConfigurationService.java

Lines changed: 30 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,6 @@
11
package io.javaoperatorsdk.operator.api.config;
22

3+
import java.time.Duration;
34
import java.util.Optional;
45
import java.util.Set;
56
import java.util.concurrent.ExecutorService;
@@ -152,6 +153,35 @@ default Optional<LeaderElectionConfiguration> getLeaderElectionConfiguration() {
152153
return Optional.empty();
153154
}
154155

156+
/**
157+
* <p>
158+
* if true, operator stops if there are some issues with informers
159+
* {@link io.javaoperatorsdk.operator.processing.event.source.informer.InformerEventSource} or
160+
* {@link io.javaoperatorsdk.operator.processing.event.source.controller.ControllerResourceEventSource}
161+
* on startup. Other event sources may also respect this flag.
162+
* </p>
163+
* <p>
164+
* if false, the startup will ignore recoverable errors, caused for example by RBAC issues, and
165+
* will try to reconnect periodically in the background.
166+
* </p>
167+
*/
168+
default boolean stopOnInformerErrorDuringStartup() {
169+
return true;
170+
}
171+
172+
/**
173+
* Timeout for cache sync in milliseconds. In other words source start timeout. Note that is
174+
* "stopOnInformerErrorDuringStartup" is true the operator will stop on timeout. Default is 2
175+
* minutes.
176+
*/
177+
default Duration cacheSyncTimeout() {
178+
return Duration.ofMinutes(2);
179+
}
180+
181+
/**
182+
* Handler for an informer stop. Informer stops if there is a non-recoverable error. Like received
183+
* a resource that cannot be deserialized.
184+
*/
155185
default Optional<InformerStoppedHandler> getInformerStoppedHandler() {
156186
return Optional.of((informer, ex) -> {
157187
if (ex != null) {

operator-framework-core/src/main/java/io/javaoperatorsdk/operator/api/config/ConfigurationServiceOverrider.java

Lines changed: 25 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,6 @@
11
package io.javaoperatorsdk.operator.api.config;
22

3+
import java.time.Duration;
34
import java.util.Optional;
45
import java.util.Set;
56
import java.util.concurrent.ExecutorService;
@@ -27,6 +28,8 @@ public class ConfigurationServiceOverrider {
2728
private ExecutorService workflowExecutorService;
2829
private LeaderElectionConfiguration leaderElectionConfiguration;
2930
private InformerStoppedHandler informerStoppedHandler;
31+
private Boolean stopOnInformerErrorDuringStartup;
32+
private Duration cacheSyncTimeout;
3033

3134
ConfigurationServiceOverrider(ConfigurationService original) {
3235
this.original = original;
@@ -99,6 +102,17 @@ public ConfigurationServiceOverrider withInformerStoppedHandler(InformerStoppedH
99102
return this;
100103
}
101104

105+
public ConfigurationServiceOverrider withStopOnInformerErrorDuringStartup(
106+
boolean stopOnInformerErrorDuringStartup) {
107+
this.stopOnInformerErrorDuringStartup = stopOnInformerErrorDuringStartup;
108+
return this;
109+
}
110+
111+
public ConfigurationServiceOverrider withCacheSyncTimeout(Duration cacheSyncTimeout) {
112+
this.cacheSyncTimeout = cacheSyncTimeout;
113+
return this;
114+
}
115+
102116
public ConfigurationService build() {
103117
return new BaseConfigurationService(original.getVersion(), cloner, objectMapper) {
104118
@Override
@@ -171,6 +185,17 @@ public Optional<InformerStoppedHandler> getInformerStoppedHandler() {
171185
return informerStoppedHandler != null ? Optional.of(informerStoppedHandler)
172186
: original.getInformerStoppedHandler();
173187
}
188+
189+
@Override
190+
public boolean stopOnInformerErrorDuringStartup() {
191+
return stopOnInformerErrorDuringStartup != null ? stopOnInformerErrorDuringStartup
192+
: super.stopOnInformerErrorDuringStartup();
193+
}
194+
195+
@Override
196+
public Duration cacheSyncTimeout() {
197+
return cacheSyncTimeout != null ? cacheSyncTimeout : super.cacheSyncTimeout();
198+
}
174199
};
175200
}
176201

operator-framework-core/src/main/java/io/javaoperatorsdk/operator/processing/event/source/informer/InformerWrapper.java

Lines changed: 27 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -3,6 +3,9 @@
33
import java.util.List;
44
import java.util.Map;
55
import java.util.Optional;
6+
import java.util.concurrent.ExecutionException;
7+
import java.util.concurrent.TimeUnit;
8+
import java.util.concurrent.TimeoutException;
69
import java.util.function.Function;
710
import java.util.function.Predicate;
811
import java.util.stream.Stream;
@@ -11,6 +14,7 @@
1114
import org.slf4j.LoggerFactory;
1215

1316
import io.fabric8.kubernetes.api.model.HasMetadata;
17+
import io.fabric8.kubernetes.client.informers.ExceptionHandler;
1418
import io.fabric8.kubernetes.client.informers.ResourceEventHandler;
1519
import io.fabric8.kubernetes.client.informers.SharedIndexInformer;
1620
import io.fabric8.kubernetes.client.informers.cache.Cache;
@@ -37,10 +41,9 @@ public InformerWrapper(SharedIndexInformer<T> informer) {
3741
@Override
3842
public void start() throws OperatorException {
3943
try {
40-
informer.run();
41-
44+
var configService = ConfigurationServiceProvider.instance();
4245
// register stopped handler if we have one defined
43-
ConfigurationServiceProvider.instance().getInformerStoppedHandler()
46+
configService.getInformerStoppedHandler()
4447
.ifPresent(ish -> {
4548
final var stopped = informer.stopped();
4649
if (stopped != null) {
@@ -58,6 +61,27 @@ public void start() throws OperatorException {
5861
+ fullResourceName + "/" + version);
5962
}
6063
});
64+
if (!configService.stopOnInformerErrorDuringStartup()) {
65+
informer.exceptionHandler((b, t) -> !ExceptionHandler.isDeserializationException(t));
66+
}
67+
try {
68+
var start = informer.start();
69+
// note that in case we don't put here timeout and stopOnInformerErrorDuringStartup is
70+
// false, and there is a rbac issue the get never returns; therefore operator never really
71+
// starts
72+
start.toCompletableFuture().get(configService.cacheSyncTimeout().toMillis(),
73+
TimeUnit.MILLISECONDS);
74+
} catch (TimeoutException | ExecutionException e) {
75+
if (configService.stopOnInformerErrorDuringStartup()) {
76+
log.error("Informer startup error. Operator will be stopped. Informer: {}", informer, e);
77+
throw new OperatorException(e);
78+
} else {
79+
log.warn("Informer startup error. Will periodically retry. Informer: {}", informer, e);
80+
}
81+
} catch (InterruptedException e) {
82+
Thread.currentThread().interrupt();
83+
throw new IllegalStateException(e);
84+
}
6185

6286
} catch (Exception e) {
6387
log.error("Couldn't start informer for " + versionedFullResourceName() + " resources", e);

operator-framework-core/src/test/java/io/javaoperatorsdk/operator/MockKubernetesClient.java

Lines changed: 7 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -17,9 +17,8 @@
1717
import static org.mockito.ArgumentMatchers.anyLong;
1818
import static org.mockito.ArgumentMatchers.anyString;
1919
import static org.mockito.ArgumentMatchers.nullable;
20+
import static org.mockito.Mockito.*;
2021
import static org.mockito.Mockito.doAnswer;
21-
import static org.mockito.Mockito.mock;
22-
import static org.mockito.Mockito.when;
2322

2423
public class MockKubernetesClient {
2524

@@ -44,17 +43,21 @@ public static <T extends HasMetadata> KubernetesClient client(Class<T> clazz,
4443
when(resources.inAnyNamespace()).thenReturn(inAnyNamespace);
4544
when(inAnyNamespace.withLabelSelector(nullable(String.class))).thenReturn(filterable);
4645
SharedIndexInformer<T> informer = mock(SharedIndexInformer.class);
46+
CompletableFuture<Void> informerStartRes = new CompletableFuture<>();
47+
informerStartRes.complete(null);
48+
when(informer.start()).thenReturn(informerStartRes);
4749
CompletableFuture<Void> stopped = new CompletableFuture<>();
4850
when(informer.stopped()).thenReturn(stopped);
51+
when(informer.getApiTypeClass()).thenReturn(clazz);
4952
if (informerRunBehavior != null) {
5053
doAnswer(invocation -> {
5154
try {
5255
informerRunBehavior.accept(null);
5356
} catch (Exception e) {
5457
stopped.completeExceptionally(e);
5558
}
56-
return null;
57-
}).when(informer).run();
59+
return stopped;
60+
}).when(informer).start();
5861
}
5962
doAnswer(invocation -> null).when(informer).stop();
6063
Indexer mockIndexer = mock(Indexer.class);

operator-framework-core/src/test/java/io/javaoperatorsdk/operator/processing/event/source/informer/InformerEventSourceTest.java

Lines changed: 5 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -19,6 +19,7 @@
1919
import io.javaoperatorsdk.operator.sample.simple.TestCustomResource;
2020

2121
import static io.javaoperatorsdk.operator.api.reconciler.Constants.DEFAULT_NAMESPACES_SET;
22+
import static org.junit.jupiter.api.Assertions.assertThrows;
2223
import static org.mockito.ArgumentMatchers.any;
2324
import static org.mockito.ArgumentMatchers.eq;
2425
import static org.mockito.Mockito.atLeastOnce;
@@ -255,7 +256,10 @@ void informerStoppedHandlerShouldBeCalledWhenInformerStops() {
255256
MockKubernetesClient.client(Deployment.class, unused -> {
256257
throw exception;
257258
}));
258-
informerEventSource.start();
259+
260+
// by default informer fails to start if there is an exception in the client on start.
261+
// Throws the exception further.
262+
assertThrows(RuntimeException.class, () -> informerEventSource.start());
259263
verify(informerStoppedHandler, atLeastOnce()).onStop(any(), eq(exception));
260264
} finally {
261265
ConfigurationServiceProvider.reset();

operator-framework-junit5/src/main/java/io/javaoperatorsdk/operator/junit/LocallyRunOperatorExtension.java

Lines changed: 14 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -101,6 +101,10 @@ public RegisteredController getRegisteredControllerForReconcile(
101101
return registeredControllers.get(getReconcilerOfType(type));
102102
}
103103

104+
public Operator getOperator() {
105+
return operator;
106+
}
107+
104108
@SuppressWarnings("unchecked")
105109
@Override
106110
protected void before(ExtensionContext context) {
@@ -159,12 +163,20 @@ protected void before(ExtensionContext context) {
159163
}
160164

161165
private void applyCrd(String resourceTypeName) {
166+
applyCrd(resourceTypeName, getKubernetesClient());
167+
}
168+
169+
public static void applyCrd(Class<? extends HasMetadata> resourceClass, KubernetesClient client) {
170+
applyCrd(ReconcilerUtils.getResourceTypeName(resourceClass), client);
171+
}
172+
173+
public static void applyCrd(String resourceTypeName, KubernetesClient client) {
162174
String path = "/META-INF/fabric8/" + resourceTypeName + "-v1.yml";
163-
try (InputStream is = getClass().getResourceAsStream(path)) {
175+
try (InputStream is = LocallyRunOperatorExtension.class.getResourceAsStream(path)) {
164176
if (is == null) {
165177
throw new IllegalStateException("Cannot find CRD at " + path);
166178
}
167-
final var crd = getKubernetesClient().load(is);
179+
final var crd = client.load(is);
168180
crd.createOrReplace();
169181
Thread.sleep(CRD_READY_WAIT); // readiness is not applicable for CRD, just wait a little
170182
LOGGER.debug("Applied CRD with path: {}", path);

0 commit comments

Comments
 (0)