diff --git a/src/main/java/org/dataloader/CacheMap.java b/src/main/java/org/dataloader/CacheMap.java index f60c6ef..cef6faf 100644 --- a/src/main/java/org/dataloader/CacheMap.java +++ b/src/main/java/org/dataloader/CacheMap.java @@ -46,12 +46,12 @@ public interface CacheMap { * * @return the cache map */ - static CacheMap> simpleMap() { + static CacheMap simpleMap() { return new DefaultCacheMap<>(); } /** - * Checks whether the specified key is contained in the cach map. + * Checks whether the specified key is contained in the cache map. * * @param key the key to check * @@ -69,7 +69,7 @@ static CacheMap> simpleMap() { * * @return the cached value, or {@code null} if not found (depends on cache implementation) */ - V get(U key); + CompletableFuture get(U key); /** * Creates a new cache map entry with the specified key and value, or updates the value if the key already exists. @@ -79,7 +79,7 @@ static CacheMap> simpleMap() { * * @return the cache map for fluent coding */ - CacheMap set(U key, V value); + CacheMap set(U key, CompletableFuture value); /** * Deletes the entry with the specified key from the cache map, if it exists. @@ -91,7 +91,7 @@ static CacheMap> simpleMap() { CacheMap delete(U key); /** - * Clears all entries of the cache map + * Clears all entries of the cache map. * * @return the cache map for fluent coding */ diff --git a/src/main/java/org/dataloader/CacheStore.java b/src/main/java/org/dataloader/CacheStore.java new file mode 100644 index 0000000..75d7252 --- /dev/null +++ b/src/main/java/org/dataloader/CacheStore.java @@ -0,0 +1,80 @@ +package org.dataloader; + +import org.dataloader.impl.DefaultCacheStore; + +import java.util.concurrent.CompletableFuture; + +/** + * Remote cache store for data loaders that use caching and want a long-lived or external cache. + *

+ * The default implementation is a no-op store which replies with the key always missing and doesn't + * store any actual results. This is to avoid duplicating the stored data between the {@link CacheMap} + * and the store. + * + * @param the type of cache keys + * @param the type of cache values + * + * @author Craig Day + */ +@PublicSpi +public interface CacheStore { + + /** + * Creates a new store, using the default no-op implementation. + * + * @param the type of cache keys + * @param the type of cache values + * + * @return the cache store + */ + static CacheStore defaultStore() { + return new DefaultCacheStore<>(); + } + + /** + * Checks whether the specified key is contained in the store. + * + * @param key the key to check + * + * @return {@code true} if the cache contains the key, {@code false} otherwise + */ + CompletableFuture has(K key); + + /** + * Gets the specified key from the store. + * + * @apiNote The future may fail if the key does not exist depending on implementation. It is + * recommended to compose this call with {@link #has(Object)}. + * + * @param key the key to retrieve + * + * @return a future containing the cached value, or {@code null} if not found (depends on implementation) + */ + CompletableFuture get(K key); + + /** + * Stores the value with the specified key, or updates it if the key already exists. + * + * @param key the key to store + * @param value the value to store + * + * @return a future containing the stored value for fluent composition + */ + CompletableFuture set(K key, V value); + + /** + * Deletes the entry with the specified key from the store, if it exists. + * + * @param key the key to delete + * + * @return a void future for error handling and fluent composition + */ + CompletableFuture delete(K key); + + /** + * Clears all entries from the store. + * + * @return a void future for error handling and fluent composition + */ + CompletableFuture clear(); +} diff --git a/src/main/java/org/dataloader/DataLoader.java b/src/main/java/org/dataloader/DataLoader.java index 3f200c3..39ba213 100644 --- a/src/main/java/org/dataloader/DataLoader.java +++ b/src/main/java/org/dataloader/DataLoader.java @@ -16,6 +16,7 @@ package org.dataloader; +import java.util.function.BiConsumer; import org.dataloader.impl.CompletableFutureKit; import org.dataloader.stats.Statistics; import org.dataloader.stats.StatisticsCollector; @@ -58,8 +59,9 @@ public class DataLoader { private final DataLoaderHelper helper; - private final CacheMap> futureCache; private final StatisticsCollector stats; + private final CacheMap futureCache; + private final CacheStore remoteValueStore; /** * Creates new DataLoader with the specified batch loader function and default options @@ -333,15 +335,21 @@ public DataLoader(BatchLoader batchLoadFunction, DataLoaderOptions options private DataLoader(Object batchLoadFunction, DataLoaderOptions options) { DataLoaderOptions loaderOptions = options == null ? new DataLoaderOptions() : options; this.futureCache = determineCacheMap(loaderOptions); + this.remoteValueStore = determineCacheStore(loaderOptions); // order of keys matter in data loader this.stats = nonNull(loaderOptions.getStatisticsCollector()); - this.helper = new DataLoaderHelper<>(this, batchLoadFunction, loaderOptions, this.futureCache, this.stats); + this.helper = new DataLoaderHelper<>(this, batchLoadFunction, loaderOptions, this.futureCache, this.remoteValueStore, this.stats); } @SuppressWarnings("unchecked") - private CacheMap> determineCacheMap(DataLoaderOptions loaderOptions) { - return loaderOptions.cacheMap().isPresent() ? (CacheMap>) loaderOptions.cacheMap().get() : CacheMap.simpleMap(); + private CacheMap determineCacheMap(DataLoaderOptions loaderOptions) { + return loaderOptions.cacheMap().orElseGet(CacheMap::simpleMap); + } + + @SuppressWarnings("unchecked") + private CacheStore determineCacheStore(DataLoaderOptions loaderOptions) { + return loaderOptions.remoteValueStore().orElseGet(CacheStore::defaultStore); } /** @@ -519,9 +527,22 @@ public int dispatchDepth() { * @return the data loader for fluent coding */ public DataLoader clear(K key) { + return clear(key, (a, b) -> {}); + } + + /** + * Clears the future with the specified key from the cache remote value store, if caching is enabled + * and a remote store is set, so it will be re-fetched and stored on the next load request. + * + * @param key the key to remove + * @param handler a handler that will be called after the async remote clear completes + * @return the data loader for fluent coding + */ + public DataLoader clear(K key, BiConsumer handler) { Object cacheKey = getCacheKey(key); synchronized (this) { futureCache.delete(cacheKey); + remoteValueStore.delete(key).whenComplete(handler); } return this; } @@ -532,8 +553,19 @@ public DataLoader clear(K key) { * @return the data loader for fluent coding */ public DataLoader clearAll() { + return clearAll((a, b) -> {}); + } + + /** + * Clears the entire cache map of the loader, and of the remote store. + * + * @param handler a handler that will be called after the async remote clear all completes + * @return the data loader for fluent coding + */ + public DataLoader clearAll(BiConsumer handler) { synchronized (this) { futureCache.clear(); + remoteValueStore.clear().whenComplete(handler); } return this; } diff --git a/src/main/java/org/dataloader/DataLoaderHelper.java b/src/main/java/org/dataloader/DataLoaderHelper.java index edef507..f6bf2a7 100644 --- a/src/main/java/org/dataloader/DataLoaderHelper.java +++ b/src/main/java/org/dataloader/DataLoaderHelper.java @@ -58,15 +58,17 @@ Object getCallContext() { private final DataLoader dataLoader; private final Object batchLoadFunction; private final DataLoaderOptions loaderOptions; - private final CacheMap> futureCache; + private final CacheMap futureCache; + private final CacheStore cacheStore; private final List>> loaderQueue; private final StatisticsCollector stats; - DataLoaderHelper(DataLoader dataLoader, Object batchLoadFunction, DataLoaderOptions loaderOptions, CacheMap> futureCache, StatisticsCollector stats) { + DataLoaderHelper(DataLoader dataLoader, Object batchLoadFunction, DataLoaderOptions loaderOptions, CacheMap futureCache, CacheStore cacheStore, StatisticsCollector stats) { this.dataLoader = dataLoader; this.batchLoadFunction = batchLoadFunction; this.loaderOptions = loaderOptions; this.futureCache = futureCache; + this.cacheStore = cacheStore; this.loaderQueue = new ArrayList<>(); this.stats = stats; } @@ -104,35 +106,13 @@ CompletableFuture load(K key, Object loadContext) { boolean batchingEnabled = loaderOptions.batchingEnabled(); boolean cachingEnabled = loaderOptions.cachingEnabled(); - Object cacheKey = null; - if (cachingEnabled) { - if (loadContext == null) { - cacheKey = getCacheKey(key); - } else { - cacheKey = getCacheKeyWithContext(key, loadContext); - } - } stats.incrementLoadCount(); if (cachingEnabled) { - if (futureCache.containsKey(cacheKey)) { - stats.incrementCacheHitCount(); - return futureCache.get(cacheKey); - } - } - - CompletableFuture future = new CompletableFuture<>(); - if (batchingEnabled) { - loaderQueue.add(new LoaderQueueEntry<>(key, future, loadContext)); + return loadFromCache(key, loadContext, batchingEnabled); } else { - stats.incrementBatchLoadCountBy(1); - // immediate execution of batch function - future = invokeLoaderImmediately(key, loadContext); + return queueOrInvokeLoad(key, loadContext, batchingEnabled); } - if (cachingEnabled) { - futureCache.set(cacheKey, future); - } - return future; } } @@ -188,6 +168,73 @@ DispatchResult dispatch() { return new DispatchResult<>(futureList, totalEntriesHandled); } + private CompletableFuture loadFromCache(K key, Object loadContext, boolean batchingEnabled) { + final Object cacheKey = loadContext == null ? getCacheKey(key) : getCacheKeyWithContext(key, loadContext); + + if (futureCache.containsKey(cacheKey)) { + // We already have a promise for this key, no need to check value cache or queue up load + stats.incrementCacheHitCount(); + return futureCache.get(cacheKey); + } + + /* + We haven't been asked for this key yet. We want to do one of two things: + + 1. Check if our cache store has it. If so: + a. Get the value from the cache store + b. Add a recovery case so we queue the load if fetching from cache store fails + c. Put that future in our futureCache to hit the early return next time + d. Return the resilient future + 2. If not in value cache: + a. queue or invoke the load + b. Add a success handler to store the result in the cache store + c. Return the result + */ + final CompletableFuture future = new CompletableFuture<>(); + + cacheStore.has(cacheKey).whenComplete((hasKey, e1) -> { + if (hasKey && e1 == null) { + cacheStore.get(cacheKey).whenComplete((cachedValue, e2) -> { + if (e2 == null) { + future.complete(cachedValue); + } else { + queueOrInvokeLoad(key, loadContext, batchingEnabled).whenComplete((result, error) -> { + if (error == null) { + future.complete(result); + } else { + future.completeExceptionally(error); + } + }); + } + }); + } else { + queueOrInvokeLoad(key, loadContext, batchingEnabled).whenComplete((result, error) -> { + if (error == null) { + cacheStore.set(cacheKey, result).whenComplete((v, e) -> future.complete(result)); + } else { + future.completeExceptionally(error); + } + }); + } + }); + + futureCache.set(cacheKey, future); + + return future; + } + + private CompletableFuture queueOrInvokeLoad(K key, Object loadContext, boolean batchingEnabled) { + if (batchingEnabled) { + CompletableFuture future = new CompletableFuture<>(); + loaderQueue.add(new LoaderQueueEntry<>(key, future, loadContext)); + return future; + } else { + stats.incrementBatchLoadCountBy(1); + // immediate execution of batch function + return invokeLoaderImmediately(key, loadContext); + } + } + private CompletableFuture> sliceIntoBatchesOfBatches(List keys, List> queuedFutures, List callContexts, int maxBatchSize) { // the number of keys is > than what the batch loader function can accept // so make multiple calls to the loader diff --git a/src/main/java/org/dataloader/DataLoaderOptions.java b/src/main/java/org/dataloader/DataLoaderOptions.java index 8158902..d863ba5 100644 --- a/src/main/java/org/dataloader/DataLoaderOptions.java +++ b/src/main/java/org/dataloader/DataLoaderOptions.java @@ -39,6 +39,7 @@ public class DataLoaderOptions { private boolean cachingExceptionsEnabled; private CacheKey cacheKeyFunction; private CacheMap cacheMap; + private CacheStore remoteValueStore; private int maxBatchSize; private Supplier statisticsCollector; private BatchLoaderContextProvider environmentProvider; @@ -67,6 +68,7 @@ public DataLoaderOptions(DataLoaderOptions other) { this.cachingExceptionsEnabled = other.cachingExceptionsEnabled; this.cacheKeyFunction = other.cacheKeyFunction; this.cacheMap = other.cacheMap; + this.remoteValueStore = other.remoteValueStore; this.maxBatchSize = other.maxBatchSize; this.statisticsCollector = other.statisticsCollector; this.environmentProvider = other.environmentProvider; @@ -193,6 +195,29 @@ public DataLoaderOptions setCacheMap(CacheMap cacheMap) { return this; } + /** + * Gets the (optional) cache store implementation that is used for value storage, if caching is enabled. + *

+ * If missing, a no-op implementation will be used. + * + * @return an optional with the cache store instance, or empty + */ + public Optional remoteValueStore() { + return Optional.ofNullable(remoteValueStore); + } + + /** + * Sets the value store implementation to use for caching values, if caching is enabled. + * + * @param remoteValueStore the cache store instance + * + * @return the data loader options for fluent coding + */ + public DataLoaderOptions setRemoteValueStore(CacheStore remoteValueStore) { + this.remoteValueStore = remoteValueStore; + return this; + } + /** * Gets the maximum number of keys that will be presented to the {@link BatchLoader} function * before they are split into multiple class diff --git a/src/main/java/org/dataloader/impl/DefaultCacheMap.java b/src/main/java/org/dataloader/impl/DefaultCacheMap.java index 0dc377e..2539934 100644 --- a/src/main/java/org/dataloader/impl/DefaultCacheMap.java +++ b/src/main/java/org/dataloader/impl/DefaultCacheMap.java @@ -16,6 +16,7 @@ package org.dataloader.impl; +import java.util.concurrent.CompletableFuture; import org.dataloader.CacheMap; import org.dataloader.Internal; @@ -33,7 +34,7 @@ @Internal public class DefaultCacheMap implements CacheMap { - private final Map cache; + private final Map> cache; /** * Default constructor @@ -54,7 +55,7 @@ public boolean containsKey(U key) { * {@inheritDoc} */ @Override - public V get(U key) { + public CompletableFuture get(U key) { return cache.get(key); } @@ -62,7 +63,7 @@ public V get(U key) { * {@inheritDoc} */ @Override - public CacheMap set(U key, V value) { + public CacheMap set(U key, CompletableFuture value) { cache.put(key, value); return this; } diff --git a/src/main/java/org/dataloader/impl/DefaultCacheStore.java b/src/main/java/org/dataloader/impl/DefaultCacheStore.java new file mode 100644 index 0000000..c9ff922 --- /dev/null +++ b/src/main/java/org/dataloader/impl/DefaultCacheStore.java @@ -0,0 +1,61 @@ +package org.dataloader.impl; + +import org.dataloader.CacheStore; +import org.dataloader.Internal; + +import java.util.concurrent.CompletableFuture; + +/** + * Default implementation of {@link CacheStore} that does nothing. + *

+ * We don't want to store values in memory twice, so when using the default store we just + * say we never have the key and complete the other methods by doing nothing. + * + * @param the type of cache keys + * @param the type of cache values + * + * @author Craig Day + */ +@Internal +public class DefaultCacheStore implements CacheStore { + + /** + * {@inheritDoc} + */ + @Override + public CompletableFuture has(K key) { + return CompletableFuture.completedFuture(false); + } + + /** + * {@inheritDoc} + */ + @Override + public CompletableFuture get(K key) { + return CompletableFutureKit.failedFuture(new UnsupportedOperationException()); + } + + /** + * {@inheritDoc} + */ + @Override + public CompletableFuture set(K key, V value) { + return CompletableFuture.completedFuture(value); + } + + /** + * {@inheritDoc} + */ + @Override + public CompletableFuture delete(K key) { + return CompletableFuture.completedFuture(null); + } + + /** + * {@inheritDoc} + */ + @Override + public CompletableFuture clear() { + return CompletableFuture.completedFuture(null); + } +} diff --git a/src/test/java/ReadmeExamples.java b/src/test/java/ReadmeExamples.java index 523cb5a..cbafcd9 100644 --- a/src/test/java/ReadmeExamples.java +++ b/src/test/java/ReadmeExamples.java @@ -213,12 +213,12 @@ public boolean containsKey(Object key) { } @Override - public Object get(Object key) { + public CompletableFuture get(Object key) { return null; } @Override - public CacheMap set(Object key, Object value) { + public CacheMap set(Object key, CompletableFuture value) { return null; } diff --git a/src/test/java/org/dataloader/CustomCacheMap.java b/src/test/java/org/dataloader/CustomCacheMap.java index 505148d..21001bf 100644 --- a/src/test/java/org/dataloader/CustomCacheMap.java +++ b/src/test/java/org/dataloader/CustomCacheMap.java @@ -2,10 +2,11 @@ import java.util.LinkedHashMap; import java.util.Map; +import java.util.concurrent.CompletableFuture; public class CustomCacheMap implements CacheMap { - public Map stash; + public Map> stash; public CustomCacheMap() { stash = new LinkedHashMap<>(); @@ -17,12 +18,12 @@ public boolean containsKey(String key) { } @Override - public Object get(String key) { + public CompletableFuture get(String key) { return stash.get(key); } @Override - public CacheMap set(String key, Object value) { + public CacheMap set(String key, CompletableFuture value) { stash.put(key, value); return this; } diff --git a/src/test/java/org/dataloader/CustomCacheStore.java b/src/test/java/org/dataloader/CustomCacheStore.java new file mode 100644 index 0000000..a73ece9 --- /dev/null +++ b/src/test/java/org/dataloader/CustomCacheStore.java @@ -0,0 +1,38 @@ +package org.dataloader; + +import java.util.Map; +import java.util.concurrent.CompletableFuture; +import java.util.concurrent.ConcurrentHashMap; + +public class CustomCacheStore implements CacheStore { + + public final Map store = new ConcurrentHashMap<>(); + + @Override + public CompletableFuture has(String key) { + return CompletableFuture.completedFuture(store.containsKey(key)); + } + + @Override + public CompletableFuture get(String key) { + return CompletableFuture.completedFuture(store.get(key)); + } + + @Override + public CompletableFuture set(String key, Object value) { + store.put(key, value); + return CompletableFuture.completedFuture(value); + } + + @Override + public CompletableFuture delete(String key) { + store.remove(key); + return CompletableFuture.completedFuture(null); + } + + @Override + public CompletableFuture clear() { + store.clear(); + return CompletableFuture.completedFuture(null); + } +} diff --git a/src/test/java/org/dataloader/DataLoaderTest.java b/src/test/java/org/dataloader/DataLoaderTest.java index 0718225..a4ac204 100644 --- a/src/test/java/org/dataloader/DataLoaderTest.java +++ b/src/test/java/org/dataloader/DataLoaderTest.java @@ -801,6 +801,52 @@ public void should_Accept_a_custom_cache_map_implementation() throws ExecutionEx assertArrayEquals(customMap.stash.keySet().toArray(), emptyList().toArray()); } + @Test + public void should_Accept_a_remote_value_store_for_caching() throws ExecutionException, InterruptedException { + CustomCacheStore customStore = new CustomCacheStore(); + List> loadCalls = new ArrayList<>(); + DataLoaderOptions options = newOptions().setRemoteValueStore(customStore); + DataLoader identityLoader = idLoader(options, loadCalls); + + // Fetches as expected + + CompletableFuture future1 = identityLoader.load("a"); + CompletableFuture future2 = identityLoader.load("b"); + CompletableFuture> composite = identityLoader.dispatch(); + + await().until(composite::isDone); + assertThat(future1.get(), equalTo("a")); + assertThat(future2.get(), equalTo("b")); + + assertThat(loadCalls, equalTo(singletonList(asList("a", "b")))); + assertArrayEquals(customStore.store.keySet().toArray(), asList("a", "b").toArray()); + + CompletableFuture future3 = identityLoader.load("c"); + CompletableFuture future2a = identityLoader.load("b"); + composite = identityLoader.dispatch(); + + await().until(composite::isDone); + assertThat(future3.get(), equalTo("c")); + assertThat(future2a.get(), equalTo("b")); + + assertThat(loadCalls, equalTo(asList(asList("a", "b"), singletonList("c")))); + assertArrayEquals(customStore.store.keySet().toArray(), asList("a", "b", "c").toArray()); + + // Supports clear + + CompletableFuture futureC = new CompletableFuture<>(); + identityLoader.clear("b", (v, e) -> futureC.complete(v)); + await().until(futureC::isDone); + assertArrayEquals(customStore.store.keySet().toArray(), asList("a", "c").toArray()); + + // Supports clear all + + CompletableFuture futureCa = new CompletableFuture<>(); + identityLoader.clearAll((v, e) -> futureCa.complete(v)); + await().until(futureCa::isDone); + assertArrayEquals(customStore.store.keySet().toArray(), emptyList().toArray()); + } + @Test public void batching_disabled_should_dispatch_immediately() throws Exception { List> loadCalls = new ArrayList<>();