Skip to content

cache values and errors, not futures #81

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Closed
wants to merge 2 commits into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
28 changes: 19 additions & 9 deletions src/main/java/org/dataloader/CacheMap.java
Original file line number Diff line number Diff line change
Expand Up @@ -18,14 +18,12 @@

import org.dataloader.impl.DefaultCacheMap;

import java.util.concurrent.CompletableFuture;

/**
* Cache map interface for data loaders that use caching.
* <p>
* The default implementation used by the data loader is based on a {@link java.util.LinkedHashMap}. Note that the
* The default implementation used by the data loader is based on a {@link java.util.concurrent.ConcurrentHashMap}. Note that the
* implementation could also have used a regular {@link java.util.Map} instead of this {@link CacheMap}, but
* this aligns better to the reference data loader implementation provided by Facebook
* this aligns better to the reference data loader implementation provided by Facebook.
* <p>
* Also it doesn't require you to implement the full set of map overloads, just the required methods.
*
Expand All @@ -39,19 +37,19 @@
public interface CacheMap<U, V> {

/**
* Creates a new cache map, using the default implementation that is based on a {@link java.util.LinkedHashMap}.
* Creates a new cache map, using the default implementation that is based on a {@link java.util.concurrent.ConcurrentHashMap}.
*
* @param <U> type parameter indicating the type of the cache keys
* @param <V> type parameter indicating the type of the data that is cached
*
* @return the cache map
*/
static <U, V> CacheMap<U, CompletableFuture<V>> simpleMap() {
static <U, V> CacheMap<U, V> simpleMap() {
return new DefaultCacheMap<>();
}

/**
* Checks whether the specified key is contained in the cach map.
* Checks whether the specified key is contained in the cache map.
*
* @param key the key to check
*
Expand All @@ -62,14 +60,16 @@ static <U, V> CacheMap<U, CompletableFuture<V>> simpleMap() {
/**
* Gets the specified key from the cache map.
* <p>
* The result is wrapped in a {@link Try} to support caching both values and errors.
* <p>
* May throw an exception if the key does not exists, depending on the cache map implementation that is used,
* so be sure to check {@link CacheMap#containsKey(Object)} first.
*
* @param key the key to retrieve
*
* @return the cached value, or {@code null} if not found (depends on cache implementation)
* @return the cached value, error, or {@code null} if not found (depends on cache implementation)
*/
V get(U key);
Try<V> get(U key);

/**
* Creates a new cache map entry with the specified key and value, or updates the value if the key already exists.
Expand All @@ -81,6 +81,16 @@ static <U, V> CacheMap<U, CompletableFuture<V>> simpleMap() {
*/
CacheMap<U, V> set(U key, V value);

/**
* Creates a new cache entry with the specified key and error, or updates the error if the key already exists.
*
* @param key the key to cache
* @param error the error to cache
*
* @return the cache map for fluent coding
*/
CacheMap<U, V> set(U key, Throwable error);

/**
* Deletes the entry with the specified key from the cache map, if it exists.
*
Expand Down
26 changes: 14 additions & 12 deletions src/main/java/org/dataloader/DataLoader.java
Original file line number Diff line number Diff line change
Expand Up @@ -58,7 +58,7 @@
public class DataLoader<K, V> {

private final DataLoaderHelper<K, V> helper;
private final CacheMap<Object, CompletableFuture<V>> futureCache;
private final CacheMap<Object, V> valueCache;
private final StatisticsCollector stats;

/**
Expand Down Expand Up @@ -332,16 +332,16 @@ public DataLoader(BatchLoader<K, V> batchLoadFunction, DataLoaderOptions options

private DataLoader(Object batchLoadFunction, DataLoaderOptions options) {
DataLoaderOptions loaderOptions = options == null ? new DataLoaderOptions() : options;
this.futureCache = determineCacheMap(loaderOptions);
this.valueCache = determineCacheMap(loaderOptions);
// order of keys matter in data loader
this.stats = nonNull(loaderOptions.getStatisticsCollector());

this.helper = new DataLoaderHelper<>(this, batchLoadFunction, loaderOptions, this.futureCache, this.stats);
this.helper = new DataLoaderHelper<>(this, batchLoadFunction, loaderOptions, this.valueCache, this.stats);
}

@SuppressWarnings("unchecked")
private CacheMap<Object, CompletableFuture<V>> determineCacheMap(DataLoaderOptions loaderOptions) {
return loaderOptions.cacheMap().isPresent() ? (CacheMap<Object, CompletableFuture<V>>) loaderOptions.cacheMap().get() : CacheMap.simpleMap();
private CacheMap<Object, V> determineCacheMap(DataLoaderOptions loaderOptions) {
return loaderOptions.cacheMap().isPresent() ? (CacheMap<Object, V>) loaderOptions.cacheMap().get() : CacheMap.simpleMap();
}

/**
Expand Down Expand Up @@ -521,7 +521,7 @@ public int dispatchDepth() {
public DataLoader<K, V> clear(K key) {
Object cacheKey = getCacheKey(key);
synchronized (this) {
futureCache.delete(cacheKey);
valueCache.delete(cacheKey);
}
return this;
}
Expand All @@ -533,7 +533,7 @@ public DataLoader<K, V> clear(K key) {
*/
public DataLoader<K, V> clearAll() {
synchronized (this) {
futureCache.clear();
valueCache.clear();
}
return this;
}
Expand All @@ -548,8 +548,8 @@ public DataLoader<K, V> clearAll() {
public DataLoader<K, V> prime(K key, V value) {
Object cacheKey = getCacheKey(key);
synchronized (this) {
if (!futureCache.containsKey(cacheKey)) {
futureCache.set(cacheKey, CompletableFuture.completedFuture(value));
if (!valueCache.containsKey(cacheKey)) {
valueCache.set(cacheKey, value);
}
}
return this;
Expand All @@ -562,10 +562,12 @@ public DataLoader<K, V> prime(K key, V value) {
* @param error the exception to prime instead of a value
* @return the data loader for fluent coding
*/
public DataLoader<K, V> prime(K key, Exception error) {
public DataLoader<K, V> prime(K key, Throwable error) {
Object cacheKey = getCacheKey(key);
if (!futureCache.containsKey(cacheKey)) {
futureCache.set(cacheKey, CompletableFutureKit.failedFuture(error));
synchronized (this) {
if (!valueCache.containsKey(cacheKey)) {
valueCache.set(cacheKey, error);
}
Comment on lines +568 to +570
Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The more I look at this, the more I'm not sure if it's right. Initially, I changed the CacheMap#get function to return a Try to allow priming the cache with an error, but I'm wondering if that is right. Maybe we just keep an independent cache of errors to support priming failures, but not let the CacheMap worry about that. That makes a lot more sense to me than caching errors, which are often times much more temporary than a cache would be.

}
return this;
}
Expand Down
29 changes: 18 additions & 11 deletions src/main/java/org/dataloader/DataLoaderHelper.java
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
package org.dataloader;

import java.util.concurrent.atomic.AtomicReference;
import org.dataloader.impl.CompletableFutureKit;
import org.dataloader.stats.StatisticsCollector;

Expand Down Expand Up @@ -58,15 +59,15 @@ Object getCallContext() {
private final DataLoader<K, V> dataLoader;
private final Object batchLoadFunction;
private final DataLoaderOptions loaderOptions;
private final CacheMap<Object, CompletableFuture<V>> futureCache;
private final CacheMap<Object, V> valueCache;
private final List<LoaderQueueEntry<K, CompletableFuture<V>>> loaderQueue;
private final StatisticsCollector stats;

DataLoaderHelper(DataLoader<K, V> dataLoader, Object batchLoadFunction, DataLoaderOptions loaderOptions, CacheMap<Object, CompletableFuture<V>> futureCache, StatisticsCollector stats) {
DataLoaderHelper(DataLoader<K, V> dataLoader, Object batchLoadFunction, DataLoaderOptions loaderOptions, CacheMap<Object, V> valueCache, StatisticsCollector stats) {
this.dataLoader = dataLoader;
this.batchLoadFunction = batchLoadFunction;
this.loaderOptions = loaderOptions;
this.futureCache = futureCache;
this.valueCache = valueCache;
this.loaderQueue = new ArrayList<>();
this.stats = stats;
}
Expand All @@ -76,9 +77,9 @@ Optional<CompletableFuture<V>> getIfPresent(K key) {
boolean cachingEnabled = loaderOptions.cachingEnabled();
if (cachingEnabled) {
Object cacheKey = getCacheKey(nonNull(key));
if (futureCache.containsKey(cacheKey)) {
if (valueCache.containsKey(cacheKey)) {
stats.incrementCacheHitCount();
return Optional.of(futureCache.get(cacheKey));
return Optional.of(CompletableFutureKit.fromTry(valueCache.get(key)));
}
}
}
Expand All @@ -104,20 +105,20 @@ CompletableFuture<V> load(K key, Object loadContext) {
boolean batchingEnabled = loaderOptions.batchingEnabled();
boolean cachingEnabled = loaderOptions.cachingEnabled();

Object cacheKey = null;
final AtomicReference<Object> cacheKey = new AtomicReference<>(null);
if (cachingEnabled) {
if (loadContext == null) {
cacheKey = getCacheKey(key);
cacheKey.set(getCacheKey(key));
} else {
cacheKey = getCacheKeyWithContext(key, loadContext);
cacheKey.set(getCacheKeyWithContext(key, loadContext));
}
}
stats.incrementLoadCount();

if (cachingEnabled) {
if (futureCache.containsKey(cacheKey)) {
if (valueCache.containsKey(cacheKey.get())) {
stats.incrementCacheHitCount();
return futureCache.get(cacheKey);
return CompletableFutureKit.fromTry(valueCache.get(cacheKey.get()));
}
}

Expand All @@ -130,7 +131,13 @@ CompletableFuture<V> load(K key, Object loadContext) {
future = invokeLoaderImmediately(key, loadContext);
}
if (cachingEnabled) {
futureCache.set(cacheKey, future);
future.whenComplete((value, error) -> {
if (error != null) {
valueCache.set(cacheKey.get(), error);
} else {
valueCache.set(cacheKey.get(), value);
}
});
}
return future;
}
Expand Down
11 changes: 10 additions & 1 deletion src/main/java/org/dataloader/impl/CompletableFutureKit.java
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@
import java.util.List;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ExecutionException;
import org.dataloader.Try;

import static java.util.stream.Collectors.toList;

Expand All @@ -14,7 +15,7 @@
@Internal
public class CompletableFutureKit {

public static <V> CompletableFuture<V> failedFuture(Exception e) {
public static <V> CompletableFuture<V> failedFuture(Throwable e) {
CompletableFuture<V> future = new CompletableFuture<>();
future.completeExceptionally(e);
return future;
Expand Down Expand Up @@ -54,4 +55,12 @@ public static <T> CompletableFuture<List<T>> allOf(List<CompletableFuture<T>> cf
.collect(toList())
);
}

public static <T> CompletableFuture<T> fromTry(Try<T> tryResult) {
if (tryResult.isFailure()) {
return failedFuture(tryResult.getThrowable());
} else {
return CompletableFuture.completedFuture(tryResult.get());
}
}
}
38 changes: 30 additions & 8 deletions src/main/java/org/dataloader/impl/DefaultCacheMap.java
Original file line number Diff line number Diff line change
Expand Up @@ -16,14 +16,14 @@

package org.dataloader.impl;

import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;
import org.dataloader.CacheMap;
import org.dataloader.Internal;

import java.util.HashMap;
import java.util.Map;
import org.dataloader.Try;

/**
* Default implementation of {@link CacheMap} that is based on a regular {@link java.util.LinkedHashMap}.
* Default implementation of {@link CacheMap} that is based on a {@link ConcurrentHashMap}.
*
* @param <U> type parameter indicating the type of the cache keys
* @param <V> type parameter indicating the type of the data that is cached
Expand All @@ -34,28 +34,37 @@
public class DefaultCacheMap<U, V> implements CacheMap<U, V> {

private final Map<U, V> cache;
private final Map<U, Throwable> errorCache;

/**
* Default constructor
*/
public DefaultCacheMap() {
cache = new HashMap<>();
cache = new ConcurrentHashMap<>();
errorCache = new ConcurrentHashMap<>();
}

/**
* {@inheritDoc}
*/
@Override
public boolean containsKey(U key) {
return cache.containsKey(key);
return cache.containsKey(key) || errorCache.containsKey(key);
}

/**
* {@inheritDoc}
*/
@Override
public V get(U key) {
return cache.get(key);
public Try<V> get(U key) {
final Throwable error = errorCache.get(key);
final V value = cache.get(key);

if (error != null) {
return Try.failed(error);
} else {
return Try.succeeded(value);
}
}

/**
Expand All @@ -64,6 +73,17 @@ public V get(U key) {
@Override
public CacheMap<U, V> set(U key, V value) {
cache.put(key, value);
errorCache.remove(key);
return this;
}

/**
* {@inheritDoc}
*/
@Override
public CacheMap<U, V> set(U key, Throwable error) {
cache.remove(key);
errorCache.put(key, error);
return this;
}

Expand All @@ -73,6 +93,7 @@ public CacheMap<U, V> set(U key, V value) {
@Override
public CacheMap<U, V> delete(U key) {
cache.remove(key);
errorCache.remove(key);
return this;
}

Expand All @@ -82,6 +103,7 @@ public CacheMap<U, V> delete(U key) {
@Override
public CacheMap<U, V> clear() {
cache.clear();
errorCache.clear();
return this;
}
}
7 changes: 6 additions & 1 deletion src/test/java/ReadmeExamples.java
Original file line number Diff line number Diff line change
Expand Up @@ -213,7 +213,7 @@ public boolean containsKey(Object key) {
}

@Override
public Object get(Object key) {
public Try get(Object key) {
return null;
}

Expand All @@ -222,6 +222,11 @@ public CacheMap set(Object key, Object value) {
return null;
}

@Override
public CacheMap set(Object key, Throwable error) {
return null;
}

@Override
public CacheMap delete(Object key) {
return null;
Expand Down
10 changes: 8 additions & 2 deletions src/test/java/org/dataloader/CustomCacheMap.java
Original file line number Diff line number Diff line change
Expand Up @@ -17,8 +17,8 @@ public boolean containsKey(String key) {
}

@Override
public Object get(String key) {
return stash.get(key);
public Try<Object> get(String key) {
return Try.succeeded(stash.get(key));
}

@Override
Expand All @@ -27,6 +27,12 @@ public CacheMap<String, Object> set(String key, Object value) {
return this;
}

@Override
public CacheMap<String, Object> set(String key, Throwable error) {
// Don't cache errors in this implementation
return this;
}

@Override
public CacheMap<String, Object> delete(String key) {
stash.remove(key);
Expand Down