Skip to content

Commit 6210fb5

Browse files
committed
POC- Orchestration of DLs - tried and executor that can observe
1 parent a5b2bba commit 6210fb5

File tree

7 files changed

+131
-19
lines changed

7 files changed

+131
-19
lines changed
Lines changed: 12 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,12 @@
1+
package org.dataloader.orchestration;
2+
3+
import java.util.concurrent.Executor;
4+
5+
class ImmediateExecutor implements Executor {
6+
static final ImmediateExecutor INSTANCE = new ImmediateExecutor();
7+
8+
@Override
9+
public void execute(Runnable command) {
10+
command.run();
11+
}
12+
}
Lines changed: 23 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,23 @@
1+
package org.dataloader.orchestration;
2+
3+
import java.util.concurrent.Executor;
4+
import java.util.function.Consumer;
5+
6+
class ObservingExecutor<T> implements Executor {
7+
8+
private final Executor delegate;
9+
private final T state;
10+
private final Consumer<T> callback;
11+
12+
public ObservingExecutor(Executor delegate, T state, Consumer<T> callback) {
13+
this.delegate = delegate;
14+
this.state = state;
15+
this.callback = callback;
16+
}
17+
18+
@Override
19+
public void execute(Runnable command) {
20+
delegate.execute(command);
21+
callback.accept(state);
22+
}
23+
}

src/main/java/org/dataloader/orchestration/Orchestrator.java

Lines changed: 16 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -6,10 +6,12 @@
66
import java.util.ArrayList;
77
import java.util.List;
88
import java.util.concurrent.CompletableFuture;
9+
import java.util.concurrent.Executor;
910
import java.util.function.Function;
1011

