49
49
* {@link #close()} is propagated as cancellation signal to the binary {@link Publisher}.
50
50
*
51
51
* @author Mark Paluch
52
+ * @author Christoph Strobl
52
53
* @since 2.2
53
54
*/
54
55
@ RequiredArgsConstructor
@@ -94,20 +95,23 @@ public Publisher<Integer> read(ByteBuffer dst) {
94
95
try {
95
96
96
97
if (error != null ) {
98
+
97
99
sink .error (error );
98
100
return ;
99
101
}
100
102
101
103
if (bytecount == -1 ) {
104
+
102
105
sink .success (-1 );
103
106
return ;
104
107
}
105
108
106
109
ByteBuffer byteBuffer = db .asByteBuffer ();
107
110
int toWrite = byteBuffer .remaining ();
108
- dst .put (byteBuffer );
109
111
112
+ dst .put (byteBuffer );
110
113
sink .success (toWrite );
114
+
111
115
} catch (Exception e ) {
112
116
sink .error (e );
113
117
} finally {
@@ -127,6 +131,7 @@ public Publisher<Integer> read(ByteBuffer dst) {
127
131
public Publisher <Success > close () {
128
132
129
133
return Mono .create (sink -> {
134
+
130
135
cancelled = true ;
131
136
132
137
if (error != null ) {
@@ -141,6 +146,7 @@ public Publisher<Success> close() {
141
146
protected void request (int n ) {
142
147
143
148
if (complete ) {
149
+
144
150
terminatePendingReads ();
145
151
return ;
146
152
}
@@ -150,67 +156,9 @@ protected void request(int n) {
150
156
if (SUBSCRIBED .get (this ) == SUBSCRIPTION_NOT_SUBSCRIBED ) {
151
157
152
158
if (SUBSCRIBED .compareAndSet (this , SUBSCRIPTION_NOT_SUBSCRIBED , SUBSCRIPTION_SUBSCRIBED )) {
153
-
154
- buffers .subscribe (new CoreSubscriber <DataBuffer >() {
155
-
156
- @ Override
157
- public Context currentContext () {
158
- return subscriberContext ;
159
- }
160
-
161
- @ Override
162
- public void onSubscribe (Subscription s ) {
163
- subscription = s ;
164
-
165
- Operators .addCap (DEMAND , AsyncInputStreamAdapter .this , -1 );
166
- s .request (1 );
167
- }
168
-
169
- @ Override
170
- public void onNext (DataBuffer dataBuffer ) {
171
-
172
- if (cancelled || complete ) {
173
- DataBufferUtils .release (dataBuffer );
174
- Operators .onNextDropped (dataBuffer , subscriberContext );
175
- return ;
176
- }
177
-
178
- BiConsumer <DataBuffer , Integer > poll = readRequests .poll ();
179
-
180
- if (poll == null ) {
181
-
182
- DataBufferUtils .release (dataBuffer );
183
- Operators .onNextDropped (dataBuffer , subscriberContext );
184
- subscription .cancel ();
185
- return ;
186
- }
187
-
188
- poll .accept (dataBuffer , dataBuffer .readableByteCount ());
189
-
190
- requestFromSubscription (subscription );
191
- }
192
-
193
- @ Override
194
- public void onError (Throwable t ) {
195
-
196
- if (cancelled || complete ) {
197
- Operators .onErrorDropped (t , subscriberContext );
198
- return ;
199
- }
200
-
201
- error = t ;
202
- complete = true ;
203
- terminatePendingReads ();
204
- }
205
-
206
- @ Override
207
- public void onComplete () {
208
-
209
- complete = true ;
210
- terminatePendingReads ();
211
- }
212
- });
159
+ buffers .subscribe (new DataBufferCoreSubscriber ());
213
160
}
161
+
214
162
} else {
215
163
216
164
Subscription subscription = this .subscription ;
@@ -245,4 +193,65 @@ void terminatePendingReads() {
245
193
readers .accept (factory .wrap (new byte [0 ]), -1 );
246
194
}
247
195
}
196
+
197
+ private class DataBufferCoreSubscriber implements CoreSubscriber <DataBuffer > {
198
+
199
+ @ Override
200
+ public Context currentContext () {
201
+ return AsyncInputStreamAdapter .this .subscriberContext ;
202
+ }
203
+
204
+ @ Override
205
+ public void onSubscribe (Subscription s ) {
206
+
207
+ AsyncInputStreamAdapter .this .subscription = s ;
208
+
209
+ Operators .addCap (DEMAND , AsyncInputStreamAdapter .this , -1 );
210
+ s .request (1 );
211
+ }
212
+
213
+ @ Override
214
+ public void onNext (DataBuffer dataBuffer ) {
215
+
216
+ if (cancelled || complete ) {
217
+ DataBufferUtils .release (dataBuffer );
218
+ Operators .onNextDropped (dataBuffer , AsyncInputStreamAdapter .this .subscriberContext );
219
+ return ;
220
+ }
221
+
222
+ BiConsumer <DataBuffer , Integer > poll = AsyncInputStreamAdapter .this .readRequests .poll ();
223
+
224
+ if (poll == null ) {
225
+
226
+ DataBufferUtils .release (dataBuffer );
227
+ Operators .onNextDropped (dataBuffer , AsyncInputStreamAdapter .this .subscriberContext );
228
+ subscription .cancel ();
229
+ return ;
230
+ }
231
+
232
+ poll .accept (dataBuffer , dataBuffer .readableByteCount ());
233
+
234
+ requestFromSubscription (subscription );
235
+ }
236
+
237
+ @ Override
238
+ public void onError (Throwable t ) {
239
+
240
+ if (AsyncInputStreamAdapter .this .cancelled || AsyncInputStreamAdapter .this .complete ) {
241
+ Operators .onErrorDropped (t , AsyncInputStreamAdapter .this .subscriberContext );
242
+ return ;
243
+ }
244
+
245
+ AsyncInputStreamAdapter .this .error = t ;
246
+ AsyncInputStreamAdapter .this .complete = true ;
247
+ terminatePendingReads ();
248
+ }
249
+
250
+ @ Override
251
+ public void onComplete () {
252
+
253
+ AsyncInputStreamAdapter .this .complete = true ;
254
+ terminatePendingReads ();
255
+ }
256
+ }
248
257
}
0 commit comments