From 4675d0e37575e50c59460dff78a9e028a471625d Mon Sep 17 00:00:00 2001 From: Brad Baker Date: Sun, 27 Jun 2021 17:17:11 +1000 Subject: [PATCH 1/6] Adding a ScheduledDataLoaderRegistry --- README.md | 565 ++++++++++-------- .../org/dataloader/DataLoaderRegistry.java | 11 +- .../registries/DispatchPredicate.java | 92 +++ .../ScheduledDataLoaderRegistry.java | 172 ++++++ src/test/java/ReadmeExamples.java | 13 + .../java/org/dataloader/ClockDataLoader.java | 4 +- .../java/org/dataloader/fixtures/TestKit.java | 4 + .../registries/DispatchPredicateTest.java | 105 ++++ .../ScheduledDataLoaderRegistryTest.java | 218 +++++++ 9 files changed, 916 insertions(+), 268 deletions(-) create mode 100644 src/main/java/org/dataloader/registries/DispatchPredicate.java create mode 100644 src/main/java/org/dataloader/registries/ScheduledDataLoaderRegistry.java create mode 100644 src/test/java/org/dataloader/registries/DispatchPredicateTest.java create mode 100644 src/test/java/org/dataloader/registries/ScheduledDataLoaderRegistryTest.java diff --git a/README.md b/README.md index 73351ad..7b6d813 100644 --- a/README.md +++ b/README.md @@ -1,21 +1,22 @@ # java-dataloader [![Build](https://github.com/graphql-java/java-dataloader/actions/workflows/master.yml/badge.svg)](https://github.com/graphql-java/java-dataloader/actions/workflows/master.yml) -[![Latest Release](https://maven-badges.herokuapp.com/maven-central/com.graphql-java/java-dataloader/badge.svg)](https://maven-badges.herokuapp.com/maven-central/com.graphql-java/java-dataloader/) +[![Latest Release](https://maven-badges.herokuapp.com/maven-central/com.graphql-java/java-dataloader/badge.svg)](https://maven-badges.herokuapp.com/maven-central/com.graphql-java/java-dataloader/) [![Apache licensed](https://img.shields.io/hexpm/l/plug.svg?maxAge=2592000)](https://github.com/graphql-java/java-dataloader/blob/master/LICENSE) -This small and simple utility library is a pure Java 8 port of [Facebook DataLoader](https://github.com/facebook/dataloader). +This small and simple utility library is a pure Java 8 port +of [Facebook DataLoader](https://github.com/facebook/dataloader). -It can serve as integral part of your application's data layer to provide a -consistent API over various back-ends and reduce message communication overhead through batching and caching. +It can serve as integral part of your application's data layer to provide a consistent API over various back-ends and +reduce message communication overhead through batching and caching. -An important use case for `java-dataloader` is improving the efficiency of GraphQL query execution. Graphql fields -are resolved in a independent manner and with a true graph of objects, you may be fetching the same object many times. +An important use case for `java-dataloader` is improving the efficiency of GraphQL query execution. Graphql fields are +resolved in a independent manner and with a true graph of objects, you may be fetching the same object many times. -A naive implementation of graphql data fetchers can easily lead to the dreaded "n+1" fetch problem. +A naive implementation of graphql data fetchers can easily lead to the dreaded "n+1" fetch problem. -Most of the code is ported directly from Facebook's reference implementation, with one IMPORTANT adaptation to make -it work for Java 8. ([more on this below](#manual-dispatching)). +Most of the code is ported directly from Facebook's reference implementation, with one IMPORTANT adaptation to make it +work for Java 8. ([more on this below](#manual-dispatching)). But before reading on, be sure to take a short dive into the [original documentation](https://github.com/facebook/dataloader/blob/master/README.md) provided by Lee Byron (@leebyron) @@ -26,8 +27,8 @@ and Nicholas Schrock (@schrockn) from [Facebook](https://www.facebook.com/), the - [Features](#features) - [Examples](#examples) - [Let's get started!](#lets-get-started) - - [Installing](#installing) - - [Building](#building) + - [Installing](#installing) + - [Building](#building) - [Other information sources](#other-information-sources) - [Contributing](#contributing) - [Acknowledgements](#acknowledgements) @@ -35,13 +36,16 @@ and Nicholas Schrock (@schrockn) from [Facebook](https://www.facebook.com/), the ## Features -`java-dataloader` is a feature-complete port of the Facebook reference implementation with [one major difference](#manual-dispatching). These features are: +`java-dataloader` is a feature-complete port of the Facebook reference implementation +with [one major difference](#manual-dispatching). These features are: - Simple, intuitive API, using generics and fluent coding - Define batch load function with lambda expression - Schedule a load request in queue for batching - Add load requests from anywhere in code -- Request returns a [`CompleteableFuture`](https://docs.oracle.com/javase/8/docs/api/java/util/concurrent/CompletableFuture.html) of the requested value +- Request returns + a [`CompleteableFuture`](https://docs.oracle.com/javase/8/docs/api/java/util/concurrent/CompletableFuture.html) of + the requested value - Can create multiple requests at once - Caches load requests, so data is only fetched once - Can clear individual cache keys, so data is re-fetched on next batch queue dispatch @@ -51,177 +55,179 @@ and Nicholas Schrock (@schrockn) from [Facebook](https://www.facebook.com/), the - Results are ordered according to insertion order of load requests - Deals with partial errors when a batch future fails - Can disable batching and/or caching in configuration -- Can supply your own [`CacheMap`](https://github.com/graphql-java/java-dataloader/blob/master/src/main/java/io/engagingspaces/vertx/dataloader/CacheMap.java) implementations -- Has very high test coverage +- Can supply your + own [`CacheMap`](https://github.com/graphql-java/java-dataloader/blob/master/src/main/java/io/engagingspaces/vertx/dataloader/CacheMap.java) + implementations +- Has very high test coverage ## Examples -A `DataLoader` object requires a `BatchLoader` function that is responsible for loading a promise of values given -a list of keys +A `DataLoader` object requires a `BatchLoader` function that is responsible for loading a promise of values given a list +of keys ```java - BatchLoader userBatchLoader = new BatchLoader() { - @Override - public CompletionStage> load(List userIds) { - return CompletableFuture.supplyAsync(() -> { - return userManager.loadUsersById(userIds); - }); - } +BatchLoader userBatchLoader=new BatchLoader(){ +@Override +public CompletionStage>load(List userIds){ + return CompletableFuture.supplyAsync(()->{ + return userManager.loadUsersById(userIds); + }); + } }; - DataLoader userLoader = DataLoaderFactory.newDataLoader(userBatchLoader); + DataLoader userLoader=DataLoaderFactory.newDataLoader(userBatchLoader); ``` You can then use it to load values which will be `CompleteableFuture` promises to values - + ```java - CompletableFuture load1 = userLoader.load(1L); + CompletableFuture load1=userLoader.load(1L); ``` - -or you can use it to compose future computations as follows. The key requirement is that you call -`dataloader.dispatch()` or its variant `dataloader.dispatchAndJoin()` at some point in order to make the underlying calls happen to the batch loader. -In this version of data loader, this does not happen automatically. More on this in [Manual dispatching](#manual-dispatching) . +or you can use it to compose future computations as follows. The key requirement is that you call +`dataloader.dispatch()` or its variant `dataloader.dispatchAndJoin()` at some point in order to make the underlying +calls happen to the batch loader. + +In this version of data loader, this does not happen automatically. More on this +in [Manual dispatching](#manual-dispatching) . ```java userLoader.load(1L) - .thenAccept(user -> { - System.out.println("user = " + user); - userLoader.load(user.getInvitedByID()) - .thenAccept(invitedBy -> { - System.out.println("invitedBy = " + invitedBy); - }); - }); - - userLoader.load(2L) - .thenAccept(user -> { - System.out.println("user = " + user); - userLoader.load(user.getInvitedByID()) - .thenAccept(invitedBy -> { - System.out.println("invitedBy = " + invitedBy); - }); - }); - - userLoader.dispatchAndJoin(); + .thenAccept(user->{ + System.out.println("user = "+user); + userLoader.load(user.getInvitedByID()) + .thenAccept(invitedBy->{ + System.out.println("invitedBy = "+invitedBy); + }); + }); + + userLoader.load(2L) + .thenAccept(user->{ + System.out.println("user = "+user); + userLoader.load(user.getInvitedByID()) + .thenAccept(invitedBy->{ + System.out.println("invitedBy = "+invitedBy); + }); + }); + + userLoader.dispatchAndJoin(); ``` As stated on the original Facebook project : ->A naive application may have issued four round-trips to a backend for the required information, -but with DataLoader this application will make at most two. - -> DataLoader allows you to decouple unrelated parts of your application without sacrificing the -performance of batch data-loading. While the loader presents an API that loads individual values, all -concurrent requests will be coalesced and presented to your batch loading function. This allows your -application to safely distribute data fetching requirements throughout your application and -maintain minimal outgoing data requests. +> A naive application may have issued four round-trips to a backend for the required information, but with DataLoader this application will make at most two. + +> DataLoader allows you to decouple unrelated parts of your application without sacrificing the performance of batch data-loading. While the loader presents an API that loads individual values, all concurrent requests will be coalesced and presented to your batch loading function. This allows your application to safely distribute data fetching requirements throughout your application and maintain minimal outgoing data requests. -In the example above, the first call to dispatch will cause the batched user keys (1 and 2) to be fired at the BatchLoader function to load 2 users. - -Since each `thenAccept` callback made more calls to `userLoader` to get the "user they have invited", another 2 user keys are given at the `BatchLoader` -function for them. +In the example above, the first call to dispatch will cause the batched user keys (1 and 2) to be fired at the +BatchLoader function to load 2 users. -In this case the `userLoader.dispatchAndJoin()` is used to make a dispatch call, wait for it (aka join it), see if the data loader has more batched entries, (which is does) -and then it repeats this until the data loader internal queue of keys is empty. At this point we have made 2 batched calls instead of the naive 4 calls we might have made if -we did not "batch" the calls to load data. +Since each `thenAccept` callback made more calls to `userLoader` to get the "user they have invited", another 2 user +keys are given at the `BatchLoader` +function for them. + +In this case the `userLoader.dispatchAndJoin()` is used to make a dispatch call, wait for it (aka join it), see if the +data loader has more batched entries, (which is does) +and then it repeats this until the data loader internal queue of keys is empty. At this point we have made 2 batched +calls instead of the naive 4 calls we might have made if we did not "batch" the calls to load data. ## Batching requires batched backing APIs -You will notice in our BatchLoader example that the backing service had the ability to get a list of users given -a list of user ids in one call. - +You will notice in our BatchLoader example that the backing service had the ability to get a list of users given a list +of user ids in one call. + ```java - public CompletionStage> load(List userIds) { - return CompletableFuture.supplyAsync(() -> { - return userManager.loadUsersById(userIds); - }); - } + public CompletionStage>load(List userIds){ + return CompletableFuture.supplyAsync(()->{ + return userManager.loadUsersById(userIds); + }); + } ``` - - This is important consideration. By using `dataloader` you have batched up the requests for N keys in a list of keys that can be - retrieved at one time. - - If you don't have batched backing services, then you cant be as efficient as possible as you will have to make N calls for each key. - + +This is important consideration. By using `dataloader` you have batched up the requests for N keys in a list of keys +that can be retrieved at one time. + +If you don't have batched backing services, then you cant be as efficient as possible as you will have to make N calls +for each key. + ```java - BatchLoader lessEfficientUserBatchLoader = new BatchLoader() { - @Override - public CompletionStage> load(List userIds) { - return CompletableFuture.supplyAsync(() -> { - // - // notice how it makes N calls to load by single user id out of the batch of N keys - // - return userIds.stream() - .map(id -> userManager.loadUserById(id)) - .collect(Collectors.toList()); - }); - } + BatchLoader lessEfficientUserBatchLoader=new BatchLoader(){ +@Override +public CompletionStage>load(List userIds){ + return CompletableFuture.supplyAsync(()->{ + // + // notice how it makes N calls to load by single user id out of the batch of N keys + // + return userIds.stream() + .map(id->userManager.loadUserById(id)) + .collect(Collectors.toList()); + }); + } }; ``` - + That said, with key caching turn on (the default), it will still be more efficient using `dataloader` than without it. ### Calling the batch loader function with call context environment -Often there is a need to call the batch loader function with some sort of call context environment, such as the calling users security -credentials or the database connection parameters. +Often there is a need to call the batch loader function with some sort of call context environment, such as the calling +users security credentials or the database connection parameters. -You can do this by implementing a `org.dataloader.BatchLoaderContextProvider` and using one of -the batch loading interfaces such as `org.dataloader.BatchLoaderWithContext`. +You can do this by implementing a `org.dataloader.BatchLoaderContextProvider` and using one of the batch loading +interfaces such as `org.dataloader.BatchLoaderWithContext`. -It will be given a `org.dataloader.BatchLoaderEnvironment` parameter and it can then ask it -for the context object. +It will be given a `org.dataloader.BatchLoaderEnvironment` parameter and it can then ask it for the context object. ```java - DataLoaderOptions options = DataLoaderOptions.newOptions() - .setBatchLoaderContextProvider(() -> SecurityCtx.getCallingUserCtx()); - - BatchLoaderWithContext batchLoader = new BatchLoaderWithContext() { - @Override - public CompletionStage> load(List keys, BatchLoaderEnvironment environment) { - SecurityCtx callCtx = environment.getContext(); - return callDatabaseForResults(callCtx, keys); - } + DataLoaderOptions options=DataLoaderOptions.newOptions() + .setBatchLoaderContextProvider(()->SecurityCtx.getCallingUserCtx()); + + BatchLoaderWithContext batchLoader=new BatchLoaderWithContext(){ +@Override +public CompletionStage>load(List keys,BatchLoaderEnvironment environment){ + SecurityCtx callCtx=environment.getContext(); + return callDatabaseForResults(callCtx,keys); + } }; - DataLoader loader = DataLoaderFactory.newDataLoader(batchLoader, options); + DataLoader loader=DataLoaderFactory.newDataLoader(batchLoader,options); ``` -The batch loading code will now receive this environment object and it can be used to get context perhaps allowing it -to connect to other systems. +The batch loading code will now receive this environment object and it can be used to get context perhaps allowing it to +connect to other systems. -You can also pass in context objects per load call. This will be captured and passed to the batch loader function. +You can also pass in context objects per load call. This will be captured and passed to the batch loader function. You can gain access to them as a map by key or as the original list of context objects. ```java - DataLoaderOptions options = DataLoaderOptions.newOptions() - .setBatchLoaderContextProvider(() -> SecurityCtx.getCallingUserCtx()); - - BatchLoaderWithContext batchLoader = new BatchLoaderWithContext() { - @Override - public CompletionStage> load(List keys, BatchLoaderEnvironment environment) { - SecurityCtx callCtx = environment.getContext(); - // - // this is the load context objects in map form by key - // in this case [ keyA : contextForA, keyB : contextForB ] - // - Map keyContexts = environment.getKeyContexts(); - // - // this is load context in list form - // - // in this case [ contextForA, contextForB ] - return callDatabaseForResults(callCtx, keys); - } + DataLoaderOptions options=DataLoaderOptions.newOptions() + .setBatchLoaderContextProvider(()->SecurityCtx.getCallingUserCtx()); + + BatchLoaderWithContext batchLoader=new BatchLoaderWithContext(){ +@Override +public CompletionStage>load(List keys,BatchLoaderEnvironment environment){ + SecurityCtx callCtx=environment.getContext(); + // + // this is the load context objects in map form by key + // in this case [ keyA : contextForA, keyB : contextForB ] + // + Map keyContexts=environment.getKeyContexts(); + // + // this is load context in list form + // + // in this case [ contextForA, contextForB ] + return callDatabaseForResults(callCtx,keys); + } }; - DataLoader loader = DataLoaderFactory.newDataLoader(batchLoader, options); - loader.load("keyA", "contextForA"); - loader.load("keyB", "contextForB"); + DataLoader loader=DataLoaderFactory.newDataLoader(batchLoader,options); + loader.load("keyA","contextForA"); + loader.load("keyB","contextForB"); ``` ### Returning a Map of results from your batch loader @@ -231,102 +237,104 @@ Often there is not a 1:1 mapping of your batch loaded keys to the values returne For example, let's assume you want to load users from a database, you could probably use a query that looks like this: ```sql - SELECT * FROM User WHERE id IN (keys) + SELECT * + FROM User + WHERE id IN (keys) ``` - - Given say 10 user id keys you might only get 7 results back. This can be more naturally represented in a map - than in an ordered list of values from the batch loader function. - - You can use `org.dataloader.MappedBatchLoader` for this purpose. - - When the map is processed by the `DataLoader` code, any keys that are missing in the map - will be replaced with null values. The semantic that the number of `DataLoader.load` requests - are matched with an equal number of values is kept. - - The keys provided MUST be first class keys since they will be used to examine the returned map and - create the list of results, with nulls filling in for missing values. - + +Given say 10 user id keys you might only get 7 results back. This can be more naturally represented in a map than in an +ordered list of values from the batch loader function. + +You can use `org.dataloader.MappedBatchLoader` for this purpose. + +When the map is processed by the `DataLoader` code, any keys that are missing in the map will be replaced with null +values. The semantic that the number of `DataLoader.load` requests are matched with an equal number of values is kept. + +The keys provided MUST be first class keys since they will be used to examine the returned map and create the list of +results, with nulls filling in for missing values. + ```java - MappedBatchLoaderWithContext mapBatchLoader = new MappedBatchLoaderWithContext() { - @Override - public CompletionStage> load(Set userIds, BatchLoaderEnvironment environment) { - SecurityCtx callCtx = environment.getContext(); - return CompletableFuture.supplyAsync(() -> userManager.loadMapOfUsersByIds(callCtx, userIds)); - } + MappedBatchLoaderWithContext mapBatchLoader=new MappedBatchLoaderWithContext(){ +@Override +public CompletionStage>load(Set userIds,BatchLoaderEnvironment environment){ + SecurityCtx callCtx=environment.getContext(); + return CompletableFuture.supplyAsync(()->userManager.loadMapOfUsersByIds(callCtx,userIds)); + } }; - DataLoader userLoader = DataLoaderFactory.newMappedDataLoader(mapBatchLoader); + DataLoader userLoader=DataLoaderFactory.newMappedDataLoader(mapBatchLoader); - // ... +// ... ``` ### Error object is not a thing in a type safe Java world -In the reference JS implementation if the batch loader returns an `Error` object back from the `load()` promise is rejected -with that error. This allows fine grain (per object in the list) sets of error. If I ask for keys A,B,C and B errors out the promise -for B can contain a specific error. +In the reference JS implementation if the batch loader returns an `Error` object back from the `load()` promise is +rejected with that error. This allows fine grain (per object in the list) sets of error. If I ask for keys A,B,C and B +errors out the promise for B can contain a specific error. This is not quite as loose in a Java implementation as Java is a type safe language. -A batch loader function is defined as `BatchLoader` meaning for a key of type `K` it returns a value of type `V`. +A batch loader function is defined as `BatchLoader` meaning for a key of type `K` it returns a value of type `V`. -It cant just return some `Exception` as an object of type `V`. Type safety matters. +It cant just return some `Exception` as an object of type `V`. Type safety matters. However you can use the `Try` data type which can encapsulate a computation that succeeded or returned an exception. ```java - Try tryS = Try.tryCall(() -> { - if (rollDice()) { - return "OK"; - } else { - throw new RuntimeException("Bang"); - } + Try tryS=Try.tryCall(()->{ + if(rollDice()){ + return"OK"; + }else{ + throw new RuntimeException("Bang"); + } }); - if (tryS.isSuccess()) { - System.out.println("It work " + tryS.get()); - } else { - System.out.println("It failed with exception : " + tryS.getThrowable()); + if(tryS.isSuccess()){ + System.out.println("It work "+tryS.get()); + }else{ + System.out.println("It failed with exception : "+tryS.getThrowable()); } ``` -DataLoader supports this type and you can use this form to create a batch loader that returns a list of `Try` objects, some of which may have succeeded -and some of which may have failed. From that data loader can infer the right behavior in terms of the `load(x)` promise. +DataLoader supports this type and you can use this form to create a batch loader that returns a list of `Try` objects, +some of which may have succeeded and some of which may have failed. From that data loader can infer the right behavior +in terms of the `load(x)` promise. ```java - DataLoader dataLoader = DataLoaderFactory.newDataLoaderWithTry(new BatchLoader>() { - @Override - public CompletionStage>> load(List keys) { - return CompletableFuture.supplyAsync(() -> { - List> users = new ArrayList<>(); - for (String key : keys) { - Try userTry = loadUser(key); - users.add(userTry); - } - return users; - }); - } + DataLoader dataLoader=DataLoaderFactory.newDataLoaderWithTry(new BatchLoader>(){ +@Override +public CompletionStage>>load(List keys){ + return CompletableFuture.supplyAsync(()->{ + List>users=new ArrayList<>(); + for(String key:keys){ + Try userTry=loadUser(key); + users.add(userTry); + } + return users; + }); + } }); ``` -On the above example if one of the `Try` objects represents a failure, then its `load()` promise will complete exceptionally and you can -react to that, in a type safe manner. - - +On the above example if one of the `Try` objects represents a failure, then its `load()` promise will complete +exceptionally and you can react to that, in a type safe manner. -## Disabling caching +## Disabling caching -In certain uncommon cases, a DataLoader which does not cache may be desirable. +In certain uncommon cases, a DataLoader which does not cache may be desirable. ```java - DataLoaderFactory.newDataLoader(userBatchLoader, DataLoaderOptions.newOptions().setCachingEnabled(false)); + DataLoaderFactory.newDataLoader(userBatchLoader,DataLoaderOptions.newOptions().setCachingEnabled(false)); ``` -Calling the above will ensure that every call to `.load()` will produce a new promise, and requested keys will not be saved in memory. - -However, when the memoization cache is disabled, your batch function will receive an array of keys which may contain duplicates! Each key will -be associated with each call to `.load()`. Your batch loader should provide a value for each instance of the requested key as per the contract +Calling the above will ensure that every call to `.load()` will produce a new promise, and requested keys will not be +saved in memory. + +However, when the memoization cache is disabled, your batch function will receive an array of keys which may contain +duplicates! Each key will be associated with each call to `.load()`. Your batch loader should provide a value for each +instance of the requested key as per the contract ```java userDataLoader.load("A"); @@ -335,49 +343,47 @@ be associated with each call to `.load()`. Your batch loader should provide a va userDataLoader.dispatch(); - // will result in keys to the batch loader with [ "A", "B", "A" ] +// will result in keys to the batch loader with [ "A", "B", "A" ] ``` - -More complex cache behavior can be achieved by calling `.clear()` or `.clearAll()` rather than disabling the cache completely. - +More complex cache behavior can be achieved by calling `.clear()` or `.clearAll()` rather than disabling the cache +completely. ## Caching errors - -If a batch load fails (that is, a batch function returns a rejected CompletionStage), then the requested values will not be cached. -However if a batch function returns a `Try` or `Throwable` instance for an individual value, then that will be cached to avoid frequently loading -the same problem object. - -In some circumstances you may wish to clear the cache for these individual problems: + +If a batch load fails (that is, a batch function returns a rejected CompletionStage), then the requested values will not +be cached. However if a batch function returns a `Try` or `Throwable` instance for an individual value, then that will +be cached to avoid frequently loading the same problem object. + +In some circumstances you may wish to clear the cache for these individual problems: ```java - userDataLoader.load("r2d2").whenComplete((user, throwable) -> { - if (throwable != null) { - userDataLoader.clear("r2dr"); - throwable.printStackTrace(); - } else { - processUser(user); - } + userDataLoader.load("r2d2").whenComplete((user,throwable)->{ + if(throwable!=null){ + userDataLoader.clear("r2dr"); + throwable.printStackTrace(); + }else{ + processUser(user); + } }); ``` - ## Statistics on what is happening -`DataLoader` keeps statistics on what is happening. It can tell you the number of objects asked for, the cache hit number, the number of objects -asked for via batching and so on. - -Knowing what the behaviour of your data is important for you to understand how efficient you are in serving the data via this pattern. +`DataLoader` keeps statistics on what is happening. It can tell you the number of objects asked for, the cache hit +number, the number of objects asked for via batching and so on. +Knowing what the behaviour of your data is important for you to understand how efficient you are in serving the data via +this pattern. ```java - Statistics statistics = userDataLoader.getStatistics(); - - System.out.println(format("load : %d", statistics.getLoadCount())); - System.out.println(format("batch load: %d", statistics.getBatchLoadCount())); - System.out.println(format("cache hit: %d", statistics.getCacheHitCount())); - System.out.println(format("cache hit ratio: %d", statistics.getCacheHitRatio())); + Statistics statistics=userDataLoader.getStatistics(); + + System.out.println(format("load : %d",statistics.getLoadCount())); + System.out.println(format("batch load: %d",statistics.getBatchLoadCount())); + System.out.println(format("cache hit: %d",statistics.getCacheHitCount())); + System.out.println(format("cache hit ratio: %d",statistics.getCacheHitRatio())); ``` @@ -386,70 +392,102 @@ Knowing what the behaviour of your data is important for you to understand how e You can configure the statistics collector used when you build the data loader ```java - DataLoaderOptions options = DataLoaderOptions.newOptions().setStatisticsCollector(() -> new ThreadLocalStatisticsCollector()); - DataLoader userDataLoader = DataLoaderFactory.newDataLoader(userBatchLoader,options); + DataLoaderOptions options=DataLoaderOptions.newOptions().setStatisticsCollector(()->new ThreadLocalStatisticsCollector()); + DataLoader userDataLoader=DataLoaderFactory.newDataLoader(userBatchLoader,options); ``` -Which collector you use is up to you. It ships with the following: `SimpleStatisticsCollector`, `ThreadLocalStatisticsCollector`, `DelegatingStatisticsCollector` +Which collector you use is up to you. It ships with the following: `SimpleStatisticsCollector` +, `ThreadLocalStatisticsCollector`, `DelegatingStatisticsCollector` and `NoOpStatisticsCollector`. ## The scope of a data loader is important -If you are serving web requests then the data can be specific to the user requesting it. If you have user specific data +If you are serving web requests then the data can be specific to the user requesting it. If you have user specific data then you will not want to cache data meant for user A to then later give it user B in a subsequent request. -The scope of your `DataLoader` instances is important. You will want to create them per web request to ensure data is only cached within that -web request and no more. +The scope of your `DataLoader` instances is important. You will want to create them per web request to ensure data is +only cached within that web request and no more. -If your data can be shared across web requests then use a custom cache to keep values in a common place. +If your data can be shared across web requests then use a custom cache to keep values in a common place. -Data loaders are stateful components that contain promises (with context) that are likely share the same affinity as the request. +Data loaders are stateful components that contain promises (with context) that are likely share the same affinity as the +request. ## Custom caches -The default cache behind `DataLoader` is an in memory `HashMap`. There is no expiry on this, and it lives for as long as the data loader -lives. - -However, you can create your own custom cache and supply it to the data loader on construction via the `org.dataloader.CacheMap` interface. +The default cache behind `DataLoader` is an in memory `HashMap`. There is no expiry on this, and it lives for as long as +the data loader lives. + +However, you can create your own custom cache and supply it to the data loader on construction via +the `org.dataloader.CacheMap` interface. ```java - MyCustomCache customCache = new MyCustomCache(); - DataLoaderOptions options = DataLoaderOptions.newOptions().setCacheMap(customCache); - DataLoaderFactory.newDataLoader(userBatchLoader, options); + MyCustomCache customCache=new MyCustomCache(); + DataLoaderOptions options=DataLoaderOptions.newOptions().setCacheMap(customCache); + DataLoaderFactory.newDataLoader(userBatchLoader,options); ``` -You could choose to use one of the fancy cache implementations from Guava or Kaffeine and wrap it in a `CacheMap` wrapper ready -for data loader. They can do fancy things like time eviction and efficient LRU caching. +You could choose to use one of the fancy cache implementations from Guava or Kaffeine and wrap it in a `CacheMap` +wrapper ready for data loader. They can do fancy things like time eviction and efficient LRU caching. ## Manual dispatching -The original [Facebook DataLoader](https://github.com/facebook/dataloader) was written in Javascript for NodeJS. NodeJS is single-threaded in nature, but simulates -asynchronous logic by invoking functions on separate threads in an event loop, as explained +The original [Facebook DataLoader](https://github.com/facebook/dataloader) was written in Javascript for NodeJS. NodeJS +is single-threaded in nature, but simulates asynchronous logic by invoking functions on separate threads in an event +loop, as explained [in this post](http://stackoverflow.com/a/19823583/3455094) on StackOverflow. NodeJS generates so-call 'ticks' in which queued functions are dispatched for execution, and Facebook `DataLoader` uses -the `nextTick()` function in NodeJS to _automatically_ dequeue load requests and send them to the batch execution function -for processing. +the `nextTick()` function in NodeJS to _automatically_ dequeue load requests and send them to the batch execution +function for processing. And here there is an **IMPORTANT DIFFERENCE** compared to how `java-dataloader` operates!! In NodeJS the batch preparation will not affect the asynchronous processing behaviour in any way. It will just prepare batches in 'spare time' as it were. -This is different in Java as you will actually _delay_ the execution of your load requests, until the moment where you make a -call to `dataLoader.dispatch()`. +This is different in Java as you will actually _delay_ the execution of your load requests, until the moment where you +make a call to `dataLoader.dispatch()`. Does this make Java `DataLoader` any less useful than the reference implementation? We would argue this is not the case, and there are also gains to this different mode of operation: - In contrast to the NodeJS implementation _you_ as developer are in full control of when batches are dispatched - You can attach any logic that determines when a dispatch takes place -- You still retain all other features, full caching support and batching (e.g. to optimize message bus traffic, GraphQL query execution time, etc.) +- You still retain all other features, full caching support and batching (e.g. to optimize message bus traffic, GraphQL + query execution time, etc.) + +However, with batch execution control comes responsibility! If you forget to make the call to `dispatch()` then the +futures in the load request queue will never be batched, and thus _will never complete_! So be careful when crafting +your loader designs. + +## Scheduled Dispatching + +`ScheduledDataLoaderRegistry` is a registry that allows for dispatching to be done on a schedule. It contains a +predicate that is evaluated (per data loader contained within) when `dispatchAll` is invoked. + +If that predicate is true, it will make a `dispatch` call on the data loader, otherwise is will schedule as task to +perform that check again. Once a predicate evaluated to true, it will not reschedule and another call to +`dispatchAll` is required to be made. + +This allows you to do things like "dispatch ONLY if the queue depth is > 10 deep or more than 200 millis have passed +since it was last dispatched". + +```java + +DispatchPredicate depthOrTimePredicate=DispatchPredicate.dispatchIfDepthGreaterThan(10) + .or(DispatchPredicate.dispatchIfLongerThan(Duration.ofMillis(200))); -However, with batch execution control comes responsibility! If you forget to make the call to `dispatch()` then the futures -in the load request queue will never be batched, and thus _will never complete_! So be careful when crafting your loader designs. + ScheduledDataLoaderRegistry registry=ScheduledDataLoaderRegistry.newScheduledRegistry() + .dispatchPredicate(depthOrTimePredicate) + .schedule(Duration.ofMillis(10)) + .register("users",userDataLoader) + .build(); +``` +The above acts as a kind of minimum batch depth, with a time overload. It won't dispatch if the loader depth is less +than or equal to 10 but if 200ms pass it will dispatch. ## Let's get started! @@ -475,7 +513,6 @@ To build from source use the Gradle wrapper: ./gradlew clean build ``` - ## Other information sources - [Facebook DataLoader Github repo](https://github.com/facebook/dataloader) @@ -487,32 +524,30 @@ To build from source use the Gradle wrapper: All your feedback and help to improve this project is very welcome. Please create issues for your bugs, ideas and enhancement requests, or better yet, contribute directly by creating a PR. -When reporting an issue, please add a detailed instruction, and if possible a code snippet or test that can be used -as a reproducer of your problem. +When reporting an issue, please add a detailed instruction, and if possible a code snippet or test that can be used as a +reproducer of your problem. -When creating a pull request, please adhere to the current coding style where possible, and create tests with your -code so it keeps providing an excellent test coverage level. PR's without tests may not be accepted unless they only -deal with minor changes. +When creating a pull request, please adhere to the current coding style where possible, and create tests with your code +so it keeps providing an excellent test coverage level. PR's without tests may not be accepted unless they only deal +with minor changes. ## Acknowledgements -This library was originally written for use within a [VertX world](http://vertx.io/) and it used the vertx-core `Future` classes to implement -itself. All the heavy lifting has been done by this project : [vertx-dataloader](https://github.com/engagingspaces/vertx-dataloader) +This library was originally written for use within a [VertX world](http://vertx.io/) and it used the vertx-core `Future` +classes to implement itself. All the heavy lifting has been done by this +project : [vertx-dataloader](https://github.com/engagingspaces/vertx-dataloader) including the extensive testing (which itself came from Facebook). -This particular port was done to reduce the dependency on Vertx and to write a pure Java 8 implementation with no dependencies and also -to use the more normative Java CompletableFuture. - -[vertx-core](http://vertx.io/docs/vertx-core/java/) is not a lightweight library by any means so having a pure Java 8 implementation is -very desirable. +This particular port was done to reduce the dependency on Vertx and to write a pure Java 8 implementation with no +dependencies and also to use the more normative Java CompletableFuture. +[vertx-core](http://vertx.io/docs/vertx-core/java/) is not a lightweight library by any means so having a pure Java 8 +implementation is very desirable. This library is entirely inspired by the great works of [Lee Byron](https://github.com/leebyron) and -[Nicholas Schrock](https://github.com/schrockn) from [Facebook](https://www.facebook.com/) whom we would like to thank, and -especially @leebyron for taking the time and effort to provide 100% coverage on the codebase. The original set of tests -were also ported. - - +[Nicholas Schrock](https://github.com/schrockn) from [Facebook](https://www.facebook.com/) whom we would like to thank, +and especially @leebyron for taking the time and effort to provide 100% coverage on the codebase. The original set of +tests were also ported. ## Licensing diff --git a/src/main/java/org/dataloader/DataLoaderRegistry.java b/src/main/java/org/dataloader/DataLoaderRegistry.java index 6f8a695..9b19c29 100644 --- a/src/main/java/org/dataloader/DataLoaderRegistry.java +++ b/src/main/java/org/dataloader/DataLoaderRegistry.java @@ -6,6 +6,7 @@ import java.util.ArrayList; import java.util.HashMap; import java.util.HashSet; +import java.util.LinkedHashMap; import java.util.List; import java.util.Map; import java.util.Set; @@ -19,7 +20,7 @@ */ @PublicApi public class DataLoaderRegistry { - private final Map> dataLoaders = new ConcurrentHashMap<>(); + protected final Map> dataLoaders = new ConcurrentHashMap<>(); public DataLoaderRegistry() { } @@ -28,6 +29,7 @@ private DataLoaderRegistry(Builder builder) { this.dataLoaders.putAll(builder.dataLoaders); } + /** * This will register a new dataloader * @@ -84,6 +86,13 @@ public DataLoaderRegistry combine(DataLoaderRegistry registry) { return new ArrayList<>(dataLoaders.values()); } + /** + * @return the currently registered data loaders as a map + */ + public Map> getDataLoadersMap() { + return new LinkedHashMap<>(dataLoaders); + } + /** * This will unregister a new dataloader * diff --git a/src/main/java/org/dataloader/registries/DispatchPredicate.java b/src/main/java/org/dataloader/registries/DispatchPredicate.java new file mode 100644 index 0000000..d782557 --- /dev/null +++ b/src/main/java/org/dataloader/registries/DispatchPredicate.java @@ -0,0 +1,92 @@ +package org.dataloader.registries; + +import org.dataloader.DataLoader; + +import java.time.Duration; +import java.util.Objects; + +/** + * A predicate class used by {@link ScheduledDataLoaderRegistry} to decide whether to dispatch or not + */ +@FunctionalInterface +public interface DispatchPredicate { + /** + * This predicate tests whether the data loader should be dispatched or not. + * + * @param dataLoaderKey the key of the data loader when registered + * @param dataLoader the dataloader to dispatch + * + * @return true if the data loader SHOULD be dispatched + */ + boolean test(String dataLoaderKey, DataLoader dataLoader); + + + /** + * Returns a composed predicate that represents a short-circuiting logical + * AND of this predicate and another. + * + * @param other a predicate that will be logically-ANDed with this + * predicate + * + * @return a composed predicate that represents the short-circuiting logical + * AND of this predicate and the {@code other} predicate + */ + default DispatchPredicate and(DispatchPredicate other) { + Objects.requireNonNull(other); + return (k, dl) -> test(k, dl) && other.test(k, dl); + } + + /** + * Returns a predicate that represents the logical negation of this + * predicate. + * + * @return a predicate that represents the logical negation of this + * predicate + */ + default DispatchPredicate negate() { + return (k, dl) -> !test(k, dl); + } + + /** + * Returns a composed predicate that represents a short-circuiting logical + * OR of this predicate and another. + * + * @param other a predicate that will be logically-ORed with this + * predicate + * + * @return a composed predicate that represents the short-circuiting logical + * OR of this predicate and the {@code other} predicate + */ + default DispatchPredicate or(DispatchPredicate other) { + Objects.requireNonNull(other); + return (k, dl) -> test(k, dl) || other.test(k, dl); + } + + /** + * This predicate will return true if the {@link DataLoader} has not be dispatched + * for at least the duration length of time. + * + * @param duration the length of time to check + * + * @return true if the data loader has not been dispatched in duration time + */ + static DispatchPredicate dispatchIfLongerThan(Duration duration) { + return (dataLoaderKey, dataLoader) -> { + int i = dataLoader.getTimeSinceDispatch().compareTo(duration); + return i > 0; + }; + } + + /** + * This predicate will return true if the {@link DataLoader#dispatchDepth()} is greater than the specified depth. + * + * This will act as minimum batch size. There must be more then `depth` items queued for the predicate to return true. + * + * @param depth the value to be greater than + * + * @return true if the {@link DataLoader#dispatchDepth()} is greater than the specified depth. + */ + static DispatchPredicate dispatchIfDepthGreaterThan(int depth) { + return (dataLoaderKey, dataLoader) -> dataLoader.dispatchDepth() > depth; + } +} diff --git a/src/main/java/org/dataloader/registries/ScheduledDataLoaderRegistry.java b/src/main/java/org/dataloader/registries/ScheduledDataLoaderRegistry.java new file mode 100644 index 0000000..013f06a --- /dev/null +++ b/src/main/java/org/dataloader/registries/ScheduledDataLoaderRegistry.java @@ -0,0 +1,172 @@ +package org.dataloader.registries; + +import org.dataloader.DataLoader; +import org.dataloader.DataLoaderRegistry; +import org.dataloader.annotations.PublicApi; + +import java.time.Duration; +import java.util.HashMap; +import java.util.Map; +import java.util.concurrent.Executors; +import java.util.concurrent.ScheduledExecutorService; +import java.util.concurrent.TimeUnit; + +import static org.dataloader.impl.Assertions.nonNull; + +/** + * This {@link DataLoaderRegistry} will use a {@link DispatchPredicate} when {@link #dispatchAll()} is called + * to test (for each {@link DataLoader} in the registry) if a dispatch should proceed. If the predicate returns false, then a task is scheduled + * to perform that predicate dispatch again via the {@link ScheduledExecutorService}. + *

