Skip to content

feat: feat controller queue size, execution thread count #1649

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 5 commits into from
Dec 9, 2022
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -5,28 +5,56 @@
import java.util.List;
import java.util.Map;
import java.util.Optional;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.atomic.AtomicInteger;

import io.fabric8.kubernetes.api.model.HasMetadata;
import io.javaoperatorsdk.operator.OperatorException;
import io.javaoperatorsdk.operator.api.monitoring.Metrics;
import io.javaoperatorsdk.operator.api.reconciler.Constants;
import io.javaoperatorsdk.operator.api.reconciler.RetryInfo;
import io.javaoperatorsdk.operator.processing.Controller;
import io.javaoperatorsdk.operator.processing.GroupVersionKind;
import io.javaoperatorsdk.operator.processing.event.Event;
import io.javaoperatorsdk.operator.processing.event.ResourceID;
import io.micrometer.core.instrument.MeterRegistry;
import io.micrometer.core.instrument.Tag;
import io.micrometer.core.instrument.Timer;

import static io.javaoperatorsdk.operator.api.reconciler.Constants.CONTROLLER_NAME;

public class MicrometerMetrics implements Metrics {

private static final String PREFIX = "operator.sdk.";
private static final String RECONCILIATIONS = "reconciliations.";
private static final String RECONCILIATIONS_EXECUTIONS = PREFIX + RECONCILIATIONS + "executions.";
private static final String RECONCILIATIONS_QUEUE_SIZE = PREFIX + RECONCILIATIONS + "queue.size.";
private final MeterRegistry registry;
private final Map<String, AtomicInteger> gauges = new ConcurrentHashMap<>();

public MicrometerMetrics(MeterRegistry registry) {
this.registry = registry;
}

@Override
public void controllerRegistered(Controller<?> controller) {
String executingThreadsName =
RECONCILIATIONS_EXECUTIONS + controller.getConfiguration().getName();
AtomicInteger executingThreads =
registry.gauge(executingThreadsName,
gvkTags(controller.getConfiguration().getResourceClass()),
new AtomicInteger(0));
gauges.put(executingThreadsName, executingThreads);

String controllerQueueName =
RECONCILIATIONS_QUEUE_SIZE + controller.getConfiguration().getName();
AtomicInteger controllerQueueSize =
registry.gauge(controllerQueueName,
gvkTags(controller.getConfiguration().getResourceClass()),
new AtomicInteger(0));
gauges.put(controllerQueueName, controllerQueueSize);
}

public <T> T timeControllerExecution(ControllerExecution<T> execution) {
final var name = execution.controllerName();
final var execName = PREFIX + "controllers.execution." + execution.name();
Expand Down Expand Up @@ -94,13 +122,35 @@ public void reconcileCustomResource(HasMetadata resource, RetryInfo retryInfoNul
"" + retryInfo.map(RetryInfo::getAttemptCount).orElse(0),
RECONCILIATIONS + "retries.last",
"" + retryInfo.map(RetryInfo::isLastAttempt).orElse(true));

AtomicInteger controllerQueueSize =
gauges.get(RECONCILIATIONS_QUEUE_SIZE + metadata.get(CONTROLLER_NAME));
controllerQueueSize.incrementAndGet();
}

@Override
public void finishedReconciliation(HasMetadata resource, Map<String, Object> metadata) {
incrementCounter(ResourceID.fromResource(resource), RECONCILIATIONS + "success", metadata);
}

@Override
public void reconciliationExecutionStarted(HasMetadata resource, Map<String, Object> metadata) {
AtomicInteger reconcilerExecutions =
gauges.get(RECONCILIATIONS_EXECUTIONS + metadata.get(CONTROLLER_NAME));
reconcilerExecutions.incrementAndGet();
}

@Override
public void reconciliationExecutionFinished(HasMetadata resource, Map<String, Object> metadata) {
AtomicInteger reconcilerExecutions =
gauges.get(RECONCILIATIONS_EXECUTIONS + metadata.get(CONTROLLER_NAME));
reconcilerExecutions.decrementAndGet();

AtomicInteger controllerQueueSize =
gauges.get(RECONCILIATIONS_QUEUE_SIZE + metadata.get(CONTROLLER_NAME));
controllerQueueSize.decrementAndGet();
}

public void failedReconciliation(HasMetadata resource, Exception exception,
Map<String, Object> metadata) {
var cause = exception.getCause();
Expand All @@ -118,6 +168,12 @@ public void failedReconciliation(HasMetadata resource, Exception exception,
return registry.gaugeMapSize(PREFIX + name + ".size", Collections.emptyList(), map);
}

public static List<Tag> gvkTags(Class<? extends HasMetadata> resourceClass) {
final var gvk = GroupVersionKind.gvkFor(resourceClass);
return List.of(Tag.of("group", gvk.group), Tag.of("version", gvk.version),
Tag.of("kind", gvk.kind));
}

private void incrementCounter(ResourceID id, String counterName, Map<String, Object> metadata,
String... additionalTags) {
final var additionalTagsNb =
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@
import io.fabric8.kubernetes.api.model.HasMetadata;
import io.javaoperatorsdk.operator.api.reconciler.Context;
import io.javaoperatorsdk.operator.api.reconciler.RetryInfo;
import io.javaoperatorsdk.operator.processing.Controller;
import io.javaoperatorsdk.operator.processing.event.Event;
import io.javaoperatorsdk.operator.processing.event.ResourceID;

Expand All @@ -20,6 +21,11 @@ public interface Metrics {
*/
Metrics NOOP = new Metrics() {};

/**
* Do initialization if necessary;
*/
default void controllerRegistered(Controller<?> controller) {}

/**
* Called when an event has been accepted by the SDK from an event source, which would result in
* potentially triggering the associated Reconciler.
Expand Down Expand Up @@ -63,6 +69,12 @@ default void failedReconciliation(HasMetadata resource, Exception exception,
failedReconciliation(ResourceID.fromResource(resource), exception, metadata);
}


default void reconciliationExecutionStarted(HasMetadata resource, Map<String, Object> metadata) {}

default void reconciliationExecutionFinished(HasMetadata resource,
Map<String, Object> metadata) {}

/**
*
* @deprecated Use (and implement) {@link #cleanupDoneFor(ResourceID, Map)} instead
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@ public final class Constants {
public static final String SAME_AS_CONTROLLER = "JOSDK_SAME_AS_CONTROLLER";

public static final String RESOURCE_GVK_KEY = "josdk.resource.gvk";
public static final String CONTROLLER_NAME = "controller.name";

private Constants() {}
}
Original file line number Diff line number Diff line change
Expand Up @@ -90,10 +90,10 @@ public Controller(Reconciler<P> reconciler,
eventProcessor = new EventProcessor<>(eventSourceManager);
eventSourceManager.postProcessDefaultEventSourcesAfterProcessorInitializer();
controllerHealthInfo = new ControllerHealthInfo(eventSourceManager);

final var context = new EventSourceContext<>(
eventSourceManager.getControllerResourceEventSource(), configuration, kubernetesClient);
initAndRegisterEventSources(context);
ConfigurationServiceProvider.instance().getMetrics().controllerRegistered(this);
}

@Override
Expand Down
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
package io.javaoperatorsdk.operator.processing.event;

import java.time.Duration;
import java.util.Collections;
import java.util.HashMap;
import java.util.Map;
import java.util.Optional;
import java.util.concurrent.ExecutorService;
Expand All @@ -18,7 +18,6 @@
import io.javaoperatorsdk.operator.api.config.ExecutorServiceManager;
import io.javaoperatorsdk.operator.api.monitoring.Metrics;
import io.javaoperatorsdk.operator.api.reconciler.Constants;
import io.javaoperatorsdk.operator.processing.Controller;
import io.javaoperatorsdk.operator.processing.LifecycleAware;
import io.javaoperatorsdk.operator.processing.MDCUtils;
import io.javaoperatorsdk.operator.processing.event.rate.RateLimiter;
Expand Down Expand Up @@ -98,9 +97,10 @@ private EventProcessor(
this.rateLimiter = controllerConfiguration.getRateLimiter();

metricsMetadata = Optional.ofNullable(eventSourceManager.getController())
.map(Controller::getAssociatedGroupVersionKind)
.map(gvk -> Map.of(Constants.RESOURCE_GVK_KEY, (Object) gvk))
.orElse(Collections.emptyMap());
.map(c -> Map.of(
Constants.RESOURCE_GVK_KEY, c.getAssociatedGroupVersionKind(),
Constants.CONTROLLER_NAME, controllerConfiguration.getName()))
.orElse(new HashMap<>());
}

@Override
Expand Down Expand Up @@ -400,11 +400,13 @@ public void run() {
final var name = thread.getName();
try {
MDCUtils.addResourceInfo(executionScope.getResource());
metrics.reconciliationExecutionStarted(executionScope.getResource(), metricsMetadata);
thread.setName("ReconcilerExecutor-" + controllerName() + "-" + thread.getId());
PostExecutionControl<P> postExecutionControl =
reconciliationDispatcher.handleExecution(executionScope);
eventProcessingFinished(executionScope, postExecutionControl);
} finally {
metrics.reconciliationExecutionFinished(executionScope.getResource(), metricsMetadata);
// restore original name
thread.setName(name);
MDCUtils.removeResourceInfo();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@ public static <R extends HasMetadata> ControllerConfiguration<R> forResource(
when(configuration.getResourceClass()).thenReturn(resourceType);
when(configuration.getNamespaces()).thenReturn(DEFAULT_NAMESPACES_SET);
when(configuration.getEffectiveNamespaces()).thenCallRealMethod();
when(configuration.getName()).thenReturn(resourceType.getSimpleName());
return configuration;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -135,7 +135,7 @@ public ControllerConfig(String finalizer, boolean generationAware,
ResourceEventFilter<T> eventFilter, Class<T> customResourceClass) {
super(
null,
null,
"testController",
null,
finalizer,
generationAware,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -178,7 +178,7 @@ public TestConfiguration(boolean generationAware, OnAddFilter<TestCustomResource
GenericFilter<TestCustomResource> genericFilter) {
super(
null,
null,
"testController",
null,
FINALIZER,
generationAware,
Expand Down