4
4
package com .microsoft .signalr ;
5
5
6
6
import java .nio .ByteBuffer ;
7
- import java .nio .charset .StandardCharsets ;
8
7
import java .util .Map ;
9
8
import java .util .concurrent .ExecutorService ;
10
9
import java .util .concurrent .Executors ;
15
14
16
15
import io .reactivex .rxjava3 .core .Completable ;
17
16
import io .reactivex .rxjava3 .core .Single ;
17
+ import io .reactivex .rxjava3 .schedulers .Schedulers ;
18
+ import io .reactivex .rxjava3 .subjects .BehaviorSubject ;
18
19
import io .reactivex .rxjava3 .subjects .CompletableSubject ;
19
20
20
21
class LongPollingTransport implements Transport {
@@ -29,7 +30,7 @@ class LongPollingTransport implements Transport {
29
30
private volatile Boolean active = false ;
30
31
private String pollUrl ;
31
32
private String closeError ;
32
- private CompletableSubject receiveLoop = CompletableSubject .create ();
33
+ private BehaviorSubject < String > receiveLoopSubject = BehaviorSubject .create ();
33
34
private CompletableSubject closeSubject = CompletableSubject .create ();
34
35
private ExecutorService threadPool ;
35
36
private ExecutorService onReceiveThread ;
@@ -78,27 +79,30 @@ public Completable start(String url) {
78
79
this .threadPool = Executors .newCachedThreadPool ();
79
80
threadPool .execute (() -> {
80
81
this .onReceiveThread = Executors .newSingleThreadExecutor ();
81
- receiveLoop . subscribe (() -> {
82
- this . stop (). onErrorComplete (). subscribe ( );
82
+ receiveLoopSubject . observeOn ( Schedulers . io ()). subscribe ( u -> {
83
+ poll ( u );
83
84
}, e -> {
84
85
this .stop ().onErrorComplete ().subscribe ();
86
+ }, () -> {
87
+ this .stop ().onErrorComplete ().subscribe ();
85
88
});
86
- poll (url ).subscribeWith (receiveLoop );
89
+ // start polling
90
+ receiveLoopSubject .onNext (url );
87
91
});
88
92
89
93
return Completable .complete ();
90
94
});
91
95
}));
92
96
}
93
97
94
- private Completable poll (String url ) {
98
+ private void poll (String url ) {
95
99
if (this .active ) {
96
100
pollUrl = url + "&_=" + System .currentTimeMillis ();
97
101
logger .debug ("Polling {}." , pollUrl );
98
- return this .updateHeaderToken ().andThen (Completable .defer (() -> {
102
+ this .updateHeaderToken ().andThen (Completable .defer (() -> {
99
103
HttpRequest request = new HttpRequest ();
100
104
request .addHeaders (headers );
101
- Completable pollingCompletable = this .pollingClient .get (pollUrl , request ).flatMapCompletable (response -> {
105
+ this .pollingClient .get (pollUrl , request ).subscribe (response -> {
102
106
if (response .getStatusCode () == 204 ) {
103
107
logger .info ("LongPolling transport terminated by server." );
104
108
this .active = false ;
@@ -107,22 +111,32 @@ private Completable poll(String url) {
107
111
this .active = false ;
108
112
this .closeError = "Unexpected response code " + response .getStatusCode () + "." ;
109
113
} else {
110
- if (response .getContent () != null ) {
114
+ if (response .getContent () != null && response . getContent (). hasRemaining () ) {
111
115
logger .debug ("Message received." );
112
- onReceiveThread .submit (() -> this .onReceive (response .getContent ()));
116
+ try {
117
+ onReceiveThread .submit (() -> this .onReceive (response .getContent ()));
118
+ // it's possible for stop to be called while a request is in progress, if stop throws it wont wait for
119
+ // an in-progress poll to complete and will shutdown the thread. We should ignore the exception so we don't
120
+ // get an unhandled RX error
121
+ } catch (Exception e ) {}
113
122
} else {
114
123
logger .debug ("Poll timed out, reissuing." );
115
124
}
116
125
}
117
- return poll (url );
126
+ receiveLoopSubject .onNext (url );
127
+ }, e -> {
128
+ receiveLoopSubject .onError (e );
118
129
});
119
130
120
- return pollingCompletable ;
121
- }));
131
+ return Completable .complete ();
132
+ }))
133
+ .subscribe (() -> {},
134
+ e -> {
135
+ receiveLoopSubject .onError (e );
136
+ });
122
137
} else {
123
138
logger .debug ("Long Polling transport polling complete." );
124
- receiveLoop .onComplete ();
125
- return Completable .complete ();
139
+ receiveLoopSubject .onComplete ();
126
140
}
127
141
}
128
142
@@ -162,7 +176,7 @@ public Completable stop() {
162
176
HttpRequest request = new HttpRequest ();
163
177
request .addHeaders (headers );
164
178
return this .pollingClient .delete (this .url , request ).ignoreElement ()
165
- .andThen (receiveLoop )
179
+ .andThen (receiveLoopSubject . ignoreElements () )
166
180
.doOnComplete (() -> {
167
181
cleanup (this .closeError );
168
182
});
0 commit comments