-
Notifications
You must be signed in to change notification settings - Fork 94
Adding a ScheduledDataLoaderRegistry #87
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Changes from 2 commits
4675d0e
c969e3c
73ad9b5
2b781e6
2d99ee1
523b6ca
80dafa1
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Large diffs are not rendered by default.
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,92 @@ | ||
package org.dataloader.registries; | ||
|
||
import org.dataloader.DataLoader; | ||
|
||
import java.time.Duration; | ||
import java.util.Objects; | ||
|
||
/** | ||
* A predicate class used by {@link ScheduledDataLoaderRegistry} to decide whether to dispatch or not | ||
*/ | ||
@FunctionalInterface | ||
public interface DispatchPredicate { | ||
/** | ||
* This predicate tests whether the data loader should be dispatched or not. | ||
* | ||
* @param dataLoaderKey the key of the data loader when registered | ||
* @param dataLoader the dataloader to dispatch | ||
* | ||
* @return true if the data loader SHOULD be dispatched | ||
*/ | ||
boolean test(String dataLoaderKey, DataLoader<?, ?> dataLoader); | ||
|
||
|
||
/** | ||
* Returns a composed predicate that represents a short-circuiting logical | ||
* AND of this predicate and another. | ||
* | ||
* @param other a predicate that will be logically-ANDed with this | ||
* predicate | ||
* | ||
* @return a composed predicate that represents the short-circuiting logical | ||
* AND of this predicate and the {@code other} predicate | ||
*/ | ||
default DispatchPredicate and(DispatchPredicate other) { | ||
Objects.requireNonNull(other); | ||
return (k, dl) -> test(k, dl) && other.test(k, dl); | ||
} | ||
|
||
/** | ||
* Returns a predicate that represents the logical negation of this | ||
* predicate. | ||
* | ||
* @return a predicate that represents the logical negation of this | ||
* predicate | ||
*/ | ||
default DispatchPredicate negate() { | ||
return (k, dl) -> !test(k, dl); | ||
} | ||
|
||
/** | ||
* Returns a composed predicate that represents a short-circuiting logical | ||
* OR of this predicate and another. | ||
* | ||
* @param other a predicate that will be logically-ORed with this | ||
* predicate | ||
* | ||
* @return a composed predicate that represents the short-circuiting logical | ||
* OR of this predicate and the {@code other} predicate | ||
*/ | ||
default DispatchPredicate or(DispatchPredicate other) { | ||
Objects.requireNonNull(other); | ||
return (k, dl) -> test(k, dl) || other.test(k, dl); | ||
} | ||
|
||
/** | ||
* This predicate will return true if the {@link DataLoader} has not be dispatched | ||
* for at least the duration length of time. | ||
* | ||
* @param duration the length of time to check | ||
* | ||
* @return true if the data loader has not been dispatched in duration time | ||
*/ | ||
static DispatchPredicate dispatchIfLongerThan(Duration duration) { | ||
return (dataLoaderKey, dataLoader) -> { | ||
int i = dataLoader.getTimeSinceDispatch().compareTo(duration); | ||
return i > 0; | ||
}; | ||
} | ||
|
||
/** | ||
* This predicate will return true if the {@link DataLoader#dispatchDepth()} is greater than the specified depth. | ||
* | ||
* This will act as minimum batch size. There must be more then `depth` items queued for the predicate to return true. | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more.
|
||
* | ||
* @param depth the value to be greater than | ||
* | ||
* @return true if the {@link DataLoader#dispatchDepth()} is greater than the specified depth. | ||
*/ | ||
static DispatchPredicate dispatchIfDepthGreaterThan(int depth) { | ||
return (dataLoaderKey, dataLoader) -> dataLoader.dispatchDepth() > depth; | ||
} | ||
} |
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,184 @@ | ||
package org.dataloader.registries; | ||
|
||
import org.dataloader.DataLoader; | ||
import org.dataloader.DataLoaderRegistry; | ||
import org.dataloader.annotations.PublicApi; | ||
|
||
import java.time.Duration; | ||
import java.util.HashMap; | ||
import java.util.Map; | ||
import java.util.concurrent.Executors; | ||
import java.util.concurrent.ScheduledExecutorService; | ||
import java.util.concurrent.TimeUnit; | ||
|
||
import static org.dataloader.impl.Assertions.nonNull; | ||
|
||
/** | ||
* This {@link DataLoaderRegistry} will use a {@link DispatchPredicate} when {@link #dispatchAll()} is called | ||
* to test (for each {@link DataLoader} in the registry) if a dispatch should proceed. If the predicate returns false, then a task is scheduled | ||
* to perform that predicate dispatch again via the {@link ScheduledExecutorService}. | ||
* <p> | ||
* This will continue to loop (test false and reschedule) until such time as the predicate returns true, in which case | ||
* no rescheduling will occur and you will need to call dispatch again to restart the process. | ||
* <p> | ||
* If you wanted to create a ScheduledDataLoaderRegistry that started a rescheduling immediately, just create one and | ||
* call {@link #rescheduleNow()}. | ||
*/ | ||
@PublicApi | ||
public class ScheduledDataLoaderRegistry extends DataLoaderRegistry implements AutoCloseable { | ||
|
||
private final ScheduledExecutorService scheduledExecutorService; | ||
private final DispatchPredicate dispatchPredicate; | ||
private final Duration schedule; | ||
private volatile boolean closed; | ||
|
||
private ScheduledDataLoaderRegistry(Builder builder) { | ||
this.dataLoaders.putAll(builder.dataLoaders); | ||
this.scheduledExecutorService = builder.scheduledExecutorService; | ||
this.dispatchPredicate = builder.dispatchPredicate; | ||
this.schedule = builder.schedule; | ||
this.closed = false; | ||
} | ||
|
||
/** | ||
* Once closed this registry will never again reschedule checks | ||
*/ | ||
@Override | ||
public void close() { | ||
closed = true; | ||
} | ||
|
||
/** | ||
* @return how long the {@link ScheduledExecutorService} task will wait before checking the predicate again | ||
*/ | ||
public Duration getScheduleDuration() { | ||
return schedule; | ||
} | ||
|
||
@Override | ||
public void dispatchAll() { | ||
dispatchAllWithCount(); | ||
} | ||
|
||
@Override | ||
public int dispatchAllWithCount() { | ||
int sum = 0; | ||
for (Map.Entry<String, DataLoader<?, ?>> entry : dataLoaders.entrySet()) { | ||
DataLoader<?, ?> dataLoader = entry.getValue(); | ||
String key = entry.getKey(); | ||
if (dispatchPredicate.test(key, dataLoader)) { | ||
sum += dataLoader.dispatchWithCounts().getKeysCount(); | ||
} else { | ||
reschedule(key, dataLoader); | ||
} | ||
} | ||
return sum; | ||
} | ||
|
||
/** | ||
* This will immediately dispatch the {@link DataLoader}s in the registry | ||
* without testing the predicate | ||
*/ | ||
public void dispatchAllImmediately() { | ||
super.dispatchAll(); | ||
} | ||
|
||
/** | ||
* This will immediately dispatch the {@link DataLoader}s in the registry | ||
* without testing the predicate | ||
* | ||
* @return total number of entries that were dispatched from registered {@link org.dataloader.DataLoader}s. | ||
*/ | ||
public int dispatchAllWithCountImmediately() { | ||
return super.dispatchAllWithCount(); | ||
} | ||
|
||
/** | ||
* This will schedule a task to check the predicate and dispatch if true right now. It will not do | ||
* a pre check of the preodicate like {@link #dispatchAll()} would | ||
*/ | ||
public void rescheduleNow() { | ||
dataLoaders.forEach(this::reschedule); | ||
} | ||
|
||
private void reschedule(String key, DataLoader<?, ?> dataLoader) { | ||
if (!closed) { | ||
Runnable runThis = () -> dispatchOrReschedule(key, dataLoader); | ||
scheduledExecutorService.schedule(runThis, schedule.toMillis(), TimeUnit.MILLISECONDS); | ||
} | ||
} | ||
|
||
private void dispatchOrReschedule(String key, DataLoader<?, ?> dataLoader) { | ||
if (dispatchPredicate.test(key, dataLoader)) { | ||
dataLoader.dispatch(); | ||
} else { | ||
reschedule(key, dataLoader); | ||
} | ||
} | ||
|
||
/** | ||
* By default this will create use a {@link Executors#newSingleThreadScheduledExecutor()} | ||
* and a schedule duration of 10 milli seconds. | ||
* | ||
* @return A builder of {@link ScheduledDataLoaderRegistry}s | ||
*/ | ||
public static Builder newScheduledRegistry() { | ||
return new Builder(); | ||
} | ||
|
||
public static class Builder { | ||
|
||
private ScheduledExecutorService scheduledExecutorService = Executors.newSingleThreadScheduledExecutor(); | ||
private DispatchPredicate dispatchPredicate = (key, dl) -> true; | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. what is the intent of having the dispatch predicate on the registry vs dataloader? There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I thought about it on the dataloader itself - but the more I thought about it the more it felt like a cross cutting concern eg in graphql you dispatch the DataLoaderRegistry and that delegates to each DataLoader Plus it would mean all the extra baggage of Executors and so on would be in each DataLoader. This allows for a collection of DataLoaders controlled by the one mechanism. But since the predicate gets the DL and decides on dispatch or not - you can get as finicky as you like (as if it wa inside the dataloader) |
||
private Duration schedule = Duration.ofMillis(10); | ||
private final Map<String, DataLoader<?, ?>> dataLoaders = new HashMap<>(); | ||
|
||
public Builder scheduledExecutorService(ScheduledExecutorService executorService) { | ||
this.scheduledExecutorService = nonNull(executorService); | ||
return this; | ||
} | ||
|
||
public Builder schedule(Duration schedule) { | ||
this.schedule = schedule; | ||
return this; | ||
} | ||
|
||
public Builder dispatchPredicate(DispatchPredicate dispatchPredicate) { | ||
this.dispatchPredicate = nonNull(dispatchPredicate); | ||
return this; | ||
} | ||
|
||
/** | ||
* This will register a new dataloader | ||
* | ||
* @param key the key to put the data loader under | ||
* @param dataLoader the data loader to register | ||
* | ||
* @return this builder for a fluent pattern | ||
*/ | ||
public Builder register(String key, DataLoader<?, ?> dataLoader) { | ||
dataLoaders.put(key, dataLoader); | ||
return this; | ||
} | ||
|
||
/** | ||
* This will combine together the data loaders in this builder with the ones | ||
* from a previous {@link DataLoaderRegistry} | ||
* | ||
* @param otherRegistry the previous {@link DataLoaderRegistry} | ||
* | ||
* @return this builder for a fluent pattern | ||
*/ | ||
public Builder registerAll(DataLoaderRegistry otherRegistry) { | ||
dataLoaders.putAll(otherRegistry.getDataLoadersMap()); | ||
return this; | ||
} | ||
|
||
/** | ||
* @return the newly built {@link ScheduledDataLoaderRegistry} | ||
*/ | ||
public ScheduledDataLoaderRegistry build() { | ||
return new ScheduledDataLoaderRegistry(this); | ||
} | ||
} | ||
} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This is very similar to
Predicate
, why not use that?There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
because it takes 2 arguments - so inspired by but not the same