Description
Firstly - thanks for a great library :)
I am in the process of getting RxJava to play nicely with Vert.x http://vertx.io
If you don't know Vert.x, it's (very roughly) a polyglot reactor implementation for the JVM. I guess you can think of it superficially as a "Node.js for the JVM".
Vert.x provides 100% asynchronous APIs for our users, and everything is executed on an event loop - the exact same thread is always used for any piece of user code.
Vert.x (like Node.js) APIs generally take the form of:
void doSomething(someParam, Handler handler);
where Handler is a callback handler that gets executed asynchronously on the event loop when the result is available.
The idea with the RxJava module I am developing for Vert.x is we can wrap these Vert.x APIs so instead of taking Handlers they instead return Observable.
This means they become composable with RxJava and we can get rid of "callback hell".
Since everything is executed on the event loop in Vert.x it's essential that the RxJava API is 100% non-blocking.
Most things do indeed seem to be non-blocking but I hit a snag with the concat operation, where the subscribe seems, unfortunately to be blocking, e.g.
Observable<Message> concatenated = Observable.concat(obs1, obs2, obs3);
concatenated.subscribe(action); // This blocks!!
Unfortunately this will cause Vert.x to hang since the thread that blocks is the same thread that the events will be delivered on so they will never arrive :(
Looking at the code in OperationConcat.java I can see that a CountDownLatch is being used.
I can't see a good reason why any of the main operations in RxJava (including concat) can't be implemented in a 100% non blocking way, and this would be essential for Vert.x to use them.
Do you have any plans to refactor concat to work in a non blocking way? We would love to use RxJava in Vert.x, and I think Vert.x would be a good vehicle to push RxJava to an even bigger audience :)
Thanks in advance.
[Also.. I noticed that last() is also blocking which makes it unusable in Vert.x (or any non blocking system) Again I can't see a reason why it has to be.]