Skip to content

Commit 917ebee

Browse files
committed
working bounded cache
1 parent 2a497a9 commit 917ebee

File tree

11 files changed

+140
-168
lines changed

11 files changed

+140
-168
lines changed

caffein-bounded-cache-support/pom.xml

Lines changed: 12 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -42,6 +42,18 @@
4242
<artifactId>crd-generator-apt</artifactId>
4343
<scope>test</scope>
4444
</dependency>
45+
<dependency>
46+
<groupId>org.apache.logging.log4j</groupId>
47+
<artifactId>log4j-slf4j-impl</artifactId>
48+
<scope>test</scope>
49+
</dependency>
50+
<dependency>
51+
<groupId>org.apache.logging.log4j</groupId>
52+
<artifactId>log4j-core</artifactId>
53+
<version>${log4j.version}</version>
54+
<type>test-jar</type>
55+
<scope>test</scope>
56+
</dependency>
4557
</dependencies>
4658

4759
<build>

caffein-bounded-cache-support/src/main/java/io/javaoperatorsdk/operator/processing/event/source/cache/CaffeinBoundedCache.java

Lines changed: 0 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -30,14 +30,4 @@ public R put(K key, R object) {
3030
cache.put(key, object);
3131
return object;
3232
}
33-
34-
@Override
35-
public Set<K> keys() {
36-
return cache.asMap().keySet();
37-
}
38-
39-
@Override
40-
public Set<R> values() {
41-
return new HashSet<>(cache.asMap().values());
42-
}
4333
}
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,4 @@
1+
package io.javaoperatorsdk.operator.processing.event.source.cache;
2+
3+
public class CaffeinBoundedItemStore {
4+
}

caffein-bounded-cache-support/src/test/java/io/javaoperatorsdk/operator/processing/event/source/cache/CaffeinBoundedCacheIT.java

