Skip to content

Commit a74031a

Browse files
committed
This attacks the problem os async ValueCache lookups in the batch load function and not the load method
1 parent ae0a29e commit a74031a

File tree

11 files changed

+401
-172
lines changed

11 files changed

+401
-172
lines changed

src/main/java/org/dataloader/DataLoader.java

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -67,7 +67,7 @@ public class DataLoader<K, V> {
6767
private final DataLoaderHelper<K, V> helper;
6868
private final StatisticsCollector stats;
6969
private final CacheMap<Object, V> futureCache;
70-
private final ValueCache<Object, V> valueCache;
70+
private final ValueCache<K, V> valueCache;
7171

7272
/**
7373
* Creates new DataLoader with the specified batch loader function and default options
@@ -430,8 +430,8 @@ private CacheMap<Object, V> determineFutureCache(DataLoaderOptions loaderOptions
430430
}
431431

432432
@SuppressWarnings("unchecked")
433-
private ValueCache<Object, V> determineValueCache(DataLoaderOptions loaderOptions) {
434-
return (ValueCache<Object, V>) loaderOptions.valueCache().orElseGet(ValueCache::defaultValueCache);
433+
private ValueCache<K, V> determineValueCache(DataLoaderOptions loaderOptions) {
434+
return (ValueCache<K, V>) loaderOptions.valueCache().orElseGet(ValueCache::defaultValueCache);
435435
}
436436

437437
/**

src/main/java/org/dataloader/DataLoaderHelper.java

Lines changed: 112 additions & 92 deletions
Large diffs are not rendered by default.

src/main/java/org/dataloader/Try.java

Lines changed: 43 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -4,6 +4,7 @@
44

55
import java.util.Optional;
66
import java.util.concurrent.Callable;
7+
import java.util.concurrent.CompletableFuture;
78
import java.util.concurrent.CompletionStage;
89
import java.util.function.Consumer;
910
import java.util.function.Function;
@@ -26,13 +27,22 @@
2627
*/
2728
@PublicApi
2829
public class Try<V> {
29-
private static Throwable NIL = new Throwable() {
30+
private final static Object NIL = new Object() {
31+
};
32+
33+
private final static Throwable NIL_THROWABLE = new RuntimeException() {
34+
@Override
35+
public String getMessage() {
36+
return "failure";
37+
}
38+
3039
@Override
3140
public synchronized Throwable fillInStackTrace() {
3241
return this;
3342
}
3443
};
3544

45+
3646
private final Throwable throwable;
3747
private final V value;
3848

@@ -48,6 +58,12 @@ private Try(V value) {
4858
this.throwable = null;
4959
}
5060

61+
62+
@Override
63+
public String toString() {
64+
return isSuccess() ? "success" : "failure";
65+
}
66+
5167
/**
5268
* Creates a Try that has succeeded with the provided value
5369
*
@@ -72,6 +88,18 @@ public static <V> Try<V> failed(Throwable throwable) {
7288
return new Try<>(throwable);
7389
}
7490

91+
/**
92+
* This returns a Try that has always failed with an consistent exception. Use this when
93+
* yiu dont care about the exception but only that the Try failed.
94+
*
95+
* @param <V> the type of value
96+
*
97+
* @return a Try that has failed
98+
*/
99+
public static <V> Try<V> alwaysFailed() {
100+
return Try.failed(NIL_THROWABLE);
101+
}
102+
75103
/**
76104
* Calls the callable and if it returns a value, the Try is successful with that value or if throws
77105
* and exception the Try captures that
@@ -96,7 +124,7 @@ public static <V> Try<V> tryCall(Callable<V> callable) {
96124
* @param completionStage the completion stage that will complete
97125
* @param <V> the value type
98126
*
99-
* @return a Try which is the result of the call
127+
* @return a CompletionStage Try which is the result of the call
100128
*/
101129
public static <V> CompletionStage<Try<V>> tryStage(CompletionStage<V> completionStage) {
102130
return completionStage.handle((value, throwable) -> {
@@ -107,6 +135,19 @@ public static <V> CompletionStage<Try<V>> tryStage(CompletionStage<V> completion
107135
});
108136
}
109137

138+
/**
139+
* Creates a CompletableFuture that, when it completes, will capture into a Try whether the given completionStage
140+
* was successful or not
141+
*
142+
* @param completionStage the completion stage that will complete
143+
* @param <V> the value type
144+
*
145+
* @return a CompletableFuture Try which is the result of the call
146+
*/
147+
public static <V> CompletableFuture<Try<V>> tryFuture(CompletionStage<V> completionStage) {
148+
return tryStage(completionStage).toCompletableFuture();
149+
}
150+
110151
/**
111152
* @return the successful value of this try
112153
*

src/main/java/org/dataloader/ValueCache.java

Lines changed: 58 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -1,8 +1,11 @@
11
package org.dataloader;
22

33
import org.dataloader.annotations.PublicSpi;
4+
import org.dataloader.impl.CompletableFutureKit;
45
import org.dataloader.impl.NoOpValueCache;
56

7+
import java.util.ArrayList;
8+
import java.util.List;
69
import java.util.concurrent.CompletableFuture;
710

811
/**
@@ -55,8 +58,6 @@ static <K, V> ValueCache<K, V> defaultValueCache() {
5558
* and not null because null is a valid cacheable value. An exceptionally completed future will cause {@link DataLoader} to load the key via batch loading
5659
* instead.
5760
* <p>
58-
* NOTE: Your implementation MUST not throw exceptions, rather it should return a CompletableFuture that has completed exceptionally. Failure
59-
* to do this may cause the {@link DataLoader} code to not run properly.
6061
*
6162
* @param key the key to retrieve
6263
*
@@ -66,10 +67,41 @@ static <K, V> ValueCache<K, V> defaultValueCache() {
6667
CompletableFuture<V> get(K key);
6768

6869
/**
69-
* Stores the value with the specified key, or updates it if the key already exists.
70+
* Gets the specified key from the value cache. If the key is not present, then the returned {@link Try} will be a failed one
71+
* other wise it has the cached value. This is preferred over the {@link #get(Object)} method.
7072
* <p>
71-
* NOTE: Your implementation MUST not throw exceptions, rather it should return a CompletableFuture that has completed exceptionally. Failure
72-
* to do this may cause the {@link DataLoader} code to not run properly.
73+
*
74+
* @param key the key to retrieve
75+
*
76+
* @return a future containing the {@link Try} cached value (which maybe null) or a failed {@link Try} if the key does
77+
* not exist in the cache.
78+
*/
79+
default CompletableFuture<Try<V>> getValue(K key) {
80+
return Try.tryFuture(get(key));
81+
}
82+
83+
/**
84+
* Gets the specified keys from the value cache, in a batch call. If your underlying cache cant do batch caching retrieval
85+
* then do not implement this method and it will delegate back to {@link #getValue(Object)} for you
86+
* <p>
87+
* You MUST return a List that is the same size as the keys passed in. The code will assert if you do not.
88+
*
89+
* @param keys the list of keys to get cached values for.
90+
*
91+
* @return a future containing a list of {@link Try} cached values (which maybe {@link Try#succeeded(Object)} or a failed {@link Try}
92+
* per key if they do not exist in the cache.
93+
*/
94+
default CompletableFuture<List<Try<V>>> getValues(List<K> keys) {
95+
List<CompletableFuture<Try<V>>> cacheLookups = new ArrayList<>();
96+
for (K key : keys) {
97+
CompletableFuture<Try<V>> cacheTry = getValue(key);
98+
cacheLookups.add(cacheTry);
99+
}
100+
return CompletableFutureKit.allOf(cacheLookups);
101+
}
102+
103+
/**
104+
* Stores the value with the specified key, or updates it if the key already exists.
73105
*
74106
* @param key the key to store
75107
* @param value the value to store
@@ -78,6 +110,27 @@ static <K, V> ValueCache<K, V> defaultValueCache() {
78110
*/
79111
CompletableFuture<V> set(K key, V value);
80112

113+
/**
114+
* Stores the value with the specified keys, or updates it if the keys if they already exist. If your underlying cache cant do batch caching setting
115+
* then do not implement this method and it will delegate back to {@link #set(Object, Object)} for you
116+
*
117+
* @param keys the keys to store
118+
* @param values the values to store
119+
*
120+
* @return a future containing the stored values for fluent composition
121+
*/
122+
default CompletableFuture<List<V>> setValues(List<K> keys, List<V> values) {
123+
List<CompletableFuture<V>> cacheSets = new ArrayList<>();
124+
for (int i = 0; i < keys.size(); i++) {
125+
K k = keys.get(i);
126+
V v = values.get(i);
127+
CompletableFuture<V> setCall = set(k, v);
128+
CompletableFuture<V> set = Try.tryFuture(setCall).thenApply(ignored -> v);
129+
cacheSets.add(set);
130+
}
131+
return CompletableFutureKit.allOf(cacheSets);
132+
}
133+
81134
/**
82135
* Deletes the entry with the specified key from the value cache, if it exists.
83136
* <p>

src/main/java/org/dataloader/ValueCacheOptions.java

Lines changed: 2 additions & 22 deletions
Original file line numberDiff line numberDiff line change
@@ -6,36 +6,20 @@
66
* @author <a href="https://github.com/bbakerman/">Brad Baker</a>
77
*/
88
public class ValueCacheOptions {
9-
private final boolean dispatchOnCacheMiss;
109
private final boolean completeValueAfterCacheSet;
1110

1211
private ValueCacheOptions() {
13-
this.dispatchOnCacheMiss = true;
1412
this.completeValueAfterCacheSet = false;
1513
}
1614

17-
private ValueCacheOptions(boolean dispatchOnCacheMiss, boolean completeValueAfterCacheSet) {
18-
this.dispatchOnCacheMiss = dispatchOnCacheMiss;
15+
private ValueCacheOptions(boolean completeValueAfterCacheSet) {
1916
this.completeValueAfterCacheSet = completeValueAfterCacheSet;
2017
}
2118

2219
public static ValueCacheOptions newOptions() {
2320
return new ValueCacheOptions();
2421
}
2522

26-
/**
27-
* This controls whether the {@link DataLoader} will called {@link DataLoader#dispatch()} if a
28-
* {@link ValueCache#get(Object)} call misses. In an async world this could take non zero time
29-
* to complete and hence previous dispatch calls may have already completed.
30-
*
31-
* This is true by default.
32-
*
33-
* @return true if a {@link DataLoader#dispatch()} call will be made on an async {@link ValueCache} miss
34-
*/
35-
public boolean isDispatchOnCacheMiss() {
36-
return dispatchOnCacheMiss;
37-
}
38-
3923
/**
4024
* This controls whether the {@link DataLoader} will wait for the {@link ValueCache#set(Object, Object)} call
4125
* to complete before it completes the returned value. By default this is false and hence
@@ -51,12 +35,8 @@ public boolean isCompleteValueAfterCacheSet() {
5135
return completeValueAfterCacheSet;
5236
}
5337

54-
public ValueCacheOptions setDispatchOnCacheMiss(boolean flag) {
55-
return new ValueCacheOptions(flag, this.completeValueAfterCacheSet);
56-
}
57-
5838
public ValueCacheOptions setCompleteValueAfterCacheSet(boolean flag) {
59-
return new ValueCacheOptions(this.dispatchOnCacheMiss, flag);
39+
return new ValueCacheOptions(flag);
6040
}
6141

6242
}

src/main/java/org/dataloader/impl/Assertions.java

Lines changed: 9 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -2,28 +2,26 @@
22

33
import org.dataloader.annotations.Internal;
44

5-
import java.util.Objects;
5+
import java.util.function.Supplier;
66

77
@Internal
88
public class Assertions {
99

10-
public static void assertState(boolean state, String message) {
10+
public static void assertState(boolean state, Supplier<String> message) {
1111
if (!state) {
12-
throw new AssertionException(message);
12+
throw new DataLoaderAssertionException(message.get());
1313
}
1414
}
1515

1616
public static <T> T nonNull(T t) {
17-
return Objects.requireNonNull(t, "nonNull object required");
17+
return nonNull(t, () -> "nonNull object required");
1818
}
1919

20-
public static <T> T nonNull(T t, String message) {
21-
return Objects.requireNonNull(t, message);
22-
}
23-
24-
private static class AssertionException extends IllegalStateException {
25-
public AssertionException(String message) {
26-
super(message);
20+
public static <T> T nonNull(T t, Supplier<String> message) {
21+
if (t == null) {
22+
throw new NullPointerException(message.get());
2723
}
24+
return t;
2825
}
26+
2927
}
Lines changed: 7 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,7 @@
1+
package org.dataloader.impl;
2+
3+
public class DataLoaderAssertionException extends IllegalStateException {
4+
public DataLoaderAssertionException(String message) {
5+
super(message);
6+
}
7+
}

src/main/java/org/dataloader/impl/PromisedValuesImpl.java

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -104,7 +104,7 @@ public Throwable cause(int index) {
104104

105105
@Override
106106
public T get(int index) {
107-
assertState(isDone(), "The PromisedValues MUST be complete before calling the get() method");
107+
assertState(isDone(), () -> "The PromisedValues MUST be complete before calling the get() method");
108108
try {
109109
CompletionStage<T> future = futures.get(index);
110110
return future.toCompletableFuture().get();
@@ -115,7 +115,7 @@ public T get(int index) {
115115

116116
@Override
117117
public List<T> toList() {
118-
assertState(isDone(), "The PromisedValues MUST be complete before calling the toList() method");
118+
assertState(isDone(), () -> "The PromisedValues MUST be complete before calling the toList() method");
119119
int size = size();
120120
List<T> list = new ArrayList<>(size);
121121
for (int index = 0; index < size; index++) {

0 commit comments

Comments
 (0)