From c3372acb2e9c6a100636a9a65d9aa553db767395 Mon Sep 17 00:00:00 2001 From: csviri Date: Mon, 7 Nov 2022 17:34:57 +0100 Subject: [PATCH 1/9] feat: runtime info for health probes --- .../io/javaoperatorsdk/operator/Operator.java | 8 ++++ .../operator/RegisteredController.java | 5 ++ .../javaoperatorsdk/operator/RuntimeInfo.java | 47 +++++++++++++++++++ .../operator/health/ControllerHealthInfo.java | 44 +++++++++++++++++ .../health/EventSourceHealthIndicator.java | 6 +++ .../InformerEventSourceHealthIndicator.java | 15 ++++++ .../health/InformerHealthIndicator.java | 15 ++++++ .../operator/health/Status.java | 12 +++++ .../operator/processing/Controller.java | 8 ++++ .../processing/event/EventSourceManager.java | 11 +++-- .../processing/event/EventSources.java | 7 +++ .../processing/event/source/EventSource.java | 9 +++- .../source/informer/InformerEventSource.java | 7 ++- .../source/informer/InformerManager.java | 5 ++ .../source/informer/InformerWrapper.java | 18 ++++++- .../informer/ManagedInformerEventSource.java | 15 +++++- 16 files changed, 223 insertions(+), 9 deletions(-) create mode 100644 operator-framework-core/src/main/java/io/javaoperatorsdk/operator/RuntimeInfo.java create mode 100644 operator-framework-core/src/main/java/io/javaoperatorsdk/operator/health/ControllerHealthInfo.java create mode 100644 operator-framework-core/src/main/java/io/javaoperatorsdk/operator/health/EventSourceHealthIndicator.java create mode 100644 operator-framework-core/src/main/java/io/javaoperatorsdk/operator/health/InformerEventSourceHealthIndicator.java create mode 100644 operator-framework-core/src/main/java/io/javaoperatorsdk/operator/health/InformerHealthIndicator.java create mode 100644 operator-framework-core/src/main/java/io/javaoperatorsdk/operator/health/Status.java diff --git a/operator-framework-core/src/main/java/io/javaoperatorsdk/operator/Operator.java b/operator-framework-core/src/main/java/io/javaoperatorsdk/operator/Operator.java index 3f09fe77ed..57a051d6af 100644 --- a/operator-framework-core/src/main/java/io/javaoperatorsdk/operator/Operator.java +++ b/operator-framework-core/src/main/java/io/javaoperatorsdk/operator/Operator.java @@ -1,5 +1,6 @@ package io.javaoperatorsdk.operator; +import java.util.ArrayList; import java.util.HashSet; import java.util.Optional; import java.util.Set; @@ -207,4 +208,11 @@ public int getRegisteredControllersNumber() { return controllerManager.size(); } + public RuntimeInfo getRuntimeInfo() { + return new RuntimeInfo(new ArrayList<>(controllerManager.controllers())); + } + + public boolean isStarted() { + return started; + } } diff --git a/operator-framework-core/src/main/java/io/javaoperatorsdk/operator/RegisteredController.java b/operator-framework-core/src/main/java/io/javaoperatorsdk/operator/RegisteredController.java index 832c2df6ee..88cd0123b0 100644 --- a/operator-framework-core/src/main/java/io/javaoperatorsdk/operator/RegisteredController.java +++ b/operator-framework-core/src/main/java/io/javaoperatorsdk/operator/RegisteredController.java @@ -3,7 +3,12 @@ import io.fabric8.kubernetes.api.model.HasMetadata; import io.javaoperatorsdk.operator.api.config.ControllerConfiguration; import io.javaoperatorsdk.operator.api.config.NamespaceChangeable; +import io.javaoperatorsdk.operator.health.ControllerHealthInfo; public interface RegisteredController

extends NamespaceChangeable { + ControllerConfiguration

getConfiguration(); + + ControllerHealthInfo getControllerHealthInfo(); + } diff --git a/operator-framework-core/src/main/java/io/javaoperatorsdk/operator/RuntimeInfo.java b/operator-framework-core/src/main/java/io/javaoperatorsdk/operator/RuntimeInfo.java new file mode 100644 index 0000000000..4731c7cffd --- /dev/null +++ b/operator-framework-core/src/main/java/io/javaoperatorsdk/operator/RuntimeInfo.java @@ -0,0 +1,47 @@ +package io.javaoperatorsdk.operator; + +import io.javaoperatorsdk.operator.RegisteredController; +import io.javaoperatorsdk.operator.health.EventSourceHealthIndicator; +import io.javaoperatorsdk.operator.health.InformerEventSourceHealthIndicator; +import io.javaoperatorsdk.operator.processing.event.source.EventSource; + +import java.util.HashMap; +import java.util.List; +import java.util.Map; + +@SuppressWarnings("rawtypes") +public class RuntimeInfo { + + private final List registeredControllers; + + public RuntimeInfo(List registeredControllers) { + this.registeredControllers = registeredControllers; + } + + public List getRegisteredControllers() { + return registeredControllers; + } + + public boolean allEventSourcesAreHealthy() { + return registeredControllers.stream() + .filter(rc->!rc.getControllerHealthInfo().unhealthyEventSources().isEmpty()) + .findFirst().isEmpty(); + } + + public Map> unhealthyEventSources() { + Map> res = new HashMap<>(); + for (var rc : registeredControllers) { + res.put(rc.getConfiguration().getName(),rc.getControllerHealthInfo().unhealthyEventSources()); + } + return res; + } + + public Map> unhealthyInformerEventSources() { + Map> res = new HashMap<>(); + for (var rc : registeredControllers) { + res.put(rc.getConfiguration().getName(),rc.getControllerHealthInfo() + .unhealthyInformerEventSourceHealthIndicators()); + } + return res; + } +} diff --git a/operator-framework-core/src/main/java/io/javaoperatorsdk/operator/health/ControllerHealthInfo.java b/operator-framework-core/src/main/java/io/javaoperatorsdk/operator/health/ControllerHealthInfo.java new file mode 100644 index 0000000000..9bdc25fa87 --- /dev/null +++ b/operator-framework-core/src/main/java/io/javaoperatorsdk/operator/health/ControllerHealthInfo.java @@ -0,0 +1,44 @@ +package io.javaoperatorsdk.operator.health; + +import io.javaoperatorsdk.operator.processing.event.EventSourceManager; +import io.javaoperatorsdk.operator.processing.event.source.EventSource; + +import java.util.Map; +import java.util.Set; +import java.util.stream.Collectors; +import java.util.stream.Stream; + +@SuppressWarnings("rawtypes") +public class ControllerHealthInfo { + + private EventSourceManager eventSourceManager; + + public ControllerHealthInfo(EventSourceManager eventSourceManager) { + this.eventSourceManager = eventSourceManager; + } + + public Map eventSourceHealthIndicators() { + return eventSourceManager.allEventSources().entrySet().stream() + .collect(Collectors.toMap(Map.Entry::getKey, Map.Entry::getValue)); + } + + public Map unhealthyEventSources() { + return eventSourceManager.allEventSources().entrySet().stream() + .filter(e->e.getValue().getStatus() == Status.UNHEALTHY) + .collect(Collectors.toMap(Map.Entry::getKey, Map.Entry::getValue)); + } + + public Map informerEventSourceHealthIndicators() { + return eventSourceManager.allEventSources().entrySet().stream() + .filter(e-> e.getValue() instanceof InformerEventSourceHealthIndicator) + .collect(Collectors.toMap(Map.Entry::getKey, e->(InformerEventSourceHealthIndicator)e.getValue())); + + } + public Map unhealthyInformerEventSourceHealthIndicators() { + return eventSourceManager.allEventSources().entrySet().stream() + .filter(e-> e.getValue().getStatus() == Status.UNHEALTHY) + .filter(e-> e.getValue() instanceof InformerEventSourceHealthIndicator) + .collect(Collectors.toMap(Map.Entry::getKey, e->(InformerEventSourceHealthIndicator)e.getValue())); + } + +} diff --git a/operator-framework-core/src/main/java/io/javaoperatorsdk/operator/health/EventSourceHealthIndicator.java b/operator-framework-core/src/main/java/io/javaoperatorsdk/operator/health/EventSourceHealthIndicator.java new file mode 100644 index 0000000000..2086ff3b3a --- /dev/null +++ b/operator-framework-core/src/main/java/io/javaoperatorsdk/operator/health/EventSourceHealthIndicator.java @@ -0,0 +1,6 @@ +package io.javaoperatorsdk.operator.health; + +public interface EventSourceHealthIndicator { + + Status getStatus(); +} diff --git a/operator-framework-core/src/main/java/io/javaoperatorsdk/operator/health/InformerEventSourceHealthIndicator.java b/operator-framework-core/src/main/java/io/javaoperatorsdk/operator/health/InformerEventSourceHealthIndicator.java new file mode 100644 index 0000000000..474aada404 --- /dev/null +++ b/operator-framework-core/src/main/java/io/javaoperatorsdk/operator/health/InformerEventSourceHealthIndicator.java @@ -0,0 +1,15 @@ +package io.javaoperatorsdk.operator.health; + +import java.util.Map; + +public interface InformerEventSourceHealthIndicator extends EventSourceHealthIndicator { + + Map informerHealthIndicators(); + + @Override + default Status getStatus() { + var nonUp = informerHealthIndicators().values().stream() + .filter(i->i.getStatus() != Status.HEALTHY).findAny(); + return nonUp.isPresent() ? Status.UNHEALTHY : Status.HEALTHY; + } +} diff --git a/operator-framework-core/src/main/java/io/javaoperatorsdk/operator/health/InformerHealthIndicator.java b/operator-framework-core/src/main/java/io/javaoperatorsdk/operator/health/InformerHealthIndicator.java new file mode 100644 index 0000000000..074a0fd40d --- /dev/null +++ b/operator-framework-core/src/main/java/io/javaoperatorsdk/operator/health/InformerHealthIndicator.java @@ -0,0 +1,15 @@ +package io.javaoperatorsdk.operator.health; + +public interface InformerHealthIndicator extends EventSourceHealthIndicator { + + boolean hasSynced(); + + boolean isWatching(); + + boolean isRunning(); + + @Override + default Status getStatus() { + return isRunning() && hasSynced() && isWatching() ? Status.HEALTHY : Status.UNHEALTHY; + } +} diff --git a/operator-framework-core/src/main/java/io/javaoperatorsdk/operator/health/Status.java b/operator-framework-core/src/main/java/io/javaoperatorsdk/operator/health/Status.java new file mode 100644 index 0000000000..160821d725 --- /dev/null +++ b/operator-framework-core/src/main/java/io/javaoperatorsdk/operator/health/Status.java @@ -0,0 +1,12 @@ +package io.javaoperatorsdk.operator.health; + +public enum Status { + + HEALTHY, + UNHEALTHY, + /** + * For event sources where it cannot be determined if it is healthy ot not. + */ + UNKNOWN + +} diff --git a/operator-framework-core/src/main/java/io/javaoperatorsdk/operator/processing/Controller.java b/operator-framework-core/src/main/java/io/javaoperatorsdk/operator/processing/Controller.java index 5b529b9581..7d37be3e63 100644 --- a/operator-framework-core/src/main/java/io/javaoperatorsdk/operator/processing/Controller.java +++ b/operator-framework-core/src/main/java/io/javaoperatorsdk/operator/processing/Controller.java @@ -7,6 +7,7 @@ import java.util.Optional; import java.util.Set; +import io.javaoperatorsdk.operator.health.ControllerHealthInfo; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -67,6 +68,7 @@ public class Controller

private final GroupVersionKind associatedGVK; private final EventProcessor

