Skip to content

feat: remove CR meters when they are deleted (after a delay) #1805

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 12 commits into from
Mar 15, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
45 changes: 45 additions & 0 deletions docs/documentation/features.md
Original file line number Diff line number Diff line change
Expand Up @@ -757,6 +757,51 @@ with a `mycrs` plural form will result in 2 files:
> Quarkus users using the `quarkus-operator-sdk` extension do not need to add any extra dependency
> to get their CRD generated as this is handled by the extension itself.

## Metrics

JOSDK provides built-in support for metrics reporting on what is happening with your reconcilers in the form of
the `Metrics` interface which can be implemented to connect to your metrics provider of choice, JOSDK calling the
methods as it goes about reconciling resources. By default, a no-operation implementation is provided thus providing a
no-cost sane default. A [micrometer](https://micrometer.io)-based implementation is also provided.

You can use a different implementation by overriding the default one provided by the default `ConfigurationService`, as
follows:

```java
Metrics metrics= …;
ConfigurationServiceProvider.overrideCurrent(overrider->overrider.withMetrics(metrics));
```

### Micrometer implementation

The micrometer implementation records a lot of metrics associated to each resource handled by the operator by default.
In order to be efficient, the implementation removes meters associated with resources when they are deleted. Since it
might be useful to keep these metrics around for a bit before they are deleted, it is possible to configure a delay
before their removal. As this is done asynchronously, it is also possible to configure how many threads you want to
devote to these operations. Both aspects are controlled by the `MicrometerMetrics` constructor so changing the defaults
is a matter of instantiating `MicrometerMetrics` with the desired values and tell `ConfigurationServiceProvider` about
it as shown above.

The micrometer implementation records the following metrics:

| Meter name | Type | Tags | Description |
|-----------------------------------------------------------|----------------|------------------------------------------------------------------------------------------------------------|--------------------------------------------------------------------------------------------------------|
| operator.sdk.reconciliations.executions.<reconciler name> | gauge | group, version, kind | Number of executions of the named reconciler |
| operator.sdk.reconciliations.queue.size.<reconciler name> | gauge | group, version, kind | How many resources are queued to get reconciled by named reconciler |
| operator.sdk.<map name>.size | gauge map size | | Gauge tracking the size of a specified map (currently unused but could be used to monitor caches size) |
| operator.sdk.events.received | counter | group, version, kind, name, namespace, scope, event, action | Number of received Kubernetes events |
| operator.sdk.events.delete | counter | group, version, kind, name, namespace, scope | Number of received Kubernetes delete events |
| operator.sdk.reconciliations.started | counter | group, version, kind, name, namespace, scope, reconciliations.retries.last, reconciliations.retries.number | Number of started reconciliations per resource type |
| operator.sdk.reconciliations.failed | counter | group, version, kind, name, namespace, scope, exception | Number of failed reconciliations per resource type |
| operator.sdk.reconciliations.success | counter | group, version, kind, name, namespace, scope | Number of successful reconciliations per resource type |
| operator.sdk.controllers.execution.reconcile.success | counter | controller, type | Number of successful reconciliations per controller |
| operator.sdk.controllers.execution.reconcile.failure | counter | controller, exception | Number of failed reconciliations per controller |
| operator.sdk.controllers.execution.cleanup.success | counter | controller, type | Number of successful cleanups per controller |
| operator.sdk.controllers.execution.cleanup.failure | counter | controller, exception | Number of failed cleanups per controller |

As you can see all the recorded metrics start with the `operator.sdk` prefix.


## Optimizing Caches

One of the ideas around the operator pattern is that all the relevant resources are cached, thus reconciliation is
Expand Down
31 changes: 31 additions & 0 deletions micrometer-support/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,37 @@
<groupId>io.javaoperatorsdk</groupId>
<artifactId>operator-framework-core</artifactId>
</dependency>
<dependency>
<groupId>org.junit.jupiter</groupId>
<artifactId>junit-jupiter-api</artifactId>
<scope>test</scope>
</dependency>
<dependency>
<groupId>org.junit.jupiter</groupId>
<artifactId>junit-jupiter-engine</artifactId>
<scope>test</scope>
</dependency>
<dependency>
<groupId>org.assertj</groupId>
<artifactId>assertj-core</artifactId>
<scope>test</scope>
</dependency>
<dependency>
<groupId>org.awaitility</groupId>
<artifactId>awaitility</artifactId>
<scope>test</scope>
</dependency>
<dependency>
<groupId>io.javaoperatorsdk</groupId>
<artifactId>operator-framework-junit-5</artifactId>
<version>${project.version}</version>
<scope>test</scope>
</dependency>
<dependency>
<groupId>io.fabric8</groupId>
<artifactId>kubernetes-httpclient-vertx</artifactId>
<scope>test</scope>
</dependency>
</dependencies>

</project>
Original file line number Diff line number Diff line change
@@ -1,11 +1,10 @@
package io.javaoperatorsdk.operator.monitoring.micrometer;

import java.util.ArrayList;
import java.util.Collections;
import java.util.List;
import java.util.Map;
import java.util.Optional;
import java.util.*;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.Executors;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger;

import io.fabric8.kubernetes.api.model.HasMetadata;
Expand All @@ -17,6 +16,8 @@
import io.javaoperatorsdk.operator.processing.GroupVersionKind;
import io.javaoperatorsdk.operator.processing.event.Event;
import io.javaoperatorsdk.operator.processing.event.ResourceID;
import io.javaoperatorsdk.operator.processing.event.source.controller.ResourceEvent;
import io.micrometer.core.instrument.Meter;
import io.micrometer.core.instrument.MeterRegistry;
import io.micrometer.core.instrument.Tag;
import io.micrometer.core.instrument.Timer;
Expand All @@ -31,9 +32,53 @@ public class MicrometerMetrics implements Metrics {
private static final String RECONCILIATIONS_QUEUE_SIZE = PREFIX + RECONCILIATIONS + "queue.size.";
private final MeterRegistry registry;
private final Map<String, AtomicInteger> gauges = new ConcurrentHashMap<>();
private final Map<ResourceID, Set<Meter.Id>> metersPerResource = new ConcurrentHashMap<>();
private final Cleaner cleaner;

/**
* Creates a non-delayed, micrometer-based Metrics implementation. The non-delayed part refers to
* the cleaning of meters associated with deleted resources.
*
* @param registry the {@link MeterRegistry} instance to use for metrics recording
*/
public MicrometerMetrics(MeterRegistry registry) {
this(registry, 0);
}

/**
* Creates a micrometer-based Metrics implementation that delays cleaning up {@link Meter}s
* associated with deleted resources by the specified amount of seconds, using a single thread for
* that process.
*
* @param registry the {@link MeterRegistry} instance to use for metrics recording
* @param cleanUpDelayInSeconds the number of seconds to wait before meters are removed for
* deleted resources
*/
public MicrometerMetrics(MeterRegistry registry, int cleanUpDelayInSeconds) {
this(registry, cleanUpDelayInSeconds, 1);
}

/**
* Creates a micrometer-based Metrics implementation that delays cleaning up {@link Meter}s
* associated with deleted resources by the specified amount of seconds, using the specified
* (maximally) number of threads for that process.
*
* @param registry the {@link MeterRegistry} instance to use for metrics recording
* @param cleanUpDelayInSeconds the number of seconds to wait before meters are removed for
* deleted resources
* @param cleaningThreadsNumber the number of threads to use for the cleaning process
*/
public MicrometerMetrics(MeterRegistry registry, int cleanUpDelayInSeconds,
int cleaningThreadsNumber) {
this.registry = registry;
if (cleanUpDelayInSeconds < 0) {
cleaner = new NoDelayCleaner();
} else {
cleaningThreadsNumber =
cleaningThreadsNumber <= 0 ? Runtime.getRuntime().availableProcessors()
: cleaningThreadsNumber;
cleaner = new DelayedCleaner(cleanUpDelayInSeconds, cleaningThreadsNumber);
}
}

@Override
Expand Down Expand Up @@ -108,14 +153,24 @@ private static String getScope(ResourceID resourceID) {

@Override
public void receivedEvent(Event event, Map<String, Object> metadata) {
final String[] tags;
if (event instanceof ResourceEvent) {
tags = new String[] {"event", event.getClass().getSimpleName(), "action",
((ResourceEvent) event).getAction().toString()};
} else {
tags = new String[] {"event", event.getClass().getSimpleName()};
}

incrementCounter(event.getRelatedCustomResourceID(), "events.received",
metadata,
"event", event.getClass().getSimpleName());
tags);
}

@Override
public void cleanupDoneFor(ResourceID resourceID, Map<String, Object> metadata) {
incrementCounter(resourceID, "events.delete", metadata);

cleaner.removeMetersFor(resourceID);
}

@Override
Expand All @@ -125,11 +180,11 @@ public void reconcileCustomResource(HasMetadata resource, RetryInfo retryInfoNul
incrementCounter(ResourceID.fromResource(resource), RECONCILIATIONS + "started",
metadata,
RECONCILIATIONS + "retries.number",
"" + retryInfo.map(RetryInfo::getAttemptCount).orElse(0),
String.valueOf(retryInfo.map(RetryInfo::getAttemptCount).orElse(0)),
RECONCILIATIONS + "retries.last",
"" + retryInfo.map(RetryInfo::isLastAttempt).orElse(true));
String.valueOf(retryInfo.map(RetryInfo::isLastAttempt).orElse(true)));

AtomicInteger controllerQueueSize =
var controllerQueueSize =
gauges.get(RECONCILIATIONS_QUEUE_SIZE + metadata.get(CONTROLLER_NAME));
controllerQueueSize.incrementAndGet();
}
Expand All @@ -141,18 +196,18 @@ public void finishedReconciliation(HasMetadata resource, Map<String, Object> met

@Override
public void reconciliationExecutionStarted(HasMetadata resource, Map<String, Object> metadata) {
AtomicInteger reconcilerExecutions =
var reconcilerExecutions =
gauges.get(RECONCILIATIONS_EXECUTIONS + metadata.get(CONTROLLER_NAME));
reconcilerExecutions.incrementAndGet();
}

@Override
public void reconciliationExecutionFinished(HasMetadata resource, Map<String, Object> metadata) {
AtomicInteger reconcilerExecutions =
var reconcilerExecutions =
gauges.get(RECONCILIATIONS_EXECUTIONS + metadata.get(CONTROLLER_NAME));
reconcilerExecutions.decrementAndGet();

AtomicInteger controllerQueueSize =
var controllerQueueSize =
gauges.get(RECONCILIATIONS_QUEUE_SIZE + metadata.get(CONTROLLER_NAME));
controllerQueueSize.decrementAndGet();
}
Expand Down Expand Up @@ -202,6 +257,50 @@ private void incrementCounter(ResourceID id, String counterName, Map<String, Obj
"version", gvk.version,
"kind", gvk.kind));
}
registry.counter(PREFIX + counterName, tags.toArray(new String[0])).increment();
final var counter = registry.counter(PREFIX + counterName, tags.toArray(new String[0]));
metersPerResource.computeIfAbsent(id, resourceID -> new HashSet<>()).add(counter.getId());
counter.increment();
}

