Skip to content

Commit e3e70a4

Browse files
committed
This adds support for calling dispatch if the ValueCache takes time in get call
1 parent 682c652 commit e3e70a4

File tree

5 files changed

+213
-40
lines changed

5 files changed

+213
-40
lines changed

src/main/java/org/dataloader/DataLoaderHelper.java

Lines changed: 49 additions & 23 deletions
Original file line numberDiff line numberDiff line change
@@ -134,7 +134,8 @@ CompletableFuture<V> load(K key, Object loadContext) {
134134
if (cachingEnabled) {
135135
return loadFromCache(key, loadContext, batchingEnabled);
136136
} else {
137-
return queueOrInvokeLoader(key, loadContext, batchingEnabled);
137+
CompletableFuture<V> future = new CompletableFuture<>();
138+
return queueOrInvokeLoader(key, loadContext, batchingEnabled, future);
138139
}
139140
}
140141
}
@@ -296,49 +297,74 @@ private CompletableFuture<V> loadFromCache(K key, Object loadContext, boolean ba
296297
We haven't been asked for this key yet. We want to do one of two things:
297298
298299
1. Check if our cache store has it. If so:
299-
a. Get the value from the cache store
300-
b. Add a recovery case so we queue the load if fetching from cache store fails
300+
a. Get the value from the cache store (this can take non-zero time)
301+
b. Add a recovery case, so we queue the load if fetching from cache store fails
301302
c. Put that future in our futureCache to hit the early return next time
302303
d. Return the resilient future
303304
2. If not in value cache:
304305
a. queue or invoke the load
305306
b. Add a success handler to store the result in the cache store
306307
c. Return the result
307308
*/
308-
final CompletableFuture<V> future = new CompletableFuture<>();
309+
final CompletableFuture<V> loadCallFuture = new CompletableFuture<>();
309310

310-
valueCache.get(cacheKey).whenComplete((cachedValue, getCallEx) -> {
311-
if (getCallEx == null) {
312-
future.complete(cachedValue);
311+
CompletableFuture<V> cacheLookupCF = valueCache.get(cacheKey);
312+
boolean cachedLookupCompletedImmediately = cacheLookupCF.isDone();
313+
314+
cacheLookupCF.whenComplete((cachedValue, cacheException) -> {
315+
if (cacheException == null) {
316+
loadCallFuture.complete(cachedValue);
313317
} else {
318+
CompletableFuture<V> loaderCF;
314319
synchronized (dataLoader) {
315-
queueOrInvokeLoader(key, loadContext, batchingEnabled)
316-
.whenComplete(setValueIntoCacheAndCompleteFuture(cacheKey, future));
320+
loaderCF = queueOrInvokeLoader(key, loadContext, batchingEnabled, loadCallFuture);
321+
loaderCF.whenComplete(setValueIntoValueCacheAndCompleteFuture(cacheKey, loadCallFuture));
322+
}
323+
//
324+
// is possible that if the cache lookup step took some time to execute
325+
// (e.g. an async network lookup to REDIS etc...) then it's possible that this
326+
// load call has already returned and a dispatch call has been made, and hence this code
327+
// is running after the dispatch was made - so we dispatch to catch up because
328+
// it's likely to hang if we do not. We might dispatch too early, but we will not
329+
// hang because of an async cache lookup
330+
//
331+
if (!cachedLookupCompletedImmediately && !loaderCF.isDone()) {
332+
if (loaderOptions.getValueCacheOptions().isDispatchOnCacheMiss()) {
333+
dispatch();
334+
}
317335
}
318336
}
319337
});
320-
321-
futureCache.set(cacheKey, future);
322-
323-
return future;
338+
futureCache.set(cacheKey, loadCallFuture);
339+
return loadCallFuture;
324340
}
325341

326-
private BiConsumer<V, Throwable> setValueIntoCacheAndCompleteFuture(Object cacheKey, CompletableFuture<V> future) {
327-
return (result, loadCallEx) -> {
328-
if (loadCallEx == null) {
329-
valueCache.set(cacheKey, result)
330-
.whenComplete((v, setCallExIgnored) -> future.complete(result));
342+
private BiConsumer<V, Throwable> setValueIntoValueCacheAndCompleteFuture(Object cacheKey, CompletableFuture<V> loadCallFuture) {
343+
return (result, loadCallException) -> {
344+
if (loadCallException == null) {
345+
//
346+
// we have completed our load call, and we should try to cache the value
347+
// however we don't wait on the caching to complete before completing the load call
348+
// this way a network cache call (say a REDIS put) does not have to be completed in order
349+
// for the calling code to get a value. There is an option that controls
350+
// which is off by default to make the code faster.
351+
//
352+
CompletableFuture<V> valueCacheSetCF = valueCache.set(cacheKey, result);
353+
if (loaderOptions.getValueCacheOptions().isCompleteValueAfterCacheSet()) {
354+
valueCacheSetCF.whenComplete((v, setCallExceptionIgnored) -> loadCallFuture.complete(result));
355+
} else {
356+
loadCallFuture.complete(result);
357+
}
331358
} else {
332-
future.completeExceptionally(loadCallEx);
359+
loadCallFuture.completeExceptionally(loadCallException);
333360
}
334361
};
335362
}
336363

337-
private CompletableFuture<V> queueOrInvokeLoader(K key, Object loadContext, boolean batchingEnabled) {
364+
private CompletableFuture<V> queueOrInvokeLoader(K key, Object loadContext, boolean batchingEnabled, CompletableFuture<V> loadCallFuture) {
338365
if (batchingEnabled) {
339-
CompletableFuture<V> future = new CompletableFuture<>();
340-
loaderQueue.add(new LoaderQueueEntry<>(key, future, loadContext));
341-
return future;
366+
loaderQueue.add(new LoaderQueueEntry<>(key, loadCallFuture, loadContext));
367+
return loadCallFuture;
342368
} else {
343369
stats.incrementBatchLoadCountBy(1);
344370
// immediate execution of batch function

src/main/java/org/dataloader/DataLoaderOptions.java

Lines changed: 29 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -17,6 +17,7 @@
1717
package org.dataloader;
1818

1919
import org.dataloader.annotations.PublicApi;
20+
import org.dataloader.impl.Assertions;
2021
import org.dataloader.stats.SimpleStatisticsCollector;
2122
import org.dataloader.stats.StatisticsCollector;
2223

@@ -39,11 +40,12 @@ public class DataLoaderOptions {
3940
private boolean cachingEnabled;
4041
private boolean cachingExceptionsEnabled;
4142
private CacheKey<?> cacheKeyFunction;
42-
private CacheMap<?,?> cacheMap;
43-
private ValueCache<?,?> valueCache;
43+
private CacheMap<?, ?> cacheMap;
44+
private ValueCache<?, ?> valueCache;
4445
private int maxBatchSize;
4546
private Supplier<StatisticsCollector> statisticsCollector;
4647
private BatchLoaderContextProvider environmentProvider;
48+
private ValueCacheOptions valueCacheOptions;
4749

4850
/**
4951
* Creates a new data loader options with default settings.
@@ -55,6 +57,7 @@ public DataLoaderOptions() {
5557
maxBatchSize = -1;
5658
statisticsCollector = SimpleStatisticsCollector::new;
5759
environmentProvider = NULL_PROVIDER;
60+
valueCacheOptions = ValueCacheOptions.newOptions();
5861
}
5962

6063
/**
@@ -72,6 +75,7 @@ public DataLoaderOptions(DataLoaderOptions other) {
7275
this.maxBatchSize = other.maxBatchSize;
7376
this.statisticsCollector = other.statisticsCollector;
7477
this.environmentProvider = other.environmentProvider;
78+
this.valueCacheOptions = other.valueCacheOptions;
7579
}
7680

7781
/**
@@ -179,7 +183,7 @@ public DataLoaderOptions setCacheKeyFunction(CacheKey<?> cacheKeyFunction) {
179183
*
180184
* @return an optional with the cache map instance, or empty
181185
*/
182-
public Optional<CacheMap<?,?>> cacheMap() {
186+
public Optional<CacheMap<?, ?>> cacheMap() {
183187
return Optional.ofNullable(cacheMap);
184188
}
185189

@@ -190,7 +194,7 @@ public Optional<CacheMap<?,?>> cacheMap() {
190194
*
191195
* @return the data loader options for fluent coding
192196
*/
193-
public DataLoaderOptions setCacheMap(CacheMap<?,?> cacheMap) {
197+
public DataLoaderOptions setCacheMap(CacheMap<?, ?> cacheMap) {
194198
this.cacheMap = cacheMap;
195199
return this;
196200
}
@@ -265,7 +269,7 @@ public DataLoaderOptions setBatchLoaderContextProvider(BatchLoaderContextProvide
265269
*
266270
* @return an optional with the cache store instance, or empty
267271
*/
268-
public Optional<ValueCache<?,?>> valueCache() {
272+
public Optional<ValueCache<?, ?>> valueCache() {
269273
return Optional.ofNullable(valueCache);
270274
}
271275

@@ -276,8 +280,27 @@ public Optional<ValueCache<?,?>> valueCache() {
276280
*
277281
* @return the data loader options for fluent coding
278282
*/
279-
public DataLoaderOptions setValueCache(ValueCache<?,?> valueCache) {
283+
public DataLoaderOptions setValueCache(ValueCache<?, ?> valueCache) {
280284
this.valueCache = valueCache;
281285
return this;
282286
}
287+
288+
/**
289+
* @return the {@link ValueCacheOptions} that control how the {@link ValueCache} will be used
290+
*/
291+
public ValueCacheOptions getValueCacheOptions() {
292+
return valueCacheOptions;
293+
}
294+
295+
/**
296+
* Sets the {@link ValueCacheOptions} that control how the {@link ValueCache} will be used
297+
*
298+
* @param valueCacheOptions the value cache options
299+
*
300+
* @return the data loader options for fluent coding
301+
*/
302+
public DataLoaderOptions setValueCacheOptions(ValueCacheOptions valueCacheOptions) {
303+
this.valueCacheOptions = Assertions.nonNull(valueCacheOptions);
304+
return this;
305+
}
283306
}

src/main/java/org/dataloader/ValueCache.java

Lines changed: 26 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -7,33 +7,36 @@
77

88
/**
99
* The {@link ValueCache} is used by data loaders that use caching and want a long-lived or external cache
10-
* of values. The {@link ValueCache} is used as a place to cache values when they come back from
10+
* of values. The {@link ValueCache} is used as a place to cache values when they come back from an async
11+
* cache store.
1112
* <p>
12-
* It differs from {@link CacheMap} which is in fact a cache of promises to values aka {@link CompletableFuture}&lt;V&gt; and it rather suited
13-
* to be a wrapper of a long lived or external value cache. {@link CompletableFuture}s cant be easily placed in an external cache
14-
* outside the JVM say, hence the need for the {@link ValueCache}.
13+
* It differs from {@link CacheMap} which is in fact a cache of promised values aka {@link CompletableFuture}&lt;V&gt;'s.
14+
* <p>
15+
* {@link ValueCache is more suited to be a wrapper of a long-lived or externallly cached values. {@link CompletableFuture}s cant
16+
* be easily placed in an external cache outside the JVM say, hence the need for the {@link ValueCache}.
1517
* <p>
1618
* {@link DataLoader}s use a two stage cache strategy if caching is enabled. If the {@link CacheMap} already has the promise to a value
1719
* that is used. If not then the {@link ValueCache} is asked for a value, if it has one then that is returned (and cached as a promise in the {@link CacheMap}.
20+
* <p>
1821
* If there is no value then the key is queued and loaded via the {@link BatchLoader} calls. The returned values will then be stored in
1922
* the {@link ValueCache} and the promises to those values are also stored in the {@link CacheMap}.
2023
* <p>
2124
* The default implementation is a no-op store which replies with the key always missing and doesn't
2225
* store any actual results. This is to avoid duplicating the stored data between the {@link CacheMap}
2326
* out of the box.
2427
* <p>
25-
* The API signature uses completable futures because the backing implementation MAY be a remote external cache
26-
* and hence exceptions may happen in retrieving values.
28+
* The API signature uses {@link CompletableFuture}s because the backing implementation MAY be a remote external cache
29+
* and hence exceptions may happen in retrieving values and they may take time to complete.
2730
*
2831
* @param <K> the type of cache keys
2932
* @param <V> the type of cache values
3033
*
3134
* @author <a href="https://github.com/craig-day">Craig Day</a>
35+
* @author <a href="https://github.com/bbakerman/">Brad Baker</a>
3236
*/
3337
@PublicSpi
3438
public interface ValueCache<K, V> {
3539

36-
3740
/**
3841
* Creates a new value cache, using the default no-op implementation.
3942
*
@@ -48,9 +51,12 @@ static <K, V> ValueCache<K, V> defaultValueCache() {
4851
}
4952

5053
/**
51-
* Gets the specified key from the store. if the key si not present, then the implementation MUST return an exceptionally completed future
52-
* and not null because null is a valid cacheable value. Any exception is will cause {@link DataLoader} to load the key via batch loading
54+
* Gets the specified key from the value cache. If the key is not present, then the implementation MUST return an exceptionally completed future
55+
* and not null because null is a valid cacheable value. An exceptionally completed future will cause {@link DataLoader} to load the key via batch loading
5356
* instead.
57+
* <p>
58+
* NOTE: Your implementation MUST not throw exceptions, rather it should return a CompletableFuture that has completed exceptionally. Failure
59+
* to do this may cause the {@link DataLoader} code to not run properly.
5460
*
5561
* @param key the key to retrieve
5662
*
@@ -61,6 +67,9 @@ static <K, V> ValueCache<K, V> defaultValueCache() {
6167

6268
/**
6369
* Stores the value with the specified key, or updates it if the key already exists.
70+
* <p>
71+
* NOTE: Your implementation MUST not throw exceptions, rather it should return a CompletableFuture that has completed exceptionally. Failure
72+
* to do this may cause the {@link DataLoader} code to not run properly.
6473
*
6574
* @param key the key to store
6675
* @param value the value to store
@@ -70,7 +79,10 @@ static <K, V> ValueCache<K, V> defaultValueCache() {
7079
CompletableFuture<V> set(K key, V value);
7180

7281
/**
73-
* Deletes the entry with the specified key from the store, if it exists.
82+
* Deletes the entry with the specified key from the value cache, if it exists.
83+
* <p>
84+
* NOTE: Your implementation MUST not throw exceptions, rather it should return a CompletableFuture that has completed exceptionally. Failure
85+
* to do this may cause the {@link DataLoader} code to not run properly.
7486
*
7587
* @param key the key to delete
7688
*
@@ -79,7 +91,10 @@ static <K, V> ValueCache<K, V> defaultValueCache() {
7991
CompletableFuture<Void> delete(K key);
8092

8193
/**
82-
* Clears all entries from the store.
94+
* Clears all entries from the value cache.
95+
* <p>
96+
* NOTE: Your implementation MUST not throw exceptions, rather it should return a CompletableFuture that has completed exceptionally. Failure
97+
* to do this may cause the {@link DataLoader} code to not run properly.
8398
*
8499
* @return a void future for error handling and fluent composition
85100
*/
Lines changed: 62 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,62 @@
1+
package org.dataloader;
2+
3+
/**
4+
* Options that control how the {@link ValueCache} is used by {@link DataLoader}
5+
*
6+
* @author <a href="https://github.com/bbakerman/">Brad Baker</a>
7+
*/
8+
public class ValueCacheOptions {
9+
private final boolean dispatchOnCacheMiss;
10+
private final boolean completeValueAfterCacheSet;
11+
12+
private ValueCacheOptions() {
13+
this.dispatchOnCacheMiss = true;
14+
this.completeValueAfterCacheSet = false;
15+
}
16+
17+
private ValueCacheOptions(boolean dispatchOnCacheMiss, boolean completeValueAfterCacheSet) {
18+
this.dispatchOnCacheMiss = dispatchOnCacheMiss;
19+
this.completeValueAfterCacheSet = completeValueAfterCacheSet;
20+
}
21+
22+
public static ValueCacheOptions newOptions() {
23+
return new ValueCacheOptions();
24+
}
25+
26+
/**
27+
* This controls whether the {@link DataLoader} will called {@link DataLoader#dispatch()} if a
28+
* {@link ValueCache#get(Object)} call misses. In an async world this could take non zero time
29+
* to complete and hence previous dispatch calls may have already completed.
30+
*
31+
* This is true by default.
32+
*
33+
* @return true if a {@link DataLoader#dispatch()} call will be made on an async {@link ValueCache} miss
34+
*/
35+
public boolean isDispatchOnCacheMiss() {
36+
return dispatchOnCacheMiss;
37+
}
38+
39+
/**
40+
* This controls whether the {@link DataLoader} will wait for the {@link ValueCache#set(Object, Object)} call
41+
* to complete before it completes the returned value. By default this is false and hence
42+
* the {@link ValueCache#set(Object, Object)} call may complete some time AFTER the data loader
43+
* value has been returned.
44+
*
45+
* This is false by default, for performance reasons.
46+
*
47+
* @return true the {@link DataLoader} will wait for the {@link ValueCache#set(Object, Object)} call to complete before
48+
* it completes the returned value.
49+
*/
50+
public boolean isCompleteValueAfterCacheSet() {
51+
return completeValueAfterCacheSet;
52+
}
53+
54+
public ValueCacheOptions setDispatchOnCacheMiss(boolean flag) {
55+
return new ValueCacheOptions(flag, this.completeValueAfterCacheSet);
56+
}
57+
58+
public ValueCacheOptions setCompleteValueAfterCacheSet(boolean flag) {
59+
return new ValueCacheOptions(this.dispatchOnCacheMiss, flag);
60+
}
61+
62+
}

0 commit comments

Comments
 (0)