Skip to content

Commit 60af884

Browse files
committed
wip
1 parent 600f672 commit 60af884

File tree

10 files changed

+351
-8
lines changed

10 files changed

+351
-8
lines changed

caffein-bounded-cache-support/pom.xml

Lines changed: 72 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,72 @@
1+
<?xml version="1.0" encoding="UTF-8"?>
2+
<project xmlns="http://maven.apache.org/POM/4.0.0"
3+
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
4+
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
5+
<parent>
6+
<artifactId>java-operator-sdk</artifactId>
7+
<groupId>io.javaoperatorsdk</groupId>
8+
<version>4.3.0-SNAPSHOT</version>
9+
</parent>
10+
<modelVersion>4.0.0</modelVersion>
11+
12+
<artifactId>caffein-bounded-cache-support</artifactId>
13+
<name>Operator SDK - Caffein Bounded Cache Support</name>
14+
15+
<properties>
16+
<maven.compiler.source>11</maven.compiler.source>
17+
<maven.compiler.target>11</maven.compiler.target>
18+
</properties>
19+
20+
<dependencies>
21+
<dependency>
22+
<groupId>io.javaoperatorsdk</groupId>
23+
<artifactId>operator-framework-core</artifactId>
24+
</dependency>
25+
<dependency>
26+
<groupId>com.github.ben-manes.caffeine</groupId>
27+
<artifactId>caffeine</artifactId>
28+
</dependency>
29+
<dependency>
30+
<groupId>io.javaoperatorsdk</groupId>
31+
<artifactId>operator-framework</artifactId>
32+
<scope>test</scope>
33+
</dependency>
34+
<dependency>
35+
<groupId>io.javaoperatorsdk</groupId>
36+
<artifactId>operator-framework-junit-5</artifactId>
37+
<version>${project.version}</version>
38+
<scope>test</scope>
39+
</dependency>
40+
<dependency>
41+
<groupId>io.fabric8</groupId>
42+
<artifactId>crd-generator-apt</artifactId>
43+
<scope>test</scope>
44+
</dependency>
45+
</dependencies>
46+
47+
<build>
48+
<plugins>
49+
<plugin>
50+
<artifactId>maven-compiler-plugin</artifactId>
51+
<version>${maven-compiler-plugin.version}</version>
52+
<executions>
53+
<!-- During compilation we need to disable annotation processors (at least the ControllerConfigurationAnnotationProcessor).
54+
However, this is needed to compile the tests so let's disable apt just for the compile phase -->
55+
<execution>
56+
<id>default-compile</id>
57+
<phase>compile</phase>
58+
<goals>
59+
<goal>compile</goal>
60+
</goals>
61+
<configuration>
62+
<compilerArgs>
63+
<arg>-proc:none</arg>
64+
</compilerArgs>
65+
</configuration>
66+
</execution>
67+
</executions>
68+
</plugin>
69+
</plugins>
70+
</build>
71+
72+
</project>
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,43 @@
1+
package io.javaoperatorsdk.operator.processing.event.source.cache;
2+
3+
import java.util.HashSet;
4+
import java.util.Set;
5+
6+
import com.github.benmanes.caffeine.cache.Cache;
7+
8+
public class CaffeinBoundedCache<K, R> implements BoundedCache<K, R> {
9+
10+
private Cache<K, R> cache;
11+
12+
public CaffeinBoundedCache(Cache<K, R> cache) {
13+
this.cache = cache;
14+
}
15+
16+
@Override
17+
public R get(K key) {
18+
return cache.getIfPresent(key);
19+
}
20+
21+
@Override
22+
public R remove(K key) {
23+
var value = cache.getIfPresent(key);
24+
cache.invalidate(key);
25+
return value;
26+
}
27+
28+
@Override
29+
public R put(K key, R object) {
30+
cache.put(key, object);
31+
return object;
32+
}
33+
34+
@Override
35+
public Set<K> keys() {
36+
return cache.asMap().keySet();
37+
}
38+
39+
@Override
40+
public Set<R> values() {
41+
return new HashSet<>(cache.asMap().values());
42+
}
43+
}
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,81 @@
1+
package io.javaoperatorsdk.operator.processing.event.source.cache;
2+
3+
import java.util.concurrent.TimeUnit;
4+
import java.util.stream.IntStream;
5+
6+
import org.junit.jupiter.api.Test;
7+
import org.junit.jupiter.api.extension.RegisterExtension;
8+
9+
import io.fabric8.kubernetes.api.model.ConfigMap;
10+
import io.fabric8.kubernetes.api.model.ObjectMetaBuilder;
11+
import io.fabric8.kubernetes.client.KubernetesClientBuilder;
12+
import io.javaoperatorsdk.operator.junit.LocallyRunOperatorExtension;
13+
import io.javaoperatorsdk.operator.processing.event.source.cache.sample.BoundedCacheTestCustomResource;
14+
import io.javaoperatorsdk.operator.processing.event.source.cache.sample.BoundedCacheTestReconciler;
15+
import io.javaoperatorsdk.operator.processing.event.source.cache.sample.BoundedCacheTestSpec;
16+
17+
import com.github.benmanes.caffeine.cache.Cache;
18+
import com.github.benmanes.caffeine.cache.Caffeine;
19+
20+
import static io.javaoperatorsdk.operator.processing.event.source.cache.sample.BoundedCacheTestReconciler.DATA_KEY;
21+
import static org.assertj.core.api.Assertions.assertThat;
22+
import static org.awaitility.Awaitility.await;
23+
24+
class CaffeinBoundedCacheIT {
25+
26+
public static final int NUMBER_OF_RESOURCE_TO_TEST = 3;
27+
public static final String RESOURCE_NAME_PREFIX = "test-";
28+
public static final String INITIAL_DATA_PREFIX = "data-";
29+
30+
@RegisterExtension
31+
LocallyRunOperatorExtension extension =
32+
LocallyRunOperatorExtension.builder().withReconciler(new BoundedCacheTestReconciler(), o -> {
33+
Cache<String, ConfigMap> cache = Caffeine.newBuilder()
34+
.expireAfterAccess(1, TimeUnit.MINUTES)
35+
.maximumSize(1)
36+
.build();
37+
BoundedItemStore<ConfigMap> boundedItemStore =
38+
new BoundedItemStore<>(new KubernetesClientBuilder().build(), ConfigMap.class,
39+
new CaffeinBoundedCache<>(cache));
40+
o.withItemStore(boundedItemStore);
41+
})
42+
.build();
43+
44+
@Test
45+
void reconciliationWorksWithLimitedCache() {
46+
createTestResources();
47+
48+
await().untilAsserted(() -> {
49+
assertConfigMapData(INITIAL_DATA_PREFIX);
50+
});
51+
}
52+
53+
void assertConfigMapData(String dataPrefix) {
54+
IntStream.range(0, NUMBER_OF_RESOURCE_TO_TEST).forEach(i -> {
55+
assertConfigMap(i, dataPrefix);
56+
});
57+
}
58+
59+
private void assertConfigMap(int i, String prefix) {
60+
var cm = extension.get(ConfigMap.class, RESOURCE_NAME_PREFIX + i);
61+
assertThat(cm).isNotNull();
62+
assertThat(cm.getData().get(DATA_KEY)).isEqualTo(prefix + i);
63+
}
64+
65+
private void createTestResources() {
66+
IntStream.range(0, NUMBER_OF_RESOURCE_TO_TEST).forEach(i -> {
67+
extension.create(createTestResource(i));
68+
});
69+
}
70+
71+
BoundedCacheTestCustomResource createTestResource(int index) {
72+
var res = new BoundedCacheTestCustomResource();
73+
res.setMetadata(new ObjectMetaBuilder()
74+
.withName(RESOURCE_NAME_PREFIX + index)
75+
.build());
76+
res.setSpec(new BoundedCacheTestSpec());
77+
res.getSpec().setData(INITIAL_DATA_PREFIX + index);
78+
return res;
79+
}
80+
81+
}
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,13 @@
1+
package io.javaoperatorsdk.operator.processing.event.source.cache.sample;
2+
3+
import io.fabric8.kubernetes.client.CustomResource;
4+
import io.fabric8.kubernetes.model.annotation.Group;
5+
import io.fabric8.kubernetes.model.annotation.ShortNames;
6+
import io.fabric8.kubernetes.model.annotation.Version;
7+
8+
@Group("sample.javaoperatorsdk")
9+
@Version("v1")
10+
@ShortNames("bct")
11+
public class BoundedCacheTestCustomResource
12+
extends CustomResource<BoundedCacheTestSpec, BoundedCacheTestStatus> {
13+
}
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,93 @@
1+
package io.javaoperatorsdk.operator.processing.event.source.cache.sample;
2+
3+
import java.util.Map;
4+
import java.util.concurrent.TimeUnit;
5+
6+
import io.fabric8.kubernetes.api.model.ConfigMap;
7+
import io.fabric8.kubernetes.api.model.ConfigMapBuilder;
8+
import io.fabric8.kubernetes.api.model.ObjectMetaBuilder;
9+
import io.fabric8.kubernetes.client.KubernetesClient;
10+
import io.javaoperatorsdk.operator.api.config.informer.InformerConfiguration;
11+
import io.javaoperatorsdk.operator.api.reconciler.*;
12+
import io.javaoperatorsdk.operator.junit.KubernetesClientAware;
13+
import io.javaoperatorsdk.operator.processing.event.source.EventSource;
14+
import io.javaoperatorsdk.operator.processing.event.source.cache.BoundedItemStore;
15+
import io.javaoperatorsdk.operator.processing.event.source.cache.CaffeinBoundedCache;
16+
import io.javaoperatorsdk.operator.processing.event.source.informer.InformerEventSource;
17+
18+
import com.github.benmanes.caffeine.cache.Cache;
19+
import com.github.benmanes.caffeine.cache.Caffeine;
20+
21+
@ControllerConfiguration
22+
public class BoundedCacheTestReconciler implements Reconciler<BoundedCacheTestCustomResource>,
23+
EventSourceInitializer<BoundedCacheTestCustomResource>, KubernetesClientAware {
24+
25+
public static final String DATA_KEY = "dataKey";
26+
private KubernetesClient client;
27+
28+
@Override
29+
public UpdateControl<BoundedCacheTestCustomResource> reconcile(
30+
BoundedCacheTestCustomResource resource,
31+
Context<BoundedCacheTestCustomResource> context) {
32+
var maybeConfigMap = context.getSecondaryResource(ConfigMap.class);
33+
maybeConfigMap.ifPresentOrElse(
34+
cm -> updateConfigMapIfNeeded(cm, resource),
35+
() -> createConfigMap(resource));
36+
ensureStatus(resource);
37+
return UpdateControl.patchStatus(resource);
38+
}
39+
40+
private void updateConfigMapIfNeeded(ConfigMap cm, BoundedCacheTestCustomResource resource) {
41+
var data = cm.getData().get(DATA_KEY);
42+
if (data == null || data.equals(resource.getSpec().getData())) {
43+
cm.setData(Map.of(DATA_KEY, resource.getSpec().getData()));
44+
client.configMaps().resource(cm).replace();
45+
}
46+
}
47+
48+
private void createConfigMap(BoundedCacheTestCustomResource resource) {
49+
var cm = new ConfigMapBuilder()
50+
.withMetadata(new ObjectMetaBuilder()
51+
.withName(resource.getMetadata().getName())
52+
.withNamespace(resource.getMetadata().getNamespace())
53+
.build())
54+
.withData(Map.of(DATA_KEY, resource.getSpec().getData()))
55+
.build();
56+
cm.addOwnerReference(resource);
57+
client.configMaps().resource(cm).create();
58+
}
59+
60+
@Override
61+
public Map<String, EventSource> prepareEventSources(
62+
EventSourceContext<BoundedCacheTestCustomResource> context) {
63+
Cache<String, ConfigMap> cache = Caffeine.newBuilder()
64+
.expireAfterAccess(1, TimeUnit.MINUTES)
65+
.maximumSize(1)
66+
.build();
67+
68+
BoundedItemStore<ConfigMap> boundedItemStore = new BoundedItemStore<>(client, ConfigMap.class,
69+
new CaffeinBoundedCache<>(cache));
70+
71+
var es = new InformerEventSource<>(InformerConfiguration.from(ConfigMap.class, context)
72+
.withItemStore(boundedItemStore)
73+
.build(), context);
74+
75+
return EventSourceInitializer.nameEventSources(es);
76+
}
77+
78+
private void ensureStatus(BoundedCacheTestCustomResource resource) {
79+
if (resource.getStatus() == null) {
80+
resource.setStatus(new BoundedCacheTestStatus());
81+
}
82+
}
83+
84+
@Override
85+
public KubernetesClient getKubernetesClient() {
86+
return client;
87+
}
88+
89+
@Override
90+
public void setKubernetesClient(KubernetesClient kubernetesClient) {
91+
this.client = kubernetesClient;
92+
}
93+
}
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,15 @@
1+
package io.javaoperatorsdk.operator.processing.event.source.cache.sample;
2+
3+
public class BoundedCacheTestSpec {
4+
5+
private String data;
6+
7+
public String getData() {
8+
return data;
9+
}
10+
11+
public BoundedCacheTestSpec setData(String data) {
12+
this.data = data;
13+
return this;
14+
}
15+
}
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,6 @@
1+
package io.javaoperatorsdk.operator.processing.event.source.cache.sample;
2+
3+
import io.javaoperatorsdk.operator.api.ObservedGenerationAwareStatus;
4+
5+
public class BoundedCacheTestStatus extends ObservedGenerationAwareStatus {
6+
}

