Skip to content

future cache vs value cache #82

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Closed
wants to merge 2 commits into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
10 changes: 5 additions & 5 deletions src/main/java/org/dataloader/CacheMap.java
Original file line number Diff line number Diff line change
Expand Up @@ -46,12 +46,12 @@ public interface CacheMap<U, V> {
*
* @return the cache map
*/
static <U, V> CacheMap<U, CompletableFuture<V>> simpleMap() {
static <U, V> CacheMap<U, V> simpleMap() {
return new DefaultCacheMap<>();
}

/**
* Checks whether the specified key is contained in the cach map.
* Checks whether the specified key is contained in the cache map.
*
* @param key the key to check
*
Expand All @@ -69,7 +69,7 @@ static <U, V> CacheMap<U, CompletableFuture<V>> simpleMap() {
*
* @return the cached value, or {@code null} if not found (depends on cache implementation)
*/
V get(U key);
CompletableFuture<V> get(U key);

/**
* Creates a new cache map entry with the specified key and value, or updates the value if the key already exists.
Expand All @@ -79,7 +79,7 @@ static <U, V> CacheMap<U, CompletableFuture<V>> simpleMap() {
*
* @return the cache map for fluent coding
*/
CacheMap<U, V> set(U key, V value);
CacheMap<U, V> set(U key, CompletableFuture<V> value);

/**
* Deletes the entry with the specified key from the cache map, if it exists.
Expand All @@ -91,7 +91,7 @@ static <U, V> CacheMap<U, CompletableFuture<V>> simpleMap() {
CacheMap<U, V> delete(U key);

/**
* Clears all entries of the cache map
* Clears all entries of the cache map.
*
* @return the cache map for fluent coding
*/
Expand Down
80 changes: 80 additions & 0 deletions src/main/java/org/dataloader/CacheStore.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,80 @@
package org.dataloader;

import org.dataloader.impl.DefaultCacheStore;

import java.util.concurrent.CompletableFuture;

/**
* Remote cache store for data loaders that use caching and want a long-lived or external cache.
* <p>
* The default implementation is a no-op store which replies with the key always missing and doesn't
* store any actual results. This is to avoid duplicating the stored data between the {@link CacheMap}
* and the store.
*
* @param <K> the type of cache keys
* @param <V> the type of cache values
*
* @author <a href="https://github.com/craig-day">Craig Day</a>
*/
@PublicSpi
public interface CacheStore<K, V> {

/**
* Creates a new store, using the default no-op implementation.
*
* @param <K> the type of cache keys
* @param <V> the type of cache values
*
* @return the cache store
*/
static <K, V> CacheStore<K, V> defaultStore() {
return new DefaultCacheStore<>();
}

/**
* Checks whether the specified key is contained in the store.
*
* @param key the key to check
*
* @return {@code true} if the cache contains the key, {@code false} otherwise
*/
CompletableFuture<Boolean> has(K key);

/**
* Gets the specified key from the store.
*
* @apiNote The future may fail if the key does not exist depending on implementation. It is
* recommended to compose this call with {@link #has(Object)}.
*
* @param key the key to retrieve
*
* @return a future containing the cached value, or {@code null} if not found (depends on implementation)
*/
CompletableFuture<V> get(K key);

/**
* Stores the value with the specified key, or updates it if the key already exists.
*
* @param key the key to store
* @param value the value to store
*
* @return a future containing the stored value for fluent composition
*/
CompletableFuture<V> set(K key, V value);

/**
* Deletes the entry with the specified key from the store, if it exists.
*
* @param key the key to delete
*
* @return a void future for error handling and fluent composition
*/
CompletableFuture<Void> delete(K key);

/**
* Clears all entries from the store.
*
* @return a void future for error handling and fluent composition
*/
CompletableFuture<Void> clear();
}
40 changes: 36 additions & 4 deletions src/main/java/org/dataloader/DataLoader.java
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@

package org.dataloader;

import java.util.function.BiConsumer;
import org.dataloader.impl.CompletableFutureKit;
import org.dataloader.stats.Statistics;
import org.dataloader.stats.StatisticsCollector;
Expand Down Expand Up @@ -58,8 +59,9 @@
public class DataLoader<K, V> {

private final DataLoaderHelper<K, V> helper;
private final CacheMap<Object, CompletableFuture<V>> futureCache;
private final StatisticsCollector stats;
private final CacheMap<Object, V> futureCache;
private final CacheStore<Object, V> remoteValueStore;

/**
* Creates new DataLoader with the specified batch loader function and default options
Expand Down Expand Up @@ -333,15 +335,21 @@ public DataLoader(BatchLoader<K, V> batchLoadFunction, DataLoaderOptions options
private DataLoader(Object batchLoadFunction, DataLoaderOptions options) {
DataLoaderOptions loaderOptions = options == null ? new DataLoaderOptions() : options;
this.futureCache = determineCacheMap(loaderOptions);
this.remoteValueStore = determineCacheStore(loaderOptions);
// order of keys matter in data loader
this.stats = nonNull(loaderOptions.getStatisticsCollector());

this.helper = new DataLoaderHelper<>(this, batchLoadFunction, loaderOptions, this.futureCache, this.stats);
this.helper = new DataLoaderHelper<>(this, batchLoadFunction, loaderOptions, this.futureCache, this.remoteValueStore, this.stats);
}

@SuppressWarnings("unchecked")
private CacheMap<Object, CompletableFuture<V>> determineCacheMap(DataLoaderOptions loaderOptions) {
return loaderOptions.cacheMap().isPresent() ? (CacheMap<Object, CompletableFuture<V>>) loaderOptions.cacheMap().get() : CacheMap.simpleMap();
private CacheMap<Object, V> determineCacheMap(DataLoaderOptions loaderOptions) {
return loaderOptions.cacheMap().orElseGet(CacheMap::simpleMap);
}

@SuppressWarnings("unchecked")
private CacheStore<Object, V> determineCacheStore(DataLoaderOptions loaderOptions) {
return loaderOptions.remoteValueStore().orElseGet(CacheStore::defaultStore);
}

/**
Expand Down Expand Up @@ -519,9 +527,22 @@ public int dispatchDepth() {
* @return the data loader for fluent coding
*/
public DataLoader<K, V> clear(K key) {
return clear(key, (a, b) -> {});
}

/**
* Clears the future with the specified key from the cache remote value store, if caching is enabled
* and a remote store is set, so it will be re-fetched and stored on the next load request.
*
* @param key the key to remove
* @param handler a handler that will be called after the async remote clear completes
* @return the data loader for fluent coding
*/
public DataLoader<K, V> clear(K key, BiConsumer<Void, Throwable> handler) {
Object cacheKey = getCacheKey(key);
synchronized (this) {
futureCache.delete(cacheKey);
remoteValueStore.delete(key).whenComplete(handler);
}
return this;
}
Expand All @@ -532,8 +553,19 @@ public DataLoader<K, V> clear(K key) {
* @return the data loader for fluent coding
*/
public DataLoader<K, V> clearAll() {
return clearAll((a, b) -> {});
}

/**
* Clears the entire cache map of the loader, and of the remote store.
*
* @param handler a handler that will be called after the async remote clear all completes
* @return the data loader for fluent coding
*/
public DataLoader<K, V> clearAll(BiConsumer<Void, Throwable> handler) {
synchronized (this) {
futureCache.clear();
remoteValueStore.clear().whenComplete(handler);
}
return this;
}
Expand Down
99 changes: 73 additions & 26 deletions src/main/java/org/dataloader/DataLoaderHelper.java
Original file line number Diff line number Diff line change
Expand Up @@ -58,15 +58,17 @@ Object getCallContext() {
private final DataLoader<K, V> dataLoader;
private final Object batchLoadFunction;
private final DataLoaderOptions loaderOptions;
private final CacheMap<Object, CompletableFuture<V>> futureCache;
private final CacheMap<Object, V> futureCache;
private final CacheStore<Object, V> cacheStore;
private final List<LoaderQueueEntry<K, CompletableFuture<V>>> loaderQueue;
private final StatisticsCollector stats;

DataLoaderHelper(DataLoader<K, V> dataLoader, Object batchLoadFunction, DataLoaderOptions loaderOptions, CacheMap<Object, CompletableFuture<V>> futureCache, StatisticsCollector stats) {
DataLoaderHelper(DataLoader<K, V> dataLoader, Object batchLoadFunction, DataLoaderOptions loaderOptions, CacheMap<Object, V> futureCache, CacheStore<Object, V> cacheStore, StatisticsCollector stats) {
this.dataLoader = dataLoader;
this.batchLoadFunction = batchLoadFunction;
this.loaderOptions = loaderOptions;
this.futureCache = futureCache;
this.cacheStore = cacheStore;
this.loaderQueue = new ArrayList<>();
this.stats = stats;
}
Expand Down Expand Up @@ -104,35 +106,13 @@ CompletableFuture<V> load(K key, Object loadContext) {
boolean batchingEnabled = loaderOptions.batchingEnabled();
boolean cachingEnabled = loaderOptions.cachingEnabled();

Object cacheKey = null;
if (cachingEnabled) {
if (loadContext == null) {
cacheKey = getCacheKey(key);
} else {
cacheKey = getCacheKeyWithContext(key, loadContext);
}
}
stats.incrementLoadCount();

if (cachingEnabled) {
if (futureCache.containsKey(cacheKey)) {
stats.incrementCacheHitCount();
return futureCache.get(cacheKey);
}
}

CompletableFuture<V> future = new CompletableFuture<>();
if (batchingEnabled) {
loaderQueue.add(new LoaderQueueEntry<>(key, future, loadContext));
return loadFromCache(key, loadContext, batchingEnabled);
} else {
stats.incrementBatchLoadCountBy(1);
// immediate execution of batch function
future = invokeLoaderImmediately(key, loadContext);
return queueOrInvokeLoad(key, loadContext, batchingEnabled);
}
if (cachingEnabled) {
futureCache.set(cacheKey, future);
}
return future;
}
}

Expand Down Expand Up @@ -188,6 +168,73 @@ DispatchResult<V> dispatch() {
return new DispatchResult<>(futureList, totalEntriesHandled);
}

private CompletableFuture<V> loadFromCache(K key, Object loadContext, boolean batchingEnabled) {
final Object cacheKey = loadContext == null ? getCacheKey(key) : getCacheKeyWithContext(key, loadContext);

if (futureCache.containsKey(cacheKey)) {
// We already have a promise for this key, no need to check value cache or queue up load
stats.incrementCacheHitCount();
return futureCache.get(cacheKey);
}

/*
We haven't been asked for this key yet. We want to do one of two things:

1. Check if our cache store has it. If so:
a. Get the value from the cache store
b. Add a recovery case so we queue the load if fetching from cache store fails
c. Put that future in our futureCache to hit the early return next time
d. Return the resilient future
2. If not in value cache:
a. queue or invoke the load
b. Add a success handler to store the result in the cache store
c. Return the result
*/
final CompletableFuture<V> future = new CompletableFuture<>();

cacheStore.has(cacheKey).whenComplete((hasKey, e1) -> {
if (hasKey && e1 == null) {
cacheStore.get(cacheKey).whenComplete((cachedValue, e2) -> {
if (e2 == null) {
future.complete(cachedValue);
} else {
queueOrInvokeLoad(key, loadContext, batchingEnabled).whenComplete((result, error) -> {
if (error == null) {
future.complete(result);
} else {
future.completeExceptionally(error);
}
});
}
});
} else {
queueOrInvokeLoad(key, loadContext, batchingEnabled).whenComplete((result, error) -> {
if (error == null) {
cacheStore.set(cacheKey, result).whenComplete((v, e) -> future.complete(result));
} else {
future.completeExceptionally(error);
}
});
}
});

futureCache.set(cacheKey, future);

return future;
}

private CompletableFuture<V> queueOrInvokeLoad(K key, Object loadContext, boolean batchingEnabled) {
if (batchingEnabled) {
CompletableFuture<V> future = new CompletableFuture<>();
loaderQueue.add(new LoaderQueueEntry<>(key, future, loadContext));
return future;
} else {
stats.incrementBatchLoadCountBy(1);
// immediate execution of batch function
return invokeLoaderImmediately(key, loadContext);
}
}

private CompletableFuture<List<V>> sliceIntoBatchesOfBatches(List<K> keys, List<CompletableFuture<V>> queuedFutures, List<Object> callContexts, int maxBatchSize) {
// the number of keys is > than what the batch loader function can accept
// so make multiple calls to the loader
Expand Down
25 changes: 25 additions & 0 deletions src/main/java/org/dataloader/DataLoaderOptions.java
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,7 @@ public class DataLoaderOptions {
private boolean cachingExceptionsEnabled;
private CacheKey cacheKeyFunction;
private CacheMap cacheMap;
private CacheStore remoteValueStore;
private int maxBatchSize;
private Supplier<StatisticsCollector> statisticsCollector;
private BatchLoaderContextProvider environmentProvider;
Expand Down Expand Up @@ -67,6 +68,7 @@ public DataLoaderOptions(DataLoaderOptions other) {
this.cachingExceptionsEnabled = other.cachingExceptionsEnabled;
this.cacheKeyFunction = other.cacheKeyFunction;
this.cacheMap = other.cacheMap;
this.remoteValueStore = other.remoteValueStore;
this.maxBatchSize = other.maxBatchSize;
this.statisticsCollector = other.statisticsCollector;
this.environmentProvider = other.environmentProvider;
Expand Down Expand Up @@ -193,6 +195,29 @@ public DataLoaderOptions setCacheMap(CacheMap cacheMap) {
return this;
}

/**
* Gets the (optional) cache store implementation that is used for value storage, if caching is enabled.
* <p>
* If missing, a no-op implementation will be used.
*
* @return an optional with the cache store instance, or empty
*/
public Optional<CacheStore> remoteValueStore() {
return Optional.ofNullable(remoteValueStore);
}

/**
* Sets the value store implementation to use for caching values, if caching is enabled.
*
* @param remoteValueStore the cache store instance
*
* @return the data loader options for fluent coding
*/
public DataLoaderOptions setRemoteValueStore(CacheStore remoteValueStore) {
this.remoteValueStore = remoteValueStore;
return this;
}

/**
* Gets the maximum number of keys that will be presented to the {@link BatchLoader} function
* before they are split into multiple class
Expand Down
Loading