Skip to content

Adding a ScheduledDataLoaderRegistry #87

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 7 commits into from
Jul 6, 2021
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
27 changes: 27 additions & 0 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -478,6 +478,33 @@ and there are also gains to this different mode of operation:
However, with batch execution control comes responsibility! If you forget to make the call to `dispatch()` then the futures
in the load request queue will never be batched, and thus _will never complete_! So be careful when crafting your loader designs.

## Scheduled Dispatching

`ScheduledDataLoaderRegistry` is a registry that allows for dispatching to be done on a schedule. It contains a
predicate that is evaluated (per data loader contained within) when `dispatchAll` is invoked.

If that predicate is true, it will make a `dispatch` call on the data loader, otherwise is will schedule a task to
perform that check again. Once a predicate evaluated to true, it will not reschedule and another call to
`dispatchAll` is required to be made.

This allows you to do things like "dispatch ONLY if the queue depth is > 10 deep or more than 200 millis have passed
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

gotcha, it allows to dial the aggressiveness of the batching strategy.

since it was last dispatched".

```java

DispatchPredicate depthOrTimePredicate = DispatchPredicate
.dispatchIfDepthGreaterThan(10)
.or(DispatchPredicate.dispatchIfLongerThan(Duration.ofMillis(200)));

ScheduledDataLoaderRegistry registry = ScheduledDataLoaderRegistry.newScheduledRegistry()
.dispatchPredicate(depthOrTimePredicate)
.schedule(Duration.ofMillis(10))
.register("users",userDataLoader)
.build();
```

The above acts as a kind of minimum batch depth, with a time overload. It won't dispatch if the loader depth is less
than or equal to 10 but if 200ms pass it will dispatch.

## Let's get started!

