Skip to content

Commit 5c54491

Browse files
committed
DATAMONGO-2393 - Use drain loop for same-thread processing in GridFS download stream.
We now rely on an outer drain-loop when GridFS reads complete on the same thread instead of using recursive subscriptions to avoid StackOverflow. Previously, we recursively invoked subscriptions that lead to an increased stack size.
1 parent a601f6c commit 5c54491

File tree

1 file changed

+48
-29
lines changed

1 file changed

+48
-29
lines changed

spring-data-mongodb/src/main/java/org/springframework/data/mongodb/gridfs/DataBufferPublisherAdapter.java

Lines changed: 48 additions & 29 deletions
Original file line numberDiff line numberDiff line change
@@ -32,7 +32,6 @@
3232

3333
import org.springframework.core.io.buffer.DataBuffer;
3434
import org.springframework.core.io.buffer.DataBufferFactory;
35-
import org.springframework.core.io.buffer.DataBufferUtils;
3635

3736
import com.mongodb.reactivestreams.client.gridfs.AsyncInputStream;
3837

@@ -51,11 +50,13 @@ class DataBufferPublisherAdapter {
5150
*
5251
* @param inputStream must not be {@literal null}.
5352
* @param dataBufferFactory must not be {@literal null}.
53+
* @param bufferSize read {@code n} bytes per iteration.
5454
* @return the resulting {@link Publisher}.
5555
*/
56-
static Flux<DataBuffer> createBinaryStream(AsyncInputStream inputStream, DataBufferFactory dataBufferFactory) {
56+
static Flux<DataBuffer> createBinaryStream(AsyncInputStream inputStream, DataBufferFactory dataBufferFactory,
57+
int bufferSize) {
5758

58-
State state = new State(inputStream, dataBufferFactory);
59+
State state = new State(inputStream, dataBufferFactory, bufferSize);
5960

6061
return Flux.usingWhen(Mono.just(inputStream), it -> {
6162

@@ -92,6 +93,7 @@ static class State {
9293

9394
final AsyncInputStream inputStream;
9495
final DataBufferFactory dataBufferFactory;
96+
final int bufferSize;
9597

9698
// see DEMAND
9799
volatile long demand;
@@ -105,8 +107,16 @@ static class State {
105107
void request(FluxSink<DataBuffer> sink, long n) {
106108

107109
Operators.addCap(DEMAND, this, n);
110+
drainLoop(sink);
111+
}
108112

109-
if (onShouldRead()) {
113+
/**
114+
* Loops while we have demand and while no read is in progress.
115+
*
116+
* @param sink
117+
*/
118+
void drainLoop(FluxSink<DataBuffer> sink) {
119+
while (onShouldRead()) {
110120
emitNext(sink);
111121
}
112122
}
@@ -119,16 +129,16 @@ boolean onWantRead() {
119129
return READ.compareAndSet(this, READ_NONE, READ_IN_PROGRESS);
120130
}
121131

122-
boolean onReadDone() {
123-
return READ.compareAndSet(this, READ_IN_PROGRESS, READ_NONE);
132+
void onReadDone() {
133+
READ.compareAndSet(this, READ_IN_PROGRESS, READ_NONE);
124134
}
125135

126136
long getDemand() {
127137
return DEMAND.get(this);
128138
}
129139

130-
boolean decrementDemand() {
131-
return DEMAND.decrementAndGet(this) > 0;
140+
void decrementDemand() {
141+
DEMAND.decrementAndGet(this);
132142
}
133143

134144
void close() {
@@ -143,30 +153,32 @@ boolean isClosed() {
143153
* Emit the next {@link DataBuffer}.
144154
*
145155
* @param sink
156+
* @return
146157
*/
147-
void emitNext(FluxSink<DataBuffer> sink) {
148-
149-
DataBuffer dataBuffer = dataBufferFactory.allocateBuffer();
150-
ByteBuffer intermediate = ByteBuffer.allocate(dataBuffer.capacity());
158+
private void emitNext(FluxSink<DataBuffer> sink) {
151159

160+
ByteBuffer transport = ByteBuffer.allocate(bufferSize);
161+
BufferCoreSubscriber bufferCoreSubscriber = new BufferCoreSubscriber(sink, dataBufferFactory, transport);
152162
try {
153-
Mono.from(inputStream.read(intermediate)).subscribe(new BufferCoreSubscriber(sink, dataBuffer, intermediate));
154-
} catch (Exception e) {
163+
inputStream.read(transport).subscribe(bufferCoreSubscriber);
164+
} catch (Throwable e) {
155165
sink.error(e);
156166
}
157167
}
158168

159169
private class BufferCoreSubscriber implements CoreSubscriber<Integer> {
160170

161171
private final FluxSink<DataBuffer> sink;
162-
private final DataBuffer dataBuffer;
163-
private final ByteBuffer intermediate;
172+
private final DataBufferFactory factory;
173+
private final ByteBuffer transport;
174+
private final Thread subscribeThread = Thread.currentThread();
175+
private volatile Subscription subscription;
164176

165-
BufferCoreSubscriber(FluxSink<DataBuffer> sink, DataBuffer dataBuffer, ByteBuffer intermediate) {
177+
BufferCoreSubscriber(FluxSink<DataBuffer> sink, DataBufferFactory factory, ByteBuffer transport) {
166178

167179
this.sink = sink;
168-
this.dataBuffer = dataBuffer;
169-
this.intermediate = intermediate;
180+
this.factory = factory;
181+
this.transport = transport;
170182
}
171183

172184
@Override
@@ -176,6 +188,7 @@ public Context currentContext() {
176188

177189
@Override
178190
public void onSubscribe(Subscription s) {
191+
this.subscription = s;
179192
s.request(1);
180193
}
181194

@@ -185,24 +198,32 @@ public void onNext(Integer bytes) {
185198
if (isClosed()) {
186199

187200
onReadDone();
188-
DataBufferUtils.release(dataBuffer);
189-
Operators.onNextDropped(dataBuffer, sink.currentContext());
190201
return;
191202
}
192203

193-
intermediate.flip();
194-
dataBuffer.write(intermediate);
204+
if (bytes > 0) {
195205

196-
sink.next(dataBuffer);
197-
decrementDemand();
206+
transport.flip();
207+
208+
DataBuffer dataBuffer = factory.allocateBuffer(transport.remaining());
209+
dataBuffer.write(transport);
210+
211+
transport.clear();
212+
sink.next(dataBuffer);
213+
214+
decrementDemand();
215+
}
198216

199217
try {
200218
if (bytes == -1) {
201219
sink.complete();
220+
return;
202221
}
203222
} finally {
204223
onReadDone();
205224
}
225+
226+
subscription.request(1);
206227
}
207228

208229
@Override
@@ -215,16 +236,14 @@ public void onError(Throwable t) {
215236
}
216237

217238
onReadDone();
218-
DataBufferUtils.release(dataBuffer);
219-
Operators.onNextDropped(dataBuffer, sink.currentContext());
220239
sink.error(t);
221240
}
222241

223242
@Override
224243
public void onComplete() {
225244

226-
if (onShouldRead()) {
227-
emitNext(sink);
245+
if (subscribeThread != Thread.currentThread()) {
246+
drainLoop(sink);
228247
}
229248
}
230249
}

0 commit comments

Comments
 (0)