Skip to content

feat: bounded cache for informers #1718

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 27 commits into from
Feb 21, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
84 changes: 84 additions & 0 deletions caffeine-bounded-cache-support/pom.xml
Original file line number Diff line number Diff line change
@@ -0,0 +1,84 @@
<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
<parent>
<artifactId>java-operator-sdk</artifactId>
<groupId>io.javaoperatorsdk</groupId>
<version>4.3.0-SNAPSHOT</version>
</parent>
<modelVersion>4.0.0</modelVersion>

<artifactId>caffeine-bounded-cache-support</artifactId>
<name>Operator SDK - Caffeine Bounded Cache Support</name>

<properties>
<maven.compiler.source>11</maven.compiler.source>
<maven.compiler.target>11</maven.compiler.target>
</properties>

<dependencies>
<dependency>
<groupId>io.javaoperatorsdk</groupId>
<artifactId>operator-framework-core</artifactId>
</dependency>
<dependency>
<groupId>com.github.ben-manes.caffeine</groupId>
<artifactId>caffeine</artifactId>
</dependency>
<dependency>
<groupId>io.javaoperatorsdk</groupId>
<artifactId>operator-framework</artifactId>
<scope>test</scope>
</dependency>
<dependency>
<groupId>io.javaoperatorsdk</groupId>
<artifactId>operator-framework-junit-5</artifactId>
<version>${project.version}</version>
<scope>test</scope>
</dependency>
<dependency>
<groupId>io.fabric8</groupId>
<artifactId>crd-generator-apt</artifactId>
<scope>test</scope>
</dependency>
<dependency>
<groupId>org.apache.logging.log4j</groupId>
<artifactId>log4j-slf4j-impl</artifactId>
<scope>test</scope>
</dependency>
<dependency>
<groupId>org.apache.logging.log4j</groupId>
<artifactId>log4j-core</artifactId>
<version>${log4j.version}</version>
<type>test-jar</type>
<scope>test</scope>
</dependency>
</dependencies>

<build>
<plugins>
<plugin>
<artifactId>maven-compiler-plugin</artifactId>
<version>${maven-compiler-plugin.version}</version>
<executions>
<!-- During compilation we need to disable annotation processors (at least the ControllerConfigurationAnnotationProcessor).
However, this is needed to compile the tests so let's disable apt just for the compile phase -->
<execution>
<id>default-compile</id>
<phase>compile</phase>
<goals>
<goal>compile</goal>
</goals>
<configuration>
<compilerArgs>
<arg>-proc:none</arg>
</compilerArgs>
</configuration>
</execution>
</executions>
</plugin>
</plugins>
</build>

</project>
Original file line number Diff line number Diff line change
@@ -0,0 +1,32 @@
package io.javaoperatorsdk.operator.processing.event.source.cache;

import com.github.benmanes.caffeine.cache.Cache;