operator-framework-core/src/main/java/io/javaoperatorsdk/operator/processing/event/source/cache/BoundedItemStore.java

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -44,7 +44,8 @@ public Stream<R> values() {
4444
var values = cache.values();
4545
var notPresentValueKeys = new HashSet<>(existingResources);
4646
notPresentValueKeys.retainAll(keys);
47-
var fetchedValues = notPresentValueKeys.stream().map(k -> fetchAndCacheResource(k));
47+
var fetchedValues =
48+
notPresentValueKeys.stream().map(k -> fetchAndCacheResourceIfStillNonPresent(k));
4849
return Stream.concat(values.stream(), fetchedValues);
4950
}
5051

operator-framework-core/src/main/java/io/javaoperatorsdk/operator/processing/event/source/cache/BoundedStore.java

Lines changed: 19 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -24,16 +24,16 @@ public R get(K key) {
2424
if (!existingResources.contains(key)) {
2525
return null;
2626
} else {
27-
return fetchAndCacheResource(key);
27+
return fetchAndCacheResourceIfStillNonPresent(key);
2828
}
2929
}
3030

31-
public R remove(K key) {
31+
public synchronized R remove(K key) {
3232
existingResources.remove(key);
3333
return cache.remove(key);
3434
}
3535

36-
public R put(K key, R object) {
36+
public synchronized R put(K key, R object) {
3737
var res = cache.put(key, object);
3838
existingResources.add(key);
3939
return res;
@@ -43,10 +43,22 @@ public Stream<K> keys() {
4343
return existingResources.stream();
4444
}
4545

46-
protected R fetchAndCacheResource(K key) {
46+
protected R fetchAndCacheResourceIfStillNonPresent(K key) {
4747
var newRes = resourceFetcher.fetchResource(key);
48-
cache.put(key, newRes);
49-
return newRes;
48+
// todo unit test
49+
// Just want to put the fetched resource if there is still no resource published from different
50+
// source.
51+
// In case of informers actually multiple events might arrive, therefore non fetched resources
52+
// should
53+
// take always precedence.
54+
synchronized (this) {
55+
var actual = cache.get(key);
56+
if (actual == null) {
57+
cache.put(key, newRes);
58+
return newRes;
59+
} else {
60+
return actual;
61+
}
62+
}
5063
}
51-
5264
}

0 commit comments

Comments
 (0)