Lines changed: 4 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -30,13 +30,13 @@ class CaffeinBoundedCacheIT {
3030
@RegisterExtension
3131
LocallyRunOperatorExtension extension =
3232
LocallyRunOperatorExtension.builder().withReconciler(new BoundedCacheTestReconciler(), o -> {
33-
Cache<String, ConfigMap> cache = Caffeine.newBuilder()
33+
Cache<String, BoundedCacheTestCustomResource> cache = Caffeine.newBuilder()
3434
.expireAfterAccess(1, TimeUnit.MINUTES)
3535
.maximumSize(1)
3636
.build();
37-
BoundedItemStore<ConfigMap> boundedItemStore =
38-
new BoundedItemStore<>(new KubernetesClientBuilder().build(), ConfigMap.class,
39-
new CaffeinBoundedCache<>(cache));
37+
BoundedItemStore<BoundedCacheTestCustomResource> boundedItemStore =
38+
new BoundedItemStore<>(new KubernetesClientBuilder().build(),
39+
new CaffeinBoundedCache<>(cache), BoundedCacheTestCustomResource.class);
4040
o.withItemStore(boundedItemStore);
4141
})
4242
.build();

caffein-bounded-cache-support/src/test/java/io/javaoperatorsdk/operator/processing/event/source/cache/sample/BoundedCacheTestCustomResource.java

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,6 @@
11
package io.javaoperatorsdk.operator.processing.event.source.cache.sample;
22

3+
import io.fabric8.kubernetes.api.model.Namespaced;
34
import io.fabric8.kubernetes.client.CustomResource;
45
import io.fabric8.kubernetes.model.annotation.Group;
56
import io.fabric8.kubernetes.model.annotation.ShortNames;
@@ -9,5 +10,5 @@
910
@Version("v1")
1011
@ShortNames("bct")
1112
public class BoundedCacheTestCustomResource
12-
extends CustomResource<BoundedCacheTestSpec, BoundedCacheTestStatus> {
13+
extends CustomResource<BoundedCacheTestSpec, BoundedCacheTestStatus> implements Namespaced {
1314
}

caffein-bounded-cache-support/src/test/java/io/javaoperatorsdk/operator/processing/event/source/cache/sample/BoundedCacheTestReconciler.java

Lines changed: 8 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -3,6 +3,9 @@
33
import java.util.Map;
44
import java.util.concurrent.TimeUnit;
55

6+
import org.slf4j.Logger;
7+
import org.slf4j.LoggerFactory;
8+
69
import io.fabric8.kubernetes.api.model.ConfigMap;
710
import io.fabric8.kubernetes.api.model.ConfigMapBuilder;
811
import io.fabric8.kubernetes.api.model.ObjectMetaBuilder;
@@ -22,6 +25,8 @@
2225
public class BoundedCacheTestReconciler implements Reconciler<BoundedCacheTestCustomResource>,
2326
EventSourceInitializer<BoundedCacheTestCustomResource>, KubernetesClientAware {
2427

28+
private static final Logger log = LoggerFactory.getLogger(BoundedCacheTestReconciler.class);
29+
2530
public static final String DATA_KEY = "dataKey";
2631
private KubernetesClient client;
2732

@@ -34,6 +39,7 @@ public UpdateControl<BoundedCacheTestCustomResource> reconcile(
3439
cm -> updateConfigMapIfNeeded(cm, resource),
3540
() -> createConfigMap(resource));
3641
ensureStatus(resource);
42+
log.info("Reconciled: {}", resource.getMetadata().getName());
3743
return UpdateControl.patchStatus(resource);
3844
}
3945

@@ -65,8 +71,8 @@ public Map<String, EventSource> prepareEventSources(
6571
.maximumSize(1)
6672
.build();
6773

68-
BoundedItemStore<ConfigMap> boundedItemStore = new BoundedItemStore<>(client, ConfigMap.class,
69-
new CaffeinBoundedCache<>(cache));
74+
BoundedItemStore<ConfigMap> boundedItemStore = new BoundedItemStore<>(client,
75+
new CaffeinBoundedCache<>(cache), ConfigMap.class);
7076

7177
var es = new InformerEventSource<>(InformerConfiguration.from(ConfigMap.class, context)
7278
.withItemStore(boundedItemStore)
Lines changed: 13 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,13 @@
1+
<?xml version="1.0" encoding="UTF-8"?>
2+
<Configuration status="WARN">
3+
<Appenders>
4+
<Console name="Console" target="SYSTEM_OUT">
5+
<PatternLayout pattern="%d %threadId %-30c{1.} [%-5level] %msg%n%throwable"/>
6+
</Console>
7+
</Appenders>
8+
<Loggers>
9+
<Root level="debug">
10+
<AppenderRef ref="Console"/>
11+
</Root>
12+
</Loggers>
13+
</Configuration>
Original file line numberDiff line numberDiff line change
@@ -1,7 +1,5 @@
11
package io.javaoperatorsdk.operator.processing.event.source.cache;
22

3-
import java.util.Set;
4-
53
// todo: rename to cache? (replace the old one)
64
public interface BoundedCache<K, R> {
75

@@ -11,8 +9,4 @@ public interface BoundedCache<K, R> {
119

1210
R put(K key, R object);
1311

14-
Set<K> keys();
15-
16-
Set<R> values();
17-
1812
}
Original file line numberDiff line numberDiff line change
@@ -1,60 +1,137 @@
11
package io.javaoperatorsdk.operator.processing.event.source.cache;
22

3-
import java.util.HashSet;
3+
import java.lang.reflect.InvocationTargetException;
4+
import java.util.Map;
5+
import java.util.concurrent.ConcurrentHashMap;
46
import java.util.function.Function;
57
import java.util.stream.Stream;
68

9+
import org.slf4j.Logger;
10+
import org.slf4j.LoggerFactory;
11+
712
import io.fabric8.kubernetes.api.model.HasMetadata;
13+
import io.fabric8.kubernetes.api.model.ObjectMetaBuilder;
814
import io.fabric8.kubernetes.client.KubernetesClient;
915
import io.fabric8.kubernetes.client.informers.cache.Cache;
1016
import io.fabric8.kubernetes.client.informers.cache.ItemStore;
1117

12-
public class BoundedItemStore<R extends HasMetadata> extends BoundedStore<String, R>
18+
public class BoundedItemStore<R extends HasMetadata>
1319
implements ItemStore<R> {
1420

21+
private Logger log = LoggerFactory.getLogger(BoundedItemStore.class);
22+
23+
private final ResourceFetcher<String, R> resourceFetcher;
24+
private final BoundedCache<String, R> cache;
1525
private final Function<R, String> keyFunction;
26+
private final Map<String, R> existingMinimalResources = new ConcurrentHashMap<>();
27+
private final Class<R> resourceClass;
1628

17-
public BoundedItemStore(KubernetesClient client, Class<R> resourceClass,
18-
BoundedCache<String, R> cache) {
19-
this(new KubernetesResourceFetcher<>(resourceClass, client), cache, namespaceKeyFunc());
29+
public BoundedItemStore(KubernetesClient client,
30+
BoundedCache<String, R> cache, Class<R> resourceClass) {
31+
this(client, cache, resourceClass, namespaceKeyFunc());
2032
}
2133

22-
public BoundedItemStore(KubernetesResourceFetcher<R> resourceFetcher,
23-
BoundedCache<String, R> cache, Function<R, String> keyFunction) {
24-
super(resourceFetcher, cache);
34+
public BoundedItemStore(KubernetesClient client,
35+
BoundedCache<String, R> cache,
36+
Class<R> resourceClass,
37+
Function<R, String> keyFunction) {
38+
this.resourceFetcher = new KubernetesResourceFetcher<>(resourceClass, client);
39+
this.cache = cache;
2540
this.keyFunction = keyFunction;
41+
this.resourceClass = resourceClass;
2642
}
2743

2844
@Override
2945
public String getKey(R obj) {
3046
return keyFunction.apply(obj);
3147
}
3248

49+
@Override
50+
public synchronized R put(String key, R obj) {
51+
var result = existingMinimalResources.get(key);
52+
cache.put(key, obj);
53+
existingMinimalResources.put(key, createMinimalResource(obj));
54+
return result;
55+
}
56+
57+
private R createMinimalResource(R obj) {
58+
try {
59+
R minimal = resourceClass.getConstructor().newInstance();
60+
minimal.setMetadata(new ObjectMetaBuilder().build());
61+
minimal.getMetadata().setName(obj.getMetadata().getName());
62+
minimal.getMetadata().setNamespace(obj.getMetadata().getNamespace());
63+
minimal.getMetadata().setResourceVersion(obj.getMetadata().getResourceVersion());
64+
return minimal;
65+
} catch (InstantiationException | IllegalAccessException | InvocationTargetException
66+
| NoSuchMethodException e) {
67+
throw new IllegalStateException(e);
68+
}
69+
}
70+
71+
@Override
72+
public synchronized R remove(String key) {
73+
var fullValue = cache.remove(key);
74+
var minimalValue = existingMinimalResources.remove(key);
75+
return fullValue != null ? fullValue : minimalValue;
76+
}
77+
3378
@Override
3479
public Stream<String> keySet() {
35-
return super.keys();
80+
return existingMinimalResources.keySet().stream();
3681
}
3782

38-
/**
39-
* This is very inefficient but should not be called by the Informer or just before it's started
40-
*/
4183
@Override
4284
public Stream<R> values() {
43-
var keys = cache.keys();
44-
var values = cache.values();
45-
var notPresentValueKeys = new HashSet<>(existingResources);
46-
notPresentValueKeys.retainAll(keys);
47-
var fetchedValues =
48-
notPresentValueKeys.stream().map(k -> fetchAndCacheResourceIfStillNonPresent(k));
49-
return Stream.concat(values.stream(), fetchedValues);
85+
return existingMinimalResources.values().stream();
5086
}
5187

5288
@Override
5389
public int size() {
54-
return existingResources.size();
90+
return existingMinimalResources.size();
91+
}
92+
93+
@Override
94+
public R get(String key) {
95+
var res = cache.get(key);
96+
if (res != null) {
97+
return res;
98+
}
99+
if (!existingMinimalResources.containsKey(key)) {
100+
return null;
101+
} else {
102+
return refreshMissingStateFromServer(key);
103+
}
104+
}
105+
106+
@Override
107+
public boolean isFullState() {
108+
return false;
55109
}
56110

57111
public static <R extends HasMetadata> Function<R, String> namespaceKeyFunc() {
58112
return r -> Cache.namespaceKeyFunc(r.getMetadata().getNamespace(), r.getMetadata().getName());
59113
}
114+
115+
protected R refreshMissingStateFromServer(String key) {
116+
log.debug("Fetching resource from server");
117+
var newRes = resourceFetcher.fetchResource(key);
118+
synchronized (this) {
119+
log.debug("Fetched resource: {}", newRes);
120+
if (newRes == null) {
121+
existingMinimalResources.remove(key);
122+
return null;
123+
}
124+
// Just want to put the fetched resource if there is still no resource published from
125+
// different source. In case of informers actually multiple events might arrive, therefore non
126+
// fetched resource should take always precedence.
127+
var actual = cache.get(key);
128+
if (actual == null) {
129+
cache.put(key, newRes);
130+
existingMinimalResources.put(key, createMinimalResource(newRes));
131+
return newRes;
132+
} else {
133+
return actual;
134+
}
135+
}
136+
}
60137
}

operator-framework-core/src/main/java/io/javaoperatorsdk/operator/processing/event/source/cache/BoundedStore.java

Lines changed: 0 additions & 64 deletions
This file was deleted.

0 commit comments

Comments
 (0)