From e3e70a4fba1b9516fb0f2d15f2846141d683bcb8 Mon Sep 17 00:00:00 2001 From: Brad Baker Date: Sat, 31 Jul 2021 18:01:37 +1000 Subject: [PATCH 1/9] This adds support for calling dispatch if the ValueCache takes time in get call --- .../java/org/dataloader/DataLoaderHelper.java | 72 +++++++++++++------ .../org/dataloader/DataLoaderOptions.java | 35 +++++++-- src/main/java/org/dataloader/ValueCache.java | 37 +++++++--- .../org/dataloader/ValueCacheOptions.java | 62 ++++++++++++++++ .../dataloader/DataLoaderValueCacheTest.java | 47 ++++++++++++ 5 files changed, 213 insertions(+), 40 deletions(-) create mode 100644 src/main/java/org/dataloader/ValueCacheOptions.java diff --git a/src/main/java/org/dataloader/DataLoaderHelper.java b/src/main/java/org/dataloader/DataLoaderHelper.java index 8f8f08f..15c7c4c 100644 --- a/src/main/java/org/dataloader/DataLoaderHelper.java +++ b/src/main/java/org/dataloader/DataLoaderHelper.java @@ -134,7 +134,8 @@ CompletableFuture load(K key, Object loadContext) { if (cachingEnabled) { return loadFromCache(key, loadContext, batchingEnabled); } else { - return queueOrInvokeLoader(key, loadContext, batchingEnabled); + CompletableFuture future = new CompletableFuture<>(); + return queueOrInvokeLoader(key, loadContext, batchingEnabled, future); } } } @@ -296,8 +297,8 @@ private CompletableFuture loadFromCache(K key, Object loadContext, boolean ba We haven't been asked for this key yet. We want to do one of two things: 1. Check if our cache store has it. If so: - a. Get the value from the cache store - b. Add a recovery case so we queue the load if fetching from cache store fails + a. Get the value from the cache store (this can take non-zero time) + b. Add a recovery case, so we queue the load if fetching from cache store fails c. Put that future in our futureCache to hit the early return next time d. Return the resilient future 2. If not in value cache: @@ -305,40 +306,65 @@ private CompletableFuture loadFromCache(K key, Object loadContext, boolean ba b. Add a success handler to store the result in the cache store c. Return the result */ - final CompletableFuture future = new CompletableFuture<>(); + final CompletableFuture loadCallFuture = new CompletableFuture<>(); - valueCache.get(cacheKey).whenComplete((cachedValue, getCallEx) -> { - if (getCallEx == null) { - future.complete(cachedValue); + CompletableFuture cacheLookupCF = valueCache.get(cacheKey); + boolean cachedLookupCompletedImmediately = cacheLookupCF.isDone(); + + cacheLookupCF.whenComplete((cachedValue, cacheException) -> { + if (cacheException == null) { + loadCallFuture.complete(cachedValue); } else { + CompletableFuture loaderCF; synchronized (dataLoader) { - queueOrInvokeLoader(key, loadContext, batchingEnabled) - .whenComplete(setValueIntoCacheAndCompleteFuture(cacheKey, future)); + loaderCF = queueOrInvokeLoader(key, loadContext, batchingEnabled, loadCallFuture); + loaderCF.whenComplete(setValueIntoValueCacheAndCompleteFuture(cacheKey, loadCallFuture)); + } + // + // is possible that if the cache lookup step took some time to execute + // (e.g. an async network lookup to REDIS etc...) then it's possible that this + // load call has already returned and a dispatch call has been made, and hence this code + // is running after the dispatch was made - so we dispatch to catch up because + // it's likely to hang if we do not. We might dispatch too early, but we will not + // hang because of an async cache lookup + // + if (!cachedLookupCompletedImmediately && !loaderCF.isDone()) { + if (loaderOptions.getValueCacheOptions().isDispatchOnCacheMiss()) { + dispatch(); + } } } }); - - futureCache.set(cacheKey, future); - - return future; + futureCache.set(cacheKey, loadCallFuture); + return loadCallFuture; } - private BiConsumer setValueIntoCacheAndCompleteFuture(Object cacheKey, CompletableFuture future) { - return (result, loadCallEx) -> { - if (loadCallEx == null) { - valueCache.set(cacheKey, result) - .whenComplete((v, setCallExIgnored) -> future.complete(result)); + private BiConsumer setValueIntoValueCacheAndCompleteFuture(Object cacheKey, CompletableFuture loadCallFuture) { + return (result, loadCallException) -> { + if (loadCallException == null) { + // + // we have completed our load call, and we should try to cache the value + // however we don't wait on the caching to complete before completing the load call + // this way a network cache call (say a REDIS put) does not have to be completed in order + // for the calling code to get a value. There is an option that controls + // which is off by default to make the code faster. + // + CompletableFuture valueCacheSetCF = valueCache.set(cacheKey, result); + if (loaderOptions.getValueCacheOptions().isCompleteValueAfterCacheSet()) { + valueCacheSetCF.whenComplete((v, setCallExceptionIgnored) -> loadCallFuture.complete(result)); + } else { + loadCallFuture.complete(result); + } } else { - future.completeExceptionally(loadCallEx); + loadCallFuture.completeExceptionally(loadCallException); } }; } - private CompletableFuture queueOrInvokeLoader(K key, Object loadContext, boolean batchingEnabled) { + private CompletableFuture queueOrInvokeLoader(K key, Object loadContext, boolean batchingEnabled, CompletableFuture loadCallFuture) { if (batchingEnabled) { - CompletableFuture future = new CompletableFuture<>(); - loaderQueue.add(new LoaderQueueEntry<>(key, future, loadContext)); - return future; + loaderQueue.add(new LoaderQueueEntry<>(key, loadCallFuture, loadContext)); + return loadCallFuture; } else { stats.incrementBatchLoadCountBy(1); // immediate execution of batch function diff --git a/src/main/java/org/dataloader/DataLoaderOptions.java b/src/main/java/org/dataloader/DataLoaderOptions.java index 89530e1..8cd35ba 100644 --- a/src/main/java/org/dataloader/DataLoaderOptions.java +++ b/src/main/java/org/dataloader/DataLoaderOptions.java @@ -17,6 +17,7 @@ package org.dataloader; import org.dataloader.annotations.PublicApi; +import org.dataloader.impl.Assertions; import org.dataloader.stats.SimpleStatisticsCollector; import org.dataloader.stats.StatisticsCollector; @@ -39,11 +40,12 @@ public class DataLoaderOptions { private boolean cachingEnabled; private boolean cachingExceptionsEnabled; private CacheKey cacheKeyFunction; - private CacheMap cacheMap; - private ValueCache valueCache; + private CacheMap cacheMap; + private ValueCache valueCache; private int maxBatchSize; private Supplier statisticsCollector; private BatchLoaderContextProvider environmentProvider; + private ValueCacheOptions valueCacheOptions; /** * Creates a new data loader options with default settings. @@ -55,6 +57,7 @@ public DataLoaderOptions() { maxBatchSize = -1; statisticsCollector = SimpleStatisticsCollector::new; environmentProvider = NULL_PROVIDER; + valueCacheOptions = ValueCacheOptions.newOptions(); } /** @@ -72,6 +75,7 @@ public DataLoaderOptions(DataLoaderOptions other) { this.maxBatchSize = other.maxBatchSize; this.statisticsCollector = other.statisticsCollector; this.environmentProvider = other.environmentProvider; + this.valueCacheOptions = other.valueCacheOptions; } /** @@ -179,7 +183,7 @@ public DataLoaderOptions setCacheKeyFunction(CacheKey cacheKeyFunction) { * * @return an optional with the cache map instance, or empty */ - public Optional> cacheMap() { + public Optional> cacheMap() { return Optional.ofNullable(cacheMap); } @@ -190,7 +194,7 @@ public Optional> cacheMap() { * * @return the data loader options for fluent coding */ - public DataLoaderOptions setCacheMap(CacheMap cacheMap) { + public DataLoaderOptions setCacheMap(CacheMap cacheMap) { this.cacheMap = cacheMap; return this; } @@ -265,7 +269,7 @@ public DataLoaderOptions setBatchLoaderContextProvider(BatchLoaderContextProvide * * @return an optional with the cache store instance, or empty */ - public Optional> valueCache() { + public Optional> valueCache() { return Optional.ofNullable(valueCache); } @@ -276,8 +280,27 @@ public Optional> valueCache() { * * @return the data loader options for fluent coding */ - public DataLoaderOptions setValueCache(ValueCache valueCache) { + public DataLoaderOptions setValueCache(ValueCache valueCache) { this.valueCache = valueCache; return this; } + + /** + * @return the {@link ValueCacheOptions} that control how the {@link ValueCache} will be used + */ + public ValueCacheOptions getValueCacheOptions() { + return valueCacheOptions; + } + + /** + * Sets the {@link ValueCacheOptions} that control how the {@link ValueCache} will be used + * + * @param valueCacheOptions the value cache options + * + * @return the data loader options for fluent coding + */ + public DataLoaderOptions setValueCacheOptions(ValueCacheOptions valueCacheOptions) { + this.valueCacheOptions = Assertions.nonNull(valueCacheOptions); + return this; + } } diff --git a/src/main/java/org/dataloader/ValueCache.java b/src/main/java/org/dataloader/ValueCache.java index 31042c6..036ae4b 100644 --- a/src/main/java/org/dataloader/ValueCache.java +++ b/src/main/java/org/dataloader/ValueCache.java @@ -7,14 +7,17 @@ /** * The {@link ValueCache} is used by data loaders that use caching and want a long-lived or external cache - * of values. The {@link ValueCache} is used as a place to cache values when they come back from + * of values. The {@link ValueCache} is used as a place to cache values when they come back from an async + * cache store. *

- * It differs from {@link CacheMap} which is in fact a cache of promises to values aka {@link CompletableFuture}<V> and it rather suited - * to be a wrapper of a long lived or external value cache. {@link CompletableFuture}s cant be easily placed in an external cache - * outside the JVM say, hence the need for the {@link ValueCache}. + * It differs from {@link CacheMap} which is in fact a cache of promised values aka {@link CompletableFuture}<V>'s. + *

+ * {@link ValueCache is more suited to be a wrapper of a long-lived or externallly cached values. {@link CompletableFuture}s cant + * be easily placed in an external cache outside the JVM say, hence the need for the {@link ValueCache}. *

* {@link DataLoader}s use a two stage cache strategy if caching is enabled. If the {@link CacheMap} already has the promise to a value * that is used. If not then the {@link ValueCache} is asked for a value, if it has one then that is returned (and cached as a promise in the {@link CacheMap}. + *

* If there is no value then the key is queued and loaded via the {@link BatchLoader} calls. The returned values will then be stored in * the {@link ValueCache} and the promises to those values are also stored in the {@link CacheMap}. *

@@ -22,18 +25,18 @@ * store any actual results. This is to avoid duplicating the stored data between the {@link CacheMap} * out of the box. *

- * The API signature uses completable futures because the backing implementation MAY be a remote external cache - * and hence exceptions may happen in retrieving values. + * The API signature uses {@link CompletableFuture}s because the backing implementation MAY be a remote external cache + * and hence exceptions may happen in retrieving values and they may take time to complete. * * @param the type of cache keys * @param the type of cache values * * @author Craig Day + * @author Brad Baker */ @PublicSpi public interface ValueCache { - /** * Creates a new value cache, using the default no-op implementation. * @@ -48,9 +51,12 @@ static ValueCache defaultValueCache() { } /** - * Gets the specified key from the store. if the key si not present, then the implementation MUST return an exceptionally completed future - * and not null because null is a valid cacheable value. Any exception is will cause {@link DataLoader} to load the key via batch loading + * Gets the specified key from the value cache. If the key is not present, then the implementation MUST return an exceptionally completed future + * and not null because null is a valid cacheable value. An exceptionally completed future will cause {@link DataLoader} to load the key via batch loading * instead. + *

+ * NOTE: Your implementation MUST not throw exceptions, rather it should return a CompletableFuture that has completed exceptionally. Failure + * to do this may cause the {@link DataLoader} code to not run properly. * * @param key the key to retrieve * @@ -61,6 +67,9 @@ static ValueCache defaultValueCache() { /** * Stores the value with the specified key, or updates it if the key already exists. + *

+ * NOTE: Your implementation MUST not throw exceptions, rather it should return a CompletableFuture that has completed exceptionally. Failure + * to do this may cause the {@link DataLoader} code to not run properly. * * @param key the key to store * @param value the value to store @@ -70,7 +79,10 @@ static ValueCache defaultValueCache() { CompletableFuture set(K key, V value); /** - * Deletes the entry with the specified key from the store, if it exists. + * Deletes the entry with the specified key from the value cache, if it exists. + *

+ * NOTE: Your implementation MUST not throw exceptions, rather it should return a CompletableFuture that has completed exceptionally. Failure + * to do this may cause the {@link DataLoader} code to not run properly. * * @param key the key to delete * @@ -79,7 +91,10 @@ static ValueCache defaultValueCache() { CompletableFuture delete(K key); /** - * Clears all entries from the store. + * Clears all entries from the value cache. + *

+ * NOTE: Your implementation MUST not throw exceptions, rather it should return a CompletableFuture that has completed exceptionally. Failure + * to do this may cause the {@link DataLoader} code to not run properly. * * @return a void future for error handling and fluent composition */ diff --git a/src/main/java/org/dataloader/ValueCacheOptions.java b/src/main/java/org/dataloader/ValueCacheOptions.java new file mode 100644 index 0000000..0928b97 --- /dev/null +++ b/src/main/java/org/dataloader/ValueCacheOptions.java @@ -0,0 +1,62 @@ +package org.dataloader; + +/** + * Options that control how the {@link ValueCache} is used by {@link DataLoader} + * + * @author Brad Baker + */ +public class ValueCacheOptions { + private final boolean dispatchOnCacheMiss; + private final boolean completeValueAfterCacheSet; + + private ValueCacheOptions() { + this.dispatchOnCacheMiss = true; + this.completeValueAfterCacheSet = false; + } + + private ValueCacheOptions(boolean dispatchOnCacheMiss, boolean completeValueAfterCacheSet) { + this.dispatchOnCacheMiss = dispatchOnCacheMiss; + this.completeValueAfterCacheSet = completeValueAfterCacheSet; + } + + public static ValueCacheOptions newOptions() { + return new ValueCacheOptions(); + } + + /** + * This controls whether the {@link DataLoader} will called {@link DataLoader#dispatch()} if a + * {@link ValueCache#get(Object)} call misses. In an async world this could take non zero time + * to complete and hence previous dispatch calls may have already completed. + * + * This is true by default. + * + * @return true if a {@link DataLoader#dispatch()} call will be made on an async {@link ValueCache} miss + */ + public boolean isDispatchOnCacheMiss() { + return dispatchOnCacheMiss; + } + + /** + * This controls whether the {@link DataLoader} will wait for the {@link ValueCache#set(Object, Object)} call + * to complete before it completes the returned value. By default this is false and hence + * the {@link ValueCache#set(Object, Object)} call may complete some time AFTER the data loader + * value has been returned. + * + * This is false by default, for performance reasons. + * + * @return true the {@link DataLoader} will wait for the {@link ValueCache#set(Object, Object)} call to complete before + * it completes the returned value. + */ + public boolean isCompleteValueAfterCacheSet() { + return completeValueAfterCacheSet; + } + + public ValueCacheOptions setDispatchOnCacheMiss(boolean flag) { + return new ValueCacheOptions(flag, this.completeValueAfterCacheSet); + } + + public ValueCacheOptions setCompleteValueAfterCacheSet(boolean flag) { + return new ValueCacheOptions(this.dispatchOnCacheMiss, flag); + } + +} diff --git a/src/test/java/org/dataloader/DataLoaderValueCacheTest.java b/src/test/java/org/dataloader/DataLoaderValueCacheTest.java index 1c54e91..622fbdf 100644 --- a/src/test/java/org/dataloader/DataLoaderValueCacheTest.java +++ b/src/test/java/org/dataloader/DataLoaderValueCacheTest.java @@ -4,6 +4,8 @@ import com.github.benmanes.caffeine.cache.Caffeine; import org.dataloader.fixtures.CaffeineValueCache; import org.dataloader.fixtures.CustomValueCache; +import org.dataloader.fixtures.TestKit; +import org.dataloader.impl.CompletableFutureKit; import org.junit.Test; import java.util.ArrayList; @@ -201,4 +203,49 @@ public CompletableFuture set(String key, Object value) { assertThat(loadCalls, equalTo(singletonList(asList("a", "b")))); assertArrayEquals(customStore.store.keySet().toArray(), singletonList("b").toArray()); } + + + @Test + public void dispatch_will_be_called_if_a_cache_value_miss_takes_some_time() { + // + // without the extra dispatch() internally, fA would never + // have completed. In the spirit of RRD, this test was written first + // and would hang before the support code was put in place + // + CustomValueCache customStore = new CustomValueCache() { + + @Override + public CompletableFuture get(String key) { + if (key.equals("a")) { + return CompletableFuture.supplyAsync(() -> { + TestKit.snooze(1000); + throw new IllegalStateException("no a in cache"); + }); + } + return super.get(key); + } + + }; + + List> loadCalls = new ArrayList<>(); + DataLoaderOptions options = newOptions().setValueCache(customStore); + DataLoader identityLoader = idLoader(options, loadCalls); + + CompletableFuture fA = identityLoader.load("a"); + CompletableFuture fB = identityLoader.load("b"); + + CompletableFuture> dispatchedCall = identityLoader.dispatch(); + + CompletableFuture> bothCalls = CompletableFutureKit.allOf(asList(fA, fB)); + await().atMost(5, TimeUnit.SECONDS).until(bothCalls::isDone); + + assertTrue(dispatchedCall.isDone()); + assertTrue(fA.isDone()); + assertTrue(fB.isDone()); + assertThat(fA.join(), equalTo("a")); + assertThat(fB.join(), equalTo("b")); + + // 'b' will complete in real time while 'a' was a cache miss after some time + assertThat(loadCalls, equalTo(asList(singletonList("b"), singletonList("a")))); + } } From ae0a29e85c4458e277110d169d1066d68d4364ba Mon Sep 17 00:00:00 2001 From: Brad Baker Date: Sat, 31 Jul 2021 18:09:01 +1000 Subject: [PATCH 2/9] javadoc problem --- src/main/java/org/dataloader/ValueCache.java | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/src/main/java/org/dataloader/ValueCache.java b/src/main/java/org/dataloader/ValueCache.java index 036ae4b..c2aab46 100644 --- a/src/main/java/org/dataloader/ValueCache.java +++ b/src/main/java/org/dataloader/ValueCache.java @@ -12,7 +12,7 @@ *

* It differs from {@link CacheMap} which is in fact a cache of promised values aka {@link CompletableFuture}<V>'s. *

- * {@link ValueCache is more suited to be a wrapper of a long-lived or externallly cached values. {@link CompletableFuture}s cant + * {@link ValueCache} is more suited to be a wrapper of a long-lived or externallly cached values. {@link CompletableFuture}s cant * be easily placed in an external cache outside the JVM say, hence the need for the {@link ValueCache}. *

* {@link DataLoader}s use a two stage cache strategy if caching is enabled. If the {@link CacheMap} already has the promise to a value From a74031ad7ff10883f71524b164890da45b50706f Mon Sep 17 00:00:00 2001 From: Brad Baker Date: Sat, 7 Aug 2021 21:29:33 +1000 Subject: [PATCH 3/9] This attacks the problem os async ValueCache lookups in the batch load function and not the load method --- src/main/java/org/dataloader/DataLoader.java | 6 +- .../java/org/dataloader/DataLoaderHelper.java | 204 ++++++++++-------- src/main/java/org/dataloader/Try.java | 45 +++- src/main/java/org/dataloader/ValueCache.java | 63 +++++- .../org/dataloader/ValueCacheOptions.java | 24 +-- .../java/org/dataloader/impl/Assertions.java | 20 +- .../impl/DataLoaderAssertionException.java | 7 + .../dataloader/impl/PromisedValuesImpl.java | 4 +- .../dataloader/DataLoaderValueCacheTest.java | 186 +++++++++++++--- src/test/java/org/dataloader/TryTest.java | 10 + .../dataloader/fixtures/CustomValueCache.java | 4 + 11 files changed, 401 insertions(+), 172 deletions(-) create mode 100644 src/main/java/org/dataloader/impl/DataLoaderAssertionException.java diff --git a/src/main/java/org/dataloader/DataLoader.java b/src/main/java/org/dataloader/DataLoader.java index 0aea1c7..fe9c59a 100644 --- a/src/main/java/org/dataloader/DataLoader.java +++ b/src/main/java/org/dataloader/DataLoader.java @@ -67,7 +67,7 @@ public class DataLoader { private final DataLoaderHelper helper; private final StatisticsCollector stats; private final CacheMap futureCache; - private final ValueCache valueCache; + private final ValueCache valueCache; /** * Creates new DataLoader with the specified batch loader function and default options @@ -430,8 +430,8 @@ private CacheMap determineFutureCache(DataLoaderOptions loaderOptions } @SuppressWarnings("unchecked") - private ValueCache determineValueCache(DataLoaderOptions loaderOptions) { - return (ValueCache) loaderOptions.valueCache().orElseGet(ValueCache::defaultValueCache); + private ValueCache determineValueCache(DataLoaderOptions loaderOptions) { + return (ValueCache) loaderOptions.valueCache().orElseGet(ValueCache::defaultValueCache); } /** diff --git a/src/main/java/org/dataloader/DataLoaderHelper.java b/src/main/java/org/dataloader/DataLoaderHelper.java index 15c7c4c..bad8579 100644 --- a/src/main/java/org/dataloader/DataLoaderHelper.java +++ b/src/main/java/org/dataloader/DataLoaderHelper.java @@ -8,19 +8,22 @@ import java.time.Instant; import java.util.ArrayList; import java.util.Collection; +import java.util.LinkedHashMap; import java.util.LinkedHashSet; import java.util.List; import java.util.Map; import java.util.Optional; import java.util.Set; import java.util.concurrent.CompletableFuture; +import java.util.concurrent.CompletionException; import java.util.concurrent.CompletionStage; import java.util.concurrent.atomic.AtomicReference; -import java.util.function.BiConsumer; -import java.util.stream.Collectors; import static java.util.Collections.emptyList; import static java.util.Collections.singletonList; +import static java.util.concurrent.CompletableFuture.allOf; +import static java.util.concurrent.CompletableFuture.completedFuture; +import static java.util.stream.Collectors.toList; import static org.dataloader.impl.Assertions.assertState; import static org.dataloader.impl.Assertions.nonNull; @@ -63,7 +66,7 @@ Object getCallContext() { private final Object batchLoadFunction; private final DataLoaderOptions loaderOptions; private final CacheMap futureCache; - private final ValueCache valueCache; + private final ValueCache valueCache; private final List>> loaderQueue; private final StatisticsCollector stats; private final Clock clock; @@ -73,7 +76,7 @@ Object getCallContext() { Object batchLoadFunction, DataLoaderOptions loaderOptions, CacheMap futureCache, - ValueCache valueCache, + ValueCache valueCache, StatisticsCollector stats, Clock clock) { this.dataLoader = dataLoader; @@ -134,8 +137,7 @@ CompletableFuture load(K key, Object loadContext) { if (cachingEnabled) { return loadFromCache(key, loadContext, batchingEnabled); } else { - CompletableFuture future = new CompletableFuture<>(); - return queueOrInvokeLoader(key, loadContext, batchingEnabled, future); + return queueOrInvokeLoader(key, loadContext, batchingEnabled, false); } } } @@ -169,7 +171,7 @@ DispatchResult dispatch() { lastDispatchTime.set(now()); } if (!batchingEnabled || keys.isEmpty()) { - return new DispatchResult<>(CompletableFuture.completedFuture(emptyList()), 0); + return new DispatchResult<>(completedFuture(emptyList()), 0); } final int totalEntriesHandled = keys.size(); // @@ -212,17 +214,17 @@ private CompletableFuture> sliceIntoBatchesOfBatches(List keys, List< } // // now reassemble all the futures into one that is the complete set of results - return CompletableFuture.allOf(allBatches.toArray(new CompletableFuture[0])) + return allOf(allBatches.toArray(new CompletableFuture[0])) .thenApply(v -> allBatches.stream() .map(CompletableFuture::join) .flatMap(Collection::stream) - .collect(Collectors.toList())); + .collect(toList())); } @SuppressWarnings("unchecked") private CompletableFuture> dispatchQueueBatch(List keys, List callContexts, List> queuedFutures) { stats.incrementBatchLoadCountBy(keys.size()); - CompletionStage> batchLoad = invokeLoader(keys, callContexts); + CompletionStage> batchLoad = invokeLoader(keys, callContexts, loaderOptions.cachingEnabled()); return batchLoad .toCompletableFuture() .thenApply(values -> { @@ -255,6 +257,9 @@ private CompletableFuture> dispatchQueueBatch(List keys, List return values; }).exceptionally(ex -> { stats.incrementBatchLoadExceptionCount(); + if (ex instanceof CompletionException) { + ex = ex.getCause(); + } for (int idx = 0; idx < queuedFutures.size(); idx++) { K key = keys.get(idx); CompletableFuture future = queuedFutures.get(idx); @@ -268,7 +273,7 @@ private CompletableFuture> dispatchQueueBatch(List keys, List private void assertResultSize(List keys, List values) { - assertState(keys.size() == values.size(), "The size of the promised values MUST be the same size as the key list"); + assertState(keys.size() == values.size(), () -> "The size of the promised values MUST be the same size as the key list"); } private void possiblyClearCacheEntriesOnExceptions(List keys) { @@ -293,103 +298,91 @@ private CompletableFuture loadFromCache(K key, Object loadContext, boolean ba return futureCache.get(cacheKey); } - /* - We haven't been asked for this key yet. We want to do one of two things: - - 1. Check if our cache store has it. If so: - a. Get the value from the cache store (this can take non-zero time) - b. Add a recovery case, so we queue the load if fetching from cache store fails - c. Put that future in our futureCache to hit the early return next time - d. Return the resilient future - 2. If not in value cache: - a. queue or invoke the load - b. Add a success handler to store the result in the cache store - c. Return the result - */ - final CompletableFuture loadCallFuture = new CompletableFuture<>(); - - CompletableFuture cacheLookupCF = valueCache.get(cacheKey); - boolean cachedLookupCompletedImmediately = cacheLookupCF.isDone(); - - cacheLookupCF.whenComplete((cachedValue, cacheException) -> { - if (cacheException == null) { - loadCallFuture.complete(cachedValue); - } else { - CompletableFuture loaderCF; - synchronized (dataLoader) { - loaderCF = queueOrInvokeLoader(key, loadContext, batchingEnabled, loadCallFuture); - loaderCF.whenComplete(setValueIntoValueCacheAndCompleteFuture(cacheKey, loadCallFuture)); - } - // - // is possible that if the cache lookup step took some time to execute - // (e.g. an async network lookup to REDIS etc...) then it's possible that this - // load call has already returned and a dispatch call has been made, and hence this code - // is running after the dispatch was made - so we dispatch to catch up because - // it's likely to hang if we do not. We might dispatch too early, but we will not - // hang because of an async cache lookup - // - if (!cachedLookupCompletedImmediately && !loaderCF.isDone()) { - if (loaderOptions.getValueCacheOptions().isDispatchOnCacheMiss()) { - dispatch(); - } - } - } - }); + CompletableFuture loadCallFuture; + synchronized (dataLoader) { + loadCallFuture = queueOrInvokeLoader(key, loadContext, batchingEnabled, true); + } + futureCache.set(cacheKey, loadCallFuture); return loadCallFuture; } - private BiConsumer setValueIntoValueCacheAndCompleteFuture(Object cacheKey, CompletableFuture loadCallFuture) { - return (result, loadCallException) -> { - if (loadCallException == null) { - // - // we have completed our load call, and we should try to cache the value - // however we don't wait on the caching to complete before completing the load call - // this way a network cache call (say a REDIS put) does not have to be completed in order - // for the calling code to get a value. There is an option that controls - // which is off by default to make the code faster. - // - CompletableFuture valueCacheSetCF = valueCache.set(cacheKey, result); - if (loaderOptions.getValueCacheOptions().isCompleteValueAfterCacheSet()) { - valueCacheSetCF.whenComplete((v, setCallExceptionIgnored) -> loadCallFuture.complete(result)); - } else { - loadCallFuture.complete(result); - } - } else { - loadCallFuture.completeExceptionally(loadCallException); - } - }; - } - - private CompletableFuture queueOrInvokeLoader(K key, Object loadContext, boolean batchingEnabled, CompletableFuture loadCallFuture) { + private CompletableFuture queueOrInvokeLoader(K key, Object loadContext, boolean batchingEnabled, boolean cachingEnabled) { if (batchingEnabled) { + CompletableFuture loadCallFuture = new CompletableFuture<>(); loaderQueue.add(new LoaderQueueEntry<>(key, loadCallFuture, loadContext)); return loadCallFuture; } else { stats.incrementBatchLoadCountBy(1); // immediate execution of batch function - return invokeLoaderImmediately(key, loadContext); + return invokeLoaderImmediately(key, loadContext, cachingEnabled); } } - CompletableFuture invokeLoaderImmediately(K key, Object keyContext) { + CompletableFuture invokeLoaderImmediately(K key, Object keyContext, boolean cachingEnabled) { List keys = singletonList(key); - CompletionStage singleLoadCall; - try { - Object context = loaderOptions.getBatchLoaderContextProvider().getContext(); - BatchLoaderEnvironment environment = BatchLoaderEnvironment.newBatchLoaderEnvironment() - .context(context).keyContexts(keys, singletonList(keyContext)).build(); - if (isMapLoader()) { - singleLoadCall = invokeMapBatchLoader(keys, environment).thenApply(list -> list.get(0)); + List keyContexts = singletonList(keyContext); + return invokeLoader(keys, keyContexts, cachingEnabled) + .thenApply(list -> list.get(0)) + .toCompletableFuture(); + } + + CompletionStage> invokeLoader(List keys, List keyContexts, boolean cachingEnabled) { + if (!cachingEnabled) { + return invokeLoader(keys, keyContexts); + } + CompletableFuture>> cacheCallCF = getFromValueCache(keys); + return cacheCallCF.thenCompose(cachedValues -> { + + assertState(keys.size() == cachedValues.size(), () -> "The size of the cached values MUST be the same size as the key list"); + + LinkedHashMap valuesInKeyOrder = new LinkedHashMap<>(); + List cacheMissedKeys = new ArrayList<>(); + List cacheMissedContexts = new ArrayList<>(); + for (int i = 0; i < keys.size(); i++) { + K key = keys.get(i); + Object keyContext = keyContexts.get(i); + Try cacheGet = cachedValues.get(i); + if (cacheGet.isSuccess()) { + valuesInKeyOrder.put(key, cacheGet.get()); + } else { + valuesInKeyOrder.put(key, null); // an entry to be replaced later + cacheMissedKeys.add(key); + cacheMissedContexts.add(keyContext); + } + } + if (cacheMissedKeys.isEmpty()) { + // + // everything was cached + // + return completedFuture(new ArrayList<>(valuesInKeyOrder.values())); } else { - singleLoadCall = invokeListBatchLoader(keys, environment).thenApply(list -> list.get(0)); + // + // we missed some of the keys from cache, so send them to the batch loader + // and then fill in their values + // + CompletionStage> batchLoad = invokeLoader(cacheMissedKeys, cacheMissedContexts); + CompletionStage> assembledValues = batchLoad.thenApply(batchedValues -> { + assertResultSize(cacheMissedKeys, batchedValues); + + for (int i = 0; i < batchedValues.size(); i++) { + K missedKey = cacheMissedKeys.get(i); + V v = batchedValues.get(i); + valuesInKeyOrder.put(missedKey, v); + } + return new ArrayList<>(valuesInKeyOrder.values()); + }); + // + // fire off a call to the ValueCache to allow it to set values into the + // cache now that we have them + assembledValues = setToValueCache(keys, assembledValues); + + return assembledValues; } - return singleLoadCall.toCompletableFuture(); - } catch (Exception e) { - return CompletableFutureKit.failedFuture(e); - } + }); } + CompletionStage> invokeLoader(List keys, List keyContexts) { CompletionStage> batchLoad; try { @@ -415,7 +408,7 @@ private CompletionStage> invokeListBatchLoader(List keys, BatchLoader } else { loadResult = ((BatchLoader) batchLoadFunction).load(keys); } - return nonNull(loadResult, "Your batch loader function MUST return a non null CompletionStage promise"); + return nonNull(loadResult, () -> "Your batch loader function MUST return a non null CompletionStage promise"); } @@ -432,7 +425,7 @@ private CompletionStage> invokeMapBatchLoader(List keys, BatchLoaderE } else { loadResult = ((MappedBatchLoader) batchLoadFunction).load(setOfKeys); } - CompletionStage> mapBatchLoad = nonNull(loadResult, "Your batch loader function MUST return a non null CompletionStage promise"); + CompletionStage> mapBatchLoad = nonNull(loadResult, () -> "Your batch loader function MUST return a non null CompletionStage promise"); return mapBatchLoad.thenApply(map -> { List values = new ArrayList<>(); for (K key : keys) { @@ -452,4 +445,31 @@ int dispatchDepth() { return loaderQueue.size(); } } + + private CompletableFuture>> getFromValueCache(List keys) { + try { + return nonNull(valueCache.getValues(keys), () -> "Your ValueCache.getValues function MUST return a non null promise"); + } catch (RuntimeException e) { + return CompletableFutureKit.failedFuture(e); + } + } + + private CompletionStage> setToValueCache(List keys, CompletionStage> assembledValues) { + boolean completeValueAfterCacheSet = loaderOptions.getValueCacheOptions().isCompleteValueAfterCacheSet(); + if (completeValueAfterCacheSet) { + return assembledValues.thenCompose(values -> nonNull(valueCache + .setValues(keys, values), () -> "Your ValueCache.setValues function MUST return a non null promise") + // we dont trust the set cache to give us the values back - we have them - lets use them + // if the cache set fails - then they wont be in cache and maybe next time they will + .handle((ignored, setExIgnored) -> values)); + } else { + return assembledValues.thenApply(values -> { + // no one is waiting for the set to happen here so if its truly async + // it will happen eventually but no result will be dependant on it + valueCache.setValues(keys, values); + return values; + }); + } + } + } diff --git a/src/main/java/org/dataloader/Try.java b/src/main/java/org/dataloader/Try.java index e273155..3f9a129 100644 --- a/src/main/java/org/dataloader/Try.java +++ b/src/main/java/org/dataloader/Try.java @@ -4,6 +4,7 @@ import java.util.Optional; import java.util.concurrent.Callable; +import java.util.concurrent.CompletableFuture; import java.util.concurrent.CompletionStage; import java.util.function.Consumer; import java.util.function.Function; @@ -26,13 +27,22 @@ */ @PublicApi public class Try { - private static Throwable NIL = new Throwable() { + private final static Object NIL = new Object() { + }; + + private final static Throwable NIL_THROWABLE = new RuntimeException() { + @Override + public String getMessage() { + return "failure"; + } + @Override public synchronized Throwable fillInStackTrace() { return this; } }; + private final Throwable throwable; private final V value; @@ -48,6 +58,12 @@ private Try(V value) { this.throwable = null; } + + @Override + public String toString() { + return isSuccess() ? "success" : "failure"; + } + /** * Creates a Try that has succeeded with the provided value * @@ -72,6 +88,18 @@ public static Try failed(Throwable throwable) { return new Try<>(throwable); } + /** + * This returns a Try that has always failed with an consistent exception. Use this when + * yiu dont care about the exception but only that the Try failed. + * + * @param the type of value + * + * @return a Try that has failed + */ + public static Try alwaysFailed() { + return Try.failed(NIL_THROWABLE); + } + /** * Calls the callable and if it returns a value, the Try is successful with that value or if throws * and exception the Try captures that @@ -96,7 +124,7 @@ public static Try tryCall(Callable callable) { * @param completionStage the completion stage that will complete * @param the value type * - * @return a Try which is the result of the call + * @return a CompletionStage Try which is the result of the call */ public static CompletionStage> tryStage(CompletionStage completionStage) { return completionStage.handle((value, throwable) -> { @@ -107,6 +135,19 @@ public static CompletionStage> tryStage(CompletionStage completion }); } + /** + * Creates a CompletableFuture that, when it completes, will capture into a Try whether the given completionStage + * was successful or not + * + * @param completionStage the completion stage that will complete + * @param the value type + * + * @return a CompletableFuture Try which is the result of the call + */ + public static CompletableFuture> tryFuture(CompletionStage completionStage) { + return tryStage(completionStage).toCompletableFuture(); + } + /** * @return the successful value of this try * diff --git a/src/main/java/org/dataloader/ValueCache.java b/src/main/java/org/dataloader/ValueCache.java index c2aab46..0218cde 100644 --- a/src/main/java/org/dataloader/ValueCache.java +++ b/src/main/java/org/dataloader/ValueCache.java @@ -1,8 +1,11 @@ package org.dataloader; import org.dataloader.annotations.PublicSpi; +import org.dataloader.impl.CompletableFutureKit; import org.dataloader.impl.NoOpValueCache; +import java.util.ArrayList; +import java.util.List; import java.util.concurrent.CompletableFuture; /** @@ -55,8 +58,6 @@ static ValueCache defaultValueCache() { * and not null because null is a valid cacheable value. An exceptionally completed future will cause {@link DataLoader} to load the key via batch loading * instead. *

- * NOTE: Your implementation MUST not throw exceptions, rather it should return a CompletableFuture that has completed exceptionally. Failure - * to do this may cause the {@link DataLoader} code to not run properly. * * @param key the key to retrieve * @@ -66,10 +67,41 @@ static ValueCache defaultValueCache() { CompletableFuture get(K key); /** - * Stores the value with the specified key, or updates it if the key already exists. + * Gets the specified key from the value cache. If the key is not present, then the returned {@link Try} will be a failed one + * other wise it has the cached value. This is preferred over the {@link #get(Object)} method. *

- * NOTE: Your implementation MUST not throw exceptions, rather it should return a CompletableFuture that has completed exceptionally. Failure - * to do this may cause the {@link DataLoader} code to not run properly. + * + * @param key the key to retrieve + * + * @return a future containing the {@link Try} cached value (which maybe null) or a failed {@link Try} if the key does + * not exist in the cache. + */ + default CompletableFuture> getValue(K key) { + return Try.tryFuture(get(key)); + } + + /** + * Gets the specified keys from the value cache, in a batch call. If your underlying cache cant do batch caching retrieval + * then do not implement this method and it will delegate back to {@link #getValue(Object)} for you + *

+ * You MUST return a List that is the same size as the keys passed in. The code will assert if you do not. + * + * @param keys the list of keys to get cached values for. + * + * @return a future containing a list of {@link Try} cached values (which maybe {@link Try#succeeded(Object)} or a failed {@link Try} + * per key if they do not exist in the cache. + */ + default CompletableFuture>> getValues(List keys) { + List>> cacheLookups = new ArrayList<>(); + for (K key : keys) { + CompletableFuture> cacheTry = getValue(key); + cacheLookups.add(cacheTry); + } + return CompletableFutureKit.allOf(cacheLookups); + } + + /** + * Stores the value with the specified key, or updates it if the key already exists. * * @param key the key to store * @param value the value to store @@ -78,6 +110,27 @@ static ValueCache defaultValueCache() { */ CompletableFuture set(K key, V value); + /** + * Stores the value with the specified keys, or updates it if the keys if they already exist. If your underlying cache cant do batch caching setting + * then do not implement this method and it will delegate back to {@link #set(Object, Object)} for you + * + * @param keys the keys to store + * @param values the values to store + * + * @return a future containing the stored values for fluent composition + */ + default CompletableFuture> setValues(List keys, List values) { + List> cacheSets = new ArrayList<>(); + for (int i = 0; i < keys.size(); i++) { + K k = keys.get(i); + V v = values.get(i); + CompletableFuture setCall = set(k, v); + CompletableFuture set = Try.tryFuture(setCall).thenApply(ignored -> v); + cacheSets.add(set); + } + return CompletableFutureKit.allOf(cacheSets); + } + /** * Deletes the entry with the specified key from the value cache, if it exists. *

diff --git a/src/main/java/org/dataloader/ValueCacheOptions.java b/src/main/java/org/dataloader/ValueCacheOptions.java index 0928b97..1a0c1a1 100644 --- a/src/main/java/org/dataloader/ValueCacheOptions.java +++ b/src/main/java/org/dataloader/ValueCacheOptions.java @@ -6,16 +6,13 @@ * @author Brad Baker */ public class ValueCacheOptions { - private final boolean dispatchOnCacheMiss; private final boolean completeValueAfterCacheSet; private ValueCacheOptions() { - this.dispatchOnCacheMiss = true; this.completeValueAfterCacheSet = false; } - private ValueCacheOptions(boolean dispatchOnCacheMiss, boolean completeValueAfterCacheSet) { - this.dispatchOnCacheMiss = dispatchOnCacheMiss; + private ValueCacheOptions(boolean completeValueAfterCacheSet) { this.completeValueAfterCacheSet = completeValueAfterCacheSet; } @@ -23,19 +20,6 @@ public static ValueCacheOptions newOptions() { return new ValueCacheOptions(); } - /** - * This controls whether the {@link DataLoader} will called {@link DataLoader#dispatch()} if a - * {@link ValueCache#get(Object)} call misses. In an async world this could take non zero time - * to complete and hence previous dispatch calls may have already completed. - * - * This is true by default. - * - * @return true if a {@link DataLoader#dispatch()} call will be made on an async {@link ValueCache} miss - */ - public boolean isDispatchOnCacheMiss() { - return dispatchOnCacheMiss; - } - /** * This controls whether the {@link DataLoader} will wait for the {@link ValueCache#set(Object, Object)} call * to complete before it completes the returned value. By default this is false and hence @@ -51,12 +35,8 @@ public boolean isCompleteValueAfterCacheSet() { return completeValueAfterCacheSet; } - public ValueCacheOptions setDispatchOnCacheMiss(boolean flag) { - return new ValueCacheOptions(flag, this.completeValueAfterCacheSet); - } - public ValueCacheOptions setCompleteValueAfterCacheSet(boolean flag) { - return new ValueCacheOptions(this.dispatchOnCacheMiss, flag); + return new ValueCacheOptions(flag); } } diff --git a/src/main/java/org/dataloader/impl/Assertions.java b/src/main/java/org/dataloader/impl/Assertions.java index 60b605b..e3eac4d 100644 --- a/src/main/java/org/dataloader/impl/Assertions.java +++ b/src/main/java/org/dataloader/impl/Assertions.java @@ -2,28 +2,26 @@ import org.dataloader.annotations.Internal; -import java.util.Objects; +import java.util.function.Supplier; @Internal public class Assertions { - public static void assertState(boolean state, String message) { + public static void assertState(boolean state, Supplier message) { if (!state) { - throw new AssertionException(message); + throw new DataLoaderAssertionException(message.get()); } } public static T nonNull(T t) { - return Objects.requireNonNull(t, "nonNull object required"); + return nonNull(t, () -> "nonNull object required"); } - public static T nonNull(T t, String message) { - return Objects.requireNonNull(t, message); - } - - private static class AssertionException extends IllegalStateException { - public AssertionException(String message) { - super(message); + public static T nonNull(T t, Supplier message) { + if (t == null) { + throw new NullPointerException(message.get()); } + return t; } + } diff --git a/src/main/java/org/dataloader/impl/DataLoaderAssertionException.java b/src/main/java/org/dataloader/impl/DataLoaderAssertionException.java new file mode 100644 index 0000000..4631387 --- /dev/null +++ b/src/main/java/org/dataloader/impl/DataLoaderAssertionException.java @@ -0,0 +1,7 @@ +package org.dataloader.impl; + +public class DataLoaderAssertionException extends IllegalStateException { + public DataLoaderAssertionException(String message) { + super(message); + } +} diff --git a/src/main/java/org/dataloader/impl/PromisedValuesImpl.java b/src/main/java/org/dataloader/impl/PromisedValuesImpl.java index 4cf3ea8..2ba592b 100644 --- a/src/main/java/org/dataloader/impl/PromisedValuesImpl.java +++ b/src/main/java/org/dataloader/impl/PromisedValuesImpl.java @@ -104,7 +104,7 @@ public Throwable cause(int index) { @Override public T get(int index) { - assertState(isDone(), "The PromisedValues MUST be complete before calling the get() method"); + assertState(isDone(), () -> "The PromisedValues MUST be complete before calling the get() method"); try { CompletionStage future = futures.get(index); return future.toCompletableFuture().get(); @@ -115,7 +115,7 @@ public T get(int index) { @Override public List toList() { - assertState(isDone(), "The PromisedValues MUST be complete before calling the toList() method"); + assertState(isDone(), () -> "The PromisedValues MUST be complete before calling the toList() method"); int size = size(); List list = new ArrayList<>(size); for (int index = 0; index < size; index++) { diff --git a/src/test/java/org/dataloader/DataLoaderValueCacheTest.java b/src/test/java/org/dataloader/DataLoaderValueCacheTest.java index 622fbdf..a0a67f7 100644 --- a/src/test/java/org/dataloader/DataLoaderValueCacheTest.java +++ b/src/test/java/org/dataloader/DataLoaderValueCacheTest.java @@ -5,13 +5,14 @@ import org.dataloader.fixtures.CaffeineValueCache; import org.dataloader.fixtures.CustomValueCache; import org.dataloader.fixtures.TestKit; -import org.dataloader.impl.CompletableFutureKit; +import org.dataloader.impl.DataLoaderAssertionException; import org.junit.Test; import java.util.ArrayList; import java.util.List; import java.util.concurrent.CompletableFuture; import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicInteger; import static java.util.Arrays.asList; import static java.util.Collections.emptyList; @@ -64,9 +65,9 @@ public void test_by_default_we_have_no_value_caching() { @Test public void should_accept_a_remote_value_store_for_caching() { - CustomValueCache customStore = new CustomValueCache(); + CustomValueCache customValueCache = new CustomValueCache(); List> loadCalls = new ArrayList<>(); - DataLoaderOptions options = newOptions().setValueCache(customStore); + DataLoaderOptions options = newOptions().setValueCache(customValueCache); DataLoader identityLoader = idLoader(options, loadCalls); // Fetches as expected @@ -79,7 +80,7 @@ public void should_accept_a_remote_value_store_for_caching() { assertThat(fB.join(), equalTo("b")); assertThat(loadCalls, equalTo(singletonList(asList("a", "b")))); - assertArrayEquals(customStore.store.keySet().toArray(), asList("a", "b").toArray()); + assertArrayEquals(customValueCache.store.keySet().toArray(), asList("a", "b").toArray()); CompletableFuture future3 = identityLoader.load("c"); CompletableFuture future2a = identityLoader.load("b"); @@ -89,21 +90,21 @@ public void should_accept_a_remote_value_store_for_caching() { assertThat(future2a.join(), equalTo("b")); assertThat(loadCalls, equalTo(asList(asList("a", "b"), singletonList("c")))); - assertArrayEquals(customStore.store.keySet().toArray(), asList("a", "b", "c").toArray()); + assertArrayEquals(customValueCache.store.keySet().toArray(), asList("a", "b", "c").toArray()); // Supports clear CompletableFuture fC = new CompletableFuture<>(); identityLoader.clear("b", (v, e) -> fC.complete(v)); await().until(fC::isDone); - assertArrayEquals(customStore.store.keySet().toArray(), asList("a", "c").toArray()); + assertArrayEquals(customValueCache.store.keySet().toArray(), asList("a", "c").toArray()); // Supports clear all CompletableFuture fCa = new CompletableFuture<>(); identityLoader.clearAll((v, e) -> fCa.complete(v)); await().until(fCa::isDone); - assertArrayEquals(customStore.store.keySet().toArray(), emptyList().toArray()); + assertArrayEquals(customValueCache.store.keySet().toArray(), emptyList().toArray()); } @Test @@ -117,10 +118,10 @@ public void can_use_caffeine_for_caching() { .maximumSize(100) .build(); - ValueCache customStore = new CaffeineValueCache(caffeineCache); + ValueCache caffeineValueCache = new CaffeineValueCache(caffeineCache); List> loadCalls = new ArrayList<>(); - DataLoaderOptions options = newOptions().setValueCache(customStore); + DataLoaderOptions options = newOptions().setValueCache(caffeineValueCache); DataLoader identityLoader = idLoader(options, loadCalls); // Fetches as expected @@ -148,7 +149,7 @@ public void can_use_caffeine_for_caching() { @Test public void will_invoke_loader_if_CACHE_GET_call_throws_exception() { - CustomValueCache customStore = new CustomValueCache() { + CustomValueCache customValueCache = new CustomValueCache() { @Override public CompletableFuture get(String key) { @@ -158,11 +159,11 @@ public CompletableFuture get(String key) { return super.get(key); } }; - customStore.set("a", "Not From Cache"); - customStore.set("b", "From Cache"); + customValueCache.set("a", "Not From Cache"); + customValueCache.set("b", "From Cache"); List> loadCalls = new ArrayList<>(); - DataLoaderOptions options = newOptions().setValueCache(customStore); + DataLoaderOptions options = newOptions().setValueCache(customValueCache); DataLoader identityLoader = idLoader(options, loadCalls); CompletableFuture fA = identityLoader.load("a"); @@ -178,7 +179,7 @@ public CompletableFuture get(String key) { @Test public void will_still_work_if_CACHE_SET_call_throws_exception() { - CustomValueCache customStore = new CustomValueCache() { + CustomValueCache customValueCache = new CustomValueCache() { @Override public CompletableFuture set(String key, Object value) { if (key.equals("a")) { @@ -189,7 +190,7 @@ public CompletableFuture set(String key, Object value) { }; List> loadCalls = new ArrayList<>(); - DataLoaderOptions options = newOptions().setValueCache(customStore); + DataLoaderOptions options = newOptions().setValueCache(customValueCache); DataLoader identityLoader = idLoader(options, loadCalls); CompletableFuture fA = identityLoader.load("a"); @@ -201,51 +202,166 @@ public CompletableFuture set(String key, Object value) { // a was not in cache (according to get) and hence needed to be loaded assertThat(loadCalls, equalTo(singletonList(asList("a", "b")))); - assertArrayEquals(customStore.store.keySet().toArray(), singletonList("b").toArray()); + assertArrayEquals(customValueCache.store.keySet().toArray(), singletonList("b").toArray()); } - @Test - public void dispatch_will_be_called_if_a_cache_value_miss_takes_some_time() { - // - // without the extra dispatch() internally, fA would never - // have completed. In the spirit of RRD, this test was written first - // and would hang before the support code was put in place - // - CustomValueCache customStore = new CustomValueCache() { + public void caching_can_take_some_time_complete() { + CustomValueCache customValueCache = new CustomValueCache() { @Override public CompletableFuture get(String key) { - if (key.equals("a")) { + if (key.startsWith("miss")) { return CompletableFuture.supplyAsync(() -> { TestKit.snooze(1000); throw new IllegalStateException("no a in cache"); }); + } else { + return CompletableFuture.supplyAsync(() -> { + TestKit.snooze(1000); + return key; + }); } - return super.get(key); } }; + List> loadCalls = new ArrayList<>(); - DataLoaderOptions options = newOptions().setValueCache(customStore); + DataLoaderOptions options = newOptions().setValueCache(customValueCache); DataLoader identityLoader = idLoader(options, loadCalls); CompletableFuture fA = identityLoader.load("a"); CompletableFuture fB = identityLoader.load("b"); + CompletableFuture fC = identityLoader.load("missC"); + CompletableFuture fD = identityLoader.load("missD"); - CompletableFuture> dispatchedCall = identityLoader.dispatch(); + await().until(identityLoader.dispatch()::isDone); - CompletableFuture> bothCalls = CompletableFutureKit.allOf(asList(fA, fB)); - await().atMost(5, TimeUnit.SECONDS).until(bothCalls::isDone); + assertThat(fA.join(), equalTo("a")); + assertThat(fB.join(), equalTo("b")); + assertThat(fC.join(), equalTo("missC")); + assertThat(fD.join(), equalTo("missD")); + + assertThat(loadCalls, equalTo(singletonList(asList("missC", "missD")))); + } + + @Test + public void batch_caching_works_as_expected() { + CustomValueCache customValueCache = new CustomValueCache() { + + @Override + public CompletableFuture>> getValues(List keys) { + List> cacheCalls = new ArrayList<>(); + for (String key : keys) { + if (key.startsWith("miss")) { + cacheCalls.add(Try.alwaysFailed()); + } else { + cacheCalls.add(Try.succeeded(key)); + } + } + return CompletableFuture.supplyAsync(() -> { + TestKit.snooze(1000); + return cacheCalls; + }); + } + }; + + + List> loadCalls = new ArrayList<>(); + DataLoaderOptions options = newOptions().setValueCache(customValueCache); + DataLoader identityLoader = idLoader(options, loadCalls); + + CompletableFuture fA = identityLoader.load("a"); + CompletableFuture fB = identityLoader.load("b"); + CompletableFuture fC = identityLoader.load("missC"); + CompletableFuture fD = identityLoader.load("missD"); + + await().until(identityLoader.dispatch()::isDone); + + assertThat(fA.join(), equalTo("a")); + assertThat(fB.join(), equalTo("b")); + assertThat(fC.join(), equalTo("missC")); + assertThat(fD.join(), equalTo("missD")); + + assertThat(loadCalls, equalTo(singletonList(asList("missC", "missD")))); + + List values = new ArrayList<>(customValueCache.asMap().values()); + assertThat(values, equalTo(asList("a", "b", "missC", "missD"))); + } + + @Test + public void assertions_will_be_thrown_if_the_cache_does_not_follow_contract() { + CustomValueCache customValueCache = new CustomValueCache() { + + @Override + public CompletableFuture>> getValues(List keys) { + List> cacheCalls = new ArrayList<>(); + for (String key : keys) { + if (key.startsWith("miss")) { + cacheCalls.add(Try.alwaysFailed()); + } else { + cacheCalls.add(Try.succeeded(key)); + } + } + List> renegOnContract = cacheCalls.subList(1, cacheCalls.size() - 1); + return CompletableFuture.completedFuture(renegOnContract); + } + }; + + List> loadCalls = new ArrayList<>(); + DataLoaderOptions options = newOptions().setValueCache(customValueCache); + DataLoader identityLoader = idLoader(options, loadCalls); + + CompletableFuture fA = identityLoader.load("a"); + CompletableFuture fB = identityLoader.load("b"); + CompletableFuture fC = identityLoader.load("missC"); + CompletableFuture fD = identityLoader.load("missD"); + + await().until(identityLoader.dispatch()::isDone); + + assertTrue(isAssertionException(fA)); + assertTrue(isAssertionException(fB)); + assertTrue(isAssertionException(fC)); + assertTrue(isAssertionException(fD)); + } + + private boolean isAssertionException(CompletableFuture fA) { + Throwable throwable = Try.tryFuture(fA).join().getThrowable(); + return throwable instanceof DataLoaderAssertionException; + } + + + @Test + public void if_caching_is_off_its_never_hit() { + AtomicInteger getCalls = new AtomicInteger(); + CustomValueCache customValueCache = new CustomValueCache() { + + @Override + public CompletableFuture get(String key) { + getCalls.incrementAndGet(); + return super.get(key); + } + }; + + List> loadCalls = new ArrayList<>(); + DataLoaderOptions options = newOptions().setValueCache(customValueCache).setCachingEnabled(false); + DataLoader identityLoader = idLoader(options, loadCalls); + + CompletableFuture fA = identityLoader.load("a"); + CompletableFuture fB = identityLoader.load("b"); + CompletableFuture fC = identityLoader.load("missC"); + CompletableFuture fD = identityLoader.load("missD"); + + await().until(identityLoader.dispatch()::isDone); - assertTrue(dispatchedCall.isDone()); - assertTrue(fA.isDone()); - assertTrue(fB.isDone()); assertThat(fA.join(), equalTo("a")); assertThat(fB.join(), equalTo("b")); + assertThat(fC.join(), equalTo("missC")); + assertThat(fD.join(), equalTo("missD")); - // 'b' will complete in real time while 'a' was a cache miss after some time - assertThat(loadCalls, equalTo(asList(singletonList("b"), singletonList("a")))); + assertThat(loadCalls, equalTo(singletonList(asList("a", "b", "missC", "missD")))); + assertThat(getCalls.get(), equalTo(0)); + assertTrue(customValueCache.asMap().isEmpty()); } } diff --git a/src/test/java/org/dataloader/TryTest.java b/src/test/java/org/dataloader/TryTest.java index 46514ad..1fdd286 100644 --- a/src/test/java/org/dataloader/TryTest.java +++ b/src/test/java/org/dataloader/TryTest.java @@ -11,7 +11,9 @@ import static org.hamcrest.Matchers.equalTo; import static org.hamcrest.Matchers.nullValue; +import static org.junit.Assert.assertFalse; import static org.junit.Assert.assertThat; +import static org.junit.Assert.assertTrue; @SuppressWarnings("ConstantConditions") public class TryTest { @@ -193,4 +195,12 @@ public void recover() throws Exception { assertSuccess(sTry, "Hello Again"); } + + @Test + public void canAlwaysFail() { + Try failedTry = Try.alwaysFailed(); + + assertTrue(failedTry.isFailure()); + assertFalse(failedTry.isSuccess()); + } } \ No newline at end of file diff --git a/src/test/java/org/dataloader/fixtures/CustomValueCache.java b/src/test/java/org/dataloader/fixtures/CustomValueCache.java index d707175..316016e 100644 --- a/src/test/java/org/dataloader/fixtures/CustomValueCache.java +++ b/src/test/java/org/dataloader/fixtures/CustomValueCache.java @@ -37,4 +37,8 @@ public CompletableFuture clear() { store.clear(); return CompletableFuture.completedFuture(null); } + + public Map asMap() { + return store; + } } \ No newline at end of file From 38b375526bedc54f90cd43d4f17d5d07bf0e4e62 Mon Sep 17 00:00:00 2001 From: Brad Baker Date: Sun, 8 Aug 2021 10:21:01 +1000 Subject: [PATCH 4/9] Doco updated and removed get returning Try --- README.md | 39 ++++++++++++-------- src/main/java/org/dataloader/ValueCache.java | 24 +++--------- 2 files changed, 29 insertions(+), 34 deletions(-) diff --git a/README.md b/README.md index 77d3fb6..c5687af 100644 --- a/README.md +++ b/README.md @@ -10,14 +10,14 @@ It can serve as integral part of your application's data layer to provide a consistent API over various back-ends and reduce message communication overhead through batching and caching. An important use case for `java-dataloader` is improving the efficiency of GraphQL query execution. Graphql fields -are resolved in a independent manner and with a true graph of objects, you may be fetching the same object many times. +are resolved independently and, with a true graph of objects, you may be fetching the same object many times. A naive implementation of graphql data fetchers can easily lead to the dreaded "n+1" fetch problem. Most of the code is ported directly from Facebook's reference implementation, with one IMPORTANT adaptation to make it work for Java 8. ([more on this below](#manual-dispatching)). -But before reading on, be sure to take a short dive into the +Before reading on, be sure to take a short dive into the [original documentation](https://github.com/facebook/dataloader/blob/master/README.md) provided by Lee Byron (@leebyron) and Nicholas Schrock (@schrockn) from [Facebook](https://www.facebook.com/), the creators of the original data loader. @@ -51,7 +51,8 @@ and Nicholas Schrock (@schrockn) from [Facebook](https://www.facebook.com/), the - Results are ordered according to insertion order of load requests - Deals with partial errors when a batch future fails - Can disable batching and/or caching in configuration -- Can supply your own [`CacheMap`](https://github.com/graphql-java/java-dataloader/blob/master/src/main/java/io/engagingspaces/vertx/dataloader/CacheMap.java) implementations +- Can supply your own `CacheMap` implementations +- Can supply your own `ValueCache` implementations - Has very high test coverage ## Examples @@ -110,7 +111,7 @@ In this version of data loader, this does not happen automatically. More on thi As stated on the original Facebook project : ->A naive application may have issued four round-trips to a backend for the required information, +> A naive application may have issued four round-trips to a backend for the required information, but with DataLoader this application will make at most two. > DataLoader allows you to decouple unrelated parts of your application without sacrificing the @@ -270,9 +271,9 @@ This is not quite as loose in a Java implementation as Java is a type safe langu A batch loader function is defined as `BatchLoader` meaning for a key of type `K` it returns a value of type `V`. -It cant just return some `Exception` as an object of type `V`. Type safety matters. +It can't just return some `Exception` as an object of type `V`. Type safety matters. -However you can use the `Try` data type which can encapsulate a computation that succeeded or returned an exception. +However, you can use the `Try` data type which can encapsulate a computation that succeeded or returned an exception. ```java Try tryS = Try.tryCall(() -> { @@ -291,7 +292,7 @@ However you can use the `Try` data type which can encapsulate a computation that } ``` -DataLoader supports this type and you can use this form to create a batch loader that returns a list of `Try` objects, some of which may have succeeded +DataLoader supports this type, and you can use this form to create a batch loader that returns a list of `Try` objects, some of which may have succeeded, and some of which may have failed. From that data loader can infer the right behavior in terms of the `load(x)` promise. ```java @@ -331,7 +332,7 @@ The value cache uses an async API pattern to encapsulate the idea that the value The default future cache behind `DataLoader` is an in memory `HashMap`. There is no expiry on this, and it lives for as long as the data loader lives. -However, you can create your own custom cache and supply it to the data loader on construction via the `org.dataloader.CacheMap` interface. +However, you can create your own custom future cache and supply it to the data loader on construction via the `org.dataloader.CacheMap` interface. ```java MyCustomCache customCache = new MyCustomCache(); @@ -342,21 +343,27 @@ However, you can create your own custom cache and supply it to the data loader o You could choose to use one of the fancy cache implementations from Guava or Caffeine and wrap it in a `CacheMap` wrapper ready for data loader. They can do fancy things like time eviction and efficient LRU caching. -As stated above, a custom `org.dataloader.CacheMap` is a local cache of futures with values, not values per se. +As stated above, a custom `org.dataloader.CacheMap` is a local cache of `CompleteFuture`s to values, not values per se. + +If you want to externally cache values then you need to use the `org.dataloader.ValueCache` interface. ## Custom value caches -You will need to create your own implementations of the `org.dataloader.ValueCache` if your want to use an external cache. +The `org.dataloader.ValueCache` allows you to use an external cache. + +The API of `ValueCache` has been designed to be asynchronous because it is expected that the value cache could be outside +your JVM. It uses `CompleteableFuture`s to get and set values into cache, which may involve a network call and hence exceptional failures to get +or set values. + +The `ValueCache` API is batch oriented, if you have a backing cache that can do batch cache fetches (such a REDIS) then you can use the `ValueCache.getValues*(` +call directly. However, if you don't have such a backing cache, then the default implementation will break apart the batch of cache value into individual requests +to `ValueCache.getValue()` for you. This library does not ship with any implementations of `ValueCache` because it does not want to have production dependencies on external cache libraries, but you can easily write your own. The tests have an example based on [Caffeine](https://github.com/ben-manes/caffeine). -The API of `ValueCache` has been designed to be asynchronous because it is expected that the value cache could be outside -your JVM. It uses `CompleteableFuture`s to get and set values into cache, which may involve a network call and hence exceptional failures to get -or set values. - ## Disabling caching @@ -369,7 +376,7 @@ In certain uncommon cases, a DataLoader which does not cache may be desirable. Calling the above will ensure that every call to `.load()` will produce a new promise, and requested keys will not be saved in memory. However, when the memoization cache is disabled, your batch function will receive an array of keys which may contain duplicates! Each key will -be associated with each call to `.load()`. Your batch loader should provide a value for each instance of the requested key as per the contract +be associated with each call to `.load()`. Your batch loader MUST provide a value for each instance of the requested key as per the contract ```java userDataLoader.load("A"); @@ -445,7 +452,7 @@ then you will not want to cache data meant for user A to then later give it user The scope of your `DataLoader` instances is important. You will want to create them per web request to ensure data is only cached within that web request and no more. -If your data can be shared across web requests then use a custom cache to keep values in a common place. +If your data can be shared across web requests then use a custom `org.dataloader.ValueCache` to keep values in a common place. Data loaders are stateful components that contain promises (with context) that are likely share the same affinity as the request. diff --git a/src/main/java/org/dataloader/ValueCache.java b/src/main/java/org/dataloader/ValueCache.java index 0218cde..44071bc 100644 --- a/src/main/java/org/dataloader/ValueCache.java +++ b/src/main/java/org/dataloader/ValueCache.java @@ -66,35 +66,23 @@ static ValueCache defaultValueCache() { */ CompletableFuture get(K key); - /** - * Gets the specified key from the value cache. If the key is not present, then the returned {@link Try} will be a failed one - * other wise it has the cached value. This is preferred over the {@link #get(Object)} method. - *

- * - * @param key the key to retrieve - * - * @return a future containing the {@link Try} cached value (which maybe null) or a failed {@link Try} if the key does - * not exist in the cache. - */ - default CompletableFuture> getValue(K key) { - return Try.tryFuture(get(key)); - } - /** * Gets the specified keys from the value cache, in a batch call. If your underlying cache cant do batch caching retrieval - * then do not implement this method and it will delegate back to {@link #getValue(Object)} for you + * then do not implement this method and it will delegate back to {@link #get(Object)} for you + *

+ * Each item in the returned list of values is a {@link Try}. If the key could not be found then a failed Try just be returned otherwise + * a successful Try contain the cached value is returned. *

* You MUST return a List that is the same size as the keys passed in. The code will assert if you do not. * * @param keys the list of keys to get cached values for. * - * @return a future containing a list of {@link Try} cached values (which maybe {@link Try#succeeded(Object)} or a failed {@link Try} - * per key if they do not exist in the cache. + * @return a future containing a list of {@link Try} cached values for each key passed in. */ default CompletableFuture>> getValues(List keys) { List>> cacheLookups = new ArrayList<>(); for (K key : keys) { - CompletableFuture> cacheTry = getValue(key); + CompletableFuture> cacheTry = Try.tryFuture(get(key)); cacheLookups.add(cacheTry); } return CompletableFutureKit.allOf(cacheLookups); From d8078e0a2ebbef0d9b3de0eadb1654a44773963b Mon Sep 17 00:00:00 2001 From: Brad Baker Date: Sun, 8 Aug 2021 11:22:48 +1000 Subject: [PATCH 5/9] more tests and more resilient setCache calls --- .../java/org/dataloader/DataLoaderHelper.java | 34 +++--- .../dataloader/DataLoaderValueCacheTest.java | 100 ++++++++++++++++-- .../java/org/dataloader/fixtures/TestKit.java | 9 +- 3 files changed, 121 insertions(+), 22 deletions(-) diff --git a/src/main/java/org/dataloader/DataLoaderHelper.java b/src/main/java/org/dataloader/DataLoaderHelper.java index bad8579..c69d585 100644 --- a/src/main/java/org/dataloader/DataLoaderHelper.java +++ b/src/main/java/org/dataloader/DataLoaderHelper.java @@ -455,20 +455,26 @@ private CompletableFuture>> getFromValueCache(List keys) { } private CompletionStage> setToValueCache(List keys, CompletionStage> assembledValues) { - boolean completeValueAfterCacheSet = loaderOptions.getValueCacheOptions().isCompleteValueAfterCacheSet(); - if (completeValueAfterCacheSet) { - return assembledValues.thenCompose(values -> nonNull(valueCache - .setValues(keys, values), () -> "Your ValueCache.setValues function MUST return a non null promise") - // we dont trust the set cache to give us the values back - we have them - lets use them - // if the cache set fails - then they wont be in cache and maybe next time they will - .handle((ignored, setExIgnored) -> values)); - } else { - return assembledValues.thenApply(values -> { - // no one is waiting for the set to happen here so if its truly async - // it will happen eventually but no result will be dependant on it - valueCache.setValues(keys, values); - return values; - }); + try { + boolean completeValueAfterCacheSet = loaderOptions.getValueCacheOptions().isCompleteValueAfterCacheSet(); + if (completeValueAfterCacheSet) { + return assembledValues.thenCompose(values -> nonNull(valueCache + .setValues(keys, values), () -> "Your ValueCache.setValues function MUST return a non null promise") + // we dont trust the set cache to give us the values back - we have them - lets use them + // if the cache set fails - then they wont be in cache and maybe next time they will + .handle((ignored, setExIgnored) -> values)); + } else { + return assembledValues.thenApply(values -> { + // no one is waiting for the set to happen here so if its truly async + // it will happen eventually but no result will be dependant on it + valueCache.setValues(keys, values); + return values; + }); + } + } catch (RuntimeException ignored) { + // if we cant set values back into the cache - so be it - this must be a faulty + // ValueCache implementation + return assembledValues; } } diff --git a/src/test/java/org/dataloader/DataLoaderValueCacheTest.java b/src/test/java/org/dataloader/DataLoaderValueCacheTest.java index a0a67f7..9394ae3 100644 --- a/src/test/java/org/dataloader/DataLoaderValueCacheTest.java +++ b/src/test/java/org/dataloader/DataLoaderValueCacheTest.java @@ -4,7 +4,6 @@ import com.github.benmanes.caffeine.cache.Caffeine; import org.dataloader.fixtures.CaffeineValueCache; import org.dataloader.fixtures.CustomValueCache; -import org.dataloader.fixtures.TestKit; import org.dataloader.impl.DataLoaderAssertionException; import org.junit.Test; @@ -20,6 +19,8 @@ import static org.awaitility.Awaitility.await; import static org.dataloader.DataLoaderOptions.newOptions; import static org.dataloader.fixtures.TestKit.idLoader; +import static org.dataloader.fixtures.TestKit.snooze; +import static org.dataloader.fixtures.TestKit.sort; import static org.dataloader.impl.CompletableFutureKit.failedFuture; import static org.hamcrest.Matchers.equalTo; import static org.junit.Assert.assertArrayEquals; @@ -48,6 +49,7 @@ public void test_by_default_we_have_no_value_caching() { assertThat(loadCalls, equalTo(singletonList(asList("a", "b")))); // futures are still cached but not values + loadCalls.clear(); fA = identityLoader.load("a"); fB = identityLoader.load("b"); @@ -59,8 +61,7 @@ public void test_by_default_we_have_no_value_caching() { assertThat(fA.join(), equalTo("a")); assertThat(fB.join(), equalTo("b")); - assertThat(loadCalls, equalTo(singletonList(asList("a", "b")))); - + assertThat(loadCalls, equalTo(emptyList())); } @Test @@ -213,12 +214,12 @@ public void caching_can_take_some_time_complete() { public CompletableFuture get(String key) { if (key.startsWith("miss")) { return CompletableFuture.supplyAsync(() -> { - TestKit.snooze(1000); + snooze(1000); throw new IllegalStateException("no a in cache"); }); } else { return CompletableFuture.supplyAsync(() -> { - TestKit.snooze(1000); + snooze(1000); return key; }); } @@ -261,7 +262,7 @@ public CompletableFuture>> getValues(List keys) { } } return CompletableFuture.supplyAsync(() -> { - TestKit.snooze(1000); + snooze(1000); return cacheCalls; }); } @@ -364,4 +365,91 @@ public CompletableFuture get(String key) { assertThat(getCalls.get(), equalTo(0)); assertTrue(customValueCache.asMap().isEmpty()); } + + @Test + public void if_everything_is_cached_no_batching_happens() { + AtomicInteger getCalls = new AtomicInteger(); + AtomicInteger setCalls = new AtomicInteger(); + CustomValueCache customValueCache = new CustomValueCache() { + + @Override + public CompletableFuture get(String key) { + getCalls.incrementAndGet(); + return super.get(key); + } + + @Override + public CompletableFuture> setValues(List keys, List values) { + setCalls.incrementAndGet(); + return super.setValues(keys, values); + } + }; + customValueCache.asMap().put("a", "cachedA"); + customValueCache.asMap().put("b", "cachedB"); + customValueCache.asMap().put("c", "cachedC"); + + List> loadCalls = new ArrayList<>(); + DataLoaderOptions options = newOptions().setValueCache(customValueCache).setCachingEnabled(true); + DataLoader identityLoader = idLoader(options, loadCalls); + + CompletableFuture fA = identityLoader.load("a"); + CompletableFuture fB = identityLoader.load("b"); + CompletableFuture fC = identityLoader.load("c"); + + await().until(identityLoader.dispatch()::isDone); + + assertThat(fA.join(), equalTo("cachedA")); + assertThat(fB.join(), equalTo("cachedB")); + assertThat(fC.join(), equalTo("cachedC")); + + assertThat(loadCalls, equalTo(emptyList())); + assertThat(getCalls.get(), equalTo(3)); + assertThat(setCalls.get(), equalTo(0)); + } + + + @Test + public void if_batching_is_off_it_still_can_cache() { + AtomicInteger getCalls = new AtomicInteger(); + AtomicInteger setCalls = new AtomicInteger(); + CustomValueCache customValueCache = new CustomValueCache() { + + @Override + public CompletableFuture get(String key) { + getCalls.incrementAndGet(); + return super.get(key); + } + + @Override + public CompletableFuture> setValues(List keys, List values) { + setCalls.incrementAndGet(); + return super.setValues(keys, values); + } + }; + customValueCache.asMap().put("a", "cachedA"); + + List> loadCalls = new ArrayList<>(); + DataLoaderOptions options = newOptions().setValueCache(customValueCache).setCachingEnabled(true).setBatchingEnabled(false); + DataLoader identityLoader = idLoader(options, loadCalls); + + CompletableFuture fA = identityLoader.load("a"); + CompletableFuture fB = identityLoader.load("b"); + CompletableFuture fC = identityLoader.load("c"); + + assertTrue(fA.isDone()); // with batching off they are dispatched immediately + assertTrue(fB.isDone()); + assertTrue(fC.isDone()); + + await().until(identityLoader.dispatch()::isDone); + + assertThat(fA.join(), equalTo("cachedA")); + assertThat(fB.join(), equalTo("b")); + assertThat(fC.join(), equalTo("c")); + + assertThat(loadCalls, equalTo(asList(singletonList("b"), singletonList("c")))); + assertThat(getCalls.get(), equalTo(3)); + assertThat(setCalls.get(), equalTo(2)); + + assertThat(sort(customValueCache.asMap().values()), equalTo(sort(asList("b", "c", "cachedA")))); + } } diff --git a/src/test/java/org/dataloader/fixtures/TestKit.java b/src/test/java/org/dataloader/fixtures/TestKit.java index 2ea23a8..5c87148 100644 --- a/src/test/java/org/dataloader/fixtures/TestKit.java +++ b/src/test/java/org/dataloader/fixtures/TestKit.java @@ -9,8 +9,8 @@ import java.util.Collection; import java.util.List; import java.util.concurrent.CompletableFuture; -import java.util.stream.Collectors; +import static java.util.stream.Collectors.toList; import static org.dataloader.impl.CompletableFutureKit.failedFuture; public class TestKit { @@ -26,7 +26,7 @@ public static BatchLoader keysAsValues(List> loadCalls) { @SuppressWarnings("unchecked") List values = keys.stream() .map(k -> (V) k) - .collect(Collectors.toList()); + .collect(toList()); return CompletableFuture.completedFuture(values); }; } @@ -62,4 +62,9 @@ public static void snooze(int millis) { throw new RuntimeException(e); } } + + + public static List sort(Collection collection) { + return collection.stream().sorted().collect(toList()); + } } From d691f88281eb14995a5c8b21bf94a97091022043 Mon Sep 17 00:00:00 2001 From: Brad Baker Date: Sun, 8 Aug 2021 19:52:41 +1000 Subject: [PATCH 6/9] found a buf where the cache set is done on entries that are already in cache - fix it --- build.gradle | 13 +++++++ .../java/org/dataloader/DataLoaderHelper.java | 37 ++++++++++++------- .../dataloader/DataLoaderValueCacheTest.java | 4 +- 3 files changed, 40 insertions(+), 14 deletions(-) diff --git a/build.gradle b/build.gradle index cdbca84..934c51e 100644 --- a/build.gradle +++ b/build.gradle @@ -40,6 +40,19 @@ version = releaseVersion ? releaseVersion : getDevelopmentVersion() group = 'com.graphql-java' description = 'A pure Java 8 port of Facebook Dataloader' +gradle.buildFinished { buildResult -> + println "*******************************" + println "*" + if (buildResult.failure != null) { + println "* FAILURE - ${buildResult.failure}" + } else { + println "* SUCCESS" + } + println "* Version: $version" + println "*" + println "*******************************" +} + repositories { mavenCentral() mavenLocal() diff --git a/src/main/java/org/dataloader/DataLoaderHelper.java b/src/main/java/org/dataloader/DataLoaderHelper.java index c69d585..e7ca18f 100644 --- a/src/main/java/org/dataloader/DataLoaderHelper.java +++ b/src/main/java/org/dataloader/DataLoaderHelper.java @@ -362,7 +362,7 @@ CompletionStage> invokeLoader(List keys, List keyContexts, bo // and then fill in their values // CompletionStage> batchLoad = invokeLoader(cacheMissedKeys, cacheMissedContexts); - CompletionStage> assembledValues = batchLoad.thenApply(batchedValues -> { + CompletionStage> assembledValues = batchLoad.thenApply(batchedValues -> { assertResultSize(cacheMissedKeys, batchedValues); for (int i = 0; i < batchedValues.size(); i++) { @@ -370,14 +370,13 @@ CompletionStage> invokeLoader(List keys, List keyContexts, bo V v = batchedValues.get(i); valuesInKeyOrder.put(missedKey, v); } - return new ArrayList<>(valuesInKeyOrder.values()); + + return new CacheMissedData<>(cacheMissedKeys, batchedValues, new ArrayList<>(valuesInKeyOrder.values())); }); // // fire off a call to the ValueCache to allow it to set values into the // cache now that we have them - assembledValues = setToValueCache(keys, assembledValues); - - return assembledValues; + return setToValueCache(assembledValues); } }); } @@ -454,27 +453,39 @@ private CompletableFuture>> getFromValueCache(List keys) { } } - private CompletionStage> setToValueCache(List keys, CompletionStage> assembledValues) { + static class CacheMissedData { + final List missedKeys; + final List missedValues; + final List assembledValues; + + CacheMissedData(List missedKeys, List missedValues, List assembledValues) { + this.missedKeys = missedKeys; + this.missedValues = missedValues; + this.assembledValues = assembledValues; + } + } + + private CompletionStage> setToValueCache(CompletionStage> assembledValues) { try { boolean completeValueAfterCacheSet = loaderOptions.getValueCacheOptions().isCompleteValueAfterCacheSet(); if (completeValueAfterCacheSet) { - return assembledValues.thenCompose(values -> nonNull(valueCache - .setValues(keys, values), () -> "Your ValueCache.setValues function MUST return a non null promise") + return assembledValues.thenCompose(cacheMissedData -> nonNull(valueCache + .setValues(cacheMissedData.missedKeys, cacheMissedData.missedValues), () -> "Your ValueCache.setValues function MUST return a non null promise") // we dont trust the set cache to give us the values back - we have them - lets use them // if the cache set fails - then they wont be in cache and maybe next time they will - .handle((ignored, setExIgnored) -> values)); + .handle((ignored, setExIgnored) -> cacheMissedData.assembledValues)); } else { - return assembledValues.thenApply(values -> { + return assembledValues.thenApply(cacheMissedData -> { // no one is waiting for the set to happen here so if its truly async // it will happen eventually but no result will be dependant on it - valueCache.setValues(keys, values); - return values; + valueCache.setValues(cacheMissedData.missedKeys, cacheMissedData.missedValues); + return cacheMissedData.assembledValues; }); } } catch (RuntimeException ignored) { // if we cant set values back into the cache - so be it - this must be a faulty // ValueCache implementation - return assembledValues; + return assembledValues.thenApply(cacheMissedData -> cacheMissedData.assembledValues); } } diff --git a/src/test/java/org/dataloader/DataLoaderValueCacheTest.java b/src/test/java/org/dataloader/DataLoaderValueCacheTest.java index 9394ae3..38cfe77 100644 --- a/src/test/java/org/dataloader/DataLoaderValueCacheTest.java +++ b/src/test/java/org/dataloader/DataLoaderValueCacheTest.java @@ -288,7 +288,9 @@ public CompletableFuture>> getValues(List keys) { assertThat(loadCalls, equalTo(singletonList(asList("missC", "missD")))); List values = new ArrayList<>(customValueCache.asMap().values()); - assertThat(values, equalTo(asList("a", "b", "missC", "missD"))); + // it will only set back in values that are missed - it wont set in values that successfully + // came out of the cache + assertThat(values, equalTo(asList("missC", "missD"))); } @Test From efdb9a7ad1dbaf7d884dcab07e6db8116e790cc6 Mon Sep 17 00:00:00 2001 From: Brad Baker Date: Mon, 9 Aug 2021 09:21:06 +1000 Subject: [PATCH 7/9] improved the set cache code - thenCompose was missing --- .../java/org/dataloader/DataLoaderHelper.java | 51 +++++++------------ 1 file changed, 17 insertions(+), 34 deletions(-) diff --git a/src/main/java/org/dataloader/DataLoaderHelper.java b/src/main/java/org/dataloader/DataLoaderHelper.java index e7ca18f..b8bf1f9 100644 --- a/src/main/java/org/dataloader/DataLoaderHelper.java +++ b/src/main/java/org/dataloader/DataLoaderHelper.java @@ -362,21 +362,20 @@ CompletionStage> invokeLoader(List keys, List keyContexts, bo // and then fill in their values // CompletionStage> batchLoad = invokeLoader(cacheMissedKeys, cacheMissedContexts); - CompletionStage> assembledValues = batchLoad.thenApply(batchedValues -> { - assertResultSize(cacheMissedKeys, batchedValues); + return batchLoad.thenCompose(missedValues -> { + assertResultSize(cacheMissedKeys, missedValues); - for (int i = 0; i < batchedValues.size(); i++) { + for (int i = 0; i < missedValues.size(); i++) { K missedKey = cacheMissedKeys.get(i); - V v = batchedValues.get(i); + V v = missedValues.get(i); valuesInKeyOrder.put(missedKey, v); } - - return new CacheMissedData<>(cacheMissedKeys, batchedValues, new ArrayList<>(valuesInKeyOrder.values())); + List assembledValues = new ArrayList<>(valuesInKeyOrder.values()); + // + // fire off a call to the ValueCache to allow it to set values into the + // cache now that we have them + return setToValueCache(assembledValues, cacheMissedKeys, missedValues); }); - // - // fire off a call to the ValueCache to allow it to set values into the - // cache now that we have them - return setToValueCache(assembledValues); } }); } @@ -453,40 +452,24 @@ private CompletableFuture>> getFromValueCache(List keys) { } } - static class CacheMissedData { - final List missedKeys; - final List missedValues; - final List assembledValues; - - CacheMissedData(List missedKeys, List missedValues, List assembledValues) { - this.missedKeys = missedKeys; - this.missedValues = missedValues; - this.assembledValues = assembledValues; - } - } - - private CompletionStage> setToValueCache(CompletionStage> assembledValues) { + private CompletionStage> setToValueCache(List assembledValues, List missedKeys, List missedValues) { try { boolean completeValueAfterCacheSet = loaderOptions.getValueCacheOptions().isCompleteValueAfterCacheSet(); if (completeValueAfterCacheSet) { - return assembledValues.thenCompose(cacheMissedData -> nonNull(valueCache - .setValues(cacheMissedData.missedKeys, cacheMissedData.missedValues), () -> "Your ValueCache.setValues function MUST return a non null promise") + return nonNull(valueCache + .setValues(missedKeys, missedValues), () -> "Your ValueCache.setValues function MUST return a non null promise") // we dont trust the set cache to give us the values back - we have them - lets use them // if the cache set fails - then they wont be in cache and maybe next time they will - .handle((ignored, setExIgnored) -> cacheMissedData.assembledValues)); + .handle((ignored, setExIgnored) -> assembledValues); } else { - return assembledValues.thenApply(cacheMissedData -> { - // no one is waiting for the set to happen here so if its truly async - // it will happen eventually but no result will be dependant on it - valueCache.setValues(cacheMissedData.missedKeys, cacheMissedData.missedValues); - return cacheMissedData.assembledValues; - }); + // no one is waiting for the set to happen here so if its truly async + // it will happen eventually but no result will be dependant on it + valueCache.setValues(missedKeys, missedValues); } } catch (RuntimeException ignored) { // if we cant set values back into the cache - so be it - this must be a faulty // ValueCache implementation - return assembledValues.thenApply(cacheMissedData -> cacheMissedData.assembledValues); } + return CompletableFuture.completedFuture(assembledValues); } - } From 4ecb78675896841e032dc44ffb2c012e00800b8d Mon Sep 17 00:00:00 2001 From: Brad Baker Date: Mon, 9 Aug 2021 15:14:27 +1000 Subject: [PATCH 8/9] Moved to CompletableFuture inside code --- .../java/org/dataloader/DataLoaderHelper.java | 21 +++++++++---------- 1 file changed, 10 insertions(+), 11 deletions(-) diff --git a/src/main/java/org/dataloader/DataLoaderHelper.java b/src/main/java/org/dataloader/DataLoaderHelper.java index b8bf1f9..a788889 100644 --- a/src/main/java/org/dataloader/DataLoaderHelper.java +++ b/src/main/java/org/dataloader/DataLoaderHelper.java @@ -224,9 +224,8 @@ private CompletableFuture> sliceIntoBatchesOfBatches(List keys, List< @SuppressWarnings("unchecked") private CompletableFuture> dispatchQueueBatch(List keys, List callContexts, List> queuedFutures) { stats.incrementBatchLoadCountBy(keys.size()); - CompletionStage> batchLoad = invokeLoader(keys, callContexts, loaderOptions.cachingEnabled()); + CompletableFuture> batchLoad = invokeLoader(keys, callContexts, loaderOptions.cachingEnabled()); return batchLoad - .toCompletableFuture() .thenApply(values -> { assertResultSize(keys, values); @@ -327,7 +326,7 @@ CompletableFuture invokeLoaderImmediately(K key, Object keyContext, boolean c .toCompletableFuture(); } - CompletionStage> invokeLoader(List keys, List keyContexts, boolean cachingEnabled) { + CompletableFuture> invokeLoader(List keys, List keyContexts, boolean cachingEnabled) { if (!cachingEnabled) { return invokeLoader(keys, keyContexts); } @@ -361,7 +360,7 @@ CompletionStage> invokeLoader(List keys, List keyContexts, bo // we missed some of the keys from cache, so send them to the batch loader // and then fill in their values // - CompletionStage> batchLoad = invokeLoader(cacheMissedKeys, cacheMissedContexts); + CompletableFuture> batchLoad = invokeLoader(cacheMissedKeys, cacheMissedContexts); return batchLoad.thenCompose(missedValues -> { assertResultSize(cacheMissedKeys, missedValues); @@ -381,8 +380,8 @@ CompletionStage> invokeLoader(List keys, List keyContexts, bo } - CompletionStage> invokeLoader(List keys, List keyContexts) { - CompletionStage> batchLoad; + CompletableFuture> invokeLoader(List keys, List keyContexts) { + CompletableFuture> batchLoad; try { Object context = loaderOptions.getBatchLoaderContextProvider().getContext(); BatchLoaderEnvironment environment = BatchLoaderEnvironment.newBatchLoaderEnvironment() @@ -399,14 +398,14 @@ CompletionStage> invokeLoader(List keys, List keyContexts) { } @SuppressWarnings("unchecked") - private CompletionStage> invokeListBatchLoader(List keys, BatchLoaderEnvironment environment) { + private CompletableFuture> invokeListBatchLoader(List keys, BatchLoaderEnvironment environment) { CompletionStage> loadResult; if (batchLoadFunction instanceof BatchLoaderWithContext) { loadResult = ((BatchLoaderWithContext) batchLoadFunction).load(keys, environment); } else { loadResult = ((BatchLoader) batchLoadFunction).load(keys); } - return nonNull(loadResult, () -> "Your batch loader function MUST return a non null CompletionStage promise"); + return nonNull(loadResult, () -> "Your batch loader function MUST return a non null CompletionStage promise").toCompletableFuture(); } @@ -415,7 +414,7 @@ private CompletionStage> invokeListBatchLoader(List keys, BatchLoader * to missing elements. */ @SuppressWarnings("unchecked") - private CompletionStage> invokeMapBatchLoader(List keys, BatchLoaderEnvironment environment) { + private CompletableFuture> invokeMapBatchLoader(List keys, BatchLoaderEnvironment environment) { CompletionStage> loadResult; Set setOfKeys = new LinkedHashSet<>(keys); if (batchLoadFunction instanceof MappedBatchLoaderWithContext) { @@ -423,7 +422,7 @@ private CompletionStage> invokeMapBatchLoader(List keys, BatchLoaderE } else { loadResult = ((MappedBatchLoader) batchLoadFunction).load(setOfKeys); } - CompletionStage> mapBatchLoad = nonNull(loadResult, () -> "Your batch loader function MUST return a non null CompletionStage promise"); + CompletableFuture> mapBatchLoad = nonNull(loadResult, () -> "Your batch loader function MUST return a non null CompletionStage promise").toCompletableFuture(); return mapBatchLoad.thenApply(map -> { List values = new ArrayList<>(); for (K key : keys) { @@ -452,7 +451,7 @@ private CompletableFuture>> getFromValueCache(List keys) { } } - private CompletionStage> setToValueCache(List assembledValues, List missedKeys, List missedValues) { + private CompletableFuture> setToValueCache(List assembledValues, List missedKeys, List missedValues) { try { boolean completeValueAfterCacheSet = loaderOptions.getValueCacheOptions().isCompleteValueAfterCacheSet(); if (completeValueAfterCacheSet) { From af28c040d695139a00b2c6c5de259d33d777566f Mon Sep 17 00:00:00 2001 From: Brad Baker Date: Mon, 9 Aug 2021 16:12:08 +1000 Subject: [PATCH 9/9] better asserts --- .../java/org/dataloader/DataLoaderHelper.java | 48 +++++++++---------- 1 file changed, 24 insertions(+), 24 deletions(-) diff --git a/src/main/java/org/dataloader/DataLoaderHelper.java b/src/main/java/org/dataloader/DataLoaderHelper.java index a788889..dbd9383 100644 --- a/src/main/java/org/dataloader/DataLoaderHelper.java +++ b/src/main/java/org/dataloader/DataLoaderHelper.java @@ -8,7 +8,6 @@ import java.time.Instant; import java.util.ArrayList; import java.util.Collection; -import java.util.LinkedHashMap; import java.util.LinkedHashSet; import java.util.List; import java.util.Map; @@ -335,45 +334,46 @@ CompletableFuture> invokeLoader(List keys, List keyContexts, assertState(keys.size() == cachedValues.size(), () -> "The size of the cached values MUST be the same size as the key list"); - LinkedHashMap valuesInKeyOrder = new LinkedHashMap<>(); - List cacheMissedKeys = new ArrayList<>(); - List cacheMissedContexts = new ArrayList<>(); + // the following is NOT a Map because keys in data loader can repeat (by design) + // and hence "a","b","c","b" is a valid set of keys + List> valuesInKeyOrder = new ArrayList<>(); + List missedKeyIndexes = new ArrayList<>(); + List missedKeys = new ArrayList<>(); + List missedKeyContexts = new ArrayList<>(); for (int i = 0; i < keys.size(); i++) { - K key = keys.get(i); - Object keyContext = keyContexts.get(i); Try cacheGet = cachedValues.get(i); - if (cacheGet.isSuccess()) { - valuesInKeyOrder.put(key, cacheGet.get()); - } else { - valuesInKeyOrder.put(key, null); // an entry to be replaced later - cacheMissedKeys.add(key); - cacheMissedContexts.add(keyContext); + valuesInKeyOrder.add(cacheGet); + if (cacheGet.isFailure()) { + missedKeyIndexes.add(i); + missedKeys.add(keys.get(i)); + missedKeyContexts.add(keyContexts.get(i)); } } - if (cacheMissedKeys.isEmpty()) { + if (missedKeys.isEmpty()) { // // everything was cached // - return completedFuture(new ArrayList<>(valuesInKeyOrder.values())); + List assembledValues = valuesInKeyOrder.stream().map(Try::get).collect(toList()); + return completedFuture(assembledValues); } else { // // we missed some of the keys from cache, so send them to the batch loader // and then fill in their values // - CompletableFuture> batchLoad = invokeLoader(cacheMissedKeys, cacheMissedContexts); + CompletableFuture> batchLoad = invokeLoader(missedKeys, missedKeyContexts); return batchLoad.thenCompose(missedValues -> { - assertResultSize(cacheMissedKeys, missedValues); + assertResultSize(missedKeys, missedValues); for (int i = 0; i < missedValues.size(); i++) { - K missedKey = cacheMissedKeys.get(i); V v = missedValues.get(i); - valuesInKeyOrder.put(missedKey, v); + Integer listIndex = missedKeyIndexes.get(i); + valuesInKeyOrder.set(listIndex, Try.succeeded(v)); } - List assembledValues = new ArrayList<>(valuesInKeyOrder.values()); + List assembledValues = valuesInKeyOrder.stream().map(Try::get).collect(toList()); // // fire off a call to the ValueCache to allow it to set values into the // cache now that we have them - return setToValueCache(assembledValues, cacheMissedKeys, missedValues); + return setToValueCache(assembledValues, missedKeys, missedValues); }); } }); @@ -405,7 +405,7 @@ private CompletableFuture> invokeListBatchLoader(List keys, BatchLoad } else { loadResult = ((BatchLoader) batchLoadFunction).load(keys); } - return nonNull(loadResult, () -> "Your batch loader function MUST return a non null CompletionStage promise").toCompletableFuture(); + return nonNull(loadResult, () -> "Your batch loader function MUST return a non null CompletionStage").toCompletableFuture(); } @@ -422,7 +422,7 @@ private CompletableFuture> invokeMapBatchLoader(List keys, BatchLoade } else { loadResult = ((MappedBatchLoader) batchLoadFunction).load(setOfKeys); } - CompletableFuture> mapBatchLoad = nonNull(loadResult, () -> "Your batch loader function MUST return a non null CompletionStage promise").toCompletableFuture(); + CompletableFuture> mapBatchLoad = nonNull(loadResult, () -> "Your batch loader function MUST return a non null CompletionStage").toCompletableFuture(); return mapBatchLoad.thenApply(map -> { List values = new ArrayList<>(); for (K key : keys) { @@ -445,7 +445,7 @@ int dispatchDepth() { private CompletableFuture>> getFromValueCache(List keys) { try { - return nonNull(valueCache.getValues(keys), () -> "Your ValueCache.getValues function MUST return a non null promise"); + return nonNull(valueCache.getValues(keys), () -> "Your ValueCache.getValues function MUST return a non null CompletableFuture"); } catch (RuntimeException e) { return CompletableFutureKit.failedFuture(e); } @@ -456,7 +456,7 @@ private CompletableFuture> setToValueCache(List assembledValues, List boolean completeValueAfterCacheSet = loaderOptions.getValueCacheOptions().isCompleteValueAfterCacheSet(); if (completeValueAfterCacheSet) { return nonNull(valueCache - .setValues(missedKeys, missedValues), () -> "Your ValueCache.setValues function MUST return a non null promise") + .setValues(missedKeys, missedValues), () -> "Your ValueCache.setValues function MUST return a non null CompletableFuture") // we dont trust the set cache to give us the values back - we have them - lets use them // if the cache set fails - then they wont be in cache and maybe next time they will .handle((ignored, setExIgnored) -> assembledValues);