Skip to content

Commit 31d7468

Browse files
committed
Merge remote-tracking branch 'origin/dataloader-orchestration' into dataloader-orchestration
2 parents 6210fb5 + 98711db commit 31d7468

File tree

1 file changed

+157
-0
lines changed
  • src/main/java/org/dataloader/orchestration

1 file changed

+157
-0
lines changed
Lines changed: 157 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,157 @@
1+
package org.dataloader.orchestration;
2+
3+
import java.util.Iterator;
4+
import java.util.concurrent.Executor;
5+
import java.util.concurrent.LinkedBlockingDeque;
6+
import java.util.concurrent.atomic.AtomicReference;
7+
import java.util.function.BiFunction;
8+
import java.util.function.Supplier;
9+
10+
public class CF<T> {
11+
12+
private AtomicReference<Object> result = new AtomicReference<>();
13+
14+
private static final Object NULL = new Object();
15+
16+
private static class ExceptionWrapper {
17+
private final Throwable throwable;
18+
19+
private ExceptionWrapper(Throwable throwable) {
20+
this.throwable = throwable;
21+
}
22+
}
23+
24+
private final LinkedBlockingDeque<CompleteAction<?, ?>> dependedActions = new LinkedBlockingDeque<>();
25+
26+
private static class CompleteAction<T, V> implements Runnable {
27+
Executor executor;
28+
CF<V> toComplete;
29+
CF<T> src;
30+
BiFunction<? super T, Throwable, ? extends V> mapperFn;
31+
32+
public CompleteAction(CF<V> toComplete, CF<T> src, BiFunction<? super T, Throwable, ? extends V> mapperFn, Executor executor) {
33+
this.toComplete = toComplete;
34+
this.src = src;
35+
this.mapperFn = mapperFn;
36+
this.executor = executor;
37+
}
38+
39+
public void execute() {
40+
if (executor != null) {
41+
executor.execute(this);
42+
} else {
43+
toComplete.completeViaMapper(mapperFn, src.result.get());
44+
}
45+
46+
}
47+
48+
@Override
49+
public void run() {
50+
toComplete.completeViaMapper(mapperFn, src.result.get());
51+
src.dependedActions.remove(this);
52+
}
53+
}
54+
55+
private CF() {
56+
57+
}
58+
59+
public <U> CF<U> newIncomplete() {
60+
return new CF<U>();
61+
}
62+
63+
public static <T> CF<T> newComplete(T completed) {
64+
CF<T> result = new CF<>();
65+
result.encodeAndSetResult(completed);
66+
return result;
67+
}
68+
69+
public static <T> CF<T> newExceptionally(Throwable e) {
70+
CF<T> result = new CF<>();
71+
result.encodeAndSetResult(e);
72+
return result;
73+
}
74+
75+
public static <T> CF<T> supplyAsync(Supplier<T> supplier,
76+
Executor executor) {
77+
78+
CF<T> result = new CF<>();
79+
executor.execute(() -> {
80+
try {
81+
result.encodeAndSetResult(supplier.get());
82+
} catch (Throwable ex) {
83+
result.encodeAndSetResult(ex);
84+
}
85+
});
86+
return result;
87+
}
88+
89+
90+
public <U> CF<U> map(BiFunction<? super T, Throwable, ? extends U> fn) {
91+
CF<U> newResult = new CF<>();
92+
dependedActions.push(new CompleteAction<>(newResult, this, fn, null));
93+
return newResult;
94+
}
95+
96+
public <U> CF<U> mapAsync(BiFunction<? super T, Throwable, ? extends U> fn, Executor executor) {
97+
CF<U> newResult = new CF<>();
98+
dependedActions.push(new CompleteAction<>(newResult, this, fn, executor));
99+
return newResult;
100+
}
101+
102+
103+
public <U> CF<U> compose(BiFunction<? super T, Throwable, ? extends U> fn, Executor executor) {
104+
CF<U> newResult = new CF<>();
105+
dependedActions.push(new CompleteAction<>(newResult, this, fn, executor));
106+
return newResult;
107+
}
108+
109+
110+
public boolean complete(T value) {
111+
boolean success = result.compareAndSet(null, value);
112+
fireDependentActions();
113+
return success;
114+
}
115+
116+
private boolean encodeAndSetResult(Object rawValue) {
117+
if (rawValue == null) {
118+
return result.compareAndSet(null, NULL);
119+
} else if (rawValue instanceof Throwable) {
120+
return result.compareAndSet(null, new ExceptionWrapper((Throwable) rawValue));
121+
} else {
122+
return result.compareAndSet(null, rawValue);
123+
}
124+
}
125+
126+
private Object decodeResult(Object rawValue) {
127+
if (rawValue instanceof ExceptionWrapper) {
128+
return ((ExceptionWrapper) rawValue).throwable;
129+
} else if (rawValue == NULL) {
130+
return null;
131+
} else {
132+
return rawValue;
133+
}
134+
}
135+
136+
private void fireDependentActions() {
137+
Iterator<CompleteAction<?, ?>> iterator = dependedActions.iterator();
138+
while (iterator.hasNext()) {
139+
iterator.next().execute();
140+
}
141+
}
142+
143+
private <T, U> void completeViaMapper(BiFunction<? super T, Throwable, ? extends U> fn, Object encodedResult) {
144+
try {
145+
Object decodedResult = decodeResult(encodedResult);
146+
Object mappedResult = fn.apply(
147+
(T) decodedResult,
148+
decodedResult instanceof Throwable ? (Throwable) decodedResult : null
149+
);
150+
this.result.compareAndSet(null, mappedResult);
151+
} catch (Throwable t) {
152+
encodeAndSetResult(t);
153+
}
154+
}
155+
156+
157+
}

0 commit comments

Comments
 (0)