protected Set<Meter.Id> recordedMeterIdsFor(ResourceID resourceID) {
return metersPerResource.get(resourceID);
}

private interface Cleaner {
void removeMetersFor(ResourceID resourceID);
}

private void removeMetersFor(ResourceID resourceID) {
// remove each meter
final var toClean = metersPerResource.get(resourceID);
if (toClean != null) {
toClean.forEach(registry::remove);
}
// then clean-up local recording of associations
metersPerResource.remove(resourceID);
}

private class NoDelayCleaner implements Cleaner {
@Override
public void removeMetersFor(ResourceID resourceID) {
MicrometerMetrics.this.removeMetersFor(resourceID);
}
}

private class DelayedCleaner implements Cleaner {
private final ScheduledExecutorService metersCleaner;
private final int cleanUpDelayInSeconds;

private DelayedCleaner(int cleanUpDelayInSeconds, int cleaningThreadsNumber) {
this.cleanUpDelayInSeconds = cleanUpDelayInSeconds;
this.metersCleaner = Executors.newScheduledThreadPool(cleaningThreadsNumber);
}

@Override
public void removeMetersFor(ResourceID resourceID) {
// schedule deletion of meters associated with ResourceID
metersCleaner.schedule(() -> MicrometerMetrics.this.removeMetersFor(resourceID),
cleanUpDelayInSeconds, TimeUnit.SECONDS);
}
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,97 @@
package io.javaoperatorsdk.operator.monitoring.micrometer;

import java.time.Duration;
import java.util.HashSet;
import java.util.Set;

import org.junit.jupiter.api.AfterAll;
import org.junit.jupiter.api.BeforeAll;
import org.junit.jupiter.api.Test;
import org.junit.jupiter.api.extension.RegisterExtension;

import io.fabric8.kubernetes.api.model.ConfigMap;
import io.fabric8.kubernetes.api.model.ConfigMapBuilder;
import io.javaoperatorsdk.operator.api.config.ConfigurationServiceProvider;
import io.javaoperatorsdk.operator.api.reconciler.*;
import io.javaoperatorsdk.operator.junit.LocallyRunOperatorExtension;
import io.javaoperatorsdk.operator.processing.event.ResourceID;
import io.micrometer.core.instrument.Meter;
import io.micrometer.core.instrument.simple.SimpleMeterRegistry;

import static org.assertj.core.api.Assertions.assertThat;
import static org.awaitility.Awaitility.await;

public class MetricsCleaningOnDeleteIT {
@RegisterExtension
static LocallyRunOperatorExtension operator =
LocallyRunOperatorExtension.builder().withReconciler(new MetricsCleaningTestReconciler())
.build();

private static final TestSimpleMeterRegistry registry = new TestSimpleMeterRegistry();
private static final int testDelay = 1;
private static final MicrometerMetrics metrics = new MicrometerMetrics(registry, testDelay, 2);
private static final String testResourceName = "cleaning-metrics-cr";

@BeforeAll
static void setup() {
ConfigurationServiceProvider.overrideCurrent(overrider -> overrider.withMetrics(metrics));
}

@AfterAll
static void reset() {
ConfigurationServiceProvider.reset();
}

@Test
void removesMetersAssociatedWithResourceAfterItsDeletion() throws InterruptedException {
var testResource = new ConfigMapBuilder()
.withNewMetadata()
.withName(testResourceName)
.endMetadata()
.build();
final var created = operator.create(testResource);

// make sure the resource is created
await().until(() -> !operator.get(ConfigMap.class, testResourceName)
.getMetadata().getFinalizers().isEmpty());

// check that we properly recorded meters associated with the resource
final var meters = metrics.recordedMeterIdsFor(ResourceID.fromResource(created));
assertThat(meters).isNotNull();
assertThat(meters).isNotEmpty();

// delete the resource and wait for it to be deleted
operator.delete(testResource);
await().until(() -> operator.get(ConfigMap.class, testResourceName) == null);

// check that the meters are properly removed after the specified delay
Thread.sleep(Duration.ofSeconds(testDelay).toMillis());
assertThat(registry.removed).isEqualTo(meters);
assertThat(metrics.recordedMeterIdsFor(ResourceID.fromResource(created))).isNull();
}

@ControllerConfiguration
private static class MetricsCleaningTestReconciler
implements Reconciler<ConfigMap>, Cleaner<ConfigMap> {
@Override
public UpdateControl<ConfigMap> reconcile(ConfigMap resource, Context<ConfigMap> context) {
return UpdateControl.noUpdate();
}

@Override
public DeleteControl cleanup(ConfigMap resource, Context<ConfigMap> context) {
return DeleteControl.defaultDelete();
}
}

private static class TestSimpleMeterRegistry extends SimpleMeterRegistry {
private final Set<Meter.Id> removed = new HashSet<>();

@Override
public Meter remove(Meter.Id mappedId) {
final var removed = super.remove(mappedId);
this.removed.add(removed.getId());
return removed;
}
}
}