diff --git a/docs/documentation/features.md b/docs/documentation/features.md
index 195c4ec55e..b4cbd1f2fd 100644
--- a/docs/documentation/features.md
+++ b/docs/documentation/features.md
@@ -699,6 +699,18 @@ leader left off should one of them become elected leader.
See sample configuration in the [E2E test](https://github.com/java-operator-sdk/java-operator-sdk/blob/8865302ac0346ee31f2d7b348997ec2913d5922b/sample-operators/leader-election/src/main/java/io/javaoperatorsdk/operator/sample/LeaderElectionTestOperator.java#L21-L23)
.
+## Runtime Info
+
+[RuntimeInfo](https://github.com/java-operator-sdk/java-operator-sdk/blob/main/operator-framework-core/src/main/java/io/javaoperatorsdk/operator/RuntimeInfo.java#L16-L16)
+is used mainly to check the actual health of event sources. Based on this information it is easy to implement custom
+liveness probes.
+
+[stopOnInformerErrorDuringStartup](https://github.com/java-operator-sdk/java-operator-sdk/blob/main/operator-framework-core/src/main/java/io/javaoperatorsdk/operator/api/config/ConfigurationService.java#L168-L168)
+setting, where this flag usually needs to be set to false, in order to control the exact liveness properties.
+
+See also an example implementation in the
+[WebPage sample](https://github.com/java-operator-sdk/java-operator-sdk/blob/3e2e7c4c834ef1c409d636156b988125744ca911/sample-operators/webpage/src/main/java/io/javaoperatorsdk/operator/sample/WebPageOperator.java#L38-L43)
+
## Monitoring with Micrometer
## Automatic Generation of CRDs
diff --git a/operator-framework-core/src/main/java/io/javaoperatorsdk/operator/Operator.java b/operator-framework-core/src/main/java/io/javaoperatorsdk/operator/Operator.java
index 3f09fe77ed..a33edae0cc 100644
--- a/operator-framework-core/src/main/java/io/javaoperatorsdk/operator/Operator.java
+++ b/operator-framework-core/src/main/java/io/javaoperatorsdk/operator/Operator.java
@@ -79,12 +79,11 @@ public KubernetesClient getKubernetesClient() {
* where there is no obvious entrypoint to the application which can trigger the injection process
* and start the cluster monitoring processes.
*/
- public void start() {
+ public synchronized void start() {
try {
if (started) {
return;
}
- started = true;
controllerManager.shouldStart();
final var version = ConfigurationServiceProvider.instance().getVersion();
log.info(
@@ -100,6 +99,7 @@ public void start() {
// the leader election would start subsequently the processor if on
controllerManager.start(!leaderElectionManager.isLeaderElectionEnabled());
leaderElectionManager.start();
+ started = true;
} catch (Exception e) {
log.error("Error starting operator", e);
stop();
@@ -207,4 +207,11 @@ public int getRegisteredControllersNumber() {
return controllerManager.size();
}
+ public RuntimeInfo getRuntimeInfo() {
+ return new RuntimeInfo(this);
+ }
+
+ boolean isStarted() {
+ return started;
+ }
}
diff --git a/operator-framework-core/src/main/java/io/javaoperatorsdk/operator/RegisteredController.java b/operator-framework-core/src/main/java/io/javaoperatorsdk/operator/RegisteredController.java
index 832c2df6ee..88cd0123b0 100644
--- a/operator-framework-core/src/main/java/io/javaoperatorsdk/operator/RegisteredController.java
+++ b/operator-framework-core/src/main/java/io/javaoperatorsdk/operator/RegisteredController.java
@@ -3,7 +3,12 @@
import io.fabric8.kubernetes.api.model.HasMetadata;
import io.javaoperatorsdk.operator.api.config.ControllerConfiguration;
import io.javaoperatorsdk.operator.api.config.NamespaceChangeable;
+import io.javaoperatorsdk.operator.health.ControllerHealthInfo;
public interface RegisteredController
extends NamespaceChangeable {
+
ControllerConfiguration
getConfiguration();
+
+ ControllerHealthInfo getControllerHealthInfo();
+
}
diff --git a/operator-framework-core/src/main/java/io/javaoperatorsdk/operator/RuntimeInfo.java b/operator-framework-core/src/main/java/io/javaoperatorsdk/operator/RuntimeInfo.java
new file mode 100644
index 0000000000..961e519d62
--- /dev/null
+++ b/operator-framework-core/src/main/java/io/javaoperatorsdk/operator/RuntimeInfo.java
@@ -0,0 +1,81 @@
+package io.javaoperatorsdk.operator;
+
+import java.util.*;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import io.javaoperatorsdk.operator.health.EventSourceHealthIndicator;
+import io.javaoperatorsdk.operator.health.InformerWrappingEventSourceHealthIndicator;
+
+/**
+ * RuntimeInfo in general is available when operator is fully started. You can use "isStarted" to
+ * check that.
+ */
+@SuppressWarnings("rawtypes")
+public class RuntimeInfo {
+
+ private static final Logger log = LoggerFactory.getLogger(RuntimeInfo.class);
+
+ private final Set registeredControllers;
+ private final Operator operator;
+
+ public RuntimeInfo(Operator operator) {
+ this.registeredControllers = operator.getRegisteredControllers();
+ this.operator = operator;
+ }
+
+ public boolean isStarted() {
+ return operator.isStarted();
+ }
+
+ public Set getRegisteredControllers() {
+ checkIfStarted();
+ return registeredControllers;
+ }
+
+ private void checkIfStarted() {
+ if (!isStarted()) {
+ log.warn(
+ "Operator not started yet while accessing runtime info, this might lead to an unreliable behavior");
+ }
+ }
+
+ public boolean allEventSourcesAreHealthy() {
+ checkIfStarted();
+ return registeredControllers.stream()
+ .filter(rc -> !rc.getControllerHealthInfo().unhealthyEventSources().isEmpty())
+ .findFirst().isEmpty();
+ }
+
+ /**
+ * @return Aggregated Map with controller related event sources.
+ */
+
+ public Map> unhealthyEventSources() {
+ checkIfStarted();
+ Map> res = new HashMap<>();
+ for (var rc : registeredControllers) {
+ res.put(rc.getConfiguration().getName(),
+ rc.getControllerHealthInfo().unhealthyEventSources());
+ }
+ return res;
+ }
+
+ /**
+ * @return Aggregated Map with controller related event sources that wraps an informer. Thus,
+ * either a
+ * {@link io.javaoperatorsdk.operator.processing.event.source.controller.ControllerResourceEventSource}
+ * or an
+ * {@link io.javaoperatorsdk.operator.processing.event.source.informer.InformerEventSource}.
+ */
+ public Map> unhealthyInformerWrappingEventSourceHealthIndicator() {
+ checkIfStarted();
+ Map> res = new HashMap<>();
+ for (var rc : registeredControllers) {
+ res.put(rc.getConfiguration().getName(), rc.getControllerHealthInfo()
+ .unhealthyInformerEventSourceHealthIndicators());
+ }
+ return res;
+ }
+}
diff --git a/operator-framework-core/src/main/java/io/javaoperatorsdk/operator/health/ControllerHealthInfo.java b/operator-framework-core/src/main/java/io/javaoperatorsdk/operator/health/ControllerHealthInfo.java
new file mode 100644
index 0000000000..2adb3a8508
--- /dev/null
+++ b/operator-framework-core/src/main/java/io/javaoperatorsdk/operator/health/ControllerHealthInfo.java
@@ -0,0 +1,50 @@
+package io.javaoperatorsdk.operator.health;
+
+import java.util.Map;
+import java.util.stream.Collectors;
+
+import io.javaoperatorsdk.operator.processing.event.EventSourceManager;
+
+@SuppressWarnings("rawtypes")
+public class ControllerHealthInfo {
+
+ private EventSourceManager> eventSourceManager;
+
+ public ControllerHealthInfo(EventSourceManager eventSourceManager) {
+ this.eventSourceManager = eventSourceManager;
+ }
+
+ public Map eventSourceHealthIndicators() {
+ return eventSourceManager.allEventSources().entrySet().stream()
+ .collect(Collectors.toMap(Map.Entry::getKey, Map.Entry::getValue));
+ }
+
+ public Map unhealthyEventSources() {
+ return eventSourceManager.allEventSources().entrySet().stream()
+ .filter(e -> e.getValue().getStatus() == Status.UNHEALTHY)
+ .collect(Collectors.toMap(Map.Entry::getKey, Map.Entry::getValue));
+ }
+
+ public Map informerEventSourceHealthIndicators() {
+ return eventSourceManager.allEventSources().entrySet().stream()
+ .filter(e -> e.getValue() instanceof InformerWrappingEventSourceHealthIndicator)
+ .collect(Collectors.toMap(Map.Entry::getKey,
+ e -> (InformerWrappingEventSourceHealthIndicator) e.getValue()));
+
+ }
+
+ /**
+ * @return Map with event sources that wraps an informer. Thus, either a
+ * {@link io.javaoperatorsdk.operator.processing.event.source.controller.ControllerResourceEventSource}
+ * or an
+ * {@link io.javaoperatorsdk.operator.processing.event.source.informer.InformerEventSource}.
+ */
+ public Map unhealthyInformerEventSourceHealthIndicators() {
+ return eventSourceManager.allEventSources().entrySet().stream()
+ .filter(e -> e.getValue().getStatus() == Status.UNHEALTHY)
+ .filter(e -> e.getValue() instanceof InformerWrappingEventSourceHealthIndicator)
+ .collect(Collectors.toMap(Map.Entry::getKey,
+ e -> (InformerWrappingEventSourceHealthIndicator) e.getValue()));
+ }
+
+}
diff --git a/operator-framework-core/src/main/java/io/javaoperatorsdk/operator/health/EventSourceHealthIndicator.java b/operator-framework-core/src/main/java/io/javaoperatorsdk/operator/health/EventSourceHealthIndicator.java
new file mode 100644
index 0000000000..e44fcb5b72
--- /dev/null
+++ b/operator-framework-core/src/main/java/io/javaoperatorsdk/operator/health/EventSourceHealthIndicator.java
@@ -0,0 +1,6 @@
+package io.javaoperatorsdk.operator.health;
+
+public interface EventSourceHealthIndicator {
+
+ Status getStatus();
+}
diff --git a/operator-framework-core/src/main/java/io/javaoperatorsdk/operator/health/InformerHealthIndicator.java b/operator-framework-core/src/main/java/io/javaoperatorsdk/operator/health/InformerHealthIndicator.java
new file mode 100644
index 0000000000..afd8b61bed
--- /dev/null
+++ b/operator-framework-core/src/main/java/io/javaoperatorsdk/operator/health/InformerHealthIndicator.java
@@ -0,0 +1,17 @@
+package io.javaoperatorsdk.operator.health;
+
+public interface InformerHealthIndicator extends EventSourceHealthIndicator {
+
+ boolean hasSynced();
+
+ boolean isWatching();
+
+ boolean isRunning();
+
+ @Override
+ default Status getStatus() {
+ return isRunning() && hasSynced() && isWatching() ? Status.HEALTHY : Status.UNHEALTHY;
+ }
+
+ String getTargetNamespace();
+}
diff --git a/operator-framework-core/src/main/java/io/javaoperatorsdk/operator/health/InformerWrappingEventSourceHealthIndicator.java b/operator-framework-core/src/main/java/io/javaoperatorsdk/operator/health/InformerWrappingEventSourceHealthIndicator.java
new file mode 100644
index 0000000000..5a603ad321
--- /dev/null
+++ b/operator-framework-core/src/main/java/io/javaoperatorsdk/operator/health/InformerWrappingEventSourceHealthIndicator.java
@@ -0,0 +1,22 @@
+package io.javaoperatorsdk.operator.health;
+
+import java.util.Map;
+
+import io.fabric8.kubernetes.api.model.HasMetadata;
+import io.javaoperatorsdk.operator.api.config.ResourceConfiguration;
+
+public interface InformerWrappingEventSourceHealthIndicator
+ extends EventSourceHealthIndicator {
+
+ Map informerHealthIndicators();
+
+ @Override
+ default Status getStatus() {
+ var nonUp = informerHealthIndicators().values().stream()
+ .filter(i -> i.getStatus() != Status.HEALTHY).findAny();
+
+ return nonUp.isPresent() ? Status.UNHEALTHY : Status.HEALTHY;
+ }
+
+ ResourceConfiguration getInformerConfiguration();
+}
diff --git a/operator-framework-core/src/main/java/io/javaoperatorsdk/operator/health/Status.java b/operator-framework-core/src/main/java/io/javaoperatorsdk/operator/health/Status.java
new file mode 100644
index 0000000000..d3a300b7d8
--- /dev/null
+++ b/operator-framework-core/src/main/java/io/javaoperatorsdk/operator/health/Status.java
@@ -0,0 +1,11 @@
+package io.javaoperatorsdk.operator.health;
+
+public enum Status {
+
+ HEALTHY, UNHEALTHY,
+ /**
+ * For event sources where it cannot be determined if it is healthy ot not.
+ */
+ UNKNOWN
+
+}
diff --git a/operator-framework-core/src/main/java/io/javaoperatorsdk/operator/processing/Controller.java b/operator-framework-core/src/main/java/io/javaoperatorsdk/operator/processing/Controller.java
index 5b529b9581..f17daef182 100644
--- a/operator-framework-core/src/main/java/io/javaoperatorsdk/operator/processing/Controller.java
+++ b/operator-framework-core/src/main/java/io/javaoperatorsdk/operator/processing/Controller.java
@@ -39,6 +39,7 @@
import io.javaoperatorsdk.operator.api.reconciler.dependent.EventSourceProvider;
import io.javaoperatorsdk.operator.api.reconciler.dependent.EventSourceReferencer;
import io.javaoperatorsdk.operator.api.reconciler.dependent.managed.DefaultManagedDependentResourceContext;
+import io.javaoperatorsdk.operator.health.ControllerHealthInfo;
import io.javaoperatorsdk.operator.processing.dependent.workflow.ManagedWorkflow;
import io.javaoperatorsdk.operator.processing.dependent.workflow.WorkflowCleanupResult;
import io.javaoperatorsdk.operator.processing.event.EventProcessor;
@@ -67,6 +68,7 @@ public class Controller
private final GroupVersionKind associatedGVK;
private final EventProcessor
eventProcessor;
+ private final ControllerHealthInfo controllerHealthInfo;
public Controller(Reconciler
reconciler,
ControllerConfiguration
configuration,
@@ -86,6 +88,7 @@ public Controller(Reconciler
reconciler,
eventSourceManager = new EventSourceManager<>(this);
eventProcessor = new EventProcessor<>(eventSourceManager);
eventSourceManager.postProcessDefaultEventSourcesAfterProcessorInitializer();
+ controllerHealthInfo = new ControllerHealthInfo(eventSourceManager);
}
@Override
@@ -285,6 +288,11 @@ public ControllerConfiguration
getConfiguration() {
return configuration;
}
+ @Override
+ public ControllerHealthInfo getControllerHealthInfo() {
+ return controllerHealthInfo;
+ }
+
public KubernetesClient getClient() {
return kubernetesClient;
}
diff --git a/operator-framework-core/src/main/java/io/javaoperatorsdk/operator/processing/event/EventSourceManager.java b/operator-framework-core/src/main/java/io/javaoperatorsdk/operator/processing/event/EventSourceManager.java
index d091d442f6..279eb78381 100644
--- a/operator-framework-core/src/main/java/io/javaoperatorsdk/operator/processing/event/EventSourceManager.java
+++ b/operator-framework-core/src/main/java/io/javaoperatorsdk/operator/processing/event/EventSourceManager.java
@@ -1,9 +1,6 @@
package io.javaoperatorsdk.operator.processing.event;
-import java.util.LinkedHashSet;
-import java.util.List;
-import java.util.Objects;
-import java.util.Set;
+import java.util.*;
import java.util.stream.Collectors;
import org.slf4j.Logger;
@@ -174,6 +171,11 @@ public Set getRegisteredEventSources() {
.collect(Collectors.toCollection(LinkedHashSet::new));
}
+ public Map allEventSources() {
+ return eventSources.allNamedEventSources().collect(Collectors.toMap(NamedEventSource::name,
+ NamedEventSource::original));
+ }
+
public ControllerResourceEventSource getControllerResourceEventSource() {
return eventSources.controllerResourceEventSource();
}
diff --git a/operator-framework-core/src/main/java/io/javaoperatorsdk/operator/processing/event/EventSources.java b/operator-framework-core/src/main/java/io/javaoperatorsdk/operator/processing/event/EventSources.java
index e4fabe7ff8..a4276ba8c9 100644
--- a/operator-framework-core/src/main/java/io/javaoperatorsdk/operator/processing/event/EventSources.java
+++ b/operator-framework-core/src/main/java/io/javaoperatorsdk/operator/processing/event/EventSources.java
@@ -46,6 +46,13 @@ public Stream additionalNamedEventSources() {
flatMappedSources());
}
+ public Stream allNamedEventSources() {
+ return Stream.concat(Stream.of(namedControllerResourceEventSource(),
+ new NamedEventSource(retryAndRescheduleTimerEventSource,
+ RETRY_RESCHEDULE_TIMER_EVENT_SOURCE_NAME)),
+ flatMappedSources());
+ }
+
Stream additionalEventSources() {
return Stream.concat(
Stream.of(retryEventSource()).filter(Objects::nonNull),
diff --git a/operator-framework-core/src/main/java/io/javaoperatorsdk/operator/processing/event/source/EventSource.java b/operator-framework-core/src/main/java/io/javaoperatorsdk/operator/processing/event/source/EventSource.java
index 76c8ca164d..ec2783f797 100644
--- a/operator-framework-core/src/main/java/io/javaoperatorsdk/operator/processing/event/source/EventSource.java
+++ b/operator-framework-core/src/main/java/io/javaoperatorsdk/operator/processing/event/source/EventSource.java
@@ -1,5 +1,7 @@
package io.javaoperatorsdk.operator.processing.event.source;
+import io.javaoperatorsdk.operator.health.EventSourceHealthIndicator;
+import io.javaoperatorsdk.operator.health.Status;
import io.javaoperatorsdk.operator.processing.LifecycleAware;
import io.javaoperatorsdk.operator.processing.event.EventHandler;
@@ -10,7 +12,7 @@
* your reconciler implement
* {@link io.javaoperatorsdk.operator.api.reconciler.EventSourceInitializer}.
*/
-public interface EventSource extends LifecycleAware {
+public interface EventSource extends LifecycleAware, EventSourceHealthIndicator {
/**
* Sets the {@link EventHandler} that is linked to your reconciler when this EventSource is
@@ -23,4 +25,9 @@ public interface EventSource extends LifecycleAware {
default EventSourceStartPriority priority() {
return EventSourceStartPriority.DEFAULT;
}
+
+ @Override
+ default Status getStatus() {
+ return Status.UNKNOWN;
+ }
}
diff --git a/operator-framework-core/src/main/java/io/javaoperatorsdk/operator/processing/event/source/informer/InformerManager.java b/operator-framework-core/src/main/java/io/javaoperatorsdk/operator/processing/event/source/informer/InformerManager.java
index f5f52d1c0e..c57d805e34 100644
--- a/operator-framework-core/src/main/java/io/javaoperatorsdk/operator/processing/event/source/informer/InformerManager.java
+++ b/operator-framework-core/src/main/java/io/javaoperatorsdk/operator/processing/event/source/informer/InformerManager.java
@@ -21,15 +21,17 @@
import io.javaoperatorsdk.operator.api.config.Cloner;
import io.javaoperatorsdk.operator.api.config.ConfigurationServiceProvider;
import io.javaoperatorsdk.operator.api.config.ResourceConfiguration;
+import io.javaoperatorsdk.operator.health.InformerHealthIndicator;
import io.javaoperatorsdk.operator.processing.LifecycleAware;
import io.javaoperatorsdk.operator.processing.event.ResourceID;
import io.javaoperatorsdk.operator.processing.event.source.Cache;
import io.javaoperatorsdk.operator.processing.event.source.IndexerResourceCache;
+import static io.javaoperatorsdk.operator.api.reconciler.Constants.WATCH_ALL_NAMESPACES;
+
public class InformerManager>
implements LifecycleAware, IndexerResourceCache {
- private static final String ALL_NAMESPACES_MAP_KEY = "allNamespaces";
private static final Logger log = LoggerFactory.getLogger(InformerManager.class);
private final Map> sources = new ConcurrentHashMap<>();
@@ -58,7 +60,7 @@ void initSources(MixedOperation, Resource> clien
final var filteredBySelectorClient =
client.inAnyNamespace().withLabelSelector(labelSelector);
final var source =
- createEventSource(filteredBySelectorClient, eventHandler, ALL_NAMESPACES_MAP_KEY);
+ createEventSource(filteredBySelectorClient, eventHandler, WATCH_ALL_NAMESPACES);
log.debug("Registered {} -> {} for any namespace", this, source);
} else {
targetNamespaces.forEach(
@@ -96,10 +98,11 @@ public void changeNamespaces(Set namespaces) {
private InformerWrapper createEventSource(
FilterWatchListDeletable, Resource> filteredBySelectorClient,
- ResourceEventHandler eventHandler, String key) {
- var source = new InformerWrapper<>(filteredBySelectorClient.runnableInformer(0));
+ ResourceEventHandler eventHandler, String namespaceIdentifier) {
+ var source =
+ new InformerWrapper<>(filteredBySelectorClient.runnableInformer(0), namespaceIdentifier);
source.addEventHandler(eventHandler);
- sources.put(key, source);
+ sources.put(namespaceIdentifier, source);
return source;
}
@@ -127,7 +130,7 @@ public Stream list(Predicate predicate) {
@Override
public Stream list(String namespace, Predicate predicate) {
if (isWatchingAllNamespaces()) {
- return getSource(ALL_NAMESPACES_MAP_KEY)
+ return getSource(WATCH_ALL_NAMESPACES)
.map(source -> source.list(namespace, predicate))
.orElseGet(Stream::empty);
} else {
@@ -139,7 +142,7 @@ public Stream list(String namespace, Predicate predicate) {
@Override
public Optional get(ResourceID resourceID) {
- return getSource(resourceID.getNamespace().orElse(ALL_NAMESPACES_MAP_KEY))
+ return getSource(resourceID.getNamespace().orElse(WATCH_ALL_NAMESPACES))
.flatMap(source -> source.get(resourceID))
.map(cloner::clone);
}
@@ -150,11 +153,11 @@ public Stream keys() {
}
private boolean isWatchingAllNamespaces() {
- return sources.containsKey(ALL_NAMESPACES_MAP_KEY);
+ return sources.containsKey(WATCH_ALL_NAMESPACES);
}
private Optional> getSource(String namespace) {
- namespace = isWatchingAllNamespaces() || namespace == null ? ALL_NAMESPACES_MAP_KEY : namespace;
+ namespace = isWatchingAllNamespaces() || namespace == null ? WATCH_ALL_NAMESPACES : namespace;
return Optional.ofNullable(sources.get(namespace));
}
@@ -179,4 +182,8 @@ public String toString() {
+ configuration.getEffectiveNamespaces()
+ (selector != null ? " selector: " + selector : "");
}
+
+ public Map informerHealthIndicators() {
+ return Collections.unmodifiableMap(sources);
+ }
}
diff --git a/operator-framework-core/src/main/java/io/javaoperatorsdk/operator/processing/event/source/informer/InformerWrapper.java b/operator-framework-core/src/main/java/io/javaoperatorsdk/operator/processing/event/source/informer/InformerWrapper.java
index 601cb0c10c..c121a29c9c 100644
--- a/operator-framework-core/src/main/java/io/javaoperatorsdk/operator/processing/event/source/informer/InformerWrapper.java
+++ b/operator-framework-core/src/main/java/io/javaoperatorsdk/operator/processing/event/source/informer/InformerWrapper.java
@@ -21,21 +21,25 @@
import io.javaoperatorsdk.operator.OperatorException;
import io.javaoperatorsdk.operator.ReconcilerUtils;
import io.javaoperatorsdk.operator.api.config.ConfigurationServiceProvider;
+import io.javaoperatorsdk.operator.health.InformerHealthIndicator;
import io.javaoperatorsdk.operator.processing.LifecycleAware;
import io.javaoperatorsdk.operator.processing.event.ResourceID;
import io.javaoperatorsdk.operator.processing.event.source.IndexerResourceCache;
class InformerWrapper
- implements LifecycleAware, IndexerResourceCache {
+ implements LifecycleAware, IndexerResourceCache, InformerHealthIndicator {
private static final Logger log = LoggerFactory.getLogger(InformerWrapper.class);
private final SharedIndexInformer informer;
private final Cache cache;
+ private final String namespaceIdentifier;
- public InformerWrapper(SharedIndexInformer informer) {
+ public InformerWrapper(SharedIndexInformer informer, String namespaceIdentifier) {
this.informer = informer;
+ this.namespaceIdentifier = namespaceIdentifier;
this.cache = (Cache) informer.getStore();
+
}
@Override
@@ -145,4 +149,24 @@ public List byIndex(String indexName, String indexKey) {
public String toString() {
return "InformerWrapper [" + versionedFullResourceName() + "] (" + informer + ')';
}
+
+ @Override
+ public boolean hasSynced() {
+ return informer.hasSynced();
+ }
+
+ @Override
+ public boolean isWatching() {
+ return informer.isWatching();
+ }
+
+ @Override
+ public boolean isRunning() {
+ return informer.isRunning();
+ }
+
+ @Override
+ public String getTargetNamespace() {
+ return namespaceIdentifier;
+ }
}
diff --git a/operator-framework-core/src/main/java/io/javaoperatorsdk/operator/processing/event/source/informer/ManagedInformerEventSource.java b/operator-framework-core/src/main/java/io/javaoperatorsdk/operator/processing/event/source/informer/ManagedInformerEventSource.java
index cc9af59094..51460ceea6 100644
--- a/operator-framework-core/src/main/java/io/javaoperatorsdk/operator/processing/event/source/informer/ManagedInformerEventSource.java
+++ b/operator-framework-core/src/main/java/io/javaoperatorsdk/operator/processing/event/source/informer/ManagedInformerEventSource.java
@@ -19,24 +19,29 @@
import io.javaoperatorsdk.operator.api.config.NamespaceChangeable;
import io.javaoperatorsdk.operator.api.config.ResourceConfiguration;
import io.javaoperatorsdk.operator.api.reconciler.dependent.RecentOperationCacheFiller;
+import io.javaoperatorsdk.operator.health.InformerHealthIndicator;
+import io.javaoperatorsdk.operator.health.InformerWrappingEventSourceHealthIndicator;
+import io.javaoperatorsdk.operator.health.Status;
import io.javaoperatorsdk.operator.processing.event.ResourceID;
import io.javaoperatorsdk.operator.processing.event.source.*;
public abstract class ManagedInformerEventSource>
extends AbstractResourceEventSource
implements ResourceEventHandler, Cache, IndexerResourceCache,
- RecentOperationCacheFiller,
- NamespaceChangeable {
+ RecentOperationCacheFiller, NamespaceChangeable,
+ InformerWrappingEventSourceHealthIndicator {
private static final Logger log = LoggerFactory.getLogger(ManagedInformerEventSource.class);
protected TemporaryResourceCache temporaryResourceCache = new TemporaryResourceCache<>(this);
protected InformerManager cache = new InformerManager<>();
+ protected C configuration;
protected ManagedInformerEventSource(
MixedOperation, Resource> client, C configuration) {
super(configuration.getResourceClass());
manager().initSources(client, configuration, this);
+ this.configuration = configuration;
}
@Override
@@ -133,4 +138,18 @@ public Stream list(Predicate predicate) {
return cache.list(predicate);
}
+ @Override
+ public Map informerHealthIndicators() {
+ return cache.informerHealthIndicators();
+ }
+
+ @Override
+ public Status getStatus() {
+ return InformerWrappingEventSourceHealthIndicator.super.getStatus();
+ }
+
+ @Override
+ public ResourceConfiguration getInformerConfiguration() {
+ return configuration;
+ }
}
diff --git a/operator-framework-core/src/main/java/io/javaoperatorsdk/operator/processing/event/source/polling/PerResourcePollingEventSource.java b/operator-framework-core/src/main/java/io/javaoperatorsdk/operator/processing/event/source/polling/PerResourcePollingEventSource.java
index 44bab7a624..8bf5c50fd4 100644
--- a/operator-framework-core/src/main/java/io/javaoperatorsdk/operator/processing/event/source/polling/PerResourcePollingEventSource.java
+++ b/operator-framework-core/src/main/java/io/javaoperatorsdk/operator/processing/event/source/polling/PerResourcePollingEventSource.java
@@ -40,6 +40,7 @@ public class PerResourcePollingEventSource
private final long period;
private final Set fetchedForPrimaries = ConcurrentHashMap.newKeySet();
+
public PerResourcePollingEventSource(ResourceFetcher resourceFetcher,
Cache resourceCache, long period, Class resourceClass) {
this(resourceFetcher, resourceCache, period, null, resourceClass,
@@ -152,4 +153,5 @@ public void stop() throws OperatorException {
super.stop();
timer.cancel();
}
+
}
diff --git a/operator-framework-core/src/main/java/io/javaoperatorsdk/operator/processing/event/source/polling/PollingEventSource.java b/operator-framework-core/src/main/java/io/javaoperatorsdk/operator/processing/event/source/polling/PollingEventSource.java
index 94efbf25aa..9ef889ecb6 100644
--- a/operator-framework-core/src/main/java/io/javaoperatorsdk/operator/processing/event/source/polling/PollingEventSource.java
+++ b/operator-framework-core/src/main/java/io/javaoperatorsdk/operator/processing/event/source/polling/PollingEventSource.java
@@ -1,12 +1,14 @@
package io.javaoperatorsdk.operator.processing.event.source.polling;
import java.util.*;
+import java.util.concurrent.atomic.AtomicBoolean;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import io.fabric8.kubernetes.api.model.HasMetadata;
import io.javaoperatorsdk.operator.OperatorException;
+import io.javaoperatorsdk.operator.health.Status;
import io.javaoperatorsdk.operator.processing.event.ResourceID;
import io.javaoperatorsdk.operator.processing.event.source.CacheKeyMapper;
import io.javaoperatorsdk.operator.processing.event.source.ExternalResourceCachingEventSource;
@@ -45,6 +47,7 @@ public class PollingEventSource
private final Timer timer = new Timer();
private final GenericResourceFetcher genericResourceFetcher;
private final long period;
+ private final AtomicBoolean healthy = new AtomicBoolean(true);
public PollingEventSource(
GenericResourceFetcher supplier,
@@ -73,11 +76,17 @@ public void start() throws OperatorException {
new TimerTask() {
@Override
public void run() {
- if (!isRunning()) {
- log.debug("Event source not yet started. Will not run.");
- return;
+ try {
+ if (!isRunning()) {
+ log.debug("Event source not yet started. Will not run.");
+ return;
+ }
+ getStateAndFillCache();
+ healthy.set(true);
+ } catch (RuntimeException e) {
+ healthy.set(false);
+ log.error("Error during polling.", e);
}
- getStateAndFillCache();
}
},
period,
@@ -89,7 +98,6 @@ protected synchronized void getStateAndFillCache() {
handleResources(values);
}
-
public interface GenericResourceFetcher {
Map> fetchResources();
}
@@ -99,4 +107,9 @@ public void stop() throws OperatorException {
super.stop();
timer.cancel();
}
+
+ @Override
+ public Status getStatus() {
+ return healthy.get() ? Status.HEALTHY : Status.UNHEALTHY;
+ }
}
diff --git a/operator-framework-core/src/test/java/io/javaoperatorsdk/operator/processing/event/source/polling/PollingEventSourceTest.java b/operator-framework-core/src/test/java/io/javaoperatorsdk/operator/processing/event/source/polling/PollingEventSourceTest.java
index 0a777f0cbd..605922f06b 100644
--- a/operator-framework-core/src/test/java/io/javaoperatorsdk/operator/processing/event/source/polling/PollingEventSourceTest.java
+++ b/operator-framework-core/src/test/java/io/javaoperatorsdk/operator/processing/event/source/polling/PollingEventSourceTest.java
@@ -1,5 +1,6 @@
package io.javaoperatorsdk.operator.processing.event.source.polling;
+import java.time.Duration;
import java.util.HashMap;
import java.util.Map;
import java.util.Set;
@@ -8,12 +9,15 @@
import org.junit.jupiter.api.Test;
import io.fabric8.kubernetes.api.model.HasMetadata;
+import io.javaoperatorsdk.operator.health.Status;
import io.javaoperatorsdk.operator.processing.event.EventHandler;
import io.javaoperatorsdk.operator.processing.event.ResourceID;
import io.javaoperatorsdk.operator.processing.event.source.AbstractEventSourceTestBase;
import io.javaoperatorsdk.operator.processing.event.source.SampleExternalResource;
import static io.javaoperatorsdk.operator.processing.event.source.SampleExternalResource.*;
+import static org.assertj.core.api.Assertions.assertThat;
+import static org.awaitility.Awaitility.await;
import static org.mockito.Mockito.*;
class PollingEventSourceTest
@@ -21,11 +25,12 @@ class PollingEventSourceTest
AbstractEventSourceTestBase, EventHandler> {
public static final int DEFAULT_WAIT_PERIOD = 100;
+ public static final long POLL_PERIOD = 30L;
private PollingEventSource.GenericResourceFetcher resourceFetcher =
mock(PollingEventSource.GenericResourceFetcher.class);
private final PollingEventSource pollingEventSource =
- new PollingEventSource<>(resourceFetcher, 30L, SampleExternalResource.class,
+ new PollingEventSource<>(resourceFetcher, POLL_PERIOD, SampleExternalResource.class,
(SampleExternalResource er) -> er.getName() + "#" + er.getValue());
@BeforeEach
@@ -73,6 +78,26 @@ void propagatesEventOnNewResourceForPrimary() throws InterruptedException {
verify(eventHandler, times(2)).handleEvent(any());
}
+ @Test
+ void updatesHealthIndicatorBasedOnExceptionsInFetcher() throws InterruptedException {
+ when(resourceFetcher.fetchResources())
+ .thenReturn(testResponseWithOneValue());
+ pollingEventSource.start();
+ assertThat(pollingEventSource.getStatus()).isEqualTo(Status.HEALTHY);
+
+ when(resourceFetcher.fetchResources())
+ // 2x - to make sure to catch the health indicator change
+ .thenThrow(new RuntimeException("test exception"))
+ .thenThrow(new RuntimeException("test exception"))
+ .thenReturn(testResponseWithOneValue());
+
+ await().pollInterval(Duration.ofMillis(POLL_PERIOD)).untilAsserted(
+ () -> assertThat(pollingEventSource.getStatus()).isEqualTo(Status.UNHEALTHY));
+
+ await()
+ .untilAsserted(() -> assertThat(pollingEventSource.getStatus()).isEqualTo(Status.HEALTHY));
+ }
+
private Map> testResponseWithTwoValueForSameId() {
Map> res = new HashMap<>();
res.put(primaryID1(), Set.of(testResource1(), testResource2()));
diff --git a/operator-framework/src/test/java/io/javaoperatorsdk/operator/InformerRelatedBehaviorITS.java b/operator-framework/src/test/java/io/javaoperatorsdk/operator/InformerRelatedBehaviorITS.java
index f28a6b968f..41b6497c0f 100644
--- a/operator-framework/src/test/java/io/javaoperatorsdk/operator/InformerRelatedBehaviorITS.java
+++ b/operator-framework/src/test/java/io/javaoperatorsdk/operator/InformerRelatedBehaviorITS.java
@@ -14,9 +14,12 @@
import io.fabric8.kubernetes.client.utils.KubernetesResourceUtil;
import io.javaoperatorsdk.operator.api.config.ConfigurationServiceProvider;
import io.javaoperatorsdk.operator.junit.LocallyRunOperatorExtension;
+import io.javaoperatorsdk.operator.processing.event.source.controller.ControllerResourceEventSource;
import io.javaoperatorsdk.operator.sample.informerrelatedbehavior.InformerRelatedBehaviorTestCustomResource;
import io.javaoperatorsdk.operator.sample.informerrelatedbehavior.InformerRelatedBehaviorTestReconciler;
+import static io.javaoperatorsdk.operator.sample.informerrelatedbehavior.InformerRelatedBehaviorTestReconciler.CONFIG_MAP_DEPENDENT_RESOURCE;
+import static io.javaoperatorsdk.operator.sample.informerrelatedbehavior.InformerRelatedBehaviorTestReconciler.INFORMER_RELATED_BEHAVIOR_TEST_RECONCILER;
import static org.assertj.core.api.Assertions.assertThat;
import static org.awaitility.Awaitility.await;
import static org.junit.jupiter.api.Assertions.assertThrows;
@@ -73,21 +76,25 @@ void startsUpWhenNoPermissionToCustomResource() {
adminClient.resource(testCustomResource()).createOrReplace();
setNoCustomResourceAccess();
- startOperator(false);
+ var operator = startOperator(false);
assertNotReconciled();
+ assertRuntimeInfoNoCRPermission(operator);
setFullResourcesAccess();
waitForWatchReconnect();
assertReconciled();
+ assertThat(operator.getRuntimeInfo().allEventSourcesAreHealthy()).isTrue();
}
+
@Test
void startsUpWhenNoPermissionToSecondaryResource() {
adminClient.resource(testCustomResource()).createOrReplace();
setNoConfigMapAccess();
- startOperator(false);
+ var operator = startOperator(false);
assertNotReconciled();
+ assertRuntimeInfoForSecondaryPermission(operator);
setFullResourcesAccess();
waitForWatchReconnect();
@@ -184,6 +191,40 @@ private void assertReconciled() {
});
}
+
+ private void assertRuntimeInfoNoCRPermission(Operator operator) {
+ assertThat(operator.getRuntimeInfo().allEventSourcesAreHealthy()).isFalse();
+ var unhealthyEventSources =
+ operator.getRuntimeInfo().unhealthyEventSources()
+ .get(INFORMER_RELATED_BEHAVIOR_TEST_RECONCILER);
+ assertThat(unhealthyEventSources).isNotEmpty();
+ assertThat(unhealthyEventSources.get(ControllerResourceEventSource.class.getSimpleName()))
+ .isNotNull();
+ var informerHealthIndicators = operator.getRuntimeInfo()
+ .unhealthyInformerWrappingEventSourceHealthIndicator()
+ .get(INFORMER_RELATED_BEHAVIOR_TEST_RECONCILER);
+ assertThat(informerHealthIndicators).isNotEmpty();
+ assertThat(informerHealthIndicators.get(ControllerResourceEventSource.class.getSimpleName())
+ .informerHealthIndicators())
+ .hasSize(1);
+ }
+
+ private void assertRuntimeInfoForSecondaryPermission(Operator operator) {
+ assertThat(operator.getRuntimeInfo().allEventSourcesAreHealthy()).isFalse();
+ var unhealthyEventSources =
+ operator.getRuntimeInfo().unhealthyEventSources()
+ .get(INFORMER_RELATED_BEHAVIOR_TEST_RECONCILER);
+ assertThat(unhealthyEventSources).isNotEmpty();
+ assertThat(unhealthyEventSources.get(CONFIG_MAP_DEPENDENT_RESOURCE)).isNotNull();
+ var informerHealthIndicators = operator.getRuntimeInfo()
+ .unhealthyInformerWrappingEventSourceHealthIndicator()
+ .get(INFORMER_RELATED_BEHAVIOR_TEST_RECONCILER);
+ assertThat(informerHealthIndicators).isNotEmpty();
+ assertThat(
+ informerHealthIndicators.get(CONFIG_MAP_DEPENDENT_RESOURCE).informerHealthIndicators())
+ .hasSize(1);
+ }
+
KubernetesClient clientUsingServiceAccount() {
KubernetesClient client = new KubernetesClientBuilder()
.withConfig(new ConfigBuilder()
diff --git a/operator-framework/src/test/java/io/javaoperatorsdk/operator/sample/informerrelatedbehavior/InformerRelatedBehaviorTestReconciler.java b/operator-framework/src/test/java/io/javaoperatorsdk/operator/sample/informerrelatedbehavior/InformerRelatedBehaviorTestReconciler.java
index baeff08478..13057d547a 100644
--- a/operator-framework/src/test/java/io/javaoperatorsdk/operator/sample/informerrelatedbehavior/InformerRelatedBehaviorTestReconciler.java
+++ b/operator-framework/src/test/java/io/javaoperatorsdk/operator/sample/informerrelatedbehavior/InformerRelatedBehaviorTestReconciler.java
@@ -10,10 +10,17 @@
import io.javaoperatorsdk.operator.api.reconciler.dependent.Dependent;
import io.javaoperatorsdk.operator.support.TestExecutionInfoProvider;
-@ControllerConfiguration(dependents = @Dependent(type = ConfigMapDependentResource.class))
+@ControllerConfiguration(
+ name = InformerRelatedBehaviorTestReconciler.INFORMER_RELATED_BEHAVIOR_TEST_RECONCILER,
+ dependents = @Dependent(
+ name = InformerRelatedBehaviorTestReconciler.CONFIG_MAP_DEPENDENT_RESOURCE,
+ type = ConfigMapDependentResource.class))
public class InformerRelatedBehaviorTestReconciler
implements Reconciler, TestExecutionInfoProvider {
+ public static final String INFORMER_RELATED_BEHAVIOR_TEST_RECONCILER =
+ "InformerRelatedBehaviorTestReconciler";
+ public static final String CONFIG_MAP_DEPENDENT_RESOURCE = "ConfigMapDependentResource";
private final AtomicInteger numberOfExecutions = new AtomicInteger(0);
private KubernetesClient client;
diff --git a/sample-operators/webpage/k8s/operator.yaml b/sample-operators/webpage/k8s/operator.yaml
index f4b0b027ea..d8518ab21d 100644
--- a/sample-operators/webpage/k8s/operator.yaml
+++ b/sample-operators/webpage/k8s/operator.yaml
@@ -25,18 +25,22 @@ spec:
imagePullPolicy: Never
ports:
- containerPort: 80
- readinessProbe:
+ startupProbe:
httpGet:
- path: /health
+ path: /startup
port: 8080
initialDelaySeconds: 1
+ periodSeconds: 2
timeoutSeconds: 1
+ failureThreshold: 10
livenessProbe:
httpGet:
- path: /health
+ path: /healthz
port: 8080
- initialDelaySeconds: 30
+ initialDelaySeconds: 5
timeoutSeconds: 1
+ periodSeconds: 2
+ failureThreshold: 3
---
apiVersion: rbac.authorization.k8s.io/v1
diff --git a/sample-operators/webpage/src/main/java/io/javaoperatorsdk/operator/sample/LivenessHandler.java b/sample-operators/webpage/src/main/java/io/javaoperatorsdk/operator/sample/LivenessHandler.java
new file mode 100644
index 0000000000..155fc13fec
--- /dev/null
+++ b/sample-operators/webpage/src/main/java/io/javaoperatorsdk/operator/sample/LivenessHandler.java
@@ -0,0 +1,29 @@
+package io.javaoperatorsdk.operator.sample;
+
+import java.io.IOException;
+
+import io.javaoperatorsdk.operator.Operator;
+
+import com.sun.net.httpserver.HttpExchange;
+import com.sun.net.httpserver.HttpHandler;
+
+import static io.javaoperatorsdk.operator.sample.StartupHandler.sendMessage;
+
+public class LivenessHandler implements HttpHandler {
+
+ private final Operator operator;
+
+ public LivenessHandler(Operator operator) {
+ this.operator = operator;
+ }
+
+ // custom logic can be added here based on the health of event sources
+ @Override
+ public void handle(HttpExchange httpExchange) throws IOException {
+ if (operator.getRuntimeInfo().allEventSourcesAreHealthy()) {
+ sendMessage(httpExchange, 200, "healthy");
+ } else {
+ sendMessage(httpExchange, 400, "an event source is not healthy");
+ }
+ }
+}
diff --git a/sample-operators/webpage/src/main/java/io/javaoperatorsdk/operator/sample/StartupHandler.java b/sample-operators/webpage/src/main/java/io/javaoperatorsdk/operator/sample/StartupHandler.java
new file mode 100644
index 0000000000..0cbc313273
--- /dev/null
+++ b/sample-operators/webpage/src/main/java/io/javaoperatorsdk/operator/sample/StartupHandler.java
@@ -0,0 +1,37 @@
+package io.javaoperatorsdk.operator.sample;
+
+import java.io.IOException;
+import java.nio.charset.StandardCharsets;
+
+import io.javaoperatorsdk.operator.Operator;
+
+import com.sun.net.httpserver.HttpExchange;
+import com.sun.net.httpserver.HttpHandler;
+
+public class StartupHandler implements HttpHandler {
+
+ private final Operator operator;
+
+ public StartupHandler(Operator operator) {
+ this.operator = operator;
+ }
+
+ @Override
+ public void handle(HttpExchange httpExchange) throws IOException {
+ if (operator.getRuntimeInfo().isStarted()) {
+ sendMessage(httpExchange, 200, "started");
+ } else {
+ sendMessage(httpExchange, 400, "not started yet");
+ }
+ }
+
+ public static void sendMessage(HttpExchange httpExchange, int code, String message)
+ throws IOException {
+ try (var outputStream = httpExchange.getResponseBody()) {
+ var bytes = message.getBytes(StandardCharsets.UTF_8);
+ httpExchange.sendResponseHeaders(code, bytes.length);
+ outputStream.write(bytes);
+ outputStream.flush();
+ }
+ }
+}
diff --git a/sample-operators/webpage/src/main/java/io/javaoperatorsdk/operator/sample/WebPageOperator.java b/sample-operators/webpage/src/main/java/io/javaoperatorsdk/operator/sample/WebPageOperator.java
index 9b137649fc..1261bc4116 100644
--- a/sample-operators/webpage/src/main/java/io/javaoperatorsdk/operator/sample/WebPageOperator.java
+++ b/sample-operators/webpage/src/main/java/io/javaoperatorsdk/operator/sample/WebPageOperator.java
@@ -1,17 +1,16 @@
package io.javaoperatorsdk.operator.sample;
import java.io.IOException;
+import java.net.InetSocketAddress;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
-import org.takes.facets.fork.FkRegex;
-import org.takes.facets.fork.TkFork;
-import org.takes.http.Exit;
-import org.takes.http.FtBasic;
import io.fabric8.kubernetes.client.*;
import io.javaoperatorsdk.operator.Operator;
+import com.sun.net.httpserver.HttpServer;
+
public class WebPageOperator {
public static final String WEBPAGE_RECONCILER_ENV = "WEBPAGE_RECONCILER";
public static final String WEBPAGE_CLASSIC_RECONCILER_ENV_VALUE = "classic";
@@ -23,7 +22,7 @@ public static void main(String[] args) throws IOException {
log.info("WebServer Operator starting!");
KubernetesClient client = new KubernetesClientBuilder().build();
- Operator operator = new Operator(client);
+ Operator operator = new Operator(client, o -> o.withStopOnInformerErrorDuringStartup(false));
String reconcilerEnvVar = System.getenv(WEBPAGE_RECONCILER_ENV);
if (WEBPAGE_CLASSIC_RECONCILER_ENV_VALUE.equals(reconcilerEnvVar)) {
operator.register(new WebPageReconciler(client));
@@ -36,6 +35,11 @@ public static void main(String[] args) throws IOException {
operator.installShutdownHook();
operator.start();
- new FtBasic(new TkFork(new FkRegex("/health", "ALL GOOD!")), 8080).start(Exit.NEVER);
+ HttpServer server = HttpServer.create(new InetSocketAddress(8080), 0);
+ server.createContext("/startup", new StartupHandler(operator));
+ // we want to restart the operator if something goes wrong with (maybe just some) event sources
+ server.createContext("/healthz", new LivenessHandler(operator));
+ server.setExecutor(null);
+ server.start();
}
}