+ * This will continue to loop (test false and reschedule) until such time as the predicate returns true, in which case + * no rescheduling will occur and you will need to call dispatch again to restart the process. + *

+ * If you wanted to create a ScheduledDataLoaderRegistry that started a rescheduling immediately, just create one and + * call {@link #rescheduleNow()}. + */ +@PublicApi +public class ScheduledDataLoaderRegistry extends DataLoaderRegistry { + + private final ScheduledExecutorService scheduledExecutorService; + private final DispatchPredicate dispatchPredicate; + private final Duration schedule; + + private ScheduledDataLoaderRegistry(Builder builder) { + this.dataLoaders.putAll(builder.dataLoaders); + this.scheduledExecutorService = builder.scheduledExecutorService; + this.dispatchPredicate = builder.dispatchPredicate; + this.schedule = builder.schedule; + } + + /** + * @return how long the {@link ScheduledExecutorService} task will wait before checking the predicate again + */ + public Duration getScheduleDuration() { + return schedule; + } + + @Override + public void dispatchAll() { + dispatchAllWithCount(); + } + + @Override + public int dispatchAllWithCount() { + int sum = 0; + for (Map.Entry> entry : dataLoaders.entrySet()) { + DataLoader dataLoader = entry.getValue(); + String key = entry.getKey(); + if (dispatchPredicate.test(key, dataLoader)) { + sum += dataLoader.dispatchWithCounts().getKeysCount(); + } else { + reschedule(key, dataLoader); + } + } + return sum; + } + + /** + * This will immediately dispatch the {@link DataLoader}s in the registry + * without testing the predicate + */ + public void dispatchAllImmediately() { + super.dispatchAll(); + } + + /** + * This will immediately dispatch the {@link DataLoader}s in the registry + * without testing the predicate + * + * @return total number of entries that were dispatched from registered {@link org.dataloader.DataLoader}s. + */ + public int dispatchAllWithCountImmediately() { + return super.dispatchAllWithCount(); + } + + /** + * This will schedule a task to check the predicate and dispatch if true right now. It will not do + * a pre check of the preodicate like {@link #dispatchAll()} would + */ + public void rescheduleNow() { + dataLoaders.forEach(this::reschedule); + } + + private void reschedule(String key, DataLoader dataLoader) { + Runnable runThis = () -> dispatchOrReschedule(key, dataLoader); + scheduledExecutorService.schedule(runThis, schedule.toMillis(), TimeUnit.MILLISECONDS); + } + + private void dispatchOrReschedule(String key, DataLoader dataLoader) { + if (dispatchPredicate.test(key, dataLoader)) { + dataLoader.dispatch(); + } else { + reschedule(key, dataLoader); + } + } + + /** + * By default this will create use a {@link Executors#newSingleThreadScheduledExecutor()} + * and a schedule duration of 10 milli seconds. + * + * @return A builder of {@link ScheduledDataLoaderRegistry}s + */ + public static Builder newScheduledRegistry() { + return new Builder(); + } + + public static class Builder { + + private ScheduledExecutorService scheduledExecutorService = Executors.newSingleThreadScheduledExecutor(); + private DispatchPredicate dispatchPredicate = (key, dl) -> true; + private Duration schedule = Duration.ofMillis(10); + private final Map> dataLoaders = new HashMap<>(); + + public Builder scheduledExecutorService(ScheduledExecutorService executorService) { + this.scheduledExecutorService = nonNull(executorService); + return this; + } + + public Builder schedule(Duration schedule) { + this.schedule = schedule; + return this; + } + + public Builder dispatchPredicate(DispatchPredicate dispatchPredicate) { + this.dispatchPredicate = nonNull(dispatchPredicate); + return this; + } + + /** + * This will register a new dataloader + * + * @param key the key to put the data loader under + * @param dataLoader the data loader to register + * + * @return this builder for a fluent pattern + */ + public Builder register(String key, DataLoader dataLoader) { + dataLoaders.put(key, dataLoader); + return this; + } + + /** + * This will combine together the data loaders in this builder with the ones + * from a previous {@link DataLoaderRegistry} + * + * @param otherRegistry the previous {@link DataLoaderRegistry} + * + * @return this builder for a fluent pattern + */ + public Builder registerAll(DataLoaderRegistry otherRegistry) { + dataLoaders.putAll(otherRegistry.getDataLoadersMap()); + return this; + } + + /** + * @return the newly built {@link ScheduledDataLoaderRegistry} + */ + public ScheduledDataLoaderRegistry build() { + return new ScheduledDataLoaderRegistry(this); + } + } +} diff --git a/src/test/java/ReadmeExamples.java b/src/test/java/ReadmeExamples.java index ccdd555..84fcfbb 100644 --- a/src/test/java/ReadmeExamples.java +++ b/src/test/java/ReadmeExamples.java @@ -10,9 +10,12 @@ import org.dataloader.fixtures.SecurityCtx; import org.dataloader.fixtures.User; import org.dataloader.fixtures.UserManager; +import org.dataloader.registries.DispatchPredicate; +import org.dataloader.registries.ScheduledDataLoaderRegistry; import org.dataloader.stats.Statistics; import org.dataloader.stats.ThreadLocalStatisticsCollector; +import java.time.Duration; import java.util.ArrayList; import java.util.List; import java.util.Map; @@ -269,4 +272,14 @@ private void statsConfigExample() { DataLoader userDataLoader = DataLoaderFactory.newDataLoader(userBatchLoader, options); } + private void ScheduledDispatche() { + DispatchPredicate depthOrTimePredicate = DispatchPredicate.dispatchIfDepthGreaterThan(10) + .or(DispatchPredicate.dispatchIfLongerThan(Duration.ofMillis(200))); + + ScheduledDataLoaderRegistry registry = ScheduledDataLoaderRegistry.newScheduledRegistry() + .dispatchPredicate(depthOrTimePredicate) + .schedule(Duration.ofMillis(10)) + .register("users", userDataLoader) + .build(); + } } diff --git a/src/test/java/org/dataloader/ClockDataLoader.java b/src/test/java/org/dataloader/ClockDataLoader.java index 4b16e78..21faeea 100644 --- a/src/test/java/org/dataloader/ClockDataLoader.java +++ b/src/test/java/org/dataloader/ClockDataLoader.java @@ -4,11 +4,11 @@ public class ClockDataLoader extends DataLoader { - ClockDataLoader(Object batchLoadFunction, Clock clock) { + public ClockDataLoader(Object batchLoadFunction, Clock clock) { this(batchLoadFunction, null, clock); } - ClockDataLoader(Object batchLoadFunction, DataLoaderOptions options, Clock clock) { + public ClockDataLoader(Object batchLoadFunction, DataLoaderOptions options, Clock clock) { super(batchLoadFunction, options, clock); } diff --git a/src/test/java/org/dataloader/fixtures/TestKit.java b/src/test/java/org/dataloader/fixtures/TestKit.java index 1242114..2ea23a8 100644 --- a/src/test/java/org/dataloader/fixtures/TestKit.java +++ b/src/test/java/org/dataloader/fixtures/TestKit.java @@ -31,6 +31,10 @@ public static BatchLoader keysAsValues(List> loadCalls) { }; } + public static DataLoader idLoader() { + return idLoader(null, new ArrayList<>()); + } + public static DataLoader idLoader(List> loadCalls) { return idLoader(null, loadCalls); } diff --git a/src/test/java/org/dataloader/registries/DispatchPredicateTest.java b/src/test/java/org/dataloader/registries/DispatchPredicateTest.java new file mode 100644 index 0000000..f241c2f --- /dev/null +++ b/src/test/java/org/dataloader/registries/DispatchPredicateTest.java @@ -0,0 +1,105 @@ +package org.dataloader.registries; + +import org.dataloader.ClockDataLoader; +import org.dataloader.DataLoader; +import org.dataloader.fixtures.TestKit; +import org.dataloader.fixtures.TestingClock; +import org.junit.Test; + +import java.time.Duration; + +import static org.junit.Assert.assertFalse; +import static org.junit.Assert.assertTrue; + +public class DispatchPredicateTest { + + @Test + public void default_logical_method() { + + String key = "k"; + DataLoader testDL = TestKit.idLoader(); + + DispatchPredicate alwaysTrue = (k, dl) -> true; + DispatchPredicate alwaysFalse = (k, dl) -> false; + + assertFalse(alwaysFalse.and(alwaysFalse).test(key, testDL)); + assertFalse(alwaysFalse.and(alwaysTrue).test(key, testDL)); + assertFalse(alwaysTrue.and(alwaysFalse).test(key, testDL)); + assertTrue(alwaysTrue.and(alwaysTrue).test(key, testDL)); + + assertTrue(alwaysFalse.negate().test(key, testDL)); + assertFalse(alwaysTrue.negate().test(key, testDL)); + + assertTrue(alwaysTrue.or(alwaysFalse).test(key, testDL)); + assertTrue(alwaysFalse.or(alwaysTrue).test(key, testDL)); + assertFalse(alwaysFalse.or(alwaysFalse).test(key, testDL)); + } + + @Test + public void dispatchIfLongerThan_test() { + TestingClock clock = new TestingClock(); + ClockDataLoader dlA = new ClockDataLoader<>(TestKit.keysAsValues(), clock); + + Duration ms200 = Duration.ofMillis(200); + DispatchPredicate dispatchPredicate = DispatchPredicate.dispatchIfLongerThan(ms200); + + assertFalse(dispatchPredicate.test("k", dlA)); + + clock.jump(199); + assertFalse(dispatchPredicate.test("k", dlA)); + + clock.jump(100); + assertTrue(dispatchPredicate.test("k", dlA)); + } + + @Test + public void dispatchIfDepthGreaterThan_test() { + DataLoader dlA = TestKit.idLoader(); + + DispatchPredicate dispatchPredicate = DispatchPredicate.dispatchIfDepthGreaterThan(4); + assertFalse(dispatchPredicate.test("k", dlA)); + + dlA.load("1"); + dlA.load("2"); + dlA.load("3"); + dlA.load("4"); + + assertFalse(dispatchPredicate.test("k", dlA)); + + + dlA.load("5"); + assertTrue(dispatchPredicate.test("k", dlA)); + + } + + @Test + public void combined_some_things() { + + TestingClock clock = new TestingClock(); + ClockDataLoader dlA = new ClockDataLoader<>(TestKit.keysAsValues(), clock); + + Duration ms200 = Duration.ofMillis(200); + + DispatchPredicate dispatchIfLongerThan = DispatchPredicate.dispatchIfLongerThan(ms200); + DispatchPredicate dispatchIfDepthGreaterThan = DispatchPredicate.dispatchIfDepthGreaterThan(4); + DispatchPredicate combinedPredicate = dispatchIfLongerThan.and(dispatchIfDepthGreaterThan); + + assertFalse(combinedPredicate.test("k", dlA)); + + clock.jump(500); // that's enough time for one condition + + assertFalse(combinedPredicate.test("k", dlA)); + + dlA.load("1"); + dlA.load("2"); + dlA.load("3"); + dlA.load("4"); + + assertFalse(combinedPredicate.test("k", dlA)); + + + dlA.load("5"); + assertTrue(combinedPredicate.test("k", dlA)); + + } +} \ No newline at end of file diff --git a/src/test/java/org/dataloader/registries/ScheduledDataLoaderRegistryTest.java b/src/test/java/org/dataloader/registries/ScheduledDataLoaderRegistryTest.java new file mode 100644 index 0000000..0572b66 --- /dev/null +++ b/src/test/java/org/dataloader/registries/ScheduledDataLoaderRegistryTest.java @@ -0,0 +1,218 @@ +package org.dataloader.registries; + +import junit.framework.TestCase; +import org.dataloader.DataLoader; +import org.dataloader.DataLoaderFactory; +import org.dataloader.DataLoaderRegistry; +import org.dataloader.fixtures.TestKit; + +import java.time.Duration; +import java.util.ArrayList; +import java.util.List; +import java.util.concurrent.CompletableFuture; +import java.util.concurrent.Executors; +import java.util.concurrent.atomic.AtomicInteger; + +import static java.util.Arrays.asList; +import static java.util.Collections.singletonList; +import static org.dataloader.fixtures.TestKit.keysAsValues; +import static org.dataloader.fixtures.TestKit.snooze; +import static org.hamcrest.Matchers.equalTo; +import static org.junit.Assert.assertThat; + +public class ScheduledDataLoaderRegistryTest extends TestCase { + + DispatchPredicate alwaysDispatch = (key, dl) -> true; + DispatchPredicate neverDispatch = (key, dl) -> false; + + + public void test_basic_setup_works_like_a_normal_dlr() { + + List> aCalls = new ArrayList<>(); + List> bCalls = new ArrayList<>(); + + DataLoader dlA = TestKit.idLoader(aCalls); + dlA.load("AK1"); + dlA.load("AK2"); + + DataLoader dlB = TestKit.idLoader(bCalls); + dlB.load("BK1"); + dlB.load("BK2"); + + DataLoaderRegistry otherDLR = DataLoaderRegistry.newRegistry().register("b", dlB).build(); + + ScheduledDataLoaderRegistry registry = ScheduledDataLoaderRegistry.newScheduledRegistry() + .register("a", dlA) + .registerAll(otherDLR) + .dispatchPredicate(alwaysDispatch) + .scheduledExecutorService(Executors.newSingleThreadScheduledExecutor()) + .schedule(Duration.ofMillis(100)) + .build(); + + assertThat(registry.getScheduleDuration(), equalTo(Duration.ofMillis(100))); + + int count = registry.dispatchAllWithCount(); + assertThat(count, equalTo(4)); + assertThat(aCalls, equalTo(singletonList(asList("AK1", "AK2")))); + assertThat(bCalls, equalTo(singletonList(asList("BK1", "BK2")))); + } + + public void test_predicate_always_false() { + + List> calls = new ArrayList<>(); + DataLoader dlA = DataLoaderFactory.newDataLoader(keysAsValues(calls)); + dlA.load("K1"); + dlA.load("K2"); + + ScheduledDataLoaderRegistry registry = ScheduledDataLoaderRegistry.newScheduledRegistry() + .register("a", dlA) + .dispatchPredicate(neverDispatch) + .schedule(Duration.ofMillis(10)) + .build(); + + int count = registry.dispatchAllWithCount(); + assertThat(count, equalTo(0)); + assertThat(calls.size(), equalTo(0)); + + snooze(200); + + count = registry.dispatchAllWithCount(); + assertThat(count, equalTo(0)); + assertThat(calls.size(), equalTo(0)); + + snooze(200); + count = registry.dispatchAllWithCount(); + assertThat(count, equalTo(0)); + assertThat(calls.size(), equalTo(0)); + } + + public void test_predicate_that_eventually_returns_true() { + + + AtomicInteger counter = new AtomicInteger(); + DispatchPredicate neverDispatch = (key, dl) -> counter.incrementAndGet() > 5; + + List> calls = new ArrayList<>(); + DataLoader dlA = DataLoaderFactory.newDataLoader(keysAsValues(calls)); + CompletableFuture p1 = dlA.load("K1"); + CompletableFuture p2 = dlA.load("K2"); + + ScheduledDataLoaderRegistry registry = ScheduledDataLoaderRegistry.newScheduledRegistry() + .register("a", dlA) + .dispatchPredicate(neverDispatch) + .schedule(Duration.ofMillis(10)) + .build(); + + + int count = registry.dispatchAllWithCount(); + assertThat(count, equalTo(0)); + assertThat(calls.size(), equalTo(0)); + assertFalse(p1.isDone()); + assertFalse(p2.isDone()); + + snooze(200); + + registry.dispatchAll(); + assertTrue(p1.isDone()); + assertTrue(p2.isDone()); + } + + public void test_dispatchAllWithCountImmediately() { + List> calls = new ArrayList<>(); + DataLoader dlA = DataLoaderFactory.newDataLoader(keysAsValues(calls)); + dlA.load("K1"); + dlA.load("K2"); + + ScheduledDataLoaderRegistry registry = ScheduledDataLoaderRegistry.newScheduledRegistry() + .register("a", dlA) + .dispatchPredicate(neverDispatch) + .schedule(Duration.ofMillis(10)) + .build(); + + int count = registry.dispatchAllWithCountImmediately(); + assertThat(count, equalTo(2)); + assertThat(calls, equalTo(singletonList(asList("K1", "K2")))); + } + + public void test_dispatchAllImmediately() { + List> calls = new ArrayList<>(); + DataLoader dlA = DataLoaderFactory.newDataLoader(keysAsValues(calls)); + dlA.load("K1"); + dlA.load("K2"); + + ScheduledDataLoaderRegistry registry = ScheduledDataLoaderRegistry.newScheduledRegistry() + .register("a", dlA) + .dispatchPredicate(neverDispatch) + .schedule(Duration.ofMillis(10)) + .build(); + + registry.dispatchAllImmediately(); + assertThat(calls, equalTo(singletonList(asList("K1", "K2")))); + } + + public void test_rescheduleNow() { + AtomicInteger i = new AtomicInteger(); + DispatchPredicate countingPredicate = (dataLoaderKey, dataLoader) -> i.incrementAndGet() > 5; + + List> calls = new ArrayList<>(); + DataLoader dlA = DataLoaderFactory.newDataLoader(keysAsValues(calls)); + dlA.load("K1"); + dlA.load("K2"); + + ScheduledDataLoaderRegistry registry = ScheduledDataLoaderRegistry.newScheduledRegistry() + .register("a", dlA) + .dispatchPredicate(countingPredicate) + .schedule(Duration.ofMillis(100)) + .build(); + + // we never called dispatch per say - we started the scheduling direct + registry.rescheduleNow(); + assertTrue(calls.isEmpty()); + + snooze(2000); + assertThat(calls, equalTo(singletonList(asList("K1", "K2")))); + } + + public void test_it_will_take_out_the_schedule_once_it_dispatches() { + AtomicInteger counter = new AtomicInteger(); + DispatchPredicate countingPredicate = (dataLoaderKey, dataLoader) -> counter.incrementAndGet() > 5; + + List> calls = new ArrayList<>(); + DataLoader dlA = DataLoaderFactory.newDataLoader(keysAsValues(calls)); + dlA.load("K1"); + dlA.load("K2"); + + ScheduledDataLoaderRegistry registry = ScheduledDataLoaderRegistry.newScheduledRegistry() + .register("a", dlA) + .dispatchPredicate(countingPredicate) + .schedule(Duration.ofMillis(100)) + .build(); + + registry.dispatchAll(); + // we have 5 * 100 mills to reach this line + assertTrue(calls.isEmpty()); + + snooze(2000); + assertThat(calls, equalTo(singletonList(asList("K1", "K2")))); + + // reset our counter state + counter.set(0); + + dlA.load("K3"); + dlA.load("K4"); + + // no one has called dispatch - there is no rescheduling + snooze(2000); + assertThat(calls, equalTo(singletonList(asList("K1", "K2")))); + + registry.dispatchAll(); + // we have 5 * 100 mills to reach this line + assertThat(calls, equalTo(singletonList(asList("K1", "K2")))); + + snooze(2000); + + assertThat(calls, equalTo(asList(asList("K1", "K2"), asList("K3", "K4")))); + + + } +} \ No newline at end of file From c969e3cf0191117d700574e6855a4d4febe076f6 Mon Sep 17 00:00:00 2001 From: Brad Baker Date: Sun, 27 Jun 2021 18:04:50 +1000 Subject: [PATCH 2/6] Added close support --- .../ScheduledDataLoaderRegistry.java | 18 ++++++-- .../ScheduledDataLoaderRegistryTest.java | 42 +++++++++++++++++++ 2 files changed, 57 insertions(+), 3 deletions(-) diff --git a/src/main/java/org/dataloader/registries/ScheduledDataLoaderRegistry.java b/src/main/java/org/dataloader/registries/ScheduledDataLoaderRegistry.java index 013f06a..78a2f17 100644 --- a/src/main/java/org/dataloader/registries/ScheduledDataLoaderRegistry.java +++ b/src/main/java/org/dataloader/registries/ScheduledDataLoaderRegistry.java @@ -25,17 +25,27 @@ * call {@link #rescheduleNow()}. */ @PublicApi -public class ScheduledDataLoaderRegistry extends DataLoaderRegistry { +public class ScheduledDataLoaderRegistry extends DataLoaderRegistry implements AutoCloseable { private final ScheduledExecutorService scheduledExecutorService; private final DispatchPredicate dispatchPredicate; private final Duration schedule; + private volatile boolean closed; private ScheduledDataLoaderRegistry(Builder builder) { this.dataLoaders.putAll(builder.dataLoaders); this.scheduledExecutorService = builder.scheduledExecutorService; this.dispatchPredicate = builder.dispatchPredicate; this.schedule = builder.schedule; + this.closed = false; + } + + /** + * Once closed this registry will never again reschedule checks + */ + @Override + public void close() { + closed = true; } /** @@ -92,8 +102,10 @@ public void rescheduleNow() { } private void reschedule(String key, DataLoader dataLoader) { - Runnable runThis = () -> dispatchOrReschedule(key, dataLoader); - scheduledExecutorService.schedule(runThis, schedule.toMillis(), TimeUnit.MILLISECONDS); + if (!closed) { + Runnable runThis = () -> dispatchOrReschedule(key, dataLoader); + scheduledExecutorService.schedule(runThis, schedule.toMillis(), TimeUnit.MILLISECONDS); + } } private void dispatchOrReschedule(String key, DataLoader dataLoader) { diff --git a/src/test/java/org/dataloader/registries/ScheduledDataLoaderRegistryTest.java b/src/test/java/org/dataloader/registries/ScheduledDataLoaderRegistryTest.java index 0572b66..527f419 100644 --- a/src/test/java/org/dataloader/registries/ScheduledDataLoaderRegistryTest.java +++ b/src/test/java/org/dataloader/registries/ScheduledDataLoaderRegistryTest.java @@ -212,7 +212,49 @@ public void test_it_will_take_out_the_schedule_once_it_dispatches() { snooze(2000); assertThat(calls, equalTo(asList(asList("K1", "K2"), asList("K3", "K4")))); + } + + public void test_close_is_a_one_way_door() { + AtomicInteger counter = new AtomicInteger(); + DispatchPredicate countingPredicate = (dataLoaderKey, dataLoader) -> { + counter.incrementAndGet(); + return false; + }; + + DataLoader dlA = TestKit.idLoader(); + dlA.load("K1"); + dlA.load("K2"); + + ScheduledDataLoaderRegistry registry = ScheduledDataLoaderRegistry.newScheduledRegistry() + .register("a", dlA) + .dispatchPredicate(countingPredicate) + .schedule(Duration.ofMillis(10)) + .build(); + + registry.rescheduleNow(); + snooze(200); + + assertTrue(counter.get() > 0); + + registry.close(); + + snooze(100); + int countThen = counter.get(); + + registry.rescheduleNow(); + snooze(200); + assertEquals(counter.get(), countThen); + registry.rescheduleNow(); + snooze(200); + assertEquals(counter.get(), countThen); + + registry.dispatchAll(); + snooze(200); + assertEquals(counter.get(), countThen + 1); // will have re-entered + + snooze(200); + assertEquals(counter.get(), countThen + 1); } } \ No newline at end of file From 73ad9b56e09ec4b149b5a33c7f087f9625085d1d Mon Sep 17 00:00:00 2001 From: Brad Baker Date: Thu, 1 Jul 2021 22:27:57 +1000 Subject: [PATCH 3/6] PR feedback - spillin misteak --- src/main/java/org/dataloader/registries/DispatchPredicate.java | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/src/main/java/org/dataloader/registries/DispatchPredicate.java b/src/main/java/org/dataloader/registries/DispatchPredicate.java index d782557..d5bd31b 100644 --- a/src/main/java/org/dataloader/registries/DispatchPredicate.java +++ b/src/main/java/org/dataloader/registries/DispatchPredicate.java @@ -80,7 +80,7 @@ static DispatchPredicate dispatchIfLongerThan(Duration duration) { /** * This predicate will return true if the {@link DataLoader#dispatchDepth()} is greater than the specified depth. * - * This will act as minimum batch size. There must be more then `depth` items queued for the predicate to return true. + * This will act as minimum batch size. There must be more than `depth` items queued for the predicate to return true. * * @param depth the value to be greater than * From 2b781e6054b34ff5dfc8c3c04011d85ffc07bd1b Mon Sep 17 00:00:00 2001 From: Brad Baker Date: Tue, 6 Jul 2021 08:31:31 +1000 Subject: [PATCH 4/6] Added experimental api --- .../annotations/ExperimentalApi.java | 23 +++++++++++++++++++ .../ScheduledDataLoaderRegistry.java | 6 +++-- 2 files changed, 27 insertions(+), 2 deletions(-) create mode 100644 src/main/java/org/dataloader/annotations/ExperimentalApi.java diff --git a/src/main/java/org/dataloader/annotations/ExperimentalApi.java b/src/main/java/org/dataloader/annotations/ExperimentalApi.java new file mode 100644 index 0000000..6be889e --- /dev/null +++ b/src/main/java/org/dataloader/annotations/ExperimentalApi.java @@ -0,0 +1,23 @@ +package org.dataloader.annotations; + +import java.lang.annotation.Documented; +import java.lang.annotation.Retention; +import java.lang.annotation.RetentionPolicy; +import java.lang.annotation.Target; + +import static java.lang.annotation.ElementType.CONSTRUCTOR; +import static java.lang.annotation.ElementType.FIELD; +import static java.lang.annotation.ElementType.METHOD; +import static java.lang.annotation.ElementType.TYPE; + +/** + * This represents code that the graphql-java project considers experimental API and while our intention is that it will + * progress to be {@link PublicApi}, its existence, signature of behavior may change between releases. + * + * In general unnecessary changes will be avoided but you should not depend on experimental classes being stable + */ +@Retention(RetentionPolicy.RUNTIME) +@Target(value = {CONSTRUCTOR, METHOD, TYPE, FIELD}) +@Documented +public @interface ExperimentalApi { +} diff --git a/src/main/java/org/dataloader/registries/ScheduledDataLoaderRegistry.java b/src/main/java/org/dataloader/registries/ScheduledDataLoaderRegistry.java index 78a2f17..4be317e 100644 --- a/src/main/java/org/dataloader/registries/ScheduledDataLoaderRegistry.java +++ b/src/main/java/org/dataloader/registries/ScheduledDataLoaderRegistry.java @@ -2,7 +2,7 @@ import org.dataloader.DataLoader; import org.dataloader.DataLoaderRegistry; -import org.dataloader.annotations.PublicApi; +import org.dataloader.annotations.ExperimentalApi; import java.time.Duration; import java.util.HashMap; @@ -23,8 +23,10 @@ *

* If you wanted to create a ScheduledDataLoaderRegistry that started a rescheduling immediately, just create one and * call {@link #rescheduleNow()}. + *

+ * This code is currently marked as {@link ExperimentalApi} */ -@PublicApi +@ExperimentalApi public class ScheduledDataLoaderRegistry extends DataLoaderRegistry implements AutoCloseable { private final ScheduledExecutorService scheduledExecutorService; From 523b6ca2d3bb0963c5c54f6e2562dbedfbc31a3f Mon Sep 17 00:00:00 2001 From: Brad Baker Date: Tue, 6 Jul 2021 19:50:40 +1000 Subject: [PATCH 5/6] Fixed up doco --- README.md | 27 +++++++++++++++++++++++++++ 1 file changed, 27 insertions(+) diff --git a/README.md b/README.md index 4263dff..39dedef 100644 --- a/README.md +++ b/README.md @@ -478,6 +478,33 @@ and there are also gains to this different mode of operation: However, with batch execution control comes responsibility! If you forget to make the call to `dispatch()` then the futures in the load request queue will never be batched, and thus _will never complete_! So be careful when crafting your loader designs. +## Scheduled Dispatching + +`ScheduledDataLoaderRegistry` is a registry that allows for dispatching to be done on a schedule. It contains a +predicate that is evaluated (per data loader contained within) when `dispatchAll` is invoked. + +If that predicate is true, it will make a `dispatch` call on the data loader, otherwise is will schedule as task to +perform that check again. Once a predicate evaluated to true, it will not reschedule and another call to +`dispatchAll` is required to be made. + +This allows you to do things like "dispatch ONLY if the queue depth is > 10 deep or more than 200 millis have passed +since it was last dispatched". + +```java + + DispatchPredicate depthOrTimePredicate = DispatchPredicate + .dispatchIfDepthGreaterThan(10) + .or(DispatchPredicate.dispatchIfLongerThan(Duration.ofMillis(200))); + + ScheduledDataLoaderRegistry registry = ScheduledDataLoaderRegistry.newScheduledRegistry() + .dispatchPredicate(depthOrTimePredicate) + .schedule(Duration.ofMillis(10)) + .register("users",userDataLoader) + .build(); +``` + +The above acts as a kind of minimum batch depth, with a time overload. It won't dispatch if the loader depth is less +than or equal to 10 but if 200ms pass it will dispatch. ## Let's get started! From 80dafa1c4f5a23dcce64195b1f0e68a975ec4087 Mon Sep 17 00:00:00 2001 From: Brad Baker Date: Tue, 6 Jul 2021 20:00:01 +1000 Subject: [PATCH 6/6] misteak --- README.md | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/README.md b/README.md index 39dedef..77d3fb6 100644 --- a/README.md +++ b/README.md @@ -483,7 +483,7 @@ in the load request queue will never be batched, and thus _will never complete_! `ScheduledDataLoaderRegistry` is a registry that allows for dispatching to be done on a schedule. It contains a predicate that is evaluated (per data loader contained within) when `dispatchAll` is invoked. -If that predicate is true, it will make a `dispatch` call on the data loader, otherwise is will schedule as task to +If that predicate is true, it will make a `dispatch` call on the data loader, otherwise is will schedule a task to perform that check again. Once a predicate evaluated to true, it will not reschedule and another call to `dispatchAll` is required to be made.