/**
* Caffein cache wrapper to be used in a {@link BoundedItemStore}
*/
public class CaffeineBoundedCache<K, R> implements BoundedCache<K, R> {

private Cache<K, R> cache;

public CaffeineBoundedCache(Cache<K, R> cache) {
this.cache = cache;
}

@Override
public R get(K key) {
return cache.getIfPresent(key);
}

@Override
public R remove(K key) {
var value = cache.getIfPresent(key);
cache.invalidate(key);
return value;
}

@Override
public void put(K key, R object) {
cache.put(key, object);
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,47 @@
package io.javaoperatorsdk.operator.processing.event.source.cache;

import java.time.Duration;

import io.fabric8.kubernetes.api.model.HasMetadata;
import io.fabric8.kubernetes.client.KubernetesClient;

import com.github.benmanes.caffeine.cache.Cache;
import com.github.benmanes.caffeine.cache.Caffeine;

/**
* The idea about CaffeinBoundedItemStore-s is that, caffeine will cache the resources which were
* recently used, and will evict resource, which are not used for a while. This is ideal from the
* perspective that on startup controllers reconcile all resources (this is why a maxSize not ideal)
* but after a while it can happen (well depending on the controller and domain) that only some
* resources are actually active, thus related events happen. So in case large amount of custom
* resources only the active once will remain in the cache. Note that if a resource is reconciled
* all the secondary resources are usually reconciled too, in that case all those resources are
* fetched and populated to the cache, and will remain there for some time, for a subsequent
* reconciliations.
*/
public class CaffeineBoundedItemStores {

private CaffeineBoundedItemStores() {}

/**
* @param client Kubernetes Client
* @param rClass resource class
* @param accessExpireDuration the duration after resources is evicted from cache if not accessed.
* @return the ItemStore implementation
* @param <R> resource type
*/
public static <R extends HasMetadata> BoundedItemStore<R> boundedItemStore(
KubernetesClient client, Class<R> rClass,
Duration accessExpireDuration) {
Cache<String, R> cache = Caffeine.newBuilder()
.expireAfterAccess(accessExpireDuration)
.build();
return boundedItemStore(client, rClass, cache);
}

public static <R extends HasMetadata> BoundedItemStore<R> boundedItemStore(
KubernetesClient client, Class<R> rClass, Cache<String, R> cache) {
return new BoundedItemStore<>(new CaffeineBoundedCache<>(cache), rClass, client);
}

}
Original file line number Diff line number Diff line change
@@ -0,0 +1,95 @@
package io.javaoperatorsdk.operator.processing.event.source.cache;

import java.time.Duration;
import java.util.stream.IntStream;

import org.junit.jupiter.api.Test;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import io.fabric8.kubernetes.api.model.ConfigMap;
import io.fabric8.kubernetes.client.CustomResource;
import io.javaoperatorsdk.operator.junit.LocallyRunOperatorExtension;
import io.javaoperatorsdk.operator.processing.event.source.cache.sample.namespacescope.BoundedCacheTestSpec;
import io.javaoperatorsdk.operator.processing.event.source.cache.sample.namespacescope.BoundedCacheTestStatus;

import static io.javaoperatorsdk.operator.processing.event.source.cache.sample.AbstractTestReconciler.DATA_KEY;
import static org.assertj.core.api.Assertions.assertThat;
import static org.awaitility.Awaitility.await;

public abstract class BoundedCacheTestBase<P extends CustomResource<BoundedCacheTestSpec, BoundedCacheTestStatus>> {

private static final Logger log = LoggerFactory.getLogger(BoundedCacheTestBase.class);

public static final int NUMBER_OF_RESOURCE_TO_TEST = 3;
public static final String RESOURCE_NAME_PREFIX = "test-";
public static final String INITIAL_DATA_PREFIX = "data-";
public static final String UPDATED_PREFIX = "updatedPrefix";

@Test
void reconciliationWorksWithLimitedCache() {
createTestResources();

assertConfigMapData(INITIAL_DATA_PREFIX);

updateTestResources();

assertConfigMapData(UPDATED_PREFIX);

deleteTestResources();

assertConfigMapsDeleted();
}

private void assertConfigMapsDeleted() {
await().atMost(Duration.ofSeconds(30))
.untilAsserted(() -> IntStream.range(0, NUMBER_OF_RESOURCE_TO_TEST).forEach(i -> {
var cm = extension().get(ConfigMap.class, RESOURCE_NAME_PREFIX + i);
assertThat(cm).isNull();
}));
}

private void deleteTestResources() {
IntStream.range(0, NUMBER_OF_RESOURCE_TO_TEST).forEach(i -> {
var cm = extension().get(customResourceClass(), RESOURCE_NAME_PREFIX + i);
var deleted = extension().delete(cm);
if (!deleted) {
log.warn("Custom resource might not be deleted: {}", cm);
}
});
}

private void updateTestResources() {
IntStream.range(0, NUMBER_OF_RESOURCE_TO_TEST).forEach(i -> {
var cm = extension().get(ConfigMap.class, RESOURCE_NAME_PREFIX + i);
cm.getData().put(DATA_KEY, UPDATED_PREFIX + i);
extension().replace(cm);
});
}

void assertConfigMapData(String dataPrefix) {
await().untilAsserted(() -> IntStream.range(0, NUMBER_OF_RESOURCE_TO_TEST)
.forEach(i -> assertConfigMap(i, dataPrefix)));
}

private void assertConfigMap(int i, String prefix) {
var cm = extension().get(ConfigMap.class, RESOURCE_NAME_PREFIX + i);
assertThat(cm).isNotNull();
assertThat(cm.getData().get(DATA_KEY)).isEqualTo(prefix + i);
}

private void createTestResources() {
IntStream.range(0, NUMBER_OF_RESOURCE_TO_TEST).forEach(i -> {
extension().create(createTestResource(i));
});
}

abstract P createTestResource(int index);

abstract Class<P> customResourceClass();

abstract LocallyRunOperatorExtension extension();



}
Original file line number Diff line number Diff line change
@@ -0,0 +1,52 @@
package io.javaoperatorsdk.operator.processing.event.source.cache;

import java.time.Duration;

import org.junit.jupiter.api.extension.RegisterExtension;

import io.fabric8.kubernetes.api.model.ObjectMetaBuilder;
import io.fabric8.kubernetes.client.KubernetesClientBuilder;
import io.javaoperatorsdk.operator.junit.LocallyRunOperatorExtension;
import io.javaoperatorsdk.operator.processing.event.source.cache.sample.clusterscope.BoundedCacheClusterScopeTestCustomResource;
import io.javaoperatorsdk.operator.processing.event.source.cache.sample.clusterscope.BoundedCacheClusterScopeTestReconciler;
import io.javaoperatorsdk.operator.processing.event.source.cache.sample.namespacescope.BoundedCacheTestSpec;

import static io.javaoperatorsdk.operator.processing.event.source.cache.sample.AbstractTestReconciler.boundedItemStore;

public class CaffeineBoundedCacheClusterScopeIT
extends BoundedCacheTestBase<BoundedCacheClusterScopeTestCustomResource> {

@RegisterExtension
LocallyRunOperatorExtension extension =
LocallyRunOperatorExtension.builder()
.withReconciler(new BoundedCacheClusterScopeTestReconciler(), o -> {
o.withItemStore(boundedItemStore(
new KubernetesClientBuilder().build(),
BoundedCacheClusterScopeTestCustomResource.class,
Duration.ofMinutes(1),
1));
})
.build();

@Override
BoundedCacheClusterScopeTestCustomResource createTestResource(int index) {
var res = new BoundedCacheClusterScopeTestCustomResource();
res.setMetadata(new ObjectMetaBuilder()
.withName(RESOURCE_NAME_PREFIX + index)
.build());
res.setSpec(new BoundedCacheTestSpec());
res.getSpec().setData(INITIAL_DATA_PREFIX + index);
res.getSpec().setTargetNamespace(extension.getNamespace());
return res;
}

@Override
Class<BoundedCacheClusterScopeTestCustomResource> customResourceClass() {
return BoundedCacheClusterScopeTestCustomResource.class;
}

@Override
LocallyRunOperatorExtension extension() {
return extension;
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,50 @@
package io.javaoperatorsdk.operator.processing.event.source.cache;

import java.time.Duration;

import org.junit.jupiter.api.extension.RegisterExtension;

import io.fabric8.kubernetes.api.model.ObjectMetaBuilder;
import io.fabric8.kubernetes.client.KubernetesClientBuilder;
import io.javaoperatorsdk.operator.junit.LocallyRunOperatorExtension;
import io.javaoperatorsdk.operator.processing.event.source.cache.sample.namespacescope.BoundedCacheTestCustomResource;
import io.javaoperatorsdk.operator.processing.event.source.cache.sample.namespacescope.BoundedCacheTestReconciler;
import io.javaoperatorsdk.operator.processing.event.source.cache.sample.namespacescope.BoundedCacheTestSpec;

import static io.javaoperatorsdk.operator.processing.event.source.cache.sample.AbstractTestReconciler.boundedItemStore;

class CaffeineBoundedCacheNamespacedIT
extends BoundedCacheTestBase<BoundedCacheTestCustomResource> {

@RegisterExtension
LocallyRunOperatorExtension extension =
LocallyRunOperatorExtension.builder().withReconciler(new BoundedCacheTestReconciler(), o -> {
o.withItemStore(boundedItemStore(
new KubernetesClientBuilder().build(), BoundedCacheTestCustomResource.class,
Duration.ofMinutes(1),
1));
})
.build();

BoundedCacheTestCustomResource createTestResource(int index) {
var res = new BoundedCacheTestCustomResource();
res.setMetadata(new ObjectMetaBuilder()
.withName(RESOURCE_NAME_PREFIX + index)
.build());
res.setSpec(new BoundedCacheTestSpec());
res.getSpec().setData(INITIAL_DATA_PREFIX + index);
res.getSpec().setTargetNamespace(extension.getNamespace());
return res;
}

@Override
Class<BoundedCacheTestCustomResource> customResourceClass() {
return BoundedCacheTestCustomResource.class;
}

@Override
LocallyRunOperatorExtension extension() {
return extension;
}

}
Loading