Description
Considering a simple example:
public class UnsafeSubscriber implements Subscriber<String> {
private boolean duplicateOnSubscribe = false;
@Override
public void onSubscribe(final Subscription s) {
if (duplicateOnSubscribe) {
throw new IllegalStateException("Duplicate onSubscribe() calls.");
}
duplicateOnSubscribe = true;
}
@Override
public void onNext(final String s) {
}
@Override
public void onError(final Throwable t) {
}
@Override
public void onComplete() {
}
}
If an UnsafeSubscriber
instance is created in a different thread than the one that invokes onSubscribe()
(true for an asynchronous Publisher
), according to the java memory model, this statement inside onSubscribe()
:
if (duplicateOnSubscribe) {
is guaranteed to compute to false
if and only if the instance is published safely between these threads. None of the rules in the specifications establish a happens-before relationship between Publisher#subscribe()
and Subscriber#onSubscribe()
. So, the usage above can be categorized as unsafe. In a more convoluted form, the assignment:
private boolean duplicateOnSubscribe = false;
can be interleaved with
duplicateOnSubscribe = true;
such that duplicateOnSubscribe
is set to false
later.
Has this been considered before or am I missing something?