32
32
33
33
import org .springframework .core .io .buffer .DataBuffer ;
34
34
import org .springframework .core .io .buffer .DataBufferFactory ;
35
- import org .springframework .core .io .buffer .DataBufferUtils ;
36
35
37
36
import com .mongodb .reactivestreams .client .gridfs .AsyncInputStream ;
38
37
@@ -51,11 +50,13 @@ class DataBufferPublisherAdapter {
51
50
*
52
51
* @param inputStream must not be {@literal null}.
53
52
* @param dataBufferFactory must not be {@literal null}.
53
+ * @param bufferSize read {@code n} bytes per iteration.
54
54
* @return the resulting {@link Publisher}.
55
55
*/
56
- static Flux <DataBuffer > createBinaryStream (AsyncInputStream inputStream , DataBufferFactory dataBufferFactory ) {
56
+ static Flux <DataBuffer > createBinaryStream (AsyncInputStream inputStream , DataBufferFactory dataBufferFactory ,
57
+ int bufferSize ) {
57
58
58
- State state = new State (inputStream , dataBufferFactory );
59
+ State state = new State (inputStream , dataBufferFactory , bufferSize );
59
60
60
61
return Flux .usingWhen (Mono .just (inputStream ), it -> {
61
62
@@ -92,6 +93,7 @@ static class State {
92
93
93
94
final AsyncInputStream inputStream ;
94
95
final DataBufferFactory dataBufferFactory ;
96
+ final int bufferSize ;
95
97
96
98
// see DEMAND
97
99
volatile long demand ;
@@ -105,8 +107,16 @@ static class State {
105
107
void request (FluxSink <DataBuffer > sink , long n ) {
106
108
107
109
Operators .addCap (DEMAND , this , n );
110
+ drainLoop (sink );
111
+ }
108
112
109
- if (onShouldRead ()) {
113
+ /**
114
+ * Loops while we have demand and while no read is in progress.
115
+ *
116
+ * @param sink
117
+ */
118
+ void drainLoop (FluxSink <DataBuffer > sink ) {
119
+ while (onShouldRead ()) {
110
120
emitNext (sink );
111
121
}
112
122
}
@@ -119,16 +129,16 @@ boolean onWantRead() {
119
129
return READ .compareAndSet (this , READ_NONE , READ_IN_PROGRESS );
120
130
}
121
131
122
- boolean onReadDone () {
123
- return READ .compareAndSet (this , READ_IN_PROGRESS , READ_NONE );
132
+ void onReadDone () {
133
+ READ .compareAndSet (this , READ_IN_PROGRESS , READ_NONE );
124
134
}
125
135
126
136
long getDemand () {
127
137
return DEMAND .get (this );
128
138
}
129
139
130
- boolean decrementDemand () {
131
- return DEMAND .decrementAndGet (this ) > 0 ;
140
+ void decrementDemand () {
141
+ DEMAND .decrementAndGet (this );
132
142
}
133
143
134
144
void close () {
@@ -143,30 +153,32 @@ boolean isClosed() {
143
153
* Emit the next {@link DataBuffer}.
144
154
*
145
155
* @param sink
156
+ * @return
146
157
*/
147
- void emitNext (FluxSink <DataBuffer > sink ) {
148
-
149
- DataBuffer dataBuffer = dataBufferFactory .allocateBuffer ();
150
- ByteBuffer intermediate = ByteBuffer .allocate (dataBuffer .capacity ());
158
+ private void emitNext (FluxSink <DataBuffer > sink ) {
151
159
160
+ ByteBuffer transport = ByteBuffer .allocate (bufferSize );
161
+ BufferCoreSubscriber bufferCoreSubscriber = new BufferCoreSubscriber (sink , dataBufferFactory , transport );
152
162
try {
153
- Mono . from ( inputStream .read (intermediate )) .subscribe (new BufferCoreSubscriber ( sink , dataBuffer , intermediate ) );
154
- } catch (Exception e ) {
163
+ inputStream .read (transport ) .subscribe (bufferCoreSubscriber );
164
+ } catch (Throwable e ) {
155
165
sink .error (e );
156
166
}
157
167
}
158
168
159
169
private class BufferCoreSubscriber implements CoreSubscriber <Integer > {
160
170
161
171
private final FluxSink <DataBuffer > sink ;
162
- private final DataBuffer dataBuffer ;
163
- private final ByteBuffer intermediate ;
172
+ private final DataBufferFactory factory ;
173
+ private final ByteBuffer transport ;
174
+ private final Thread subscribeThread = Thread .currentThread ();
175
+ private volatile Subscription subscription ;
164
176
165
- BufferCoreSubscriber (FluxSink <DataBuffer > sink , DataBuffer dataBuffer , ByteBuffer intermediate ) {
177
+ BufferCoreSubscriber (FluxSink <DataBuffer > sink , DataBufferFactory factory , ByteBuffer transport ) {
166
178
167
179
this .sink = sink ;
168
- this .dataBuffer = dataBuffer ;
169
- this .intermediate = intermediate ;
180
+ this .factory = factory ;
181
+ this .transport = transport ;
170
182
}
171
183
172
184
@ Override
@@ -176,6 +188,7 @@ public Context currentContext() {
176
188
177
189
@ Override
178
190
public void onSubscribe (Subscription s ) {
191
+ this .subscription = s ;
179
192
s .request (1 );
180
193
}
181
194
@@ -185,24 +198,32 @@ public void onNext(Integer bytes) {
185
198
if (isClosed ()) {
186
199
187
200
onReadDone ();
188
- DataBufferUtils .release (dataBuffer );
189
- Operators .onNextDropped (dataBuffer , sink .currentContext ());
190
201
return ;
191
202
}
192
203
193
- intermediate .flip ();
194
- dataBuffer .write (intermediate );
204
+ if (bytes > 0 ) {
195
205
196
- sink .next (dataBuffer );
197
- decrementDemand ();
206
+ transport .flip ();
207
+
208
+ DataBuffer dataBuffer = factory .allocateBuffer (transport .remaining ());
209
+ dataBuffer .write (transport );
210
+
211
+ transport .clear ();
212
+ sink .next (dataBuffer );
213
+
214
+ decrementDemand ();
215
+ }
198
216
199
217
try {
200
218
if (bytes == -1 ) {
201
219
sink .complete ();
220
+ return ;
202
221
}
203
222
} finally {
204
223
onReadDone ();
205
224
}
225
+
226
+ subscription .request (1 );
206
227
}
207
228
208
229
@ Override
@@ -215,16 +236,14 @@ public void onError(Throwable t) {
215
236
}
216
237
217
238
onReadDone ();
218
- DataBufferUtils .release (dataBuffer );
219
- Operators .onNextDropped (dataBuffer , sink .currentContext ());
220
239
sink .error (t );
221
240
}
222
241
223
242
@ Override
224
243
public void onComplete () {
225
244
226
- if (onShouldRead ()) {
227
- emitNext (sink );
245
+ if (subscribeThread != Thread . currentThread ()) {
246
+ drainLoop (sink );
228
247
}
229
248
}
230
249
}
0 commit comments