diff --git a/src/main/java/org/dataloader/DataLoaderHelper.java b/src/main/java/org/dataloader/DataLoaderHelper.java index 8f8f08f..15c7c4c 100644 --- a/src/main/java/org/dataloader/DataLoaderHelper.java +++ b/src/main/java/org/dataloader/DataLoaderHelper.java @@ -134,7 +134,8 @@ CompletableFuture load(K key, Object loadContext) { if (cachingEnabled) { return loadFromCache(key, loadContext, batchingEnabled); } else { - return queueOrInvokeLoader(key, loadContext, batchingEnabled); + CompletableFuture future = new CompletableFuture<>(); + return queueOrInvokeLoader(key, loadContext, batchingEnabled, future); } } } @@ -296,8 +297,8 @@ private CompletableFuture loadFromCache(K key, Object loadContext, boolean ba We haven't been asked for this key yet. We want to do one of two things: 1. Check if our cache store has it. If so: - a. Get the value from the cache store - b. Add a recovery case so we queue the load if fetching from cache store fails + a. Get the value from the cache store (this can take non-zero time) + b. Add a recovery case, so we queue the load if fetching from cache store fails c. Put that future in our futureCache to hit the early return next time d. Return the resilient future 2. If not in value cache: @@ -305,40 +306,65 @@ private CompletableFuture loadFromCache(K key, Object loadContext, boolean ba b. Add a success handler to store the result in the cache store c. Return the result */ - final CompletableFuture future = new CompletableFuture<>(); + final CompletableFuture loadCallFuture = new CompletableFuture<>(); - valueCache.get(cacheKey).whenComplete((cachedValue, getCallEx) -> { - if (getCallEx == null) { - future.complete(cachedValue); + CompletableFuture cacheLookupCF = valueCache.get(cacheKey); + boolean cachedLookupCompletedImmediately = cacheLookupCF.isDone(); + + cacheLookupCF.whenComplete((cachedValue, cacheException) -> { + if (cacheException == null) { + loadCallFuture.complete(cachedValue); } else { + CompletableFuture loaderCF; synchronized (dataLoader) { - queueOrInvokeLoader(key, loadContext, batchingEnabled) - .whenComplete(setValueIntoCacheAndCompleteFuture(cacheKey, future)); + loaderCF = queueOrInvokeLoader(key, loadContext, batchingEnabled, loadCallFuture); + loaderCF.whenComplete(setValueIntoValueCacheAndCompleteFuture(cacheKey, loadCallFuture)); + } + // + // is possible that if the cache lookup step took some time to execute + // (e.g. an async network lookup to REDIS etc...) then it's possible that this + // load call has already returned and a dispatch call has been made, and hence this code + // is running after the dispatch was made - so we dispatch to catch up because + // it's likely to hang if we do not. We might dispatch too early, but we will not + // hang because of an async cache lookup + // + if (!cachedLookupCompletedImmediately && !loaderCF.isDone()) { + if (loaderOptions.getValueCacheOptions().isDispatchOnCacheMiss()) { + dispatch(); + } } } }); - - futureCache.set(cacheKey, future); - - return future; + futureCache.set(cacheKey, loadCallFuture); + return loadCallFuture; } - private BiConsumer setValueIntoCacheAndCompleteFuture(Object cacheKey, CompletableFuture future) { - return (result, loadCallEx) -> { - if (loadCallEx == null) { - valueCache.set(cacheKey, result) - .whenComplete((v, setCallExIgnored) -> future.complete(result)); + private BiConsumer setValueIntoValueCacheAndCompleteFuture(Object cacheKey, CompletableFuture loadCallFuture) { + return (result, loadCallException) -> { + if (loadCallException == null) { + // + // we have completed our load call, and we should try to cache the value + // however we don't wait on the caching to complete before completing the load call + // this way a network cache call (say a REDIS put) does not have to be completed in order + // for the calling code to get a value. There is an option that controls + // which is off by default to make the code faster. + // + CompletableFuture valueCacheSetCF = valueCache.set(cacheKey, result); + if (loaderOptions.getValueCacheOptions().isCompleteValueAfterCacheSet()) { + valueCacheSetCF.whenComplete((v, setCallExceptionIgnored) -> loadCallFuture.complete(result)); + } else { + loadCallFuture.complete(result); + } } else { - future.completeExceptionally(loadCallEx); + loadCallFuture.completeExceptionally(loadCallException); } }; } - private CompletableFuture queueOrInvokeLoader(K key, Object loadContext, boolean batchingEnabled) { + private CompletableFuture queueOrInvokeLoader(K key, Object loadContext, boolean batchingEnabled, CompletableFuture loadCallFuture) { if (batchingEnabled) { - CompletableFuture future = new CompletableFuture<>(); - loaderQueue.add(new LoaderQueueEntry<>(key, future, loadContext)); - return future; + loaderQueue.add(new LoaderQueueEntry<>(key, loadCallFuture, loadContext)); + return loadCallFuture; } else { stats.incrementBatchLoadCountBy(1); // immediate execution of batch function diff --git a/src/main/java/org/dataloader/DataLoaderOptions.java b/src/main/java/org/dataloader/DataLoaderOptions.java index 89530e1..8cd35ba 100644 --- a/src/main/java/org/dataloader/DataLoaderOptions.java +++ b/src/main/java/org/dataloader/DataLoaderOptions.java @@ -17,6 +17,7 @@ package org.dataloader; import org.dataloader.annotations.PublicApi; +import org.dataloader.impl.Assertions; import org.dataloader.stats.SimpleStatisticsCollector; import org.dataloader.stats.StatisticsCollector; @@ -39,11 +40,12 @@ public class DataLoaderOptions { private boolean cachingEnabled; private boolean cachingExceptionsEnabled; private CacheKey cacheKeyFunction; - private CacheMap cacheMap; - private ValueCache valueCache; + private CacheMap cacheMap; + private ValueCache valueCache; private int maxBatchSize; private Supplier statisticsCollector; private BatchLoaderContextProvider environmentProvider; + private ValueCacheOptions valueCacheOptions; /** * Creates a new data loader options with default settings. @@ -55,6 +57,7 @@ public DataLoaderOptions() { maxBatchSize = -1; statisticsCollector = SimpleStatisticsCollector::new; environmentProvider = NULL_PROVIDER; + valueCacheOptions = ValueCacheOptions.newOptions(); } /** @@ -72,6 +75,7 @@ public DataLoaderOptions(DataLoaderOptions other) { this.maxBatchSize = other.maxBatchSize; this.statisticsCollector = other.statisticsCollector; this.environmentProvider = other.environmentProvider; + this.valueCacheOptions = other.valueCacheOptions; } /** @@ -179,7 +183,7 @@ public DataLoaderOptions setCacheKeyFunction(CacheKey cacheKeyFunction) { * * @return an optional with the cache map instance, or empty */ - public Optional> cacheMap() { + public Optional> cacheMap() { return Optional.ofNullable(cacheMap); } @@ -190,7 +194,7 @@ public Optional> cacheMap() { * * @return the data loader options for fluent coding */ - public DataLoaderOptions setCacheMap(CacheMap cacheMap) { + public DataLoaderOptions setCacheMap(CacheMap cacheMap) { this.cacheMap = cacheMap; return this; } @@ -265,7 +269,7 @@ public DataLoaderOptions setBatchLoaderContextProvider(BatchLoaderContextProvide * * @return an optional with the cache store instance, or empty */ - public Optional> valueCache() { + public Optional> valueCache() { return Optional.ofNullable(valueCache); } @@ -276,8 +280,27 @@ public Optional> valueCache() { * * @return the data loader options for fluent coding */ - public DataLoaderOptions setValueCache(ValueCache valueCache) { + public DataLoaderOptions setValueCache(ValueCache valueCache) { this.valueCache = valueCache; return this; } + + /** + * @return the {@link ValueCacheOptions} that control how the {@link ValueCache} will be used + */ + public ValueCacheOptions getValueCacheOptions() { + return valueCacheOptions; + } + + /** + * Sets the {@link ValueCacheOptions} that control how the {@link ValueCache} will be used + * + * @param valueCacheOptions the value cache options + * + * @return the data loader options for fluent coding + */ + public DataLoaderOptions setValueCacheOptions(ValueCacheOptions valueCacheOptions) { + this.valueCacheOptions = Assertions.nonNull(valueCacheOptions); + return this; + } } diff --git a/src/main/java/org/dataloader/ValueCache.java b/src/main/java/org/dataloader/ValueCache.java index 31042c6..c2aab46 100644 --- a/src/main/java/org/dataloader/ValueCache.java +++ b/src/main/java/org/dataloader/ValueCache.java @@ -7,14 +7,17 @@ /** * The {@link ValueCache} is used by data loaders that use caching and want a long-lived or external cache - * of values. The {@link ValueCache} is used as a place to cache values when they come back from + * of values. The {@link ValueCache} is used as a place to cache values when they come back from an async + * cache store. *

- * It differs from {@link CacheMap} which is in fact a cache of promises to values aka {@link CompletableFuture}<V> and it rather suited - * to be a wrapper of a long lived or external value cache. {@link CompletableFuture}s cant be easily placed in an external cache - * outside the JVM say, hence the need for the {@link ValueCache}. + * It differs from {@link CacheMap} which is in fact a cache of promised values aka {@link CompletableFuture}<V>'s. + *

+ * {@link ValueCache} is more suited to be a wrapper of a long-lived or externallly cached values. {@link CompletableFuture}s cant + * be easily placed in an external cache outside the JVM say, hence the need for the {@link ValueCache}. *

* {@link DataLoader}s use a two stage cache strategy if caching is enabled. If the {@link CacheMap} already has the promise to a value * that is used. If not then the {@link ValueCache} is asked for a value, if it has one then that is returned (and cached as a promise in the {@link CacheMap}. + *

* If there is no value then the key is queued and loaded via the {@link BatchLoader} calls. The returned values will then be stored in * the {@link ValueCache} and the promises to those values are also stored in the {@link CacheMap}. *

@@ -22,18 +25,18 @@ * store any actual results. This is to avoid duplicating the stored data between the {@link CacheMap} * out of the box. *

- * The API signature uses completable futures because the backing implementation MAY be a remote external cache - * and hence exceptions may happen in retrieving values. + * The API signature uses {@link CompletableFuture}s because the backing implementation MAY be a remote external cache + * and hence exceptions may happen in retrieving values and they may take time to complete. * * @param the type of cache keys * @param the type of cache values * * @author Craig Day + * @author Brad Baker */ @PublicSpi public interface ValueCache { - /** * Creates a new value cache, using the default no-op implementation. * @@ -48,9 +51,12 @@ static ValueCache defaultValueCache() { } /** - * Gets the specified key from the store. if the key si not present, then the implementation MUST return an exceptionally completed future - * and not null because null is a valid cacheable value. Any exception is will cause {@link DataLoader} to load the key via batch loading + * Gets the specified key from the value cache. If the key is not present, then the implementation MUST return an exceptionally completed future + * and not null because null is a valid cacheable value. An exceptionally completed future will cause {@link DataLoader} to load the key via batch loading * instead. + *

+ * NOTE: Your implementation MUST not throw exceptions, rather it should return a CompletableFuture that has completed exceptionally. Failure + * to do this may cause the {@link DataLoader} code to not run properly. * * @param key the key to retrieve * @@ -61,6 +67,9 @@ static ValueCache defaultValueCache() { /** * Stores the value with the specified key, or updates it if the key already exists. + *

+ * NOTE: Your implementation MUST not throw exceptions, rather it should return a CompletableFuture that has completed exceptionally. Failure + * to do this may cause the {@link DataLoader} code to not run properly. * * @param key the key to store * @param value the value to store @@ -70,7 +79,10 @@ static ValueCache defaultValueCache() { CompletableFuture set(K key, V value); /** - * Deletes the entry with the specified key from the store, if it exists. + * Deletes the entry with the specified key from the value cache, if it exists. + *

+ * NOTE: Your implementation MUST not throw exceptions, rather it should return a CompletableFuture that has completed exceptionally. Failure + * to do this may cause the {@link DataLoader} code to not run properly. * * @param key the key to delete * @@ -79,7 +91,10 @@ static ValueCache defaultValueCache() { CompletableFuture delete(K key); /** - * Clears all entries from the store. + * Clears all entries from the value cache. + *

+ * NOTE: Your implementation MUST not throw exceptions, rather it should return a CompletableFuture that has completed exceptionally. Failure + * to do this may cause the {@link DataLoader} code to not run properly. * * @return a void future for error handling and fluent composition */ diff --git a/src/main/java/org/dataloader/ValueCacheOptions.java b/src/main/java/org/dataloader/ValueCacheOptions.java new file mode 100644 index 0000000..0928b97 --- /dev/null +++ b/src/main/java/org/dataloader/ValueCacheOptions.java @@ -0,0 +1,62 @@ +package org.dataloader; + +/** + * Options that control how the {@link ValueCache} is used by {@link DataLoader} + * + * @author Brad Baker + */ +public class ValueCacheOptions { + private final boolean dispatchOnCacheMiss; + private final boolean completeValueAfterCacheSet; + + private ValueCacheOptions() { + this.dispatchOnCacheMiss = true; + this.completeValueAfterCacheSet = false; + } + + private ValueCacheOptions(boolean dispatchOnCacheMiss, boolean completeValueAfterCacheSet) { + this.dispatchOnCacheMiss = dispatchOnCacheMiss; + this.completeValueAfterCacheSet = completeValueAfterCacheSet; + } + + public static ValueCacheOptions newOptions() { + return new ValueCacheOptions(); + } + + /** + * This controls whether the {@link DataLoader} will called {@link DataLoader#dispatch()} if a + * {@link ValueCache#get(Object)} call misses. In an async world this could take non zero time + * to complete and hence previous dispatch calls may have already completed. + * + * This is true by default. + * + * @return true if a {@link DataLoader#dispatch()} call will be made on an async {@link ValueCache} miss + */ + public boolean isDispatchOnCacheMiss() { + return dispatchOnCacheMiss; + } + + /** + * This controls whether the {@link DataLoader} will wait for the {@link ValueCache#set(Object, Object)} call + * to complete before it completes the returned value. By default this is false and hence + * the {@link ValueCache#set(Object, Object)} call may complete some time AFTER the data loader + * value has been returned. + * + * This is false by default, for performance reasons. + * + * @return true the {@link DataLoader} will wait for the {@link ValueCache#set(Object, Object)} call to complete before + * it completes the returned value. + */ + public boolean isCompleteValueAfterCacheSet() { + return completeValueAfterCacheSet; + } + + public ValueCacheOptions setDispatchOnCacheMiss(boolean flag) { + return new ValueCacheOptions(flag, this.completeValueAfterCacheSet); + } + + public ValueCacheOptions setCompleteValueAfterCacheSet(boolean flag) { + return new ValueCacheOptions(this.dispatchOnCacheMiss, flag); + } + +} diff --git a/src/test/java/org/dataloader/DataLoaderValueCacheTest.java b/src/test/java/org/dataloader/DataLoaderValueCacheTest.java index 1c54e91..622fbdf 100644 --- a/src/test/java/org/dataloader/DataLoaderValueCacheTest.java +++ b/src/test/java/org/dataloader/DataLoaderValueCacheTest.java @@ -4,6 +4,8 @@ import com.github.benmanes.caffeine.cache.Caffeine; import org.dataloader.fixtures.CaffeineValueCache; import org.dataloader.fixtures.CustomValueCache; +import org.dataloader.fixtures.TestKit; +import org.dataloader.impl.CompletableFutureKit; import org.junit.Test; import java.util.ArrayList; @@ -201,4 +203,49 @@ public CompletableFuture set(String key, Object value) { assertThat(loadCalls, equalTo(singletonList(asList("a", "b")))); assertArrayEquals(customStore.store.keySet().toArray(), singletonList("b").toArray()); } + + + @Test + public void dispatch_will_be_called_if_a_cache_value_miss_takes_some_time() { + // + // without the extra dispatch() internally, fA would never + // have completed. In the spirit of RRD, this test was written first + // and would hang before the support code was put in place + // + CustomValueCache customStore = new CustomValueCache() { + + @Override + public CompletableFuture get(String key) { + if (key.equals("a")) { + return CompletableFuture.supplyAsync(() -> { + TestKit.snooze(1000); + throw new IllegalStateException("no a in cache"); + }); + } + return super.get(key); + } + + }; + + List> loadCalls = new ArrayList<>(); + DataLoaderOptions options = newOptions().setValueCache(customStore); + DataLoader identityLoader = idLoader(options, loadCalls); + + CompletableFuture fA = identityLoader.load("a"); + CompletableFuture fB = identityLoader.load("b"); + + CompletableFuture> dispatchedCall = identityLoader.dispatch(); + + CompletableFuture> bothCalls = CompletableFutureKit.allOf(asList(fA, fB)); + await().atMost(5, TimeUnit.SECONDS).until(bothCalls::isDone); + + assertTrue(dispatchedCall.isDone()); + assertTrue(fA.isDone()); + assertTrue(fB.isDone()); + assertThat(fA.join(), equalTo("a")); + assertThat(fB.join(), equalTo("b")); + + // 'b' will complete in real time while 'a' was a cache miss after some time + assertThat(loadCalls, equalTo(asList(singletonList("b"), singletonList("a")))); + } }