Skip to content

Blocking behaviour in RxJava #270

Closed
Closed
@purplefox

Description

@purplefox

Firstly - thanks for a great library :)

I am in the process of getting RxJava to play nicely with Vert.x http://vertx.io

If you don't know Vert.x, it's (very roughly) a polyglot reactor implementation for the JVM. I guess you can think of it superficially as a "Node.js for the JVM".

Vert.x provides 100% asynchronous APIs for our users, and everything is executed on an event loop - the exact same thread is always used for any piece of user code.

Vert.x (like Node.js) APIs generally take the form of:

void doSomething(someParam, Handler handler);

where Handler is a callback handler that gets executed asynchronously on the event loop when the result is available.

The idea with the RxJava module I am developing for Vert.x is we can wrap these Vert.x APIs so instead of taking Handlers they instead return Observable.

This means they become composable with RxJava and we can get rid of "callback hell".

Since everything is executed on the event loop in Vert.x it's essential that the RxJava API is 100% non-blocking.

Most things do indeed seem to be non-blocking but I hit a snag with the concat operation, where the subscribe seems, unfortunately to be blocking, e.g.

Observable<Message> concatenated = Observable.concat(obs1, obs2, obs3);
concatenated.subscribe(action); // This blocks!!

Unfortunately this will cause Vert.x to hang since the thread that blocks is the same thread that the events will be delivered on so they will never arrive :(

Looking at the code in OperationConcat.java I can see that a CountDownLatch is being used.

I can't see a good reason why any of the main operations in RxJava (including concat) can't be implemented in a 100% non blocking way, and this would be essential for Vert.x to use them.

Do you have any plans to refactor concat to work in a non blocking way? We would love to use RxJava in Vert.x, and I think Vert.x would be a good vehicle to push RxJava to an even bigger audience :)

Thanks in advance.

[Also.. I noticed that last() is also blocking which makes it unusable in Vert.x (or any non blocking system) Again I can't see a reason why it has to be.]

Metadata

Metadata

Assignees

No one assigned

    Labels

    No labels
    No labels

    Type

    No type

    Projects

    No projects

    Milestone

    No milestone

    Relationships

    None yet

    Development

    No branches or pull requests

    Issue actions