3
3
import org .junit .Test ;
4
4
import reactor .core .publisher .Flux ;
5
5
6
+ import java .util .ArrayList ;
7
+ import java .util .Collection ;
8
+ import java .util .HashMap ;
6
9
import java .util .List ;
7
10
import java .util .Map ;
8
11
import java .util .concurrent .CompletableFuture ;
9
- import java .util .concurrent .CompletionStage ;
10
12
import java .util .concurrent .ExecutionException ;
11
- import java .util .concurrent .atomic .AtomicBoolean ;
13
+ import java .util .concurrent .atomic .AtomicInteger ;
12
14
13
15
import static java .util .Arrays .asList ;
16
+ import static java .util .Collections .singletonList ;
14
17
import static org .awaitility .Awaitility .await ;
15
18
import static org .dataloader .DataLoaderFactory .newMappedPublisherDataLoader ;
19
+ import static org .dataloader .DataLoaderOptions .newOptions ;
20
+ import static org .dataloader .fixtures .TestKit .listFrom ;
21
+ import static org .dataloader .impl .CompletableFutureKit .cause ;
16
22
import static org .hamcrest .Matchers .equalTo ;
23
+ import static org .hamcrest .Matchers .instanceOf ;
17
24
import static org .hamcrest .Matchers .is ;
18
25
import static org .junit .Assert .assertThat ;
19
26
20
27
public class DataLoaderMappedPublisherBatchLoaderTest {
21
28
22
- @ Test
23
- public void should_Build_a_really_really_simple_data_loader () {
24
- AtomicBoolean success = new AtomicBoolean ();
25
- DataLoader <Integer , Integer > identityLoader = newMappedPublisherDataLoader (keysAsValues (), DataLoaderOptions .newOptions ());
26
-
27
- CompletionStage <Integer > future1 = identityLoader .load (1 );
29
+ MappedPublisherBatchLoader <String , String > evensOnlyMappedBatchLoader = (keys , subscriber ) -> {
30
+ Map <String , String > mapOfResults = new HashMap <>();
28
31
29
- future1 .thenAccept (value -> {
30
- assertThat (value , equalTo (1 ));
31
- success .set (true );
32
+ AtomicInteger index = new AtomicInteger ();
33
+ keys .forEach (k -> {
34
+ int i = index .getAndIncrement ();
35
+ if (i % 2 == 0 ) {
36
+ mapOfResults .put (k , k );
37
+ }
32
38
});
33
- identityLoader .dispatch ();
34
- await ().untilAtomic (success , is (true ));
39
+ Flux .fromIterable (mapOfResults .entrySet ()).subscribe (subscriber );
40
+ };
41
+
42
+ private static <K , V > DataLoader <K , V > idMapLoader (DataLoaderOptions options , List <Collection <K >> loadCalls ) {
43
+ MappedPublisherBatchLoader <K , V > kvBatchLoader = (keys , subscriber ) -> {
44
+ loadCalls .add (new ArrayList <>(keys ));
45
+ Map <K , V > map = new HashMap <>();
46
+ //noinspection unchecked
47
+ keys .forEach (k -> map .put (k , (V ) k ));
48
+ Flux .fromIterable (map .entrySet ()).subscribe (subscriber );
49
+ };
50
+ return DataLoaderFactory .newMappedPublisherDataLoader (kvBatchLoader , options );
35
51
}
36
52
37
- @ Test
38
- public void should_Support_loading_multiple_keys_in_one_call () {
39
- AtomicBoolean success = new AtomicBoolean ();
40
- DataLoader <Integer , Integer > identityLoader = newMappedPublisherDataLoader (keysAsValues (), DataLoaderOptions .newOptions ());
41
-
42
- CompletionStage <List <Integer >> futureAll = identityLoader .loadMany (asList (1 , 2 ));
43
- futureAll .thenAccept (promisedValues -> {
44
- assertThat (promisedValues .size (), is (2 ));
45
- success .set (true );
46
- });
47
- identityLoader .dispatch ();
48
- await ().untilAtomic (success , is (true ));
49
- assertThat (futureAll .toCompletableFuture ().join (), equalTo (asList (1 , 2 )));
53
+ private static <K , V > DataLoader <K , V > idMapLoaderBlowsUps (
54
+ DataLoaderOptions options , List <Collection <K >> loadCalls ) {
55
+ return newMappedPublisherDataLoader ((MappedPublisherBatchLoader <K , V >) (keys , subscriber ) -> {
56
+ loadCalls .add (new ArrayList <>(keys ));
57
+ Flux .<Map .Entry <K , V >>error (new IllegalStateException ("Error" )).subscribe (subscriber );
58
+ }, options );
50
59
}
51
60
61
+
52
62
@ Test
53
- public void simple_dataloader () {
54
- DataLoader <String , String > loader = newMappedPublisherDataLoader (keysAsValues (), DataLoaderOptions . newOptions () );
63
+ public void basic_map_batch_loading () {
64
+ DataLoader <String , String > loader = DataLoaderFactory . newMappedPublisherDataLoader (evensOnlyMappedBatchLoader );
55
65
56
66
loader .load ("A" );
57
67
loader .load ("B" );
@@ -60,12 +70,13 @@ public void simple_dataloader() {
60
70
List <String > results = loader .dispatchAndJoin ();
61
71
62
72
assertThat (results .size (), equalTo (4 ));
63
- assertThat (results , equalTo (asList ("A" , "B" , "C" , "D" )));
73
+ assertThat (results , equalTo (asList ("A" , null , "C" , null )));
64
74
}
65
75
66
76
@ Test
67
- public void should_observer_batch_multiple_requests () throws ExecutionException , InterruptedException {
68
- DataLoader <Integer , Integer > identityLoader = newMappedPublisherDataLoader (keysAsValues (), new DataLoaderOptions ());
77
+ public void should_map_Batch_multiple_requests () throws ExecutionException , InterruptedException {
78
+ List <Collection <Integer >> loadCalls = new ArrayList <>();
79
+ DataLoader <Integer , Integer > identityLoader = idMapLoader (new DataLoaderOptions (), loadCalls );
69
80
70
81
CompletableFuture <Integer > future1 = identityLoader .load (1 );
71
82
CompletableFuture <Integer > future2 = identityLoader .load (2 );
@@ -74,11 +85,91 @@ public void should_observer_batch_multiple_requests() throws ExecutionException,
74
85
await ().until (() -> future1 .isDone () && future2 .isDone ());
75
86
assertThat (future1 .get (), equalTo (1 ));
76
87
assertThat (future2 .get (), equalTo (2 ));
88
+ assertThat (loadCalls , equalTo (singletonList (asList (1 , 2 ))));
89
+ }
90
+
91
+ @ Test
92
+ public void can_split_max_batch_sizes_correctly () {
93
+ List <Collection <Integer >> loadCalls = new ArrayList <>();
94
+ DataLoader <Integer , Integer > identityLoader = idMapLoader (newOptions ().setMaxBatchSize (5 ), loadCalls );
95
+
96
+ for (int i = 0 ; i < 21 ; i ++) {
97
+ identityLoader .load (i );
98
+ }
99
+ List <Collection <Integer >> expectedCalls = new ArrayList <>();
100
+ expectedCalls .add (listFrom (0 , 5 ));
101
+ expectedCalls .add (listFrom (5 , 10 ));
102
+ expectedCalls .add (listFrom (10 , 15 ));
103
+ expectedCalls .add (listFrom (15 , 20 ));
104
+ expectedCalls .add (listFrom (20 , 21 ));
105
+
106
+ List <Integer > result = identityLoader .dispatch ().join ();
107
+
108
+ assertThat (result , equalTo (listFrom (0 , 21 )));
109
+ assertThat (loadCalls , equalTo (expectedCalls ));
110
+ }
111
+
112
+ @ Test
113
+ public void should_Propagate_error_to_all_loads () {
114
+ List <Collection <Integer >> loadCalls = new ArrayList <>();
115
+ DataLoader <Integer , Integer > errorLoader = idMapLoaderBlowsUps (new DataLoaderOptions (), loadCalls );
116
+
117
+ CompletableFuture <Integer > future1 = errorLoader .load (1 );
118
+ CompletableFuture <Integer > future2 = errorLoader .load (2 );
119
+ errorLoader .dispatch ();
120
+
121
+ await ().until (future1 ::isDone );
122
+
123
+ assertThat (future1 .isCompletedExceptionally (), is (true ));
124
+ Throwable cause = cause (future1 );
125
+ assert cause != null ;
126
+ assertThat (cause , instanceOf (IllegalStateException .class ));
127
+ assertThat (cause .getMessage (), equalTo ("Error" ));
128
+
129
+ await ().until (future2 ::isDone );
130
+ cause = cause (future2 );
131
+ assert cause != null ;
132
+ assertThat (cause .getMessage (), equalTo (cause .getMessage ()));
133
+
134
+ assertThat (loadCalls , equalTo (singletonList (asList (1 , 2 ))));
135
+ }
136
+
137
+ @ Test
138
+ public void should_work_with_duplicate_keys_when_caching_disabled () throws ExecutionException , InterruptedException {
139
+ List <Collection <String >> loadCalls = new ArrayList <>();
140
+ DataLoader <String , String > identityLoader =
141
+ idMapLoader (newOptions ().setCachingEnabled (false ), loadCalls );
142
+
143
+ CompletableFuture <String > future1 = identityLoader .load ("A" );
144
+ CompletableFuture <String > future2 = identityLoader .load ("B" );
145
+ CompletableFuture <String > future3 = identityLoader .load ("A" );
146
+ identityLoader .dispatch ();
147
+
148
+ await ().until (() -> future1 .isDone () && future2 .isDone () && future3 .isDone ());
149
+ assertThat (future1 .get (), equalTo ("A" ));
150
+ assertThat (future2 .get (), equalTo ("B" ));
151
+ assertThat (future3 .get (), equalTo ("A" ));
152
+
153
+ // the map batch functions use a set of keys as input and hence remove duplicates unlike list variant
154
+ assertThat (loadCalls , equalTo (singletonList (asList ("A" , "B" ))));
77
155
}
78
156
79
- private static <K > MappedPublisherBatchLoader <K , K > keysAsValues () {
80
- return (keys , subscriber ) -> Flux
81
- .fromStream (keys .stream ().map (k -> Map .entry (k , k )))
82
- .subscribe (subscriber );
157
+ @ Test
158
+ public void should_work_with_duplicate_keys_when_caching_enabled () throws ExecutionException , InterruptedException {
159
+ List <Collection <String >> loadCalls = new ArrayList <>();
160
+ DataLoader <String , String > identityLoader =
161
+ idMapLoader (newOptions ().setCachingEnabled (true ), loadCalls );
162
+
163
+ CompletableFuture <String > future1 = identityLoader .load ("A" );
164
+ CompletableFuture <String > future2 = identityLoader .load ("B" );
165
+ CompletableFuture <String > future3 = identityLoader .load ("A" );
166
+ identityLoader .dispatch ();
167
+
168
+ await ().until (() -> future1 .isDone () && future2 .isDone () && future3 .isDone ());
169
+ assertThat (future1 .get (), equalTo ("A" ));
170
+ assertThat (future2 .get (), equalTo ("B" ));
171
+ assertThat (future3 .get (), equalTo ("A" ));
172
+ assertThat (loadCalls , equalTo (singletonList (asList ("A" , "B" ))));
83
173
}
174
+
84
175
}
0 commit comments