Skip to content

Commit 4774a20

Browse files
committed
add CF class
1 parent a5b2bba commit 4774a20

File tree

1 file changed

+143
-0
lines changed
  • src/main/java/org/dataloader/orchestration

1 file changed

+143
-0
lines changed
Lines changed: 143 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,143 @@
1+
package org.dataloader.orchestration;
2+
3+
import java.util.Iterator;
4+
import java.util.concurrent.Executor;
5+
import java.util.concurrent.LinkedBlockingDeque;
6+
import java.util.concurrent.atomic.AtomicReference;
7+
import java.util.function.BiFunction;
8+
9+
public class CF<T> {
10+
11+
private AtomicReference<Object> result = new AtomicReference<>();
12+
13+
private static final Object NULL = new Object();
14+
15+
private static class ExceptionWrapper {
16+
private final Throwable throwable;
17+
18+
private ExceptionWrapper(Throwable throwable) {
19+
this.throwable = throwable;
20+
}
21+
}
22+
23+
private final LinkedBlockingDeque<CompleteAction<?, ?>> dependedActions = new LinkedBlockingDeque<>();
24+
25+
private static class CompleteAction<T, V> implements Runnable {
26+
Executor executor;
27+
CF<V> toComplete;
28+
CF<T> src;
29+
BiFunction<? super T, Throwable, ? extends V> mapperFn;
30+
31+
public CompleteAction(CF<V> toComplete, CF<T> src, BiFunction<? super T, Throwable, ? extends V> mapperFn, Executor executor) {
32+
this.toComplete = toComplete;
33+
this.src = src;
34+
this.mapperFn = mapperFn;
35+
this.executor = executor;
36+
}
37+
38+
public void execute() {
39+
if (executor != null) {
40+
executor.execute(this);
41+
} else {
42+
toComplete.completeViaMapper(mapperFn, src.result.get());
43+
}
44+
45+
}
46+
47+
@Override
48+
public void run() {
49+
toComplete.completeViaMapper(mapperFn, src.result.get());
50+
src.dependedActions.remove(this);
51+
}
52+
}
53+
54+
private CF() {
55+
56+
}
57+
58+
public <U> CF<U> newIncomplete() {
59+
return new CF<U>();
60+
}
61+
62+
public static <T> CF<T> newComplete(T completed) {
63+
CF<T> result = new CF<>();
64+
result.encodeAndSetResult(completed);
65+
return result;
66+
}
67+
68+
public static <T> CF<T> newExceptionally(Throwable e) {
69+
CF<T> result = new CF<>();
70+
result.encodeAndSetResult(e);
71+
return result;
72+
}
73+
74+
75+
public <U> CF<U> map(BiFunction<? super T, Throwable, ? extends U> fn) {
76+
CF<U> newResult = new CF<>();
77+
dependedActions.push(new CompleteAction<>(newResult, this, fn, null));
78+
return newResult;
79+
}
80+
81+
public <U> CF<U> mapAsync(BiFunction<? super T, Throwable, ? extends U> fn, Executor executor) {
82+
CF<U> newResult = new CF<>();
83+
dependedActions.push(new CompleteAction<>(newResult, this, fn, executor));
84+
return newResult;
85+
}
86+
87+
88+
public <U> CF<U> compose(BiFunction<? super T, Throwable, ? extends U> fn, Executor executor) {
89+
CF<U> newResult = new CF<>();
90+
dependedActions.push(new CompleteAction<>(newResult, this, fn, executor));
91+
return newResult;
92+
}
93+
94+
95+
public boolean complete(T value) {
96+
boolean success = result.compareAndSet(null, value);
97+
return success;
98+
}
99+
100+
private boolean encodeAndSetResult(Object rawValue) {
101+
if (rawValue == null) {
102+
return result.compareAndSet(null, NULL);
103+
} else if (rawValue instanceof Throwable) {
104+
return result.compareAndSet(null, new ExceptionWrapper((Throwable) rawValue));
105+
} else {
106+
return result.compareAndSet(null, rawValue);
107+
}
108+
}
109+
110+
private Object decodeResult(Object rawValue) {
111+
if (rawValue instanceof ExceptionWrapper) {
112+
return ((ExceptionWrapper) rawValue).throwable;
113+
} else if (rawValue == NULL) {
114+
return null;
115+
} else {
116+
return rawValue;
117+
}
118+
}
119+
120+
private void fireDependentActions() {
121+
Iterator<CompleteAction<?, ?>> iterator = dependedActions.iterator();
122+
while (iterator.hasNext()) {
123+
iterator.next().execute();
124+
}
125+
}
126+
127+
private <T, U> void completeViaMapper(BiFunction<? super T, Throwable, ? extends U> fn, Object encodedResult) {
128+
try {
129+
Object decodedResult = decodeResult(encodedResult);
130+
Object mappedResult = fn.apply(
131+
(T) decodedResult,
132+
decodedResult instanceof Throwable ? (Throwable) decodedResult : null
133+
);
134+
this.result.compareAndSet(null, mappedResult);
135+
} catch (Throwable t) {
136+
encodeAndSetResult(t);
137+
}
138+
139+
140+
}
141+
142+
143+
}

0 commit comments

Comments
 (0)