Skip to content

Add support for minimum batch size #77

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Closed
wants to merge 1 commit into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
26 changes: 26 additions & 0 deletions src/main/java/org/dataloader/DataLoaderHelper.java
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,8 @@
import org.dataloader.impl.CompletableFutureKit;
import org.dataloader.stats.StatisticsCollector;

import java.time.Duration;
import java.time.Instant;
import java.util.ArrayList;
import java.util.Collection;
import java.util.LinkedHashSet;
Expand All @@ -12,6 +14,12 @@
import java.util.Set;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.CompletionStage;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.SynchronousQueue;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
import java.util.stream.Collectors;

import static java.util.Collections.emptyList;
Expand Down Expand Up @@ -61,6 +69,8 @@ Object getCallContext() {
private final CacheMap<Object, CompletableFuture<V>> futureCache;
private final List<LoaderQueueEntry<K, CompletableFuture<V>>> loaderQueue;
private final StatisticsCollector stats;
private Instant lastDispatchTime;
private final ScheduledExecutorService executorService;

DataLoaderHelper(DataLoader<K, V> dataLoader, Object batchLoadFunction, DataLoaderOptions loaderOptions, CacheMap<Object, CompletableFuture<V>> futureCache, StatisticsCollector stats) {
this.dataLoader = dataLoader;
Expand All @@ -69,6 +79,8 @@ Object getCallContext() {
this.futureCache = futureCache;
this.loaderQueue = new ArrayList<>();
this.stats = stats;
this.lastDispatchTime = Instant.now();
this.executorService = Executors.newSingleThreadScheduledExecutor();
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Data load should not do any threading. This might be suitable for some use cases but say in a reactive app this executor is not controlled or warranted.

If we did put an executor into the DataLoader then it would need to be passed in not created

I would rather see some sort of helper that can dispatch a dataloader if its not been dispatched for more than N seconds

So I like the fact this has time since last dispatch and so on. But it Must not do the actual dispatcing

}

Optional<CompletableFuture<V>> getIfPresent(K key) {
Expand Down Expand Up @@ -136,18 +148,32 @@ Object getCacheKey(K key) {
}

DispatchResult<V> dispatch() {
return dispatch(false);
}

DispatchResult<V> dispatch(boolean forced) {
boolean batchingEnabled = loaderOptions.batchingEnabled();
//
// we copy the pre-loaded set of futures ready for dispatch
final List<K> keys = new ArrayList<>();
final List<Object> callContexts = new ArrayList<>();
final List<CompletableFuture<V>> queuedFutures = new ArrayList<>();
synchronized (dataLoader) {
final long timeSinceLastDispatch = Duration.between(lastDispatchTime, Instant.now()).toMillis();

if (batchingEnabled && !forced && loaderQueue.size() < loaderOptions.minBatchSize() && timeSinceLastDispatch < loaderOptions.maxWaitInMillis()) {
executorService.schedule(() -> { dispatch(true); },
loaderOptions.maxWaitInMillis() - timeSinceLastDispatch,
TimeUnit.MILLISECONDS);
return new DispatchResult<>(CompletableFuture.completedFuture(emptyList()), 0);
}

loaderQueue.forEach(entry -> {
keys.add(entry.getKey());
queuedFutures.add(entry.getValue());
callContexts.add(entry.getCallContext());
});
lastDispatchTime = Instant.now();
loaderQueue.clear();
}
if (!batchingEnabled || keys.isEmpty()) {
Expand Down
58 changes: 58 additions & 0 deletions src/main/java/org/dataloader/DataLoaderOptions.java
Original file line number Diff line number Diff line change
Expand Up @@ -40,6 +40,8 @@ public class DataLoaderOptions {
private CacheKey cacheKeyFunction;
private CacheMap cacheMap;
private int maxBatchSize;
private int minBatchSize;
private int maxWaitInMillis;
private Supplier<StatisticsCollector> statisticsCollector;
private BatchLoaderContextProvider environmentProvider;

Expand All @@ -51,6 +53,8 @@ public DataLoaderOptions() {
cachingEnabled = true;
cachingExceptionsEnabled = true;
maxBatchSize = -1;
minBatchSize = 0;
maxWaitInMillis = 0;
statisticsCollector = SimpleStatisticsCollector::new;
environmentProvider = NULL_PROVIDER;
}
Expand All @@ -68,6 +72,8 @@ public DataLoaderOptions(DataLoaderOptions other) {
this.cacheKeyFunction = other.cacheKeyFunction;
this.cacheMap = other.cacheMap;
this.maxBatchSize = other.maxBatchSize;
this.minBatchSize = other.minBatchSize;
this.maxWaitInMillis = other.maxWaitInMillis;
this.statisticsCollector = other.statisticsCollector;
this.environmentProvider = other.environmentProvider;
}
Expand Down Expand Up @@ -212,10 +218,62 @@ public int maxBatchSize() {
* @return the data loader options for fluent coding
*/
public DataLoaderOptions setMaxBatchSize(int maxBatchSize) {
if(maxBatchSize != -1 && (minBatchSize > maxBatchSize)) {
throw new IllegalArgumentException("minBatchSize should not be greater than maxBatchSize");
}
this.maxBatchSize = maxBatchSize;
return this;
}

/**
* Gets the minimum number of keys that will be presented to the {@link BatchLoader} function.
* minimum number of keys in a batch are also controlled by another option, maxWaitInMillis.
*
* @return the minimum batch size or 0 if there is no limit
*/
public int minBatchSize() {
return minBatchSize;
}

/**
* Sets the minimum number of keys that will be presented to the {@link BatchLoader} function.
* minimum number of keys in a batch are also controlled by another option, maxWaitInMillis.
*
* @param minBatchSize the minimum batch size
*
* @return the data loader options for fluent coding
*/
public DataLoaderOptions setMinBatchSize(int minBatchSize) {
if(maxBatchSize != -1 && (minBatchSize > maxBatchSize)) {
throw new IllegalArgumentException("minBatchSize should not be greater than maxBatchSize");
}
this.minBatchSize = minBatchSize;
return this;
}

/**
* Gets the max milliseconds to wait before presenting a batch of keys to the {@link BatchLoader} function.
* minimum number of keys in a batch are also controlled by another option, minBatchSize.
*
* @return the max wait time in milliseconds or 0 if there is no limit
*/
public int maxWaitInMillis() {
return maxWaitInMillis;
}

/**
* Sets the max milliseconds to wait before presenting a batch of keys to the {@link BatchLoader} function.
* minimum number of keys in a batch are also controlled by another option, minBatchSize.
*
* @param maxWaitInMillis the max wait time in milliseconds
*
* @return the data loader options for fluent coding
*/
public DataLoaderOptions setMaxWaitInMillis(int maxWaitInMillis) {
this.maxWaitInMillis = maxWaitInMillis;
return this;
}

/**
* @return the statistics collector to use with these options
*/
Expand Down
35 changes: 35 additions & 0 deletions src/test/java/org/dataloader/DataLoaderOptionsTest.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,35 @@
package org.dataloader;

import static org.hamcrest.Matchers.equalTo;
import static org.junit.Assert.assertThat;

import org.junit.Test;

public class DataLoaderOptionsTest {
@Test
public void should_create_a_default_data_loader_options() {
DataLoaderOptions options = new DataLoaderOptions(createDefaultDataLoaderOptions());
assertThat(options.batchingEnabled(), equalTo(true));
assertThat(options.cachingEnabled(), equalTo(true));
assertThat(options.cachingExceptionsEnabled(), equalTo(true));
assertThat(options.maxBatchSize(), equalTo(-1));
assertThat(options.minBatchSize(), equalTo(0));
assertThat(options.maxWaitInMillis(), equalTo(0));
}

@Test(expected = IllegalArgumentException.class)
public void should_fail_if_min_batch_size_is_greater_than_max() {
DataLoaderOptions options = createDefaultDataLoaderOptions();
options.setMaxBatchSize(5).setMinBatchSize(6);
}

@Test(expected = IllegalArgumentException.class)
public void should_fail_if_max_batch_size_is_less_than_min() {
DataLoaderOptions options = createDefaultDataLoaderOptions();
options.setMinBatchSize(6).setMaxBatchSize(5);
}

private DataLoaderOptions createDefaultDataLoaderOptions() {
return DataLoaderOptions.newOptions();
}
}
117 changes: 117 additions & 0 deletions src/test/java/org/dataloader/DataLoaderTest.java
Original file line number Diff line number Diff line change
Expand Up @@ -860,6 +860,37 @@ public void batching_disabled_and_caching_disabled_should_dispatch_immediately_a

}

@Test
public void min_batch_size_with_batching_disabled_and_caching_disabled_should_dispatch_immediately_and_forget() throws Exception {
List<Collection<String>> loadCalls = new ArrayList<>();
DataLoaderOptions options = newOptions().setMinBatchSize(5).setMaxWaitInMillis(10).setBatchingEnabled(false).setCachingEnabled(false);
DataLoader<String, String> identityLoader = idLoader(options, loadCalls);

CompletableFuture<String> fa = identityLoader.load("A");
CompletableFuture<String> fb = identityLoader.load("B");

// caching is off
CompletableFuture<String> fa1 = identityLoader.load("A");
CompletableFuture<String> fb1 = identityLoader.load("B");

List<String> values = CompletableFutureKit.allOf(asList(fa, fb, fa1, fb1)).join();

assertThat(fa.join(), equalTo("A"));
assertThat(fb.join(), equalTo("B"));
assertThat(fa1.join(), equalTo("A"));
assertThat(fb1.join(), equalTo("B"));

assertThat(values, equalTo(asList("A", "B", "A", "B")));

assertThat(loadCalls, equalTo(asList(
singletonList("A"),
singletonList("B"),
singletonList("A"),
singletonList("B")
)));

}

@Test
public void batches_multiple_requests_with_max_batch_size() throws Exception {
List<Collection<Integer>> loadCalls = new ArrayList<>();
Expand All @@ -881,6 +912,70 @@ public void batches_multiple_requests_with_max_batch_size() throws Exception {

}

@Test
public void batches_multiple_requests_with_min_batch_size() throws Exception {
List<Collection<Integer>> loadCalls = new ArrayList<>();
DataLoader<Integer, Integer> identityLoader = idLoader(newOptions().setMinBatchSize(3).setMaxWaitInMillis(10), loadCalls);

CompletableFuture<Integer> f1 = identityLoader.load(1);
identityLoader.dispatch();
CompletableFuture<Integer> f2 = identityLoader.load(2);
identityLoader.dispatch();
CompletableFuture<Integer> f3 = identityLoader.load(3);
identityLoader.dispatch();

CompletableFuture.allOf(f1, f2, f3).join();

assertThat(f1.join(), equalTo(1));
assertThat(f2.join(), equalTo(2));
assertThat(f3.join(), equalTo(3));

assertThat(loadCalls, equalTo(singletonList(asList(1, 2, 3))));

}

@Test
public void min_batch_size_with_no_wait_time_should_not_batch_requests() throws Exception {
List<Collection<Integer>> loadCalls = new ArrayList<>();
DataLoader<Integer, Integer> identityLoader = idLoader(newOptions().setMinBatchSize(3), loadCalls);

CompletableFuture<Integer> f1 = identityLoader.load(1);
identityLoader.dispatch();
CompletableFuture<Integer> f2 = identityLoader.load(2);
identityLoader.dispatch();
CompletableFuture<Integer> f3 = identityLoader.load(3);
identityLoader.dispatch();

CompletableFuture.allOf(f1, f2, f3).join();

assertThat(f1.join(), equalTo(1));
assertThat(f2.join(), equalTo(2));
assertThat(f3.join(), equalTo(3));

assertThat(loadCalls, equalTo(asList(singletonList(1), singletonList(2), singletonList(3))));
}

@Test
public void max_wait_time_with_no_min_batch_size_should_not_batch_requests() throws Exception {
List<Collection<Integer>> loadCalls = new ArrayList<>();
DataLoader<Integer, Integer> identityLoader = idLoader(newOptions().setMaxWaitInMillis(100), loadCalls);

CompletableFuture<Integer> f1 = identityLoader.load(1);
identityLoader.dispatch();
CompletableFuture<Integer> f2 = identityLoader.load(2);
identityLoader.dispatch();
CompletableFuture<Integer> f3 = identityLoader.load(3);
identityLoader.dispatch();

CompletableFuture.allOf(f1, f2, f3).join();

assertThat(f1.join(), equalTo(1));
assertThat(f2.join(), equalTo(2));
assertThat(f3.join(), equalTo(3));

assertThat(loadCalls, equalTo(asList(singletonList(1), singletonList(2), singletonList(3))));
}

@Test
public void can_split_max_batch_sizes_correctly() throws Exception {
List<Collection<Integer>> loadCalls = new ArrayList<>();
Expand All @@ -903,6 +998,28 @@ public void can_split_max_batch_sizes_correctly() throws Exception {

}

@Test
public void can_combine_min_batch_size_and_split_max_batch_sizes_correctly() throws Exception {
List<Collection<Integer>> loadCalls = new ArrayList<>();
DataLoader<Integer, Integer> identityLoader = idLoader(newOptions().setMinBatchSize(5).setMaxWaitInMillis(20).setMaxBatchSize(5), loadCalls);

List<CompletableFuture<Integer>> results = new ArrayList<>();
for (int i = 0; i < 21; i++) {
results.add(identityLoader.load(i));
identityLoader.dispatch();
}
List<Collection<Integer>> expectedCalls = new ArrayList<>();
expectedCalls.add(listFrom(0, 5));
expectedCalls.add(listFrom(5, 10));
expectedCalls.add(listFrom(10, 15));
expectedCalls.add(listFrom(15, 20));
expectedCalls.add(listFrom(20, 21));

results.forEach(CompletableFuture::join);

assertThat(loadCalls, equalTo(expectedCalls));
}

@Test
public void should_Batch_loads_occurring_within_futures() {
List<Collection<String>> loadCalls = new ArrayList<>();
Expand Down