Skip to content

Commit 87cfa7a

Browse files
Initial commit
1 parent 6d0e7fa commit 87cfa7a

Some content is hidden

Large Commits have some content hidden by default. Use the searchbox below for content that may be hidden.

41 files changed

+9785
-0
lines changed

rxjava-core/build.gradle

Lines changed: 22 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,22 @@
1+
apply plugin: 'java'
2+
apply plugin: 'eclipse'
3+
4+
dependencies {
5+
compile 'org.slf4j:slf4j-api:1.7.0'
6+
compile 'com.google.code.findbugs:jsr305:2.0.0'
7+
provided 'junit:junit:4.10'
8+
provided 'org.mockito:mockito-core:1.9.5'
9+
compile 'org.codehaus.groovy:groovy:1.8.8'
10+
compile 'org.jruby:jruby:1.7.2'
11+
}
12+
13+
eclipse {
14+
classpath {
15+
//you can tweak the classpath of the Eclipse project by adding extra configurations:
16+
plusConfigurations += configurations.provided
17+
18+
downloadSources = true
19+
downloadJavadoc = true
20+
}
21+
}
22+
Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,5 @@
1+
package org.rx.functions;
2+
3+
public interface Func0<R> {
4+
public R call();
5+
}
Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,5 @@
1+
package org.rx.functions;
2+
3+
public interface Func1<R, T1> {
4+
public R call(T1 t1);
5+
}
Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,5 @@
1+
package org.rx.functions;
2+
3+
public interface Func2<R, T1, T2> {
4+
public R call(T1 t1, T2 t2);
5+
}
Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,5 @@
1+
package org.rx.functions;
2+
3+
public interface Func3<R, T1, T2, T3> {
4+
public R call(T1 t1, T2 t2, T3 t3);
5+
}
Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,5 @@
1+
package org.rx.functions;
2+
3+
public interface Func4<R, T1, T2, T3, T4> {
4+
public R call(T1 t1, T2 t2, T3 t3, T4 t4);
5+
}
Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,5 @@
1+
package org.rx.functions;
2+
3+
public interface FuncN<R> {
4+
public R call(Object... args);
5+
}
Lines changed: 202 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,202 @@
1+
package org.rx.functions;
2+
3+
import groovy.lang.Closure;
4+
5+
import org.jruby.Ruby;
6+
import org.jruby.RubyProc;
7+
import org.jruby.javasupport.JavaEmbedUtils;
8+
import org.jruby.runtime.builtin.IRubyObject;
9+
import org.slf4j.Logger;
10+
import org.slf4j.LoggerFactory;
11+
12+
public class Functions {
13+
14+
private static final Logger logger = LoggerFactory.getLogger(Functions.class);
15+
16+
/**
17+
* Utility method for determining the type of closure/function and executing it.
18+
*
19+
* @param closure
20+
* @param args
21+
*/
22+
@SuppressWarnings("unchecked")
23+
public static <R> R execute(Object closure, Object... args) {
24+
// if we have a tracer then log the start
25+
long startTime = -1;
26+
if (tracer != null && tracer.isTraceEnabled()) {
27+
try {
28+
startTime = System.nanoTime();
29+
tracer.traceStart(closure, args);
30+
} catch (Exception e) {
31+
logger.warn("Failed to trace log.", e);
32+
}
33+
}
34+
// perform controller logic to determine what type of function we received and execute it
35+
try {
36+
if (closure == null) {
37+
throw new RuntimeException("closure is null. Can't send arguments to null closure.");
38+
}
39+
if (closure instanceof Closure) {
40+
/* handle Groovy */
41+
return (R) ((Closure<?>) closure).call(args);
42+
} else if (closure instanceof RubyProc) {
43+
// handle JRuby
44+
RubyProc rubyProc = ((RubyProc) closure);
45+
Ruby ruby = rubyProc.getRuntime();
46+
IRubyObject rubyArgs[] = new IRubyObject[args.length];
47+
for (int i = 0; i < args.length; i++) {
48+
rubyArgs[i] = JavaEmbedUtils.javaToRuby(ruby, args[i]);
49+
}
50+
return (R) rubyProc.getBlock().call(ruby.getCurrentContext(), rubyArgs);
51+
} else if (closure instanceof Func0) {
52+
Func0<R> f = (Func0<R>) closure;
53+
if (args.length != 0) {
54+
throw new RuntimeException("The closure was Func0 and expected no arguments, but we received: " + args.length);
55+
}
56+
return (R) f.call();
57+
} else if (closure instanceof Func1) {
58+
Func1<R, Object> f = (Func1<R, Object>) closure;
59+
if (args.length != 1) {
60+
throw new RuntimeException("The closure was Func1 and expected 1 argument, but we received: " + args.length);
61+
}
62+
return f.call(args[0]);
63+
} else if (closure instanceof Func2) {
64+
Func2<R, Object, Object> f = (Func2<R, Object, Object>) closure;
65+
if (args.length != 2) {
66+
throw new RuntimeException("The closure was Func2 and expected 2 arguments, but we received: " + args.length);
67+
}
68+
return f.call(args[0], args[1]);
69+
} else if (closure instanceof Func3) {
70+
Func3<R, Object, Object, Object> f = (Func3<R, Object, Object, Object>) closure;
71+
if (args.length != 3) {
72+
throw new RuntimeException("The closure was Func3 and expected 3 arguments, but we received: " + args.length);
73+
}
74+
return (R) f.call(args[0], args[1], args[2]);
75+
} else if (closure instanceof Func4) {
76+
Func4<R, Object, Object, Object, Object> f = (Func4<R, Object, Object, Object, Object>) closure;
77+
if (args.length != 1) {
78+
throw new RuntimeException("The closure was Func4 and expected 4 arguments, but we received: " + args.length);
79+
}
80+
return f.call(args[0], args[1], args[2], args[3]);
81+
} else if (closure instanceof FuncN) {
82+
FuncN<R> f = (FuncN<R>) closure;
83+
return f.call(args);
84+
} else {
85+
throw new RuntimeException("Unsupported closure type: " + closure.getClass().getSimpleName());
86+
}
87+
} finally {
88+
// if we have a tracer then log the end
89+
if (tracer != null && tracer.isTraceEnabled()) {
90+
try {
91+
tracer.traceEnd(startTime, System.nanoTime(), closure, args);
92+
} catch (Exception e) {
93+
logger.warn("Failed to trace log.", e);
94+
}
95+
}
96+
}
97+
}
98+
99+
public static <R, T0> FuncN<R> fromFunc(final Func1<R, T0> f) {
100+
return new FuncN<R>() {
101+
102+
/**
103+
* If it can't cast to this it should throw an exception as that means code is using this wrong.
104+
* <p>
105+
* We unfortunately need FuncN to be Object and this is a bridge between typed and non-typed hence this being unchecked
106+
*/
107+
@SuppressWarnings("unchecked")
108+
@Override
109+
public R call(Object... args) {
110+
if (args.length == 0) {
111+
return f.call(null);
112+
} else {
113+
return f.call((T0) args[0]);
114+
}
115+
}
116+
117+
};
118+
}
119+
120+
public static <R, T0, T1> FuncN<R> fromFunc(final Func2<R, T0, T1> f) {
121+
return new FuncN<R>() {
122+
123+
/**
124+
* If it can't cast to this it should throw an exception as that means code is using this wrong.
125+
* <p>
126+
* We unfortunately need FuncN to be Object and this is a bridge between typed and non-typed hence this being unchecked
127+
*/
128+
@SuppressWarnings("unchecked")
129+
@Override
130+
public R call(Object... args) {
131+
if (args.length < 2) {
132+
throw new RuntimeException("Func2 expecting 2 arguments.");
133+
}
134+
return f.call((T0) args[0], (T1) args[1]);
135+
}
136+
137+
};
138+
}
139+
140+
public static <R, T0, T1, T2> FuncN<R> fromFunc(final Func3<R, T0, T1, T2> f) {
141+
return new FuncN<R>() {
142+
143+
/**
144+
* If it can't cast to this it should throw an exception as that means code is using this wrong.
145+
* <p>
146+
* We unfortunately need FuncN to be Object and this is a bridge between typed and non-typed hence this being unchecked
147+
*/
148+
@SuppressWarnings("unchecked")
149+
@Override
150+
public R call(Object... args) {
151+
if (args.length < 3) {
152+
throw new RuntimeException("Func3 expecting 3 arguments.");
153+
}
154+
return f.call((T0) args[0], (T1) args[1], (T2) args[2]);
155+
}
156+
157+
};
158+
}
159+
160+
public static <R, T0, T1, T2, T3> FuncN<R> fromFunc(final Func4<R, T0, T1, T2, T3> f) {
161+
return new FuncN<R>() {
162+
163+
/**
164+
* If it can't cast to this it should throw an exception as that means code is using this wrong.
165+
* <p>
166+
* We unfortunately need FuncN to be Object and this is a bridge between typed and non-typed hence this being unchecked
167+
*/
168+
@SuppressWarnings("unchecked")
169+
@Override
170+
public R call(Object... args) {
171+
if (args.length < 4) {
172+
throw new RuntimeException("Func4 expecting 4 arguments.");
173+
}
174+
return f.call((T0) args[0], (T1) args[1], (T2) args[2], (T3) args[3]);
175+
}
176+
177+
};
178+
}
179+
180+
private static volatile FunctionTraceLogger tracer = null;
181+
182+
public static interface FunctionTraceLogger {
183+
public boolean isTraceEnabled();
184+
185+
public void traceStart(Object closure, Object... args);
186+
187+
/**
188+
*
189+
* @param start
190+
* nanoTime
191+
* @param end
192+
* nanoTime
193+
* @param closure
194+
* @param args
195+
*/
196+
public void traceEnd(long start, long end, Object closure, Object... args);
197+
}
198+
199+
public static void registerTraceLogger(FunctionTraceLogger tracer) {
200+
Functions.tracer = tracer;
201+
}
202+
}
Lines changed: 56 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,56 @@
1+
package org.rx.operations;
2+
3+
import java.util.concurrent.atomic.AtomicBoolean;
4+
import java.util.concurrent.atomic.AtomicReference;
5+
6+
import javax.annotation.concurrent.ThreadSafe;
7+
8+
import org.rx.reactive.IDisposable;
9+
10+
11+
/**
12+
* Thread-safe wrapper around WatchableSubscription that ensures unsubscribe can be called only once.
13+
*/
14+
@ThreadSafe
15+
/* package */class AtomicWatchableSubscription implements IDisposable {
16+
17+
private AtomicReference<IDisposable> actualSubscription = new AtomicReference<IDisposable>();
18+
private AtomicBoolean unsubscribed = new AtomicBoolean(false);
19+
20+
public AtomicWatchableSubscription() {
21+
22+
}
23+
24+
public AtomicWatchableSubscription(IDisposable actualSubscription) {
25+
this.actualSubscription.set(actualSubscription);
26+
}
27+
28+
/**
29+
* Set the actual subscription once it exists (if it wasn't available when constructed)
30+
*
31+
* @param actualSubscription
32+
* @throws IllegalStateException
33+
* if trying to set more than once (or use this method after setting via constructor)
34+
*/
35+
public AtomicWatchableSubscription setActual(IDisposable actualSubscription) {
36+
if (!this.actualSubscription.compareAndSet(null, actualSubscription)) {
37+
throw new IllegalStateException("Can not set subscription more than once.");
38+
}
39+
return this;
40+
}
41+
42+
@Override
43+
public void unsubscribe() {
44+
// get the real thing and set to null in an atomic operation so we will only ever call unsubscribe once
45+
IDisposable actual = actualSubscription.getAndSet(null);
46+
// if it's not null we will unsubscribe
47+
if (actual != null) {
48+
actual.unsubscribe();
49+
unsubscribed.set(true);
50+
}
51+
}
52+
53+
public boolean isUnsubscribed() {
54+
return unsubscribed.get();
55+
}
56+
}
Lines changed: 69 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,69 @@
1+
package org.rx.operations;
2+
3+
import javax.annotation.concurrent.ThreadSafe;
4+
5+
import org.rx.reactive.IObserver;
6+
7+
/**
8+
* A thread-safe Watcher for transitioning states in operators.
9+
* <p>
10+
* Allows both single-threaded and multi-threaded execution controlled by the following FastProperty:
11+
* <li>reactive.watcher.multithreaded.enabled [Default: false]</li>
12+
* <p>
13+
* Single-threaded Execution rules are:
14+
* <ul>
15+
* <li>Allow only single-threaded, synchronous, ordered execution of onNext, onCompleted, onError</li>
16+
* <li>Once an onComplete or onError are performed, no further calls can be executed</li>
17+
* <li>If unsubscribe is called, this means we call completed() and don't allow any further onNext calls.</li>
18+
* </ul>
19+
* <p>
20+
* Multi-threaded Execution rules are:
21+
* <ul>
22+
* <li>Allows multiple threads to perform onNext concurrently</li>
23+
* <li>When an onComplete, onError or unsubscribe request is received, block until all current onNext calls are completed</li>
24+
* <li>When an unsubscribe is received, block until all current onNext are completed</li>
25+
* <li>Once an onComplete or onError are performed, no further calls can be executed</li>
26+
* <li>If unsubscribe is called, this means we call completed() and don't allow any further onNext calls.</li>
27+
* </ul>
28+
*
29+
* @param <T>
30+
*/
31+
@ThreadSafe
32+
/* package */class AtomicWatcher<T> implements IObserver<T> {
33+
34+
/** Allow changing between forcing single or allowing multi-threaded execution of onNext */
35+
private static boolean allowMultiThreaded = true;
36+
static {
37+
String v = System.getProperty("rx.onNext.multithreaded.enabled");
38+
if (v != null) {
39+
// if we have a property set then we'll use it
40+
allowMultiThreaded = Boolean.parseBoolean(v);
41+
}
42+
}
43+
44+
private final IObserver<T> watcher;
45+
46+
public AtomicWatcher(IObserver<T> watcher, AtomicWatchableSubscription subscription) {
47+
if (allowMultiThreaded) {
48+
this.watcher = new AtomicWatcherMultiThreaded<T>(watcher, subscription);
49+
} else {
50+
this.watcher = new AtomicWatcherSingleThreaded<T>(watcher, subscription);
51+
}
52+
}
53+
54+
@Override
55+
public void onCompleted() {
56+
watcher.onCompleted();
57+
}
58+
59+
@Override
60+
public void onError(Exception e) {
61+
watcher.onError(e);
62+
}
63+
64+
@Override
65+
public void onNext(T args) {
66+
watcher.onNext(args);
67+
}
68+
69+
}

0 commit comments

Comments
 (0)