Expand Down
11 changes: 10 additions & 1 deletion src/main/java/org/dataloader/DataLoaderRegistry.java
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@
import java.util.ArrayList;
import java.util.HashMap;
import java.util.HashSet;
import java.util.LinkedHashMap;
import java.util.List;
import java.util.Map;
import java.util.Set;
Expand All @@ -19,7 +20,7 @@
*/
@PublicApi
public class DataLoaderRegistry {
private final Map<String, DataLoader<?, ?>> dataLoaders = new ConcurrentHashMap<>();
protected final Map<String, DataLoader<?, ?>> dataLoaders = new ConcurrentHashMap<>();

public DataLoaderRegistry() {
}
Expand All @@ -28,6 +29,7 @@ private DataLoaderRegistry(Builder builder) {
this.dataLoaders.putAll(builder.dataLoaders);
}


/**
* This will register a new dataloader
*
Expand Down Expand Up @@ -84,6 +86,13 @@ public DataLoaderRegistry combine(DataLoaderRegistry registry) {
return new ArrayList<>(dataLoaders.values());
}

/**
* @return the currently registered data loaders as a map
*/
public Map<String, DataLoader<?, ?>> getDataLoadersMap() {
return new LinkedHashMap<>(dataLoaders);
}

/**
* This will unregister a new dataloader
*
Expand Down
23 changes: 23 additions & 0 deletions src/main/java/org/dataloader/annotations/ExperimentalApi.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,23 @@
package org.dataloader.annotations;

import java.lang.annotation.Documented;
import java.lang.annotation.Retention;
import java.lang.annotation.RetentionPolicy;
import java.lang.annotation.Target;

import static java.lang.annotation.ElementType.CONSTRUCTOR;
import static java.lang.annotation.ElementType.FIELD;
import static java.lang.annotation.ElementType.METHOD;
import static java.lang.annotation.ElementType.TYPE;

/**
* This represents code that the graphql-java project considers experimental API and while our intention is that it will
* progress to be {@link PublicApi}, its existence, signature of behavior may change between releases.
*
* In general unnecessary changes will be avoided but you should not depend on experimental classes being stable
*/
@Retention(RetentionPolicy.RUNTIME)
@Target(value = {CONSTRUCTOR, METHOD, TYPE, FIELD})
@Documented
public @interface ExperimentalApi {
}
92 changes: 92 additions & 0 deletions src/main/java/org/dataloader/registries/DispatchPredicate.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,92 @@
package org.dataloader.registries;

import org.dataloader.DataLoader;

import java.time.Duration;
import java.util.Objects;

/**
* A predicate class used by {@link ScheduledDataLoaderRegistry} to decide whether to dispatch or not
*/
@FunctionalInterface
public interface DispatchPredicate {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is very similar to Predicate, why not use that?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

because it takes 2 arguments - so inspired by but not the same

/**
* This predicate tests whether the data loader should be dispatched or not.
*
* @param dataLoaderKey the key of the data loader when registered
* @param dataLoader the dataloader to dispatch
*
* @return true if the data loader SHOULD be dispatched
*/
boolean test(String dataLoaderKey, DataLoader<?, ?> dataLoader);


/**
* Returns a composed predicate that represents a short-circuiting logical
* AND of this predicate and another.
*
* @param other a predicate that will be logically-ANDed with this
* predicate
*
* @return a composed predicate that represents the short-circuiting logical
* AND of this predicate and the {@code other} predicate
*/
default DispatchPredicate and(DispatchPredicate other) {
Objects.requireNonNull(other);
return (k, dl) -> test(k, dl) && other.test(k, dl);
}

/**
* Returns a predicate that represents the logical negation of this
* predicate.
*
* @return a predicate that represents the logical negation of this
* predicate
*/
default DispatchPredicate negate() {
return (k, dl) -> !test(k, dl);
}

/**
* Returns a composed predicate that represents a short-circuiting logical
* OR of this predicate and another.
*
* @param other a predicate that will be logically-ORed with this
* predicate
*
* @return a composed predicate that represents the short-circuiting logical
* OR of this predicate and the {@code other} predicate
*/
default DispatchPredicate or(DispatchPredicate other) {
Objects.requireNonNull(other);
return (k, dl) -> test(k, dl) || other.test(k, dl);
}

/**
* This predicate will return true if the {@link DataLoader} has not be dispatched
* for at least the duration length of time.
*
* @param duration the length of time to check
*
* @return true if the data loader has not been dispatched in duration time
*/
static DispatchPredicate dispatchIfLongerThan(Duration duration) {
return (dataLoaderKey, dataLoader) -> {
int i = dataLoader.getTimeSinceDispatch().compareTo(duration);
return i > 0;
};
}

/**
* This predicate will return true if the {@link DataLoader#dispatchDepth()} is greater than the specified depth.
*
* This will act as minimum batch size. There must be more than `depth` items queued for the predicate to return true.
*
* @param depth the value to be greater than
*
* @return true if the {@link DataLoader#dispatchDepth()} is greater than the specified depth.
*/
static DispatchPredicate dispatchIfDepthGreaterThan(int depth) {
return (dataLoaderKey, dataLoader) -> dataLoader.dispatchDepth() > depth;
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,186 @@
package org.dataloader.registries;

import org.dataloader.DataLoader;
import org.dataloader.DataLoaderRegistry;
import org.dataloader.annotations.ExperimentalApi;

import java.time.Duration;
import java.util.HashMap;
import java.util.Map;
import java.util.concurrent.Executors;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.TimeUnit;

import static org.dataloader.impl.Assertions.nonNull;

/**
* This {@link DataLoaderRegistry} will use a {@link DispatchPredicate} when {@link #dispatchAll()} is called
* to test (for each {@link DataLoader} in the registry) if a dispatch should proceed. If the predicate returns false, then a task is scheduled
* to perform that predicate dispatch again via the {@link ScheduledExecutorService}.
* <p>
* This will continue to loop (test false and reschedule) until such time as the predicate returns true, in which case
* no rescheduling will occur and you will need to call dispatch again to restart the process.
* <p>
* If you wanted to create a ScheduledDataLoaderRegistry that started a rescheduling immediately, just create one and
* call {@link #rescheduleNow()}.
* <p>
* This code is currently marked as {@link ExperimentalApi}
*/
@ExperimentalApi
public class ScheduledDataLoaderRegistry extends DataLoaderRegistry implements AutoCloseable {

private final ScheduledExecutorService scheduledExecutorService;
private final DispatchPredicate dispatchPredicate;
private final Duration schedule;
private volatile boolean closed;

private ScheduledDataLoaderRegistry(Builder builder) {
this.dataLoaders.putAll(builder.dataLoaders);
this.scheduledExecutorService = builder.scheduledExecutorService;
this.dispatchPredicate = builder.dispatchPredicate;
this.schedule = builder.schedule;
this.closed = false;
}

/**
* Once closed this registry will never again reschedule checks
*/
@Override
public void close() {
closed = true;
}

/**
* @return how long the {@link ScheduledExecutorService} task will wait before checking the predicate again
*/
public Duration getScheduleDuration() {
return schedule;
}

@Override
public void dispatchAll() {
dispatchAllWithCount();
}

@Override
public int dispatchAllWithCount() {
int sum = 0;
for (Map.Entry<String, DataLoader<?, ?>> entry : dataLoaders.entrySet()) {
DataLoader<?, ?> dataLoader = entry.getValue();
String key = entry.getKey();
if (dispatchPredicate.test(key, dataLoader)) {
sum += dataLoader.dispatchWithCounts().getKeysCount();
} else {
reschedule(key, dataLoader);
}
}
return sum;
}

/**
* This will immediately dispatch the {@link DataLoader}s in the registry
* without testing the predicate
*/
public void dispatchAllImmediately() {
super.dispatchAll();
}

/**
* This will immediately dispatch the {@link DataLoader}s in the registry
* without testing the predicate
*
* @return total number of entries that were dispatched from registered {@link org.dataloader.DataLoader}s.
*/
public int dispatchAllWithCountImmediately() {
return super.dispatchAllWithCount();
}

/**
* This will schedule a task to check the predicate and dispatch if true right now. It will not do
* a pre check of the preodicate like {@link #dispatchAll()} would
*/
public void rescheduleNow() {
dataLoaders.forEach(this::reschedule);
}

private void reschedule(String key, DataLoader<?, ?> dataLoader) {
if (!closed) {
Runnable runThis = () -> dispatchOrReschedule(key, dataLoader);
scheduledExecutorService.schedule(runThis, schedule.toMillis(), TimeUnit.MILLISECONDS);
}
}

private void dispatchOrReschedule(String key, DataLoader<?, ?> dataLoader) {
if (dispatchPredicate.test(key, dataLoader)) {
dataLoader.dispatch();
} else {
reschedule(key, dataLoader);
}
}

/**
* By default this will create use a {@link Executors#newSingleThreadScheduledExecutor()}
* and a schedule duration of 10 milli seconds.
*
* @return A builder of {@link ScheduledDataLoaderRegistry}s
*/
public static Builder newScheduledRegistry() {
return new Builder();
}

public static class Builder {

private ScheduledExecutorService scheduledExecutorService = Executors.newSingleThreadScheduledExecutor();
private DispatchPredicate dispatchPredicate = (key, dl) -> true;
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

what is the intent of having the dispatch predicate on the registry vs dataloader?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I thought about it on the dataloader itself - but the more I thought about it the more it felt like a cross cutting concern

eg in graphql you dispatch the DataLoaderRegistry and that delegates to each DataLoader

Plus it would mean all the extra baggage of Executors and so on would be in each DataLoader.

This allows for a collection of DataLoaders controlled by the one mechanism.

But since the predicate gets the DL and decides on dispatch or not - you can get as finicky as you like (as if it wa inside the dataloader)

private Duration schedule = Duration.ofMillis(10);
private final Map<String, DataLoader<?, ?>> dataLoaders = new HashMap<>();

public Builder scheduledExecutorService(ScheduledExecutorService executorService) {
this.scheduledExecutorService = nonNull(executorService);
return this;
}

public Builder schedule(Duration schedule) {
this.schedule = schedule;
return this;
}

public Builder dispatchPredicate(DispatchPredicate dispatchPredicate) {
this.dispatchPredicate = nonNull(dispatchPredicate);
return this;
}

/**
* This will register a new dataloader
*
* @param key the key to put the data loader under
* @param dataLoader the data loader to register
*
* @return this builder for a fluent pattern
*/
public Builder register(String key, DataLoader<?, ?> dataLoader) {
dataLoaders.put(key, dataLoader);
return this;
}

/**
* This will combine together the data loaders in this builder with the ones
* from a previous {@link DataLoaderRegistry}
*
* @param otherRegistry the previous {@link DataLoaderRegistry}
*
* @return this builder for a fluent pattern
*/
public Builder registerAll(DataLoaderRegistry otherRegistry) {
dataLoaders.putAll(otherRegistry.getDataLoadersMap());
return this;
}

/**
* @return the newly built {@link ScheduledDataLoaderRegistry}
*/
public ScheduledDataLoaderRegistry build() {
return new ScheduledDataLoaderRegistry(this);
}
}
}
Loading