Skip to content

Commit 3843282

Browse files
committed
POC- Orchestration of DLs - Added a tracker that can observe via a callback
1 parent 31d7468 commit 3843282

File tree

8 files changed

+156
-42
lines changed

8 files changed

+156
-42
lines changed

src/main/java/org/dataloader/orchestration/ImmediateExecutor.java

Lines changed: 0 additions & 12 deletions
This file was deleted.

src/main/java/org/dataloader/orchestration/Orchestrator.java

Lines changed: 42 additions & 16 deletions
Original file line numberDiff line numberDiff line change
@@ -2,6 +2,9 @@
22

33
import org.dataloader.DataLoader;
44
import org.dataloader.impl.Assertions;
5+
import org.dataloader.orchestration.executors.ImmediateExecutor;
6+
import org.dataloader.orchestration.observation.Tracker;
7+
import org.dataloader.orchestration.observation.TrackingObserver;
58

69
import java.util.ArrayList;
710
import java.util.List;
@@ -25,19 +28,14 @@ public class Orchestrator<K, V> {
2528
* @param <V> the value type
2629
* @return a new {@link Orchestrator}
2730
*/
28-
public static <K, V> Orchestrator<K, V> orchestrate(DataLoader<K, V> dataLoader) {
29-
return new Orchestrator<>(new Tracker(), dataLoader, ImmediateExecutor.INSTANCE);
31+
public static <K, V> Builder<K, V> orchestrate(DataLoader<K, V> dataLoader) {
32+
return new Builder<>(dataLoader);
3033
}
3134

32-
// TODO - make this a builder
33-
public static <K, V> Orchestrator<K, V> orchestrate(DataLoader<K, V> dataLoader, Executor executor) {
34-
return new Orchestrator<>(new Tracker(), dataLoader, executor);
35-
}
36-
37-
private Orchestrator(Tracker tracker, DataLoader<K, V> dataLoader, Executor executor) {
38-
this.tracker = tracker;
39-
this.startingDL = dataLoader;
40-
this.executor = executor;
35+
public Orchestrator(Builder<K, V> builder) {
36+
this.tracker = new Tracker(builder.trackingObserver);
37+
this.executor = builder.executor;
38+
this.startingDL = builder.dataLoader;
4139
}
4240

4341
public Tracker getTracker() {
@@ -79,6 +77,10 @@ <KT, VT> void record(Step<KT, VT> step) {
7977
*/
8078
<VT> CompletableFuture<VT> execute() {
8179
Assertions.assertState(!steps.isEmpty(), () -> "How can the steps to run be empty??");
80+
81+
// tell the tracker we are under way
82+
getTracker().startingExecution();
83+
8284
int index = 0;
8385
Step<?, ?> firstStep = steps.get(index);
8486

@@ -94,23 +96,47 @@ <VT> CompletableFuture<VT> execute() {
9496
// side effect when this step is complete
9597
whenComplete(index, nextStep, nextCF);
9698
}
99+
97100
return castAs(currentCF);
98101

99102
}
100103

101-
private void whenComplete(int index, Step<?, ?> step, CompletableFuture<Object> cf) {
104+
private void whenComplete(int stepIndex, Step<?, ?> step, CompletableFuture<Object> cf) {
102105
cf.whenComplete((v, throwable) -> {
103-
getTracker().loadCallComplete(step.dataLoader());
104-
// replace with instrumentation code
105106
if (throwable != null) {
106107
// TODO - should we be cancelling future steps here - no need for dispatch tracking if they will never run
107-
System.out.println("A throwable has been thrown on step " + index + ": " + throwable.getMessage());
108+
System.out.println("A throwable has been thrown on step " + stepIndex + ": " + throwable.getMessage());
108109
throwable.printStackTrace(System.out);
109110
} else {
110-
System.out.println("step " + index + " returned : " + v);
111+
System.out.println("step " + stepIndex + " returned : " + v);
111112
}
113+
getTracker().loadCallComplete(stepIndex, step.dataLoader(), throwable);
112114
});
113115
}
114116

115117

118+
public static class Builder<K, V> {
119+
private Executor executor = ImmediateExecutor.INSTANCE;
120+
private DataLoader<K, V> dataLoader;
121+
private TrackingObserver trackingObserver;
122+
123+
Builder(DataLoader<K, V> dataLoader) {
124+
this.dataLoader = dataLoader;
125+
}
126+
127+
public Builder<K, V> executor(Executor executor) {
128+
this.executor = executor;
129+
return this;
130+
}
131+
132+
public Builder<K, V> observer(TrackingObserver trackingObserver) {
133+
this.trackingObserver = trackingObserver;
134+
return this;
135+
}
136+
137+
public Orchestrator<K, V> build() {
138+
return new Orchestrator<>(this);
139+
}
140+
}
141+
116142
}

src/main/java/org/dataloader/orchestration/Step.java

Lines changed: 9 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,8 @@
11
package org.dataloader.orchestration;
22

33
import org.dataloader.DataLoader;
4+
import org.dataloader.orchestration.executors.ObservingExecutor;
5+
import org.dataloader.orchestration.observation.Tracker;
46

57
import java.util.concurrent.CompletableFuture;
68
import java.util.concurrent.Executor;
@@ -46,9 +48,11 @@ public Step<V, V> thenLoadAsync(Function<V, K> codeToRun) {
4648
}
4749

4850
static <K, V> Step<K, V> loadImpl(Orchestrator<?, ?> orchestrator, DataLoader<Object, Object> dl, K key, Object keyContext) {
51+
Tracker tracker = orchestrator.getTracker();
52+
int stepIndex = tracker.getStepCount();
4953
Function<K, CompletableFuture<V>> codeToRun = k -> {
5054
CompletableFuture<V> cf = castAs(dl.load(key, keyContext));
51-
orchestrator.getTracker().loadCall(dl);
55+
orchestrator.getTracker().loadCall(stepIndex, dl);
5256
return cf;
5357
};
5458
Step<K, V> step = new Step<>(orchestrator, dl, codeToRun);
@@ -70,19 +74,21 @@ static <K, V> Step<V, V> thenLoadImpl(Orchestrator<?, ?> orchestrator, DataLoade
7074
}
7175

7276
private static <K, V> Function<V, CompletableFuture<V>> mkSyncLoadLambda(DataLoader<Object, Object> dl, Function<V, K> codeToRun, Tracker tracker) {
77+
int stepIndex = tracker.getStepCount();
7378
return v -> {
7479
K key = codeToRun.apply(v);
7580
CompletableFuture<V> cf = castAs(dl.load(key));
76-
tracker.loadCall(dl);
81+
tracker.loadCall(stepIndex, dl);
7782
return cf;
7883
};
7984
}
8085

8186
private static <K, V> Function<V, CompletableFuture<V>> mkAsyncLoadLambda(Orchestrator<?, ?> orchestrator, DataLoader<Object, Object> dl, Function<V, K> codeToRun, Tracker tracker) {
87+
int stepIndex = tracker.getStepCount();
8288
return v -> {
8389
Executor executor = orchestrator.getExecutor();
8490
Consumer<String> callback = atSomePointWeNeedMoreStateButUsingStringForNowToMakeItCompile -> {
85-
tracker.loadCall(dl);
91+
tracker.loadCall(stepIndex, dl);
8692
};
8793
ObservingExecutor<String> observingExecutor = new ObservingExecutor<>(executor, "state", callback);
8894
Supplier<CompletableFuture<V>> dataLoaderCall = () -> {
Lines changed: 15 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,15 @@
1+
package org.dataloader.orchestration.executors;
2+
3+
import org.dataloader.annotations.Internal;
4+
5+
import java.util.concurrent.Executor;
6+
7+
@Internal
8+
public class ImmediateExecutor implements Executor {
9+
public static final ImmediateExecutor INSTANCE = new ImmediateExecutor();
10+
11+
@Override
12+
public void execute(Runnable command) {
13+
command.run();
14+
}
15+
}

src/main/java/org/dataloader/orchestration/ObservingExecutor.java renamed to src/main/java/org/dataloader/orchestration/executors/ObservingExecutor.java

Lines changed: 5 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -1,9 +1,12 @@
1-
package org.dataloader.orchestration;
1+
package org.dataloader.orchestration.executors;
2+
3+
import org.dataloader.annotations.Internal;
24

35
import java.util.concurrent.Executor;
46
import java.util.function.Consumer;
57

6-
class ObservingExecutor<T> implements Executor {
8+
@Internal
9+
public class ObservingExecutor<T> implements Executor {
710

811
private final Executor delegate;
912
private final T state;

src/main/java/org/dataloader/orchestration/Tracker.java renamed to src/main/java/org/dataloader/orchestration/observation/Tracker.java

Lines changed: 15 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -1,4 +1,4 @@
1-
package org.dataloader.orchestration;
1+
package org.dataloader.orchestration.observation;
22

33
import org.dataloader.DataLoader;
44

@@ -14,6 +14,11 @@
1414
public class Tracker {
1515
private final AtomicInteger stepCount = new AtomicInteger();
1616
private final Map<DataLoader<?,?>, AtomicInteger> counters = new HashMap<>();
17+
private final TrackingObserver trackingObserver;
18+
19+
public Tracker(TrackingObserver trackingObserver) {
20+
this.trackingObserver = trackingObserver;
21+
}
1722

1823
public int getOutstandingLoadCount(DataLoader<?,?> dl) {
1924
synchronized (this) {
@@ -35,19 +40,25 @@ public int getStepCount() {
3540
return stepCount.get();
3641
}
3742

38-
void incrementStepCount() {
43+
public void incrementStepCount() {
3944
this.stepCount.incrementAndGet();
4045
}
4146

42-
void loadCall(DataLoader<?,?> dl) {
47+
public void startingExecution() {
48+
trackingObserver.onStart(this);
49+
}
50+
51+
public void loadCall(int stepIndex, DataLoader<?,?> dl) {
4352
synchronized (this) {
4453
getDLCounter(dl).incrementAndGet();
54+
trackingObserver.onLoad(this, stepIndex, dl);
4555
}
4656
}
4757

48-
void loadCallComplete(DataLoader<?,?> dl) {
58+
public void loadCallComplete(int stepIndex, DataLoader<?,?> dl, Throwable throwable) {
4959
synchronized (this) {
5060
getDLCounter(dl).decrementAndGet();
61+
trackingObserver.onLoadComplete(this,stepIndex,dl, throwable);
5162
}
5263
}
5364

Lines changed: 18 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,18 @@
1+
package org.dataloader.orchestration.observation;
2+
3+
import org.dataloader.DataLoader;
4+
import org.dataloader.annotations.PublicSpi;
5+
6+
/**
7+
* This callback is invoked when the {@link org.dataloader.orchestration.Orchestrator} starts execution and then
8+
* as each {@link DataLoader} is invoked and then again when it completes
9+
*/
10+
@PublicSpi
11+
public interface TrackingObserver {
12+
void onStart(Tracker tracker);
13+
14+
void onLoad(Tracker tracker, int stepIndex, DataLoader<?, ?> dl);
15+
16+
// TODO - should this have an exception should it fail in CF terms ???
17+
void onLoadComplete(Tracker tracker, int stepIndex, DataLoader<?, ?> dl, Throwable throwable);
18+
}

src/test/java/org/dataloader/orchestration/OrchestratorTest.java

Lines changed: 52 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -4,13 +4,13 @@
44
import org.dataloader.DataLoaderOptions;
55
import org.dataloader.DataLoaderRegistry;
66
import org.dataloader.fixtures.TestKit;
7+
import org.dataloader.orchestration.observation.Tracker;
8+
import org.dataloader.orchestration.observation.TrackingObserver;
79
import org.junit.jupiter.api.Test;
810

911
import java.util.concurrent.CompletableFuture;
10-
import java.util.concurrent.CountDownLatch;
11-
import java.util.concurrent.Executor;
1212
import java.util.concurrent.ForkJoinPool;
13-
import java.util.concurrent.Semaphore;
13+
import java.util.concurrent.atomic.AtomicInteger;
1414

1515
import static org.awaitility.Awaitility.await;
1616
import static org.dataloader.DataLoaderFactory.newDataLoader;
@@ -40,7 +40,7 @@ void canOrchestrate() {
4040
.register("alternateCase", dlAlternateCase)
4141
.build();
4242

43-
Orchestrator<String, String> orchestrator = Orchestrator.orchestrate(dlUpper);
43+
Orchestrator<String, String> orchestrator = Orchestrator.orchestrate(dlUpper).build();
4444
Step<String, String> step1 = orchestrator.load("aBc", null);
4545
With<String, String> with1 = step1.with(dlLower);
4646
Step<String, String> step2 = with1.thenLoad(key -> key);
@@ -68,7 +68,7 @@ void canOrchestrateWhenNotInPerfectOrder() {
6868
.build();
6969

7070
ForkJoinPool forkJoinPool = ForkJoinPool.commonPool();
71-
Orchestrator<String, String> orchestrator = Orchestrator.orchestrate(dlUpper, forkJoinPool);
71+
Orchestrator<String, String> orchestrator = Orchestrator.orchestrate(dlUpper).executor(forkJoinPool).build();
7272
CompletableFuture<String> cf = orchestrator.load("aBc", null)
7373
.with(dlLower).thenLoad(key1 -> key1)
7474
.with(dlReverse).thenLoad(key -> key)
@@ -85,4 +85,51 @@ void canOrchestrateWhenNotInPerfectOrder() {
8585

8686
assertThat(cf.join(), equalTo("cBa"));
8787
}
88+
89+
@Test
90+
void can_observe_orchestration_happening() {
91+
92+
DataLoaderRegistry registry = DataLoaderRegistry.newRegistry()
93+
.register("upper", dlUpper)
94+
.register("lower", dlLower)
95+
.register("reverse", dlReverse)
96+
.register("alternateCase", dlAlternateCase)
97+
.build();
98+
99+
AtomicInteger stepCount = new AtomicInteger();
100+
TrackingObserver observer = new TrackingObserver() {
101+
@Override
102+
public void onStart(Tracker tracker) {
103+
System.out.println("starting - step count : " + tracker.getStepCount());
104+
stepCount.set(tracker.getStepCount());
105+
}
106+
107+
@Override
108+
public void onLoad(Tracker tracker, int stepIndex, DataLoader<?, ?> dl) {
109+
System.out.println("onLoad : " + stepIndex);
110+
}
111+
112+
@Override
113+
public void onLoadComplete(Tracker tracker, int stepIndex, DataLoader<?, ?> dl, Throwable throwable) {
114+
System.out.println("onLoadComplete : " + stepIndex);
115+
}
116+
};
117+
118+
Orchestrator<String, String> orchestrator = Orchestrator.orchestrate(dlUpper).observer(observer).build();
119+
Step<String, String> step1 = orchestrator.load("aBc", null);
120+
With<String, String> with1 = step1.with(dlLower);
121+
Step<String, String> step2 = with1.thenLoad(key -> key);
122+
With<String, String> with2 = step2.with(dlReverse);
123+
Step<String, String> step3 = with2.thenLoad(key -> key);
124+
CompletableFuture<String> cf = step3.toCompletableFuture();
125+
126+
// because all the dls are dispatched in "perfect order" here they all end up dispatching
127+
// at JUST the right time. A change in order would be different
128+
registry.dispatchAll();
129+
130+
await().until(cf::isDone);
131+
132+
assertThat(cf.join(), equalTo("cba"));
133+
assertThat(stepCount.get(), equalTo(3));
134+
}
88135
}

0 commit comments

Comments
 (0)