Skip to content

This adds support for calling dispatch if the ValueCache takes timein get call #91

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Closed
wants to merge 2 commits into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
72 changes: 49 additions & 23 deletions src/main/java/org/dataloader/DataLoaderHelper.java
Original file line number Diff line number Diff line change
Expand Up @@ -134,7 +134,8 @@ CompletableFuture<V> load(K key, Object loadContext) {
if (cachingEnabled) {
return loadFromCache(key, loadContext, batchingEnabled);
} else {
return queueOrInvokeLoader(key, loadContext, batchingEnabled);
CompletableFuture<V> future = new CompletableFuture<>();
return queueOrInvokeLoader(key, loadContext, batchingEnabled, future);
}
}
}
Expand Down Expand Up @@ -296,49 +297,74 @@ private CompletableFuture<V> loadFromCache(K key, Object loadContext, boolean ba
We haven't been asked for this key yet. We want to do one of two things:

1. Check if our cache store has it. If so:
a. Get the value from the cache store
b. Add a recovery case so we queue the load if fetching from cache store fails
a. Get the value from the cache store (this can take non-zero time)
b. Add a recovery case, so we queue the load if fetching from cache store fails
c. Put that future in our futureCache to hit the early return next time
d. Return the resilient future
2. If not in value cache:
a. queue or invoke the load
b. Add a success handler to store the result in the cache store
c. Return the result
*/
final CompletableFuture<V> future = new CompletableFuture<>();
final CompletableFuture<V> loadCallFuture = new CompletableFuture<>();

valueCache.get(cacheKey).whenComplete((cachedValue, getCallEx) -> {
if (getCallEx == null) {
future.complete(cachedValue);
CompletableFuture<V> cacheLookupCF = valueCache.get(cacheKey);
boolean cachedLookupCompletedImmediately = cacheLookupCF.isDone();

cacheLookupCF.whenComplete((cachedValue, cacheException) -> {
if (cacheException == null) {
loadCallFuture.complete(cachedValue);
} else {
CompletableFuture<V> loaderCF;
synchronized (dataLoader) {
queueOrInvokeLoader(key, loadContext, batchingEnabled)
.whenComplete(setValueIntoCacheAndCompleteFuture(cacheKey, future));
loaderCF = queueOrInvokeLoader(key, loadContext, batchingEnabled, loadCallFuture);
loaderCF.whenComplete(setValueIntoValueCacheAndCompleteFuture(cacheKey, loadCallFuture));
}
//
// is possible that if the cache lookup step took some time to execute
// (e.g. an async network lookup to REDIS etc...) then it's possible that this
// load call has already returned and a dispatch call has been made, and hence this code
// is running after the dispatch was made - so we dispatch to catch up because
// it's likely to hang if we do not. We might dispatch too early, but we will not
// hang because of an async cache lookup
//
if (!cachedLookupCompletedImmediately && !loaderCF.isDone()) {
if (loaderOptions.getValueCacheOptions().isDispatchOnCacheMiss()) {
dispatch();
}
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is the fix - this will dispatch if the cache get miss took time to complete before we enqueued the batch loader call.

The danger here is that it interferes with other batch calls on this loader (premature batch calling) but the alternative is that it will hang.

This balances that out

The other alternative is that people use ScheduledDataLoaderRegistry to periodically tick and dispatch on schedule

}
}
});

futureCache.set(cacheKey, future);

return future;
futureCache.set(cacheKey, loadCallFuture);
return loadCallFuture;
}

private BiConsumer<V, Throwable> setValueIntoCacheAndCompleteFuture(Object cacheKey, CompletableFuture<V> future) {
return (result, loadCallEx) -> {
if (loadCallEx == null) {
valueCache.set(cacheKey, result)
.whenComplete((v, setCallExIgnored) -> future.complete(result));
private BiConsumer<V, Throwable> setValueIntoValueCacheAndCompleteFuture(Object cacheKey, CompletableFuture<V> loadCallFuture) {
return (result, loadCallException) -> {
if (loadCallException == null) {
//
// we have completed our load call, and we should try to cache the value
// however we don't wait on the caching to complete before completing the load call
// this way a network cache call (say a REDIS put) does not have to be completed in order
// for the calling code to get a value. There is an option that controls
// which is off by default to make the code faster.
//
CompletableFuture<V> valueCacheSetCF = valueCache.set(cacheKey, result);
if (loaderOptions.getValueCacheOptions().isCompleteValueAfterCacheSet()) {
valueCacheSetCF.whenComplete((v, setCallExceptionIgnored) -> loadCallFuture.complete(result));
} else {
loadCallFuture.complete(result);
}
} else {
future.completeExceptionally(loadCallEx);
loadCallFuture.completeExceptionally(loadCallException);
}
};
}

private CompletableFuture<V> queueOrInvokeLoader(K key, Object loadContext, boolean batchingEnabled) {
private CompletableFuture<V> queueOrInvokeLoader(K key, Object loadContext, boolean batchingEnabled, CompletableFuture<V> loadCallFuture) {
if (batchingEnabled) {
CompletableFuture<V> future = new CompletableFuture<>();
loaderQueue.add(new LoaderQueueEntry<>(key, future, loadContext));
return future;
loaderQueue.add(new LoaderQueueEntry<>(key, loadCallFuture, loadContext));
return loadCallFuture;
} else {
stats.incrementBatchLoadCountBy(1);
// immediate execution of batch function
Expand Down
35 changes: 29 additions & 6 deletions src/main/java/org/dataloader/DataLoaderOptions.java
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@
package org.dataloader;

import org.dataloader.annotations.PublicApi;
import org.dataloader.impl.Assertions;
import org.dataloader.stats.SimpleStatisticsCollector;
import org.dataloader.stats.StatisticsCollector;

Expand All @@ -39,11 +40,12 @@ public class DataLoaderOptions {
private boolean cachingEnabled;
private boolean cachingExceptionsEnabled;
private CacheKey<?> cacheKeyFunction;
private CacheMap<?,?> cacheMap;
private ValueCache<?,?> valueCache;
private CacheMap<?, ?> cacheMap;
private ValueCache<?, ?> valueCache;
private int maxBatchSize;
private Supplier<StatisticsCollector> statisticsCollector;
private BatchLoaderContextProvider environmentProvider;
private ValueCacheOptions valueCacheOptions;

/**
* Creates a new data loader options with default settings.
Expand All @@ -55,6 +57,7 @@ public DataLoaderOptions() {
maxBatchSize = -1;
statisticsCollector = SimpleStatisticsCollector::new;
environmentProvider = NULL_PROVIDER;
valueCacheOptions = ValueCacheOptions.newOptions();
}

/**
Expand All @@ -72,6 +75,7 @@ public DataLoaderOptions(DataLoaderOptions other) {
this.maxBatchSize = other.maxBatchSize;
this.statisticsCollector = other.statisticsCollector;
this.environmentProvider = other.environmentProvider;
this.valueCacheOptions = other.valueCacheOptions;
}

/**
Expand Down Expand Up @@ -179,7 +183,7 @@ public DataLoaderOptions setCacheKeyFunction(CacheKey<?> cacheKeyFunction) {
*
* @return an optional with the cache map instance, or empty
*/
public Optional<CacheMap<?,?>> cacheMap() {
public Optional<CacheMap<?, ?>> cacheMap() {
return Optional.ofNullable(cacheMap);
}

Expand All @@ -190,7 +194,7 @@ public Optional<CacheMap<?,?>> cacheMap() {
*
* @return the data loader options for fluent coding
*/
public DataLoaderOptions setCacheMap(CacheMap<?,?> cacheMap) {
public DataLoaderOptions setCacheMap(CacheMap<?, ?> cacheMap) {
this.cacheMap = cacheMap;
return this;
}
Expand Down Expand Up @@ -265,7 +269,7 @@ public DataLoaderOptions setBatchLoaderContextProvider(BatchLoaderContextProvide
*
* @return an optional with the cache store instance, or empty
*/
public Optional<ValueCache<?,?>> valueCache() {
public Optional<ValueCache<?, ?>> valueCache() {
return Optional.ofNullable(valueCache);
}

Expand All @@ -276,8 +280,27 @@ public Optional<ValueCache<?,?>> valueCache() {
*
* @return the data loader options for fluent coding
*/
public DataLoaderOptions setValueCache(ValueCache<?,?> valueCache) {
public DataLoaderOptions setValueCache(ValueCache<?, ?> valueCache) {
this.valueCache = valueCache;
return this;
}

/**
* @return the {@link ValueCacheOptions} that control how the {@link ValueCache} will be used
*/
public ValueCacheOptions getValueCacheOptions() {
return valueCacheOptions;
}

/**
* Sets the {@link ValueCacheOptions} that control how the {@link ValueCache} will be used
*
* @param valueCacheOptions the value cache options
*
* @return the data loader options for fluent coding
*/
public DataLoaderOptions setValueCacheOptions(ValueCacheOptions valueCacheOptions) {
this.valueCacheOptions = Assertions.nonNull(valueCacheOptions);
return this;
}
}
37 changes: 26 additions & 11 deletions src/main/java/org/dataloader/ValueCache.java
Original file line number Diff line number Diff line change
Expand Up @@ -7,33 +7,36 @@

/**
* The {@link ValueCache} is used by data loaders that use caching and want a long-lived or external cache
* of values. The {@link ValueCache} is used as a place to cache values when they come back from
* of values. The {@link ValueCache} is used as a place to cache values when they come back from an async
* cache store.
* <p>
* It differs from {@link CacheMap} which is in fact a cache of promises to values aka {@link CompletableFuture}&lt;V&gt; and it rather suited
* to be a wrapper of a long lived or external value cache. {@link CompletableFuture}s cant be easily placed in an external cache
* outside the JVM say, hence the need for the {@link ValueCache}.
* It differs from {@link CacheMap} which is in fact a cache of promised values aka {@link CompletableFuture}&lt;V&gt;'s.
* <p>
* {@link ValueCache} is more suited to be a wrapper of a long-lived or externallly cached values. {@link CompletableFuture}s cant
* be easily placed in an external cache outside the JVM say, hence the need for the {@link ValueCache}.
* <p>
* {@link DataLoader}s use a two stage cache strategy if caching is enabled. If the {@link CacheMap} already has the promise to a value
* that is used. If not then the {@link ValueCache} is asked for a value, if it has one then that is returned (and cached as a promise in the {@link CacheMap}.
* <p>
* If there is no value then the key is queued and loaded via the {@link BatchLoader} calls. The returned values will then be stored in
* the {@link ValueCache} and the promises to those values are also stored in the {@link CacheMap}.
* <p>
* The default implementation is a no-op store which replies with the key always missing and doesn't
* store any actual results. This is to avoid duplicating the stored data between the {@link CacheMap}
* out of the box.
* <p>
* The API signature uses completable futures because the backing implementation MAY be a remote external cache
* and hence exceptions may happen in retrieving values.
* The API signature uses {@link CompletableFuture}s because the backing implementation MAY be a remote external cache
* and hence exceptions may happen in retrieving values and they may take time to complete.
*
* @param <K> the type of cache keys
* @param <V> the type of cache values
*
* @author <a href="https://github.com/craig-day">Craig Day</a>
* @author <a href="https://github.com/bbakerman/">Brad Baker</a>
*/
@PublicSpi
public interface ValueCache<K, V> {


/**
* Creates a new value cache, using the default no-op implementation.
*
Expand All @@ -48,9 +51,12 @@ static <K, V> ValueCache<K, V> defaultValueCache() {
}

/**
* Gets the specified key from the store. if the key si not present, then the implementation MUST return an exceptionally completed future
* and not null because null is a valid cacheable value. Any exception is will cause {@link DataLoader} to load the key via batch loading
* Gets the specified key from the value cache. If the key is not present, then the implementation MUST return an exceptionally completed future
* and not null because null is a valid cacheable value. An exceptionally completed future will cause {@link DataLoader} to load the key via batch loading
* instead.
* <p>
* NOTE: Your implementation MUST not throw exceptions, rather it should return a CompletableFuture that has completed exceptionally. Failure
* to do this may cause the {@link DataLoader} code to not run properly.
*
* @param key the key to retrieve
*
Expand All @@ -61,6 +67,9 @@ static <K, V> ValueCache<K, V> defaultValueCache() {

/**
* Stores the value with the specified key, or updates it if the key already exists.
* <p>
* NOTE: Your implementation MUST not throw exceptions, rather it should return a CompletableFuture that has completed exceptionally. Failure
* to do this may cause the {@link DataLoader} code to not run properly.
*
* @param key the key to store
* @param value the value to store
Expand All @@ -70,7 +79,10 @@ static <K, V> ValueCache<K, V> defaultValueCache() {
CompletableFuture<V> set(K key, V value);

/**
* Deletes the entry with the specified key from the store, if it exists.
* Deletes the entry with the specified key from the value cache, if it exists.
* <p>
* NOTE: Your implementation MUST not throw exceptions, rather it should return a CompletableFuture that has completed exceptionally. Failure
* to do this may cause the {@link DataLoader} code to not run properly.
*
* @param key the key to delete
*
Expand All @@ -79,7 +91,10 @@ static <K, V> ValueCache<K, V> defaultValueCache() {
CompletableFuture<Void> delete(K key);

/**
* Clears all entries from the store.
* Clears all entries from the value cache.
* <p>
* NOTE: Your implementation MUST not throw exceptions, rather it should return a CompletableFuture that has completed exceptionally. Failure
* to do this may cause the {@link DataLoader} code to not run properly.
*
* @return a void future for error handling and fluent composition
*/
Expand Down
62 changes: 62 additions & 0 deletions src/main/java/org/dataloader/ValueCacheOptions.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,62 @@
package org.dataloader;

/**
* Options that control how the {@link ValueCache} is used by {@link DataLoader}
*
* @author <a href="https://github.com/bbakerman/">Brad Baker</a>
*/
public class ValueCacheOptions {
private final boolean dispatchOnCacheMiss;
private final boolean completeValueAfterCacheSet;

private ValueCacheOptions() {
this.dispatchOnCacheMiss = true;
this.completeValueAfterCacheSet = false;
}

private ValueCacheOptions(boolean dispatchOnCacheMiss, boolean completeValueAfterCacheSet) {
this.dispatchOnCacheMiss = dispatchOnCacheMiss;
this.completeValueAfterCacheSet = completeValueAfterCacheSet;
}

public static ValueCacheOptions newOptions() {
return new ValueCacheOptions();
}

/**
* This controls whether the {@link DataLoader} will called {@link DataLoader#dispatch()} if a
* {@link ValueCache#get(Object)} call misses. In an async world this could take non zero time
* to complete and hence previous dispatch calls may have already completed.
*
* This is true by default.
*
* @return true if a {@link DataLoader#dispatch()} call will be made on an async {@link ValueCache} miss
*/
public boolean isDispatchOnCacheMiss() {
return dispatchOnCacheMiss;
}

/**
* This controls whether the {@link DataLoader} will wait for the {@link ValueCache#set(Object, Object)} call
* to complete before it completes the returned value. By default this is false and hence
* the {@link ValueCache#set(Object, Object)} call may complete some time AFTER the data loader
* value has been returned.
*
* This is false by default, for performance reasons.
*
* @return true the {@link DataLoader} will wait for the {@link ValueCache#set(Object, Object)} call to complete before
* it completes the returned value.
*/
public boolean isCompleteValueAfterCacheSet() {
return completeValueAfterCacheSet;
}

public ValueCacheOptions setDispatchOnCacheMiss(boolean flag) {
return new ValueCacheOptions(flag, this.completeValueAfterCacheSet);
}

public ValueCacheOptions setCompleteValueAfterCacheSet(boolean flag) {
return new ValueCacheOptions(this.dispatchOnCacheMiss, flag);
}

}
Loading