1112
public class Orchestrator<K, V> {
1213

14+
private final Executor executor;
1315
private final Tracker tracker;
1416
private final DataLoader<K, V> startingDL;
1517
private final List<Step<?, ?>> steps = new ArrayList<>();
@@ -24,16 +26,26 @@ public class Orchestrator<K, V> {
2426
* @return a new {@link Orchestrator}
2527
*/
2628
public static <K, V> Orchestrator<K, V> orchestrate(DataLoader<K, V> dataLoader) {
27-
return new Orchestrator<>(new Tracker(), dataLoader);
29+
return new Orchestrator<>(new Tracker(), dataLoader, ImmediateExecutor.INSTANCE);
2830
}
2931

30-
public Tracker getTracker() {
31-
return tracker;
32+
// TODO - make this a builder
33+
public static <K, V> Orchestrator<K, V> orchestrate(DataLoader<K, V> dataLoader, Executor executor) {
34+
return new Orchestrator<>(new Tracker(), dataLoader, executor);
3235
}
3336

34-
private Orchestrator(Tracker tracker, DataLoader<K, V> dataLoader) {
37+
private Orchestrator(Tracker tracker, DataLoader<K, V> dataLoader, Executor executor) {
3538
this.tracker = tracker;
3639
this.startingDL = dataLoader;
40+
this.executor = executor;
41+
}
42+
43+
public Tracker getTracker() {
44+
return tracker;
45+
}
46+
47+
public Executor getExecutor() {
48+
return executor;
3749
}
3850

3951

src/main/java/org/dataloader/orchestration/Step.java

Lines changed: 40 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -3,7 +3,10 @@
33
import org.dataloader.DataLoader;
44

55
import java.util.concurrent.CompletableFuture;
6+
import java.util.concurrent.Executor;
7+
import java.util.function.Consumer;
68
import java.util.function.Function;
9+
import java.util.function.Supplier;
710

811
import static org.dataloader.orchestration.Orchestrator.castAs;
912

@@ -35,7 +38,11 @@ public Step<K, V> load(K key, Object keyContext) {
3538
}
3639

3740
public Step<V, V> thenLoad(Function<V, K> codeToRun) {
38-
return thenLoadImpl(orchestrator, dl, codeToRun);
41+
return thenLoadImpl(orchestrator, dl, codeToRun, false);
42+
}
43+
44+
public Step<V, V> thenLoadAsync(Function<V, K> codeToRun) {
45+
return thenLoadImpl(orchestrator, dl, codeToRun, true);
3946
}
4047

4148
static <K, V> Step<K, V> loadImpl(Orchestrator<?, ?> orchestrator, DataLoader<Object, Object> dl, K key, Object keyContext) {
@@ -49,16 +56,42 @@ static <K, V> Step<K, V> loadImpl(Orchestrator<?, ?> orchestrator, DataLoader<Ob
4956
return step;
5057
}
5158

52-
static <K, V> Step<V, V> thenLoadImpl(Orchestrator<?, ?> orchestrator, DataLoader<Object, Object> dl, Function<V, K> codeToRun) {
53-
Function<V, CompletableFuture<V>> actualCodeToRun = v -> {
59+
static <K, V> Step<V, V> thenLoadImpl(Orchestrator<?, ?> orchestrator, DataLoader<Object, Object> dl, Function<V, K> codeToRun, boolean async) {
60+
Tracker tracker = orchestrator.getTracker();
61+
Function<V, CompletableFuture<V>> actualCodeToRun;
62+
if (async) {
63+
actualCodeToRun = mkAsyncLoadLambda(orchestrator, dl, codeToRun, tracker);
64+
} else {
65+
actualCodeToRun = mkSyncLoadLambda(dl, codeToRun, tracker);
66+
}
67+
Step<V, V> step = new Step<>(orchestrator, dl, actualCodeToRun);
68+
orchestrator.record(step);
69+
return step;
70+
}
71+
72+
private static <K, V> Function<V, CompletableFuture<V>> mkSyncLoadLambda(DataLoader<Object, Object> dl, Function<V, K> codeToRun, Tracker tracker) {
73+
return v -> {
5474
K key = codeToRun.apply(v);
5575
CompletableFuture<V> cf = castAs(dl.load(key));
56-
orchestrator.getTracker().loadCall(dl);
76+
tracker.loadCall(dl);
5777
return cf;
5878
};
59-
Step<V, V> step = new Step<>(orchestrator, dl, actualCodeToRun);
60-
orchestrator.record(step);
61-
return step;
79+
}
80+
81+
private static <K, V> Function<V, CompletableFuture<V>> mkAsyncLoadLambda(Orchestrator<?, ?> orchestrator, DataLoader<Object, Object> dl, Function<V, K> codeToRun, Tracker tracker) {
82+
return v -> {
83+
Executor executor = orchestrator.getExecutor();
84+
Consumer<String> callback = atSomePointWeNeedMoreStateButUsingStringForNowToMakeItCompile -> {
85+
tracker.loadCall(dl);
86+
};
87+
ObservingExecutor<String> observingExecutor = new ObservingExecutor<>(executor, "state", callback);
88+
Supplier<CompletableFuture<V>> dataLoaderCall = () -> {
89+
K key = codeToRun.apply(v);
90+
return castAs(dl.load(key));
91+
};
92+
return CompletableFuture.supplyAsync(dataLoaderCall, observingExecutor)
93+
.thenCompose(Function.identity());
94+
};
6295
}
6396

6497
public CompletableFuture<V> toCompletableFuture() {

src/main/java/org/dataloader/orchestration/With.java

Lines changed: 5 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -31,6 +31,10 @@ public Step<K, V> load(K key, Object keyContext) {
3131
}
3232

3333
public Step<V, V> thenLoad(Function<V, K> codeToRun) {
34-
return Step.thenLoadImpl(orchestrator, castAs(dl), codeToRun);
34+
return Step.thenLoadImpl(orchestrator, castAs(dl), codeToRun, false);
35+
}
36+
37+
public Step<V, V> thenLoadAsync(Function<V, K> codeToRun) {
38+
return Step.thenLoadImpl(orchestrator, castAs(dl), codeToRun, true);
3539
}
3640
}

src/test/java/org/dataloader/fixtures/TestKit.java

Lines changed: 17 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -72,6 +72,10 @@ public static BatchLoader<String, String> reverseBatchLoader() {
7272
return keys -> CompletableFuture.completedFuture(keys.stream().map(TestKit::reverse).collect(toList()));
7373
}
7474

75+
public static BatchLoader<String, String> alternateCaseBatchLoader() {
76+
return keys -> CompletableFuture.completedFuture(keys.stream().map(TestKit::alternateCase).collect(toList()));
77+
}
78+
7579
public static String reverse(String s) {
7680
StringBuilder sb = new StringBuilder();
7781
for (int i = s.length() - 1; i >= 0; i--) {
@@ -80,6 +84,19 @@ public static String reverse(String s) {
8084
return sb.toString();
8185
}
8286

87+
public static String alternateCase(String s) {
88+
StringBuilder sb = new StringBuilder();
89+
for (int i = 0; i <= s.length()-1; i++) {
90+
char c = s.charAt(i);
91+
if (i % 2 == 0) {
92+
sb.append(Character.toLowerCase(c));
93+
} else {
94+
sb.append(Character.toUpperCase(c));
95+
}
96+
}
97+
return sb.toString();
98+
}
99+
83100
public static <K, V> DataLoader<K, V> idLoader() {
84101
return idLoader(null, new ArrayList<>());
85102
}

src/test/java/org/dataloader/orchestration/OrchestratorTest.java

Lines changed: 18 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -3,12 +3,18 @@
33
import org.dataloader.DataLoader;
44
import org.dataloader.DataLoaderOptions;
55
import org.dataloader.DataLoaderRegistry;
6+
import org.dataloader.fixtures.TestKit;
67
import org.junit.jupiter.api.Test;
78

89
import java.util.concurrent.CompletableFuture;
10+
import java.util.concurrent.CountDownLatch;
11+
import java.util.concurrent.Executor;
12+
import java.util.concurrent.ForkJoinPool;
13+
import java.util.concurrent.Semaphore;
914

1015
import static org.awaitility.Awaitility.await;
1116
import static org.dataloader.DataLoaderFactory.newDataLoader;
17+
import static org.dataloader.fixtures.TestKit.alternateCaseBatchLoader;
1218
import static org.dataloader.fixtures.TestKit.lowerCaseBatchLoader;
1319
import static org.dataloader.fixtures.TestKit.reverseBatchLoader;
1420
import static org.dataloader.fixtures.TestKit.upperCaseBatchLoader;
@@ -22,6 +28,7 @@ class OrchestratorTest {
2228
DataLoader<String, String> dlUpper = newDataLoader(upperCaseBatchLoader(), cachingAndBatchingOptions);
2329
DataLoader<String, String> dlLower = newDataLoader(lowerCaseBatchLoader(), cachingAndBatchingOptions);
2430
DataLoader<String, String> dlReverse = newDataLoader(reverseBatchLoader(), cachingAndBatchingOptions);
31+
DataLoader<String, String> dlAlternateCase = newDataLoader(alternateCaseBatchLoader(), cachingAndBatchingOptions);
2532

2633
@Test
2734
void canOrchestrate() {
@@ -30,6 +37,7 @@ void canOrchestrate() {
3037
.register("upper", dlUpper)
3138
.register("lower", dlLower)
3239
.register("reverse", dlReverse)
40+
.register("alternateCase", dlAlternateCase)
3341
.build();
3442

3543
Orchestrator<String, String> orchestrator = Orchestrator.orchestrate(dlUpper);
@@ -56,22 +64,25 @@ void canOrchestrateWhenNotInPerfectOrder() {
5664
.register("reverse", dlReverse)
5765
.register("lower", dlLower)
5866
.register("upper", dlUpper)
67+
.register("alternateCase", dlAlternateCase)
5968
.build();
6069

61-
Orchestrator<String, String> orchestrator = Orchestrator.orchestrate(dlUpper);
70+
ForkJoinPool forkJoinPool = ForkJoinPool.commonPool();
71+
Orchestrator<String, String> orchestrator = Orchestrator.orchestrate(dlUpper, forkJoinPool);
6272
CompletableFuture<String> cf = orchestrator.load("aBc", null)
6373
.with(dlLower).thenLoad(key1 -> key1)
6474
.with(dlReverse).thenLoad(key -> key)
75+
.with(dlAlternateCase).thenLoadAsync(key -> key)
6576
.toCompletableFuture();
6677

67-
registry.dispatchAll();
68-
69-
assertThat(cf.isDone(), equalTo(false));
70-
71-
assertThat(orchestrator.getTracker().getOutstandingLoadCount(),equalTo(2));
78+
for (int i = 0; i < 10; i++) {
79+
TestKit.snooze(50); // TODO - hack or now
80+
registry.dispatchAll();
81+
System.out.println("Waiting for " + i + " to complete...");
82+
}
7283

7384
await().until(cf::isDone);
7485

75-
assertThat(cf.join(), equalTo("cba"));
86+
assertThat(cf.join(), equalTo("cBa"));
7687
}
7788
}

0 commit comments

Comments
 (0)