eventProcessor; + private final ControllerHealthInfo controllerHealthInfo; public Controller(Reconciler

reconciler, ControllerConfiguration

configuration, @@ -86,6 +88,7 @@ public Controller(Reconciler

reconciler, eventSourceManager = new EventSourceManager<>(this); eventProcessor = new EventProcessor<>(eventSourceManager); eventSourceManager.postProcessDefaultEventSourcesAfterProcessorInitializer(); + controllerHealthInfo = new ControllerHealthInfo(eventSourceManager); } @Override @@ -285,6 +288,11 @@ public ControllerConfiguration

getConfiguration() { return configuration; } + @Override + public ControllerHealthInfo getControllerHealthInfo() { + return controllerHealthInfo; + } + public KubernetesClient getClient() { return kubernetesClient; } diff --git a/operator-framework-core/src/main/java/io/javaoperatorsdk/operator/processing/event/EventSourceManager.java b/operator-framework-core/src/main/java/io/javaoperatorsdk/operator/processing/event/EventSourceManager.java index d091d442f6..e75425078a 100644 --- a/operator-framework-core/src/main/java/io/javaoperatorsdk/operator/processing/event/EventSourceManager.java +++ b/operator-framework-core/src/main/java/io/javaoperatorsdk/operator/processing/event/EventSourceManager.java @@ -1,10 +1,8 @@ package io.javaoperatorsdk.operator.processing.event; -import java.util.LinkedHashSet; -import java.util.List; -import java.util.Objects; -import java.util.Set; +import java.util.*; import java.util.stream.Collectors; +import java.util.stream.Stream; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -174,6 +172,11 @@ public Set getRegisteredEventSources() { .collect(Collectors.toCollection(LinkedHashSet::new)); } + public Map allEventSources() { + return eventSources.allNamedEventSources().collect(Collectors.toMap(NamedEventSource::name, + NamedEventSource::original)); + } + public ControllerResourceEventSource

getControllerResourceEventSource() { return eventSources.controllerResourceEventSource(); } diff --git a/operator-framework-core/src/main/java/io/javaoperatorsdk/operator/processing/event/EventSources.java b/operator-framework-core/src/main/java/io/javaoperatorsdk/operator/processing/event/EventSources.java index e4fabe7ff8..95f371cbc2 100644 --- a/operator-framework-core/src/main/java/io/javaoperatorsdk/operator/processing/event/EventSources.java +++ b/operator-framework-core/src/main/java/io/javaoperatorsdk/operator/processing/event/EventSources.java @@ -46,6 +46,13 @@ public Stream additionalNamedEventSources() { flatMappedSources()); } + public Stream allNamedEventSources() { + return Stream.concat(Stream.of(namedControllerResourceEventSource(), + new NamedEventSource(retryAndRescheduleTimerEventSource, + RETRY_RESCHEDULE_TIMER_EVENT_SOURCE_NAME)), + flatMappedSources()); + } + Stream additionalEventSources() { return Stream.concat( Stream.of(retryEventSource()).filter(Objects::nonNull), diff --git a/operator-framework-core/src/main/java/io/javaoperatorsdk/operator/processing/event/source/EventSource.java b/operator-framework-core/src/main/java/io/javaoperatorsdk/operator/processing/event/source/EventSource.java index 76c8ca164d..ec2783f797 100644 --- a/operator-framework-core/src/main/java/io/javaoperatorsdk/operator/processing/event/source/EventSource.java +++ b/operator-framework-core/src/main/java/io/javaoperatorsdk/operator/processing/event/source/EventSource.java @@ -1,5 +1,7 @@ package io.javaoperatorsdk.operator.processing.event.source; +import io.javaoperatorsdk.operator.health.EventSourceHealthIndicator; +import io.javaoperatorsdk.operator.health.Status; import io.javaoperatorsdk.operator.processing.LifecycleAware; import io.javaoperatorsdk.operator.processing.event.EventHandler; @@ -10,7 +12,7 @@ * your reconciler implement * {@link io.javaoperatorsdk.operator.api.reconciler.EventSourceInitializer}. */ -public interface EventSource extends LifecycleAware { +public interface EventSource extends LifecycleAware, EventSourceHealthIndicator { /** * Sets the {@link EventHandler} that is linked to your reconciler when this EventSource is @@ -23,4 +25,9 @@ public interface EventSource extends LifecycleAware { default EventSourceStartPriority priority() { return EventSourceStartPriority.DEFAULT; } + + @Override + default Status getStatus() { + return Status.UNKNOWN; + } } diff --git a/operator-framework-core/src/main/java/io/javaoperatorsdk/operator/processing/event/source/informer/InformerEventSource.java b/operator-framework-core/src/main/java/io/javaoperatorsdk/operator/processing/event/source/informer/InformerEventSource.java index 2b6dc8e7f8..95fa6486f9 100644 --- a/operator-framework-core/src/main/java/io/javaoperatorsdk/operator/processing/event/source/informer/InformerEventSource.java +++ b/operator-framework-core/src/main/java/io/javaoperatorsdk/operator/processing/event/source/informer/InformerEventSource.java @@ -1,9 +1,13 @@ package io.javaoperatorsdk.operator.processing.event.source.informer; +import java.util.Map; import java.util.Optional; import java.util.Set; import java.util.stream.Collectors; +import io.javaoperatorsdk.operator.health.InformerEventSourceHealthIndicator; +import io.javaoperatorsdk.operator.health.InformerHealthIndicator; +import io.javaoperatorsdk.operator.health.Status; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -66,7 +70,8 @@ */ public class InformerEventSource extends ManagedInformerEventSource> - implements ResourceEventHandler, RecentOperationEventFilter { + implements ResourceEventHandler, RecentOperationEventFilter +{ private static final Logger log = LoggerFactory.getLogger(InformerEventSource.class); diff --git a/operator-framework-core/src/main/java/io/javaoperatorsdk/operator/processing/event/source/informer/InformerManager.java b/operator-framework-core/src/main/java/io/javaoperatorsdk/operator/processing/event/source/informer/InformerManager.java index f5f52d1c0e..4b47caea33 100644 --- a/operator-framework-core/src/main/java/io/javaoperatorsdk/operator/processing/event/source/informer/InformerManager.java +++ b/operator-framework-core/src/main/java/io/javaoperatorsdk/operator/processing/event/source/informer/InformerManager.java @@ -7,6 +7,7 @@ import java.util.stream.Collectors; import java.util.stream.Stream; +import io.javaoperatorsdk.operator.health.InformerHealthIndicator; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -179,4 +180,8 @@ public String toString() { + configuration.getEffectiveNamespaces() + (selector != null ? " selector: " + selector : ""); } + + public Map informerHealthIndicators() { + return Collections.unmodifiableMap(sources); + } } diff --git a/operator-framework-core/src/main/java/io/javaoperatorsdk/operator/processing/event/source/informer/InformerWrapper.java b/operator-framework-core/src/main/java/io/javaoperatorsdk/operator/processing/event/source/informer/InformerWrapper.java index 601cb0c10c..706bc942b4 100644 --- a/operator-framework-core/src/main/java/io/javaoperatorsdk/operator/processing/event/source/informer/InformerWrapper.java +++ b/operator-framework-core/src/main/java/io/javaoperatorsdk/operator/processing/event/source/informer/InformerWrapper.java @@ -10,6 +10,7 @@ import java.util.function.Predicate; import java.util.stream.Stream; +import io.javaoperatorsdk.operator.health.InformerHealthIndicator; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -26,7 +27,7 @@ import io.javaoperatorsdk.operator.processing.event.source.IndexerResourceCache; class InformerWrapper - implements LifecycleAware, IndexerResourceCache { + implements LifecycleAware, IndexerResourceCache, InformerHealthIndicator { private static final Logger log = LoggerFactory.getLogger(InformerWrapper.class); @@ -145,4 +146,19 @@ public List byIndex(String indexName, String indexKey) { public String toString() { return "InformerWrapper [" + versionedFullResourceName() + "] (" + informer + ')'; } + + @Override + public boolean hasSynced() { + return informer.hasSynced(); + } + + @Override + public boolean isWatching() { + return informer.isWatching(); + } + + @Override + public boolean isRunning() { + return informer.isRunning(); + } } diff --git a/operator-framework-core/src/main/java/io/javaoperatorsdk/operator/processing/event/source/informer/ManagedInformerEventSource.java b/operator-framework-core/src/main/java/io/javaoperatorsdk/operator/processing/event/source/informer/ManagedInformerEventSource.java index cc9af59094..726bbe7381 100644 --- a/operator-framework-core/src/main/java/io/javaoperatorsdk/operator/processing/event/source/informer/ManagedInformerEventSource.java +++ b/operator-framework-core/src/main/java/io/javaoperatorsdk/operator/processing/event/source/informer/ManagedInformerEventSource.java @@ -8,6 +8,9 @@ import java.util.function.Predicate; import java.util.stream.Stream; +import io.javaoperatorsdk.operator.health.InformerEventSourceHealthIndicator; +import io.javaoperatorsdk.operator.health.InformerHealthIndicator; +import io.javaoperatorsdk.operator.health.Status; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -25,8 +28,7 @@ public abstract class ManagedInformerEventSource> extends AbstractResourceEventSource implements ResourceEventHandler, Cache, IndexerResourceCache, - RecentOperationCacheFiller, - NamespaceChangeable { + RecentOperationCacheFiller, NamespaceChangeable, InformerEventSourceHealthIndicator { private static final Logger log = LoggerFactory.getLogger(ManagedInformerEventSource.class); @@ -133,4 +135,13 @@ public Stream list(Predicate predicate) { return cache.list(predicate); } + @Override + public Map informerHealthIndicators() { + return cache.informerHealthIndicators(); + } + + @Override + public Status getStatus() { + return InformerEventSourceHealthIndicator.super.getStatus(); + } } From a727960b8d65ae10347f10b3dc07bb8070cdf04f Mon Sep 17 00:00:00 2001 From: csviri Date: Mon, 7 Nov 2022 17:53:34 +0100 Subject: [PATCH 2/9] wip --- .../io/javaoperatorsdk/operator/Operator.java | 6 +- .../javaoperatorsdk/operator/RuntimeInfo.java | 61 ++++++++--------- .../operator/health/ControllerHealthInfo.java | 68 +++++++++---------- .../health/EventSourceHealthIndicator.java | 2 +- .../InformerEventSourceHealthIndicator.java | 16 ++--- .../health/InformerHealthIndicator.java | 14 ++-- .../operator/health/Status.java | 11 ++- .../operator/processing/Controller.java | 2 +- .../processing/event/EventSourceManager.java | 5 +- .../processing/event/EventSources.java | 6 +- .../source/informer/InformerEventSource.java | 7 +- .../source/informer/InformerManager.java | 2 +- .../source/informer/InformerWrapper.java | 2 +- .../informer/ManagedInformerEventSource.java | 6 +- .../PerResourcePollingEventSource.java | 22 ++++-- 15 files changed, 118 insertions(+), 112 deletions(-) diff --git a/operator-framework-core/src/main/java/io/javaoperatorsdk/operator/Operator.java b/operator-framework-core/src/main/java/io/javaoperatorsdk/operator/Operator.java index 57a051d6af..b1cebda283 100644 --- a/operator-framework-core/src/main/java/io/javaoperatorsdk/operator/Operator.java +++ b/operator-framework-core/src/main/java/io/javaoperatorsdk/operator/Operator.java @@ -208,9 +208,9 @@ public int getRegisteredControllersNumber() { return controllerManager.size(); } - public RuntimeInfo getRuntimeInfo() { - return new RuntimeInfo(new ArrayList<>(controllerManager.controllers())); - } + public RuntimeInfo getRuntimeInfo() { + return new RuntimeInfo(new ArrayList<>(controllerManager.controllers())); + } public boolean isStarted() { return started; diff --git a/operator-framework-core/src/main/java/io/javaoperatorsdk/operator/RuntimeInfo.java b/operator-framework-core/src/main/java/io/javaoperatorsdk/operator/RuntimeInfo.java index 4731c7cffd..05b5e3860f 100644 --- a/operator-framework-core/src/main/java/io/javaoperatorsdk/operator/RuntimeInfo.java +++ b/operator-framework-core/src/main/java/io/javaoperatorsdk/operator/RuntimeInfo.java @@ -1,47 +1,46 @@ package io.javaoperatorsdk.operator; -import io.javaoperatorsdk.operator.RegisteredController; -import io.javaoperatorsdk.operator.health.EventSourceHealthIndicator; -import io.javaoperatorsdk.operator.health.InformerEventSourceHealthIndicator; -import io.javaoperatorsdk.operator.processing.event.source.EventSource; - import java.util.HashMap; import java.util.List; import java.util.Map; +import io.javaoperatorsdk.operator.health.EventSourceHealthIndicator; +import io.javaoperatorsdk.operator.health.InformerEventSourceHealthIndicator; + @SuppressWarnings("rawtypes") public class RuntimeInfo { - private final List registeredControllers; + private final List registeredControllers; - public RuntimeInfo(List registeredControllers) { - this.registeredControllers = registeredControllers; - } + public RuntimeInfo(List registeredControllers) { + this.registeredControllers = registeredControllers; + } - public List getRegisteredControllers() { - return registeredControllers; - } + public List getRegisteredControllers() { + return registeredControllers; + } - public boolean allEventSourcesAreHealthy() { - return registeredControllers.stream() - .filter(rc->!rc.getControllerHealthInfo().unhealthyEventSources().isEmpty()) - .findFirst().isEmpty(); - } + public boolean allEventSourcesAreHealthy() { + return registeredControllers.stream() + .filter(rc -> !rc.getControllerHealthInfo().unhealthyEventSources().isEmpty()) + .findFirst().isEmpty(); + } - public Map> unhealthyEventSources() { - Map> res = new HashMap<>(); - for (var rc : registeredControllers) { - res.put(rc.getConfiguration().getName(),rc.getControllerHealthInfo().unhealthyEventSources()); - } - return res; + public Map> unhealthyEventSources() { + Map> res = new HashMap<>(); + for (var rc : registeredControllers) { + res.put(rc.getConfiguration().getName(), + rc.getControllerHealthInfo().unhealthyEventSources()); } - - public Map> unhealthyInformerEventSources() { - Map> res = new HashMap<>(); - for (var rc : registeredControllers) { - res.put(rc.getConfiguration().getName(),rc.getControllerHealthInfo() - .unhealthyInformerEventSourceHealthIndicators()); - } - return res; + return res; + } + + public Map> unhealthyInformerEventSources() { + Map> res = new HashMap<>(); + for (var rc : registeredControllers) { + res.put(rc.getConfiguration().getName(), rc.getControllerHealthInfo() + .unhealthyInformerEventSourceHealthIndicators()); } + return res; + } } diff --git a/operator-framework-core/src/main/java/io/javaoperatorsdk/operator/health/ControllerHealthInfo.java b/operator-framework-core/src/main/java/io/javaoperatorsdk/operator/health/ControllerHealthInfo.java index 9bdc25fa87..3702df04fa 100644 --- a/operator-framework-core/src/main/java/io/javaoperatorsdk/operator/health/ControllerHealthInfo.java +++ b/operator-framework-core/src/main/java/io/javaoperatorsdk/operator/health/ControllerHealthInfo.java @@ -1,44 +1,44 @@ package io.javaoperatorsdk.operator.health; -import io.javaoperatorsdk.operator.processing.event.EventSourceManager; -import io.javaoperatorsdk.operator.processing.event.source.EventSource; - import java.util.Map; -import java.util.Set; import java.util.stream.Collectors; -import java.util.stream.Stream; + +import io.javaoperatorsdk.operator.processing.event.EventSourceManager; @SuppressWarnings("rawtypes") public class ControllerHealthInfo { - private EventSourceManager eventSourceManager; - - public ControllerHealthInfo(EventSourceManager eventSourceManager) { - this.eventSourceManager = eventSourceManager; - } - - public Map eventSourceHealthIndicators() { - return eventSourceManager.allEventSources().entrySet().stream() - .collect(Collectors.toMap(Map.Entry::getKey, Map.Entry::getValue)); - } - - public Map unhealthyEventSources() { - return eventSourceManager.allEventSources().entrySet().stream() - .filter(e->e.getValue().getStatus() == Status.UNHEALTHY) - .collect(Collectors.toMap(Map.Entry::getKey, Map.Entry::getValue)); - } - - public Map informerEventSourceHealthIndicators() { - return eventSourceManager.allEventSources().entrySet().stream() - .filter(e-> e.getValue() instanceof InformerEventSourceHealthIndicator) - .collect(Collectors.toMap(Map.Entry::getKey, e->(InformerEventSourceHealthIndicator)e.getValue())); - - } - public Map unhealthyInformerEventSourceHealthIndicators() { - return eventSourceManager.allEventSources().entrySet().stream() - .filter(e-> e.getValue().getStatus() == Status.UNHEALTHY) - .filter(e-> e.getValue() instanceof InformerEventSourceHealthIndicator) - .collect(Collectors.toMap(Map.Entry::getKey, e->(InformerEventSourceHealthIndicator)e.getValue())); - } + private EventSourceManager eventSourceManager; + + public ControllerHealthInfo(EventSourceManager eventSourceManager) { + this.eventSourceManager = eventSourceManager; + } + + public Map eventSourceHealthIndicators() { + return eventSourceManager.allEventSources().entrySet().stream() + .collect(Collectors.toMap(Map.Entry::getKey, Map.Entry::getValue)); + } + + public Map unhealthyEventSources() { + return eventSourceManager.allEventSources().entrySet().stream() + .filter(e -> e.getValue().getStatus() == Status.UNHEALTHY) + .collect(Collectors.toMap(Map.Entry::getKey, Map.Entry::getValue)); + } + + public Map informerEventSourceHealthIndicators() { + return eventSourceManager.allEventSources().entrySet().stream() + .filter(e -> e.getValue() instanceof InformerEventSourceHealthIndicator) + .collect(Collectors.toMap(Map.Entry::getKey, + e -> (InformerEventSourceHealthIndicator) e.getValue())); + + } + + public Map unhealthyInformerEventSourceHealthIndicators() { + return eventSourceManager.allEventSources().entrySet().stream() + .filter(e -> e.getValue().getStatus() == Status.UNHEALTHY) + .filter(e -> e.getValue() instanceof InformerEventSourceHealthIndicator) + .collect(Collectors.toMap(Map.Entry::getKey, + e -> (InformerEventSourceHealthIndicator) e.getValue())); + } } diff --git a/operator-framework-core/src/main/java/io/javaoperatorsdk/operator/health/EventSourceHealthIndicator.java b/operator-framework-core/src/main/java/io/javaoperatorsdk/operator/health/EventSourceHealthIndicator.java index 2086ff3b3a..e44fcb5b72 100644 --- a/operator-framework-core/src/main/java/io/javaoperatorsdk/operator/health/EventSourceHealthIndicator.java +++ b/operator-framework-core/src/main/java/io/javaoperatorsdk/operator/health/EventSourceHealthIndicator.java @@ -2,5 +2,5 @@ public interface EventSourceHealthIndicator { - Status getStatus(); + Status getStatus(); } diff --git a/operator-framework-core/src/main/java/io/javaoperatorsdk/operator/health/InformerEventSourceHealthIndicator.java b/operator-framework-core/src/main/java/io/javaoperatorsdk/operator/health/InformerEventSourceHealthIndicator.java index 474aada404..eb03bc41ba 100644 --- a/operator-framework-core/src/main/java/io/javaoperatorsdk/operator/health/InformerEventSourceHealthIndicator.java +++ b/operator-framework-core/src/main/java/io/javaoperatorsdk/operator/health/InformerEventSourceHealthIndicator.java @@ -4,12 +4,12 @@ public interface InformerEventSourceHealthIndicator extends EventSourceHealthIndicator { - Map informerHealthIndicators(); - - @Override - default Status getStatus() { - var nonUp = informerHealthIndicators().values().stream() - .filter(i->i.getStatus() != Status.HEALTHY).findAny(); - return nonUp.isPresent() ? Status.UNHEALTHY : Status.HEALTHY; - } + Map informerHealthIndicators(); + + @Override + default Status getStatus() { + var nonUp = informerHealthIndicators().values().stream() + .filter(i -> i.getStatus() != Status.HEALTHY).findAny(); + return nonUp.isPresent() ? Status.UNHEALTHY : Status.HEALTHY; + } } diff --git a/operator-framework-core/src/main/java/io/javaoperatorsdk/operator/health/InformerHealthIndicator.java b/operator-framework-core/src/main/java/io/javaoperatorsdk/operator/health/InformerHealthIndicator.java index 074a0fd40d..f658fa1d87 100644 --- a/operator-framework-core/src/main/java/io/javaoperatorsdk/operator/health/InformerHealthIndicator.java +++ b/operator-framework-core/src/main/java/io/javaoperatorsdk/operator/health/InformerHealthIndicator.java @@ -2,14 +2,14 @@ public interface InformerHealthIndicator extends EventSourceHealthIndicator { - boolean hasSynced(); + boolean hasSynced(); - boolean isWatching(); + boolean isWatching(); - boolean isRunning(); + boolean isRunning(); - @Override - default Status getStatus() { - return isRunning() && hasSynced() && isWatching() ? Status.HEALTHY : Status.UNHEALTHY; - } + @Override + default Status getStatus() { + return isRunning() && hasSynced() && isWatching() ? Status.HEALTHY : Status.UNHEALTHY; + } } diff --git a/operator-framework-core/src/main/java/io/javaoperatorsdk/operator/health/Status.java b/operator-framework-core/src/main/java/io/javaoperatorsdk/operator/health/Status.java index 160821d725..d3a300b7d8 100644 --- a/operator-framework-core/src/main/java/io/javaoperatorsdk/operator/health/Status.java +++ b/operator-framework-core/src/main/java/io/javaoperatorsdk/operator/health/Status.java @@ -2,11 +2,10 @@ public enum Status { - HEALTHY, - UNHEALTHY, - /** - * For event sources where it cannot be determined if it is healthy ot not. - */ - UNKNOWN + HEALTHY, UNHEALTHY, + /** + * For event sources where it cannot be determined if it is healthy ot not. + */ + UNKNOWN } diff --git a/operator-framework-core/src/main/java/io/javaoperatorsdk/operator/processing/Controller.java b/operator-framework-core/src/main/java/io/javaoperatorsdk/operator/processing/Controller.java index 7d37be3e63..f17daef182 100644 --- a/operator-framework-core/src/main/java/io/javaoperatorsdk/operator/processing/Controller.java +++ b/operator-framework-core/src/main/java/io/javaoperatorsdk/operator/processing/Controller.java @@ -7,7 +7,6 @@ import java.util.Optional; import java.util.Set; -import io.javaoperatorsdk.operator.health.ControllerHealthInfo; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -40,6 +39,7 @@ import io.javaoperatorsdk.operator.api.reconciler.dependent.EventSourceProvider; import io.javaoperatorsdk.operator.api.reconciler.dependent.EventSourceReferencer; import io.javaoperatorsdk.operator.api.reconciler.dependent.managed.DefaultManagedDependentResourceContext; +import io.javaoperatorsdk.operator.health.ControllerHealthInfo; import io.javaoperatorsdk.operator.processing.dependent.workflow.ManagedWorkflow; import io.javaoperatorsdk.operator.processing.dependent.workflow.WorkflowCleanupResult; import io.javaoperatorsdk.operator.processing.event.EventProcessor; diff --git a/operator-framework-core/src/main/java/io/javaoperatorsdk/operator/processing/event/EventSourceManager.java b/operator-framework-core/src/main/java/io/javaoperatorsdk/operator/processing/event/EventSourceManager.java index e75425078a..279eb78381 100644 --- a/operator-framework-core/src/main/java/io/javaoperatorsdk/operator/processing/event/EventSourceManager.java +++ b/operator-framework-core/src/main/java/io/javaoperatorsdk/operator/processing/event/EventSourceManager.java @@ -2,7 +2,6 @@ import java.util.*; import java.util.stream.Collectors; -import java.util.stream.Stream; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -172,9 +171,9 @@ public Set getRegisteredEventSources() { .collect(Collectors.toCollection(LinkedHashSet::new)); } - public Map allEventSources() { + public Map allEventSources() { return eventSources.allNamedEventSources().collect(Collectors.toMap(NamedEventSource::name, - NamedEventSource::original)); + NamedEventSource::original)); } public ControllerResourceEventSource

getControllerResourceEventSource() { diff --git a/operator-framework-core/src/main/java/io/javaoperatorsdk/operator/processing/event/EventSources.java b/operator-framework-core/src/main/java/io/javaoperatorsdk/operator/processing/event/EventSources.java index 95f371cbc2..a4276ba8c9 100644 --- a/operator-framework-core/src/main/java/io/javaoperatorsdk/operator/processing/event/EventSources.java +++ b/operator-framework-core/src/main/java/io/javaoperatorsdk/operator/processing/event/EventSources.java @@ -48,9 +48,9 @@ public Stream additionalNamedEventSources() { public Stream allNamedEventSources() { return Stream.concat(Stream.of(namedControllerResourceEventSource(), - new NamedEventSource(retryAndRescheduleTimerEventSource, - RETRY_RESCHEDULE_TIMER_EVENT_SOURCE_NAME)), - flatMappedSources()); + new NamedEventSource(retryAndRescheduleTimerEventSource, + RETRY_RESCHEDULE_TIMER_EVENT_SOURCE_NAME)), + flatMappedSources()); } Stream additionalEventSources() { diff --git a/operator-framework-core/src/main/java/io/javaoperatorsdk/operator/processing/event/source/informer/InformerEventSource.java b/operator-framework-core/src/main/java/io/javaoperatorsdk/operator/processing/event/source/informer/InformerEventSource.java index 95fa6486f9..2b6dc8e7f8 100644 --- a/operator-framework-core/src/main/java/io/javaoperatorsdk/operator/processing/event/source/informer/InformerEventSource.java +++ b/operator-framework-core/src/main/java/io/javaoperatorsdk/operator/processing/event/source/informer/InformerEventSource.java @@ -1,13 +1,9 @@ package io.javaoperatorsdk.operator.processing.event.source.informer; -import java.util.Map; import java.util.Optional; import java.util.Set; import java.util.stream.Collectors; -import io.javaoperatorsdk.operator.health.InformerEventSourceHealthIndicator; -import io.javaoperatorsdk.operator.health.InformerHealthIndicator; -import io.javaoperatorsdk.operator.health.Status; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -70,8 +66,7 @@ */ public class InformerEventSource extends ManagedInformerEventSource> - implements ResourceEventHandler, RecentOperationEventFilter -{ + implements ResourceEventHandler, RecentOperationEventFilter { private static final Logger log = LoggerFactory.getLogger(InformerEventSource.class); diff --git a/operator-framework-core/src/main/java/io/javaoperatorsdk/operator/processing/event/source/informer/InformerManager.java b/operator-framework-core/src/main/java/io/javaoperatorsdk/operator/processing/event/source/informer/InformerManager.java index 4b47caea33..52a18bcae1 100644 --- a/operator-framework-core/src/main/java/io/javaoperatorsdk/operator/processing/event/source/informer/InformerManager.java +++ b/operator-framework-core/src/main/java/io/javaoperatorsdk/operator/processing/event/source/informer/InformerManager.java @@ -7,7 +7,6 @@ import java.util.stream.Collectors; import java.util.stream.Stream; -import io.javaoperatorsdk.operator.health.InformerHealthIndicator; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -22,6 +21,7 @@ import io.javaoperatorsdk.operator.api.config.Cloner; import io.javaoperatorsdk.operator.api.config.ConfigurationServiceProvider; import io.javaoperatorsdk.operator.api.config.ResourceConfiguration; +import io.javaoperatorsdk.operator.health.InformerHealthIndicator; import io.javaoperatorsdk.operator.processing.LifecycleAware; import io.javaoperatorsdk.operator.processing.event.ResourceID; import io.javaoperatorsdk.operator.processing.event.source.Cache; diff --git a/operator-framework-core/src/main/java/io/javaoperatorsdk/operator/processing/event/source/informer/InformerWrapper.java b/operator-framework-core/src/main/java/io/javaoperatorsdk/operator/processing/event/source/informer/InformerWrapper.java index 706bc942b4..51a795d591 100644 --- a/operator-framework-core/src/main/java/io/javaoperatorsdk/operator/processing/event/source/informer/InformerWrapper.java +++ b/operator-framework-core/src/main/java/io/javaoperatorsdk/operator/processing/event/source/informer/InformerWrapper.java @@ -10,7 +10,6 @@ import java.util.function.Predicate; import java.util.stream.Stream; -import io.javaoperatorsdk.operator.health.InformerHealthIndicator; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -22,6 +21,7 @@ import io.javaoperatorsdk.operator.OperatorException; import io.javaoperatorsdk.operator.ReconcilerUtils; import io.javaoperatorsdk.operator.api.config.ConfigurationServiceProvider; +import io.javaoperatorsdk.operator.health.InformerHealthIndicator; import io.javaoperatorsdk.operator.processing.LifecycleAware; import io.javaoperatorsdk.operator.processing.event.ResourceID; import io.javaoperatorsdk.operator.processing.event.source.IndexerResourceCache; diff --git a/operator-framework-core/src/main/java/io/javaoperatorsdk/operator/processing/event/source/informer/ManagedInformerEventSource.java b/operator-framework-core/src/main/java/io/javaoperatorsdk/operator/processing/event/source/informer/ManagedInformerEventSource.java index 726bbe7381..d78645125c 100644 --- a/operator-framework-core/src/main/java/io/javaoperatorsdk/operator/processing/event/source/informer/ManagedInformerEventSource.java +++ b/operator-framework-core/src/main/java/io/javaoperatorsdk/operator/processing/event/source/informer/ManagedInformerEventSource.java @@ -8,9 +8,6 @@ import java.util.function.Predicate; import java.util.stream.Stream; -import io.javaoperatorsdk.operator.health.InformerEventSourceHealthIndicator; -import io.javaoperatorsdk.operator.health.InformerHealthIndicator; -import io.javaoperatorsdk.operator.health.Status; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -22,6 +19,9 @@ import io.javaoperatorsdk.operator.api.config.NamespaceChangeable; import io.javaoperatorsdk.operator.api.config.ResourceConfiguration; import io.javaoperatorsdk.operator.api.reconciler.dependent.RecentOperationCacheFiller; +import io.javaoperatorsdk.operator.health.InformerEventSourceHealthIndicator; +import io.javaoperatorsdk.operator.health.InformerHealthIndicator; +import io.javaoperatorsdk.operator.health.Status; import io.javaoperatorsdk.operator.processing.event.ResourceID; import io.javaoperatorsdk.operator.processing.event.source.*; diff --git a/operator-framework-core/src/main/java/io/javaoperatorsdk/operator/processing/event/source/polling/PerResourcePollingEventSource.java b/operator-framework-core/src/main/java/io/javaoperatorsdk/operator/processing/event/source/polling/PerResourcePollingEventSource.java index 44bab7a624..832188b6da 100644 --- a/operator-framework-core/src/main/java/io/javaoperatorsdk/operator/processing/event/source/polling/PerResourcePollingEventSource.java +++ b/operator-framework-core/src/main/java/io/javaoperatorsdk/operator/processing/event/source/polling/PerResourcePollingEventSource.java @@ -9,6 +9,7 @@ import io.fabric8.kubernetes.api.model.HasMetadata; import io.javaoperatorsdk.operator.OperatorException; +import io.javaoperatorsdk.operator.health.Status; import io.javaoperatorsdk.operator.processing.event.ResourceID; import io.javaoperatorsdk.operator.processing.event.source.Cache; import io.javaoperatorsdk.operator.processing.event.source.CacheKeyMapper; @@ -39,6 +40,7 @@ public class PerResourcePollingEventSource private final Predicate

registerPredicate; private final long period; private final Set fetchedForPrimaries = ConcurrentHashMap.newKeySet(); + private volatile boolean healthy = true; public PerResourcePollingEventSource(ResourceFetcher resourceFetcher, Cache

resourceCache, long period, Class resourceClass) { @@ -64,10 +66,16 @@ public PerResourcePollingEventSource(ResourceFetcher resourceFetcher, } private Set getAndCacheResource(P primary, boolean fromGetter) { - var values = resourceFetcher.fetchResources(primary); - handleResources(ResourceID.fromResource(primary), values, !fromGetter); - fetchedForPrimaries.add(ResourceID.fromResource(primary)); - return values; + try { + var values = resourceFetcher.fetchResources(primary); + handleResources(ResourceID.fromResource(primary), values, !fromGetter); + fetchedForPrimaries.add(ResourceID.fromResource(primary)); + healthy = true; + return values; + } catch (RuntimeException e) { + healthy = false; + throw e; + } } @Override @@ -152,4 +160,10 @@ public void stop() throws OperatorException { super.stop(); timer.cancel(); } + + @Override + public Status getStatus() { + // todo unit test + return healthy ? Status.HEALTHY : Status.UNHEALTHY; + } } From b8a7274a5ec713d9b14b6c0fff94050b078237a1 Mon Sep 17 00:00:00 2001 From: csviri Date: Tue, 8 Nov 2022 10:43:12 +0100 Subject: [PATCH 3/9] unit test --- .../PerResourcePollingEventSource.java | 22 ++++----------- .../source/polling/PollingEventSource.java | 23 ++++++++++++---- .../polling/PollingEventSourceTest.java | 27 ++++++++++++++++++- 3 files changed, 49 insertions(+), 23 deletions(-) diff --git a/operator-framework-core/src/main/java/io/javaoperatorsdk/operator/processing/event/source/polling/PerResourcePollingEventSource.java b/operator-framework-core/src/main/java/io/javaoperatorsdk/operator/processing/event/source/polling/PerResourcePollingEventSource.java index 832188b6da..8bf5c50fd4 100644 --- a/operator-framework-core/src/main/java/io/javaoperatorsdk/operator/processing/event/source/polling/PerResourcePollingEventSource.java +++ b/operator-framework-core/src/main/java/io/javaoperatorsdk/operator/processing/event/source/polling/PerResourcePollingEventSource.java @@ -9,7 +9,6 @@ import io.fabric8.kubernetes.api.model.HasMetadata; import io.javaoperatorsdk.operator.OperatorException; -import io.javaoperatorsdk.operator.health.Status; import io.javaoperatorsdk.operator.processing.event.ResourceID; import io.javaoperatorsdk.operator.processing.event.source.Cache; import io.javaoperatorsdk.operator.processing.event.source.CacheKeyMapper; @@ -40,7 +39,7 @@ public class PerResourcePollingEventSource private final Predicate

registerPredicate; private final long period; private final Set fetchedForPrimaries = ConcurrentHashMap.newKeySet(); - private volatile boolean healthy = true; + public PerResourcePollingEventSource(ResourceFetcher resourceFetcher, Cache

resourceCache, long period, Class resourceClass) { @@ -66,16 +65,10 @@ public PerResourcePollingEventSource(ResourceFetcher resourceFetcher, } private Set getAndCacheResource(P primary, boolean fromGetter) { - try { - var values = resourceFetcher.fetchResources(primary); - handleResources(ResourceID.fromResource(primary), values, !fromGetter); - fetchedForPrimaries.add(ResourceID.fromResource(primary)); - healthy = true; - return values; - } catch (RuntimeException e) { - healthy = false; - throw e; - } + var values = resourceFetcher.fetchResources(primary); + handleResources(ResourceID.fromResource(primary), values, !fromGetter); + fetchedForPrimaries.add(ResourceID.fromResource(primary)); + return values; } @Override @@ -161,9 +154,4 @@ public void stop() throws OperatorException { timer.cancel(); } - @Override - public Status getStatus() { - // todo unit test - return healthy ? Status.HEALTHY : Status.UNHEALTHY; - } } diff --git a/operator-framework-core/src/main/java/io/javaoperatorsdk/operator/processing/event/source/polling/PollingEventSource.java b/operator-framework-core/src/main/java/io/javaoperatorsdk/operator/processing/event/source/polling/PollingEventSource.java index 94efbf25aa..9ef889ecb6 100644 --- a/operator-framework-core/src/main/java/io/javaoperatorsdk/operator/processing/event/source/polling/PollingEventSource.java +++ b/operator-framework-core/src/main/java/io/javaoperatorsdk/operator/processing/event/source/polling/PollingEventSource.java @@ -1,12 +1,14 @@ package io.javaoperatorsdk.operator.processing.event.source.polling; import java.util.*; +import java.util.concurrent.atomic.AtomicBoolean; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import io.fabric8.kubernetes.api.model.HasMetadata; import io.javaoperatorsdk.operator.OperatorException; +import io.javaoperatorsdk.operator.health.Status; import io.javaoperatorsdk.operator.processing.event.ResourceID; import io.javaoperatorsdk.operator.processing.event.source.CacheKeyMapper; import io.javaoperatorsdk.operator.processing.event.source.ExternalResourceCachingEventSource; @@ -45,6 +47,7 @@ public class PollingEventSource private final Timer timer = new Timer(); private final GenericResourceFetcher genericResourceFetcher; private final long period; + private final AtomicBoolean healthy = new AtomicBoolean(true); public PollingEventSource( GenericResourceFetcher supplier, @@ -73,11 +76,17 @@ public void start() throws OperatorException { new TimerTask() { @Override public void run() { - if (!isRunning()) { - log.debug("Event source not yet started. Will not run."); - return; + try { + if (!isRunning()) { + log.debug("Event source not yet started. Will not run."); + return; + } + getStateAndFillCache(); + healthy.set(true); + } catch (RuntimeException e) { + healthy.set(false); + log.error("Error during polling.", e); } - getStateAndFillCache(); } }, period, @@ -89,7 +98,6 @@ protected synchronized void getStateAndFillCache() { handleResources(values); } - public interface GenericResourceFetcher { Map> fetchResources(); } @@ -99,4 +107,9 @@ public void stop() throws OperatorException { super.stop(); timer.cancel(); } + + @Override + public Status getStatus() { + return healthy.get() ? Status.HEALTHY : Status.UNHEALTHY; + } } diff --git a/operator-framework-core/src/test/java/io/javaoperatorsdk/operator/processing/event/source/polling/PollingEventSourceTest.java b/operator-framework-core/src/test/java/io/javaoperatorsdk/operator/processing/event/source/polling/PollingEventSourceTest.java index 0a777f0cbd..605922f06b 100644 --- a/operator-framework-core/src/test/java/io/javaoperatorsdk/operator/processing/event/source/polling/PollingEventSourceTest.java +++ b/operator-framework-core/src/test/java/io/javaoperatorsdk/operator/processing/event/source/polling/PollingEventSourceTest.java @@ -1,5 +1,6 @@ package io.javaoperatorsdk.operator.processing.event.source.polling; +import java.time.Duration; import java.util.HashMap; import java.util.Map; import java.util.Set; @@ -8,12 +9,15 @@ import org.junit.jupiter.api.Test; import io.fabric8.kubernetes.api.model.HasMetadata; +import io.javaoperatorsdk.operator.health.Status; import io.javaoperatorsdk.operator.processing.event.EventHandler; import io.javaoperatorsdk.operator.processing.event.ResourceID; import io.javaoperatorsdk.operator.processing.event.source.AbstractEventSourceTestBase; import io.javaoperatorsdk.operator.processing.event.source.SampleExternalResource; import static io.javaoperatorsdk.operator.processing.event.source.SampleExternalResource.*; +import static org.assertj.core.api.Assertions.assertThat; +import static org.awaitility.Awaitility.await; import static org.mockito.Mockito.*; class PollingEventSourceTest @@ -21,11 +25,12 @@ class PollingEventSourceTest AbstractEventSourceTestBase, EventHandler> { public static final int DEFAULT_WAIT_PERIOD = 100; + public static final long POLL_PERIOD = 30L; private PollingEventSource.GenericResourceFetcher resourceFetcher = mock(PollingEventSource.GenericResourceFetcher.class); private final PollingEventSource pollingEventSource = - new PollingEventSource<>(resourceFetcher, 30L, SampleExternalResource.class, + new PollingEventSource<>(resourceFetcher, POLL_PERIOD, SampleExternalResource.class, (SampleExternalResource er) -> er.getName() + "#" + er.getValue()); @BeforeEach @@ -73,6 +78,26 @@ void propagatesEventOnNewResourceForPrimary() throws InterruptedException { verify(eventHandler, times(2)).handleEvent(any()); } + @Test + void updatesHealthIndicatorBasedOnExceptionsInFetcher() throws InterruptedException { + when(resourceFetcher.fetchResources()) + .thenReturn(testResponseWithOneValue()); + pollingEventSource.start(); + assertThat(pollingEventSource.getStatus()).isEqualTo(Status.HEALTHY); + + when(resourceFetcher.fetchResources()) + // 2x - to make sure to catch the health indicator change + .thenThrow(new RuntimeException("test exception")) + .thenThrow(new RuntimeException("test exception")) + .thenReturn(testResponseWithOneValue()); + + await().pollInterval(Duration.ofMillis(POLL_PERIOD)).untilAsserted( + () -> assertThat(pollingEventSource.getStatus()).isEqualTo(Status.UNHEALTHY)); + + await() + .untilAsserted(() -> assertThat(pollingEventSource.getStatus()).isEqualTo(Status.HEALTHY)); + } + private Map> testResponseWithTwoValueForSameId() { Map> res = new HashMap<>(); res.put(primaryID1(), Set.of(testResource1(), testResource2())); From 51440f19f3bc6e85865a30580bc1c58c60d737b7 Mon Sep 17 00:00:00 2001 From: csviri Date: Tue, 8 Nov 2022 13:56:02 +0100 Subject: [PATCH 4/9] integration test --- .../javaoperatorsdk/operator/RuntimeInfo.java | 6 +-- .../operator/health/ControllerHealthInfo.java | 12 ++--- .../health/InformerHealthIndicator.java | 2 + ...erWrappingEventSourceHealthIndicator.java} | 9 +++- .../source/informer/InformerManager.java | 20 +++++---- .../source/informer/InformerWrapper.java | 10 ++++- .../informer/ManagedInformerEventSource.java | 14 ++++-- .../operator/InformerRelatedBehaviorITS.java | 45 ++++++++++++++++++- ...InformerRelatedBehaviorTestReconciler.java | 9 +++- 9 files changed, 101 insertions(+), 26 deletions(-) rename operator-framework-core/src/main/java/io/javaoperatorsdk/operator/health/{InformerEventSourceHealthIndicator.java => InformerWrappingEventSourceHealthIndicator.java} (55%) diff --git a/operator-framework-core/src/main/java/io/javaoperatorsdk/operator/RuntimeInfo.java b/operator-framework-core/src/main/java/io/javaoperatorsdk/operator/RuntimeInfo.java index 05b5e3860f..959d0d85af 100644 --- a/operator-framework-core/src/main/java/io/javaoperatorsdk/operator/RuntimeInfo.java +++ b/operator-framework-core/src/main/java/io/javaoperatorsdk/operator/RuntimeInfo.java @@ -5,7 +5,7 @@ import java.util.Map; import io.javaoperatorsdk.operator.health.EventSourceHealthIndicator; -import io.javaoperatorsdk.operator.health.InformerEventSourceHealthIndicator; +import io.javaoperatorsdk.operator.health.InformerWrappingEventSourceHealthIndicator; @SuppressWarnings("rawtypes") public class RuntimeInfo { @@ -35,8 +35,8 @@ public Map> unhealthyEventSource return res; } - public Map> unhealthyInformerEventSources() { - Map> res = new HashMap<>(); + public Map> unhealthyInformerWrappingEventSourceHealthIndicator() { + Map> res = new HashMap<>(); for (var rc : registeredControllers) { res.put(rc.getConfiguration().getName(), rc.getControllerHealthInfo() .unhealthyInformerEventSourceHealthIndicators()); diff --git a/operator-framework-core/src/main/java/io/javaoperatorsdk/operator/health/ControllerHealthInfo.java b/operator-framework-core/src/main/java/io/javaoperatorsdk/operator/health/ControllerHealthInfo.java index 3702df04fa..0f7fd65232 100644 --- a/operator-framework-core/src/main/java/io/javaoperatorsdk/operator/health/ControllerHealthInfo.java +++ b/operator-framework-core/src/main/java/io/javaoperatorsdk/operator/health/ControllerHealthInfo.java @@ -25,20 +25,20 @@ public Map unhealthyEventSources() { .collect(Collectors.toMap(Map.Entry::getKey, Map.Entry::getValue)); } - public Map informerEventSourceHealthIndicators() { + public Map informerEventSourceHealthIndicators() { return eventSourceManager.allEventSources().entrySet().stream() - .filter(e -> e.getValue() instanceof InformerEventSourceHealthIndicator) + .filter(e -> e.getValue() instanceof InformerWrappingEventSourceHealthIndicator) .collect(Collectors.toMap(Map.Entry::getKey, - e -> (InformerEventSourceHealthIndicator) e.getValue())); + e -> (InformerWrappingEventSourceHealthIndicator) e.getValue())); } - public Map unhealthyInformerEventSourceHealthIndicators() { + public Map unhealthyInformerEventSourceHealthIndicators() { return eventSourceManager.allEventSources().entrySet().stream() .filter(e -> e.getValue().getStatus() == Status.UNHEALTHY) - .filter(e -> e.getValue() instanceof InformerEventSourceHealthIndicator) + .filter(e -> e.getValue() instanceof InformerWrappingEventSourceHealthIndicator) .collect(Collectors.toMap(Map.Entry::getKey, - e -> (InformerEventSourceHealthIndicator) e.getValue())); + e -> (InformerWrappingEventSourceHealthIndicator) e.getValue())); } } diff --git a/operator-framework-core/src/main/java/io/javaoperatorsdk/operator/health/InformerHealthIndicator.java b/operator-framework-core/src/main/java/io/javaoperatorsdk/operator/health/InformerHealthIndicator.java index f658fa1d87..afd8b61bed 100644 --- a/operator-framework-core/src/main/java/io/javaoperatorsdk/operator/health/InformerHealthIndicator.java +++ b/operator-framework-core/src/main/java/io/javaoperatorsdk/operator/health/InformerHealthIndicator.java @@ -12,4 +12,6 @@ public interface InformerHealthIndicator extends EventSourceHealthIndicator { default Status getStatus() { return isRunning() && hasSynced() && isWatching() ? Status.HEALTHY : Status.UNHEALTHY; } + + String getTargetNamespace(); } diff --git a/operator-framework-core/src/main/java/io/javaoperatorsdk/operator/health/InformerEventSourceHealthIndicator.java b/operator-framework-core/src/main/java/io/javaoperatorsdk/operator/health/InformerWrappingEventSourceHealthIndicator.java similarity index 55% rename from operator-framework-core/src/main/java/io/javaoperatorsdk/operator/health/InformerEventSourceHealthIndicator.java rename to operator-framework-core/src/main/java/io/javaoperatorsdk/operator/health/InformerWrappingEventSourceHealthIndicator.java index eb03bc41ba..5a603ad321 100644 --- a/operator-framework-core/src/main/java/io/javaoperatorsdk/operator/health/InformerEventSourceHealthIndicator.java +++ b/operator-framework-core/src/main/java/io/javaoperatorsdk/operator/health/InformerWrappingEventSourceHealthIndicator.java @@ -2,7 +2,11 @@ import java.util.Map; -public interface InformerEventSourceHealthIndicator extends EventSourceHealthIndicator { +import io.fabric8.kubernetes.api.model.HasMetadata; +import io.javaoperatorsdk.operator.api.config.ResourceConfiguration; + +public interface InformerWrappingEventSourceHealthIndicator + extends EventSourceHealthIndicator { Map informerHealthIndicators(); @@ -10,6 +14,9 @@ public interface InformerEventSourceHealthIndicator extends EventSourceHealthInd default Status getStatus() { var nonUp = informerHealthIndicators().values().stream() .filter(i -> i.getStatus() != Status.HEALTHY).findAny(); + return nonUp.isPresent() ? Status.UNHEALTHY : Status.HEALTHY; } + + ResourceConfiguration getInformerConfiguration(); } diff --git a/operator-framework-core/src/main/java/io/javaoperatorsdk/operator/processing/event/source/informer/InformerManager.java b/operator-framework-core/src/main/java/io/javaoperatorsdk/operator/processing/event/source/informer/InformerManager.java index 52a18bcae1..c57d805e34 100644 --- a/operator-framework-core/src/main/java/io/javaoperatorsdk/operator/processing/event/source/informer/InformerManager.java +++ b/operator-framework-core/src/main/java/io/javaoperatorsdk/operator/processing/event/source/informer/InformerManager.java @@ -27,10 +27,11 @@ import io.javaoperatorsdk.operator.processing.event.source.Cache; import io.javaoperatorsdk.operator.processing.event.source.IndexerResourceCache; +import static io.javaoperatorsdk.operator.api.reconciler.Constants.WATCH_ALL_NAMESPACES; + public class InformerManager> implements LifecycleAware, IndexerResourceCache { - private static final String ALL_NAMESPACES_MAP_KEY = "allNamespaces"; private static final Logger log = LoggerFactory.getLogger(InformerManager.class); private final Map> sources = new ConcurrentHashMap<>(); @@ -59,7 +60,7 @@ void initSources(MixedOperation, Resource> clien final var filteredBySelectorClient = client.inAnyNamespace().withLabelSelector(labelSelector); final var source = - createEventSource(filteredBySelectorClient, eventHandler, ALL_NAMESPACES_MAP_KEY); + createEventSource(filteredBySelectorClient, eventHandler, WATCH_ALL_NAMESPACES); log.debug("Registered {} -> {} for any namespace", this, source); } else { targetNamespaces.forEach( @@ -97,10 +98,11 @@ public void changeNamespaces(Set namespaces) { private InformerWrapper createEventSource( FilterWatchListDeletable, Resource> filteredBySelectorClient, - ResourceEventHandler eventHandler, String key) { - var source = new InformerWrapper<>(filteredBySelectorClient.runnableInformer(0)); + ResourceEventHandler eventHandler, String namespaceIdentifier) { + var source = + new InformerWrapper<>(filteredBySelectorClient.runnableInformer(0), namespaceIdentifier); source.addEventHandler(eventHandler); - sources.put(key, source); + sources.put(namespaceIdentifier, source); return source; } @@ -128,7 +130,7 @@ public Stream list(Predicate predicate) { @Override public Stream list(String namespace, Predicate predicate) { if (isWatchingAllNamespaces()) { - return getSource(ALL_NAMESPACES_MAP_KEY) + return getSource(WATCH_ALL_NAMESPACES) .map(source -> source.list(namespace, predicate)) .orElseGet(Stream::empty); } else { @@ -140,7 +142,7 @@ public Stream list(String namespace, Predicate predicate) { @Override public Optional get(ResourceID resourceID) { - return getSource(resourceID.getNamespace().orElse(ALL_NAMESPACES_MAP_KEY)) + return getSource(resourceID.getNamespace().orElse(WATCH_ALL_NAMESPACES)) .flatMap(source -> source.get(resourceID)) .map(cloner::clone); } @@ -151,11 +153,11 @@ public Stream keys() { } private boolean isWatchingAllNamespaces() { - return sources.containsKey(ALL_NAMESPACES_MAP_KEY); + return sources.containsKey(WATCH_ALL_NAMESPACES); } private Optional> getSource(String namespace) { - namespace = isWatchingAllNamespaces() || namespace == null ? ALL_NAMESPACES_MAP_KEY : namespace; + namespace = isWatchingAllNamespaces() || namespace == null ? WATCH_ALL_NAMESPACES : namespace; return Optional.ofNullable(sources.get(namespace)); } diff --git a/operator-framework-core/src/main/java/io/javaoperatorsdk/operator/processing/event/source/informer/InformerWrapper.java b/operator-framework-core/src/main/java/io/javaoperatorsdk/operator/processing/event/source/informer/InformerWrapper.java index 51a795d591..c121a29c9c 100644 --- a/operator-framework-core/src/main/java/io/javaoperatorsdk/operator/processing/event/source/informer/InformerWrapper.java +++ b/operator-framework-core/src/main/java/io/javaoperatorsdk/operator/processing/event/source/informer/InformerWrapper.java @@ -33,10 +33,13 @@ class InformerWrapper private final SharedIndexInformer informer; private final Cache cache; + private final String namespaceIdentifier; - public InformerWrapper(SharedIndexInformer informer) { + public InformerWrapper(SharedIndexInformer informer, String namespaceIdentifier) { this.informer = informer; + this.namespaceIdentifier = namespaceIdentifier; this.cache = (Cache) informer.getStore(); + } @Override @@ -161,4 +164,9 @@ public boolean isWatching() { public boolean isRunning() { return informer.isRunning(); } + + @Override + public String getTargetNamespace() { + return namespaceIdentifier; + } } diff --git a/operator-framework-core/src/main/java/io/javaoperatorsdk/operator/processing/event/source/informer/ManagedInformerEventSource.java b/operator-framework-core/src/main/java/io/javaoperatorsdk/operator/processing/event/source/informer/ManagedInformerEventSource.java index d78645125c..51460ceea6 100644 --- a/operator-framework-core/src/main/java/io/javaoperatorsdk/operator/processing/event/source/informer/ManagedInformerEventSource.java +++ b/operator-framework-core/src/main/java/io/javaoperatorsdk/operator/processing/event/source/informer/ManagedInformerEventSource.java @@ -19,8 +19,8 @@ import io.javaoperatorsdk.operator.api.config.NamespaceChangeable; import io.javaoperatorsdk.operator.api.config.ResourceConfiguration; import io.javaoperatorsdk.operator.api.reconciler.dependent.RecentOperationCacheFiller; -import io.javaoperatorsdk.operator.health.InformerEventSourceHealthIndicator; import io.javaoperatorsdk.operator.health.InformerHealthIndicator; +import io.javaoperatorsdk.operator.health.InformerWrappingEventSourceHealthIndicator; import io.javaoperatorsdk.operator.health.Status; import io.javaoperatorsdk.operator.processing.event.ResourceID; import io.javaoperatorsdk.operator.processing.event.source.*; @@ -28,17 +28,20 @@ public abstract class ManagedInformerEventSource> extends AbstractResourceEventSource implements ResourceEventHandler, Cache, IndexerResourceCache, - RecentOperationCacheFiller, NamespaceChangeable, InformerEventSourceHealthIndicator { + RecentOperationCacheFiller, NamespaceChangeable, + InformerWrappingEventSourceHealthIndicator { private static final Logger log = LoggerFactory.getLogger(ManagedInformerEventSource.class); protected TemporaryResourceCache temporaryResourceCache = new TemporaryResourceCache<>(this); protected InformerManager cache = new InformerManager<>(); + protected C configuration; protected ManagedInformerEventSource( MixedOperation, Resource> client, C configuration) { super(configuration.getResourceClass()); manager().initSources(client, configuration, this); + this.configuration = configuration; } @Override @@ -142,6 +145,11 @@ public Map informerHealthIndicators() { @Override public Status getStatus() { - return InformerEventSourceHealthIndicator.super.getStatus(); + return InformerWrappingEventSourceHealthIndicator.super.getStatus(); + } + + @Override + public ResourceConfiguration getInformerConfiguration() { + return configuration; } } diff --git a/operator-framework/src/test/java/io/javaoperatorsdk/operator/InformerRelatedBehaviorITS.java b/operator-framework/src/test/java/io/javaoperatorsdk/operator/InformerRelatedBehaviorITS.java index f28a6b968f..41b6497c0f 100644 --- a/operator-framework/src/test/java/io/javaoperatorsdk/operator/InformerRelatedBehaviorITS.java +++ b/operator-framework/src/test/java/io/javaoperatorsdk/operator/InformerRelatedBehaviorITS.java @@ -14,9 +14,12 @@ import io.fabric8.kubernetes.client.utils.KubernetesResourceUtil; import io.javaoperatorsdk.operator.api.config.ConfigurationServiceProvider; import io.javaoperatorsdk.operator.junit.LocallyRunOperatorExtension; +import io.javaoperatorsdk.operator.processing.event.source.controller.ControllerResourceEventSource; import io.javaoperatorsdk.operator.sample.informerrelatedbehavior.InformerRelatedBehaviorTestCustomResource; import io.javaoperatorsdk.operator.sample.informerrelatedbehavior.InformerRelatedBehaviorTestReconciler; +import static io.javaoperatorsdk.operator.sample.informerrelatedbehavior.InformerRelatedBehaviorTestReconciler.CONFIG_MAP_DEPENDENT_RESOURCE; +import static io.javaoperatorsdk.operator.sample.informerrelatedbehavior.InformerRelatedBehaviorTestReconciler.INFORMER_RELATED_BEHAVIOR_TEST_RECONCILER; import static org.assertj.core.api.Assertions.assertThat; import static org.awaitility.Awaitility.await; import static org.junit.jupiter.api.Assertions.assertThrows; @@ -73,21 +76,25 @@ void startsUpWhenNoPermissionToCustomResource() { adminClient.resource(testCustomResource()).createOrReplace(); setNoCustomResourceAccess(); - startOperator(false); + var operator = startOperator(false); assertNotReconciled(); + assertRuntimeInfoNoCRPermission(operator); setFullResourcesAccess(); waitForWatchReconnect(); assertReconciled(); + assertThat(operator.getRuntimeInfo().allEventSourcesAreHealthy()).isTrue(); } + @Test void startsUpWhenNoPermissionToSecondaryResource() { adminClient.resource(testCustomResource()).createOrReplace(); setNoConfigMapAccess(); - startOperator(false); + var operator = startOperator(false); assertNotReconciled(); + assertRuntimeInfoForSecondaryPermission(operator); setFullResourcesAccess(); waitForWatchReconnect(); @@ -184,6 +191,40 @@ private void assertReconciled() { }); } + + private void assertRuntimeInfoNoCRPermission(Operator operator) { + assertThat(operator.getRuntimeInfo().allEventSourcesAreHealthy()).isFalse(); + var unhealthyEventSources = + operator.getRuntimeInfo().unhealthyEventSources() + .get(INFORMER_RELATED_BEHAVIOR_TEST_RECONCILER); + assertThat(unhealthyEventSources).isNotEmpty(); + assertThat(unhealthyEventSources.get(ControllerResourceEventSource.class.getSimpleName())) + .isNotNull(); + var informerHealthIndicators = operator.getRuntimeInfo() + .unhealthyInformerWrappingEventSourceHealthIndicator() + .get(INFORMER_RELATED_BEHAVIOR_TEST_RECONCILER); + assertThat(informerHealthIndicators).isNotEmpty(); + assertThat(informerHealthIndicators.get(ControllerResourceEventSource.class.getSimpleName()) + .informerHealthIndicators()) + .hasSize(1); + } + + private void assertRuntimeInfoForSecondaryPermission(Operator operator) { + assertThat(operator.getRuntimeInfo().allEventSourcesAreHealthy()).isFalse(); + var unhealthyEventSources = + operator.getRuntimeInfo().unhealthyEventSources() + .get(INFORMER_RELATED_BEHAVIOR_TEST_RECONCILER); + assertThat(unhealthyEventSources).isNotEmpty(); + assertThat(unhealthyEventSources.get(CONFIG_MAP_DEPENDENT_RESOURCE)).isNotNull(); + var informerHealthIndicators = operator.getRuntimeInfo() + .unhealthyInformerWrappingEventSourceHealthIndicator() + .get(INFORMER_RELATED_BEHAVIOR_TEST_RECONCILER); + assertThat(informerHealthIndicators).isNotEmpty(); + assertThat( + informerHealthIndicators.get(CONFIG_MAP_DEPENDENT_RESOURCE).informerHealthIndicators()) + .hasSize(1); + } + KubernetesClient clientUsingServiceAccount() { KubernetesClient client = new KubernetesClientBuilder() .withConfig(new ConfigBuilder() diff --git a/operator-framework/src/test/java/io/javaoperatorsdk/operator/sample/informerrelatedbehavior/InformerRelatedBehaviorTestReconciler.java b/operator-framework/src/test/java/io/javaoperatorsdk/operator/sample/informerrelatedbehavior/InformerRelatedBehaviorTestReconciler.java index baeff08478..13057d547a 100644 --- a/operator-framework/src/test/java/io/javaoperatorsdk/operator/sample/informerrelatedbehavior/InformerRelatedBehaviorTestReconciler.java +++ b/operator-framework/src/test/java/io/javaoperatorsdk/operator/sample/informerrelatedbehavior/InformerRelatedBehaviorTestReconciler.java @@ -10,10 +10,17 @@ import io.javaoperatorsdk.operator.api.reconciler.dependent.Dependent; import io.javaoperatorsdk.operator.support.TestExecutionInfoProvider; -@ControllerConfiguration(dependents = @Dependent(type = ConfigMapDependentResource.class)) +@ControllerConfiguration( + name = InformerRelatedBehaviorTestReconciler.INFORMER_RELATED_BEHAVIOR_TEST_RECONCILER, + dependents = @Dependent( + name = InformerRelatedBehaviorTestReconciler.CONFIG_MAP_DEPENDENT_RESOURCE, + type = ConfigMapDependentResource.class)) public class InformerRelatedBehaviorTestReconciler implements Reconciler, TestExecutionInfoProvider { + public static final String INFORMER_RELATED_BEHAVIOR_TEST_RECONCILER = + "InformerRelatedBehaviorTestReconciler"; + public static final String CONFIG_MAP_DEPENDENT_RESOURCE = "ConfigMapDependentResource"; private final AtomicInteger numberOfExecutions = new AtomicInteger(0); private KubernetesClient client; From d5fa459754e9b6135fc9a193e3bd6fae7a5f076a Mon Sep 17 00:00:00 2001 From: csviri Date: Tue, 8 Nov 2022 15:23:18 +0100 Subject: [PATCH 5/9] sample --- .../io/javaoperatorsdk/operator/Operator.java | 4 +- sample-operators/webpage/k8s/operator.yaml | 13 ++++--- .../operator/sample/LivenessHandler.java | 29 +++++++++++++++ .../operator/sample/StartupHandler.java | 37 +++++++++++++++++++ .../operator/sample/WebPageOperator.java | 16 +++++--- 5 files changed, 85 insertions(+), 14 deletions(-) create mode 100644 sample-operators/webpage/src/main/java/io/javaoperatorsdk/operator/sample/LivenessHandler.java create mode 100644 sample-operators/webpage/src/main/java/io/javaoperatorsdk/operator/sample/StartupHandler.java diff --git a/operator-framework-core/src/main/java/io/javaoperatorsdk/operator/Operator.java b/operator-framework-core/src/main/java/io/javaoperatorsdk/operator/Operator.java index b1cebda283..afdc8fa35d 100644 --- a/operator-framework-core/src/main/java/io/javaoperatorsdk/operator/Operator.java +++ b/operator-framework-core/src/main/java/io/javaoperatorsdk/operator/Operator.java @@ -80,12 +80,11 @@ public KubernetesClient getKubernetesClient() { * where there is no obvious entrypoint to the application which can trigger the injection process * and start the cluster monitoring processes. */ - public void start() { + public synchronized void start() { try { if (started) { return; } - started = true; controllerManager.shouldStart(); final var version = ConfigurationServiceProvider.instance().getVersion(); log.info( @@ -101,6 +100,7 @@ public void start() { // the leader election would start subsequently the processor if on controllerManager.start(!leaderElectionManager.isLeaderElectionEnabled()); leaderElectionManager.start(); + started = true; } catch (Exception e) { log.error("Error starting operator", e); stop(); diff --git a/sample-operators/webpage/k8s/operator.yaml b/sample-operators/webpage/k8s/operator.yaml index f4b0b027ea..a8e11d509f 100644 --- a/sample-operators/webpage/k8s/operator.yaml +++ b/sample-operators/webpage/k8s/operator.yaml @@ -25,17 +25,18 @@ spec: imagePullPolicy: Never ports: - containerPort: 80 - readinessProbe: + startupProbe: httpGet: - path: /health + path: /startup port: 8080 - initialDelaySeconds: 1 - timeoutSeconds: 1 + initialDelaySeconds: 3 + timeoutSeconds: 3 + failureThreshold: 10 livenessProbe: httpGet: - path: /health + path: /healthz port: 8080 - initialDelaySeconds: 30 + initialDelaySeconds: 5 timeoutSeconds: 1 --- diff --git a/sample-operators/webpage/src/main/java/io/javaoperatorsdk/operator/sample/LivenessHandler.java b/sample-operators/webpage/src/main/java/io/javaoperatorsdk/operator/sample/LivenessHandler.java new file mode 100644 index 0000000000..155fc13fec --- /dev/null +++ b/sample-operators/webpage/src/main/java/io/javaoperatorsdk/operator/sample/LivenessHandler.java @@ -0,0 +1,29 @@ +package io.javaoperatorsdk.operator.sample; + +import java.io.IOException; + +import io.javaoperatorsdk.operator.Operator; + +import com.sun.net.httpserver.HttpExchange; +import com.sun.net.httpserver.HttpHandler; + +import static io.javaoperatorsdk.operator.sample.StartupHandler.sendMessage; + +public class LivenessHandler implements HttpHandler { + + private final Operator operator; + + public LivenessHandler(Operator operator) { + this.operator = operator; + } + + // custom logic can be added here based on the health of event sources + @Override + public void handle(HttpExchange httpExchange) throws IOException { + if (operator.getRuntimeInfo().allEventSourcesAreHealthy()) { + sendMessage(httpExchange, 200, "healthy"); + } else { + sendMessage(httpExchange, 400, "an event source is not healthy"); + } + } +} diff --git a/sample-operators/webpage/src/main/java/io/javaoperatorsdk/operator/sample/StartupHandler.java b/sample-operators/webpage/src/main/java/io/javaoperatorsdk/operator/sample/StartupHandler.java new file mode 100644 index 0000000000..4a9f590586 --- /dev/null +++ b/sample-operators/webpage/src/main/java/io/javaoperatorsdk/operator/sample/StartupHandler.java @@ -0,0 +1,37 @@ +package io.javaoperatorsdk.operator.sample; + +import java.io.IOException; +import java.nio.charset.StandardCharsets; + +import io.javaoperatorsdk.operator.Operator; + +import com.sun.net.httpserver.HttpExchange; +import com.sun.net.httpserver.HttpHandler; + +public class StartupHandler implements HttpHandler { + + private final Operator operator; + + public StartupHandler(Operator operator) { + this.operator = operator; + } + + @Override + public void handle(HttpExchange httpExchange) throws IOException { + if (operator.isStarted()) { + sendMessage(httpExchange, 200, "started"); + } else { + sendMessage(httpExchange, 400, "not started yet"); + } + } + + public static void sendMessage(HttpExchange httpExchange, int code, String message) + throws IOException { + try (var outputStream = httpExchange.getResponseBody()) { + var bytes = message.getBytes(StandardCharsets.UTF_8); + httpExchange.sendResponseHeaders(code, bytes.length); + outputStream.write(bytes); + outputStream.flush(); + } + } +} diff --git a/sample-operators/webpage/src/main/java/io/javaoperatorsdk/operator/sample/WebPageOperator.java b/sample-operators/webpage/src/main/java/io/javaoperatorsdk/operator/sample/WebPageOperator.java index 9b137649fc..1261bc4116 100644 --- a/sample-operators/webpage/src/main/java/io/javaoperatorsdk/operator/sample/WebPageOperator.java +++ b/sample-operators/webpage/src/main/java/io/javaoperatorsdk/operator/sample/WebPageOperator.java @@ -1,17 +1,16 @@ package io.javaoperatorsdk.operator.sample; import java.io.IOException; +import java.net.InetSocketAddress; import org.slf4j.Logger; import org.slf4j.LoggerFactory; -import org.takes.facets.fork.FkRegex; -import org.takes.facets.fork.TkFork; -import org.takes.http.Exit; -import org.takes.http.FtBasic; import io.fabric8.kubernetes.client.*; import io.javaoperatorsdk.operator.Operator; +import com.sun.net.httpserver.HttpServer; + public class WebPageOperator { public static final String WEBPAGE_RECONCILER_ENV = "WEBPAGE_RECONCILER"; public static final String WEBPAGE_CLASSIC_RECONCILER_ENV_VALUE = "classic"; @@ -23,7 +22,7 @@ public static void main(String[] args) throws IOException { log.info("WebServer Operator starting!"); KubernetesClient client = new KubernetesClientBuilder().build(); - Operator operator = new Operator(client); + Operator operator = new Operator(client, o -> o.withStopOnInformerErrorDuringStartup(false)); String reconcilerEnvVar = System.getenv(WEBPAGE_RECONCILER_ENV); if (WEBPAGE_CLASSIC_RECONCILER_ENV_VALUE.equals(reconcilerEnvVar)) { operator.register(new WebPageReconciler(client)); @@ -36,6 +35,11 @@ public static void main(String[] args) throws IOException { operator.installShutdownHook(); operator.start(); - new FtBasic(new TkFork(new FkRegex("/health", "ALL GOOD!")), 8080).start(Exit.NEVER); + HttpServer server = HttpServer.create(new InetSocketAddress(8080), 0); + server.createContext("/startup", new StartupHandler(operator)); + // we want to restart the operator if something goes wrong with (maybe just some) event sources + server.createContext("/healthz", new LivenessHandler(operator)); + server.setExecutor(null); + server.start(); } } From 3e2e7c4c834ef1c409d636156b988125744ca911 Mon Sep 17 00:00:00 2001 From: csviri Date: Tue, 8 Nov 2022 15:34:53 +0100 Subject: [PATCH 6/9] imroved probe setting --- sample-operators/webpage/k8s/operator.yaml | 7 +++++-- 1 file changed, 5 insertions(+), 2 deletions(-) diff --git a/sample-operators/webpage/k8s/operator.yaml b/sample-operators/webpage/k8s/operator.yaml index a8e11d509f..d8518ab21d 100644 --- a/sample-operators/webpage/k8s/operator.yaml +++ b/sample-operators/webpage/k8s/operator.yaml @@ -29,8 +29,9 @@ spec: httpGet: path: /startup port: 8080 - initialDelaySeconds: 3 - timeoutSeconds: 3 + initialDelaySeconds: 1 + periodSeconds: 2 + timeoutSeconds: 1 failureThreshold: 10 livenessProbe: httpGet: @@ -38,6 +39,8 @@ spec: port: 8080 initialDelaySeconds: 5 timeoutSeconds: 1 + periodSeconds: 2 + failureThreshold: 3 --- apiVersion: rbac.authorization.k8s.io/v1 From b2b49dacd3f3c41c5ae841bb04bb65f5cba721ec Mon Sep 17 00:00:00 2001 From: csviri Date: Wed, 9 Nov 2022 10:47:01 +0100 Subject: [PATCH 7/9] docs --- docs/documentation/features.md | 11 +++++ .../io/javaoperatorsdk/operator/Operator.java | 5 +- .../javaoperatorsdk/operator/RuntimeInfo.java | 49 ++++++++++++++++--- .../operator/health/ControllerHealthInfo.java | 6 +++ .../operator/sample/StartupHandler.java | 2 +- 5 files changed, 62 insertions(+), 11 deletions(-) diff --git a/docs/documentation/features.md b/docs/documentation/features.md index 195c4ec55e..aa32bcd8ef 100644 --- a/docs/documentation/features.md +++ b/docs/documentation/features.md @@ -699,6 +699,17 @@ leader left off should one of them become elected leader. See sample configuration in the [E2E test](https://github.com/java-operator-sdk/java-operator-sdk/blob/8865302ac0346ee31f2d7b348997ec2913d5922b/sample-operators/leader-election/src/main/java/io/javaoperatorsdk/operator/sample/LeaderElectionTestOperator.java#L21-L23) . +## Runtime Info + +[RuntimeInfo](https://github.com/java-operator-sdk/java-operator-sdk/blob/main/operator-framework-core/src/main/java/io/javaoperatorsdk/operator/RuntimeInfo.java#L16-L16) +is accessed. You can access mainly health information of event sources. Based on that liveness probes can be registered +and a custom logic implemented, when the operator is considered live. Note that this can be used with combination of +[stopOnInformerErrorDuringStartup](https://github.com/java-operator-sdk/java-operator-sdk/blob/main/operator-framework-core/src/main/java/io/javaoperatorsdk/operator/api/config/ConfigurationService.java#L168-L168) +setting. + +See also an example implementation in the +[WebPage sample](https://github.com/java-operator-sdk/java-operator-sdk/blob/3e2e7c4c834ef1c409d636156b988125744ca911/sample-operators/webpage/src/main/java/io/javaoperatorsdk/operator/sample/WebPageOperator.java#L38-L43) + ## Monitoring with Micrometer ## Automatic Generation of CRDs diff --git a/operator-framework-core/src/main/java/io/javaoperatorsdk/operator/Operator.java b/operator-framework-core/src/main/java/io/javaoperatorsdk/operator/Operator.java index afdc8fa35d..a33edae0cc 100644 --- a/operator-framework-core/src/main/java/io/javaoperatorsdk/operator/Operator.java +++ b/operator-framework-core/src/main/java/io/javaoperatorsdk/operator/Operator.java @@ -1,6 +1,5 @@ package io.javaoperatorsdk.operator; -import java.util.ArrayList; import java.util.HashSet; import java.util.Optional; import java.util.Set; @@ -209,10 +208,10 @@ public int getRegisteredControllersNumber() { } public RuntimeInfo getRuntimeInfo() { - return new RuntimeInfo(new ArrayList<>(controllerManager.controllers())); + return new RuntimeInfo(this); } - public boolean isStarted() { + boolean isStarted() { return started; } } diff --git a/operator-framework-core/src/main/java/io/javaoperatorsdk/operator/RuntimeInfo.java b/operator-framework-core/src/main/java/io/javaoperatorsdk/operator/RuntimeInfo.java index 959d0d85af..961e519d62 100644 --- a/operator-framework-core/src/main/java/io/javaoperatorsdk/operator/RuntimeInfo.java +++ b/operator-framework-core/src/main/java/io/javaoperatorsdk/operator/RuntimeInfo.java @@ -1,32 +1,59 @@ package io.javaoperatorsdk.operator; -import java.util.HashMap; -import java.util.List; -import java.util.Map; +import java.util.*; + +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; import io.javaoperatorsdk.operator.health.EventSourceHealthIndicator; import io.javaoperatorsdk.operator.health.InformerWrappingEventSourceHealthIndicator; +/** + * RuntimeInfo in general is available when operator is fully started. You can use "isStarted" to + * check that. + */ @SuppressWarnings("rawtypes") public class RuntimeInfo { - private final List registeredControllers; + private static final Logger log = LoggerFactory.getLogger(RuntimeInfo.class); + + private final Set registeredControllers; + private final Operator operator; + + public RuntimeInfo(Operator operator) { + this.registeredControllers = operator.getRegisteredControllers(); + this.operator = operator; + } - public RuntimeInfo(List registeredControllers) { - this.registeredControllers = registeredControllers; + public boolean isStarted() { + return operator.isStarted(); } - public List getRegisteredControllers() { + public Set getRegisteredControllers() { + checkIfStarted(); return registeredControllers; } + private void checkIfStarted() { + if (!isStarted()) { + log.warn( + "Operator not started yet while accessing runtime info, this might lead to an unreliable behavior"); + } + } + public boolean allEventSourcesAreHealthy() { + checkIfStarted(); return registeredControllers.stream() .filter(rc -> !rc.getControllerHealthInfo().unhealthyEventSources().isEmpty()) .findFirst().isEmpty(); } + /** + * @return Aggregated Map with controller related event sources. + */ + public Map> unhealthyEventSources() { + checkIfStarted(); Map> res = new HashMap<>(); for (var rc : registeredControllers) { res.put(rc.getConfiguration().getName(), @@ -35,7 +62,15 @@ public Map> unhealthyEventSource return res; } + /** + * @return Aggregated Map with controller related event sources that wraps an informer. Thus, + * either a + * {@link io.javaoperatorsdk.operator.processing.event.source.controller.ControllerResourceEventSource} + * or an + * {@link io.javaoperatorsdk.operator.processing.event.source.informer.InformerEventSource}. + */ public Map> unhealthyInformerWrappingEventSourceHealthIndicator() { + checkIfStarted(); Map> res = new HashMap<>(); for (var rc : registeredControllers) { res.put(rc.getConfiguration().getName(), rc.getControllerHealthInfo() diff --git a/operator-framework-core/src/main/java/io/javaoperatorsdk/operator/health/ControllerHealthInfo.java b/operator-framework-core/src/main/java/io/javaoperatorsdk/operator/health/ControllerHealthInfo.java index 0f7fd65232..2adb3a8508 100644 --- a/operator-framework-core/src/main/java/io/javaoperatorsdk/operator/health/ControllerHealthInfo.java +++ b/operator-framework-core/src/main/java/io/javaoperatorsdk/operator/health/ControllerHealthInfo.java @@ -33,6 +33,12 @@ public Map informerEventSour } + /** + * @return Map with event sources that wraps an informer. Thus, either a + * {@link io.javaoperatorsdk.operator.processing.event.source.controller.ControllerResourceEventSource} + * or an + * {@link io.javaoperatorsdk.operator.processing.event.source.informer.InformerEventSource}. + */ public Map unhealthyInformerEventSourceHealthIndicators() { return eventSourceManager.allEventSources().entrySet().stream() .filter(e -> e.getValue().getStatus() == Status.UNHEALTHY) diff --git a/sample-operators/webpage/src/main/java/io/javaoperatorsdk/operator/sample/StartupHandler.java b/sample-operators/webpage/src/main/java/io/javaoperatorsdk/operator/sample/StartupHandler.java index 4a9f590586..0cbc313273 100644 --- a/sample-operators/webpage/src/main/java/io/javaoperatorsdk/operator/sample/StartupHandler.java +++ b/sample-operators/webpage/src/main/java/io/javaoperatorsdk/operator/sample/StartupHandler.java @@ -18,7 +18,7 @@ public StartupHandler(Operator operator) { @Override public void handle(HttpExchange httpExchange) throws IOException { - if (operator.isStarted()) { + if (operator.getRuntimeInfo().isStarted()) { sendMessage(httpExchange, 200, "started"); } else { sendMessage(httpExchange, 400, "not started yet"); From 3ca4ec192625337738e76fec11c6cd1490e85f3a Mon Sep 17 00:00:00 2001 From: csviri Date: Wed, 9 Nov 2022 11:00:48 +0100 Subject: [PATCH 8/9] docs --- docs/documentation/features.md | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/docs/documentation/features.md b/docs/documentation/features.md index aa32bcd8ef..52ca14dc94 100644 --- a/docs/documentation/features.md +++ b/docs/documentation/features.md @@ -705,7 +705,7 @@ See sample configuration in the [E2E test](https://github.com/java-operator-sdk/ is accessed. You can access mainly health information of event sources. Based on that liveness probes can be registered and a custom logic implemented, when the operator is considered live. Note that this can be used with combination of [stopOnInformerErrorDuringStartup](https://github.com/java-operator-sdk/java-operator-sdk/blob/main/operator-framework-core/src/main/java/io/javaoperatorsdk/operator/api/config/ConfigurationService.java#L168-L168) -setting. +setting, where this flag usually needs to be set to false, in order to control the exact liveness properties. See also an example implementation in the [WebPage sample](https://github.com/java-operator-sdk/java-operator-sdk/blob/3e2e7c4c834ef1c409d636156b988125744ca911/sample-operators/webpage/src/main/java/io/javaoperatorsdk/operator/sample/WebPageOperator.java#L38-L43) From a0cfe1f9f77b44185ff6899ed26046077dbbc04e Mon Sep 17 00:00:00 2001 From: csviri Date: Wed, 23 Nov 2022 09:02:57 +0100 Subject: [PATCH 9/9] docs improvements --- docs/documentation/features.md | 5 +++-- 1 file changed, 3 insertions(+), 2 deletions(-) diff --git a/docs/documentation/features.md b/docs/documentation/features.md index 52ca14dc94..b4cbd1f2fd 100644 --- a/docs/documentation/features.md +++ b/docs/documentation/features.md @@ -702,8 +702,9 @@ See sample configuration in the [E2E test](https://github.com/java-operator-sdk/ ## Runtime Info [RuntimeInfo](https://github.com/java-operator-sdk/java-operator-sdk/blob/main/operator-framework-core/src/main/java/io/javaoperatorsdk/operator/RuntimeInfo.java#L16-L16) -is accessed. You can access mainly health information of event sources. Based on that liveness probes can be registered -and a custom logic implemented, when the operator is considered live. Note that this can be used with combination of +is used mainly to check the actual health of event sources. Based on this information it is easy to implement custom +liveness probes. + [stopOnInformerErrorDuringStartup](https://github.com/java-operator-sdk/java-operator-sdk/blob/main/operator-framework-core/src/main/java/io/javaoperatorsdk/operator/api/config/ConfigurationService.java#L168-L168) setting, where this flag usually needs to be set to false, in order to control the exact liveness properties.