diff --git a/pom.xml b/pom.xml
index 15b2d67f47..2f40db764d 100644
--- a/pom.xml
+++ b/pom.xml
@@ -5,7 +5,7 @@
org.springframework.data
spring-data-mongodb-parent
- 2.3.0.BUILD-SNAPSHOT
+ 2.3.0.DATAMONGO-2393-SNAPSHOT
pom
Spring Data MongoDB
diff --git a/spring-data-mongodb-benchmarks/pom.xml b/spring-data-mongodb-benchmarks/pom.xml
index c4766040c1..9aa26e3402 100644
--- a/spring-data-mongodb-benchmarks/pom.xml
+++ b/spring-data-mongodb-benchmarks/pom.xml
@@ -7,7 +7,7 @@
org.springframework.data
spring-data-mongodb-parent
- 2.3.0.BUILD-SNAPSHOT
+ 2.3.0.DATAMONGO-2393-SNAPSHOT
../pom.xml
diff --git a/spring-data-mongodb-distribution/pom.xml b/spring-data-mongodb-distribution/pom.xml
index ed39c63e76..bc2ff731e3 100644
--- a/spring-data-mongodb-distribution/pom.xml
+++ b/spring-data-mongodb-distribution/pom.xml
@@ -14,7 +14,7 @@
org.springframework.data
spring-data-mongodb-parent
- 2.3.0.BUILD-SNAPSHOT
+ 2.3.0.DATAMONGO-2393-SNAPSHOT
../pom.xml
diff --git a/spring-data-mongodb/pom.xml b/spring-data-mongodb/pom.xml
index 25cf02b5d5..bf31d9bc0d 100644
--- a/spring-data-mongodb/pom.xml
+++ b/spring-data-mongodb/pom.xml
@@ -11,7 +11,7 @@
org.springframework.data
spring-data-mongodb-parent
- 2.3.0.BUILD-SNAPSHOT
+ 2.3.0.DATAMONGO-2393-SNAPSHOT
../pom.xml
diff --git a/spring-data-mongodb/src/main/java/org/springframework/data/mongodb/gridfs/AsyncInputStreamAdapter.java b/spring-data-mongodb/src/main/java/org/springframework/data/mongodb/gridfs/AsyncInputStreamAdapter.java
index 51ea4a1dce..ff2a33bf57 100644
--- a/spring-data-mongodb/src/main/java/org/springframework/data/mongodb/gridfs/AsyncInputStreamAdapter.java
+++ b/spring-data-mongodb/src/main/java/org/springframework/data/mongodb/gridfs/AsyncInputStreamAdapter.java
@@ -17,6 +17,8 @@
import lombok.RequiredArgsConstructor;
import reactor.core.CoreSubscriber;
+import reactor.core.publisher.Flux;
+import reactor.core.publisher.FluxSink;
import reactor.core.publisher.Mono;
import reactor.core.publisher.Operators;
import reactor.util.concurrent.Queues;
@@ -26,13 +28,12 @@
import java.util.Queue;
import java.util.concurrent.atomic.AtomicIntegerFieldUpdater;
import java.util.concurrent.atomic.AtomicLongFieldUpdater;
-import java.util.function.BiConsumer;
import org.reactivestreams.Publisher;
import org.reactivestreams.Subscription;
+
import org.springframework.core.io.buffer.DataBuffer;
import org.springframework.core.io.buffer.DataBufferUtils;
-import org.springframework.core.io.buffer.DefaultDataBufferFactory;
import com.mongodb.reactivestreams.client.Success;
import com.mongodb.reactivestreams.client.gridfs.AsyncInputStream;
@@ -66,14 +67,14 @@ class AsyncInputStreamAdapter implements AsyncInputStream {
private final Publisher extends DataBuffer> buffers;
private final Context subscriberContext;
- private final DefaultDataBufferFactory factory = new DefaultDataBufferFactory();
private volatile Subscription subscription;
private volatile boolean cancelled;
- private volatile boolean complete;
+ private volatile boolean allDataBuffersReceived;
private volatile Throwable error;
- private final Queue> readRequests = Queues.> small()
- .get();
+ private final Queue readRequests = Queues. small().get();
+
+ private final Queue bufferQueue = Queues. small().get();
// see DEMAND
volatile long demand;
@@ -88,39 +89,28 @@ class AsyncInputStreamAdapter implements AsyncInputStream {
@Override
public Publisher read(ByteBuffer dst) {
- return Mono.create(sink -> {
-
- readRequests.offer((db, bytecount) -> {
-
- try {
+ return Flux.create(sink -> {
- if (error != null) {
+ readRequests.offer(new ReadRequest(sink, dst));
- sink.error(error);
- return;
- }
-
- if (bytecount == -1) {
-
- sink.success(-1);
- return;
- }
+ sink.onCancel(this::terminatePendingReads);
+ sink.onDispose(this::terminatePendingReads);
+ sink.onRequest(this::request);
+ });
+ }
- ByteBuffer byteBuffer = db.asByteBuffer();
- int toWrite = byteBuffer.remaining();
+ void onError(FluxSink sink, Throwable e) {
- dst.put(byteBuffer);
- sink.success(toWrite);
+ readRequests.poll();
+ sink.error(e);
+ }
- } catch (Exception e) {
- sink.error(e);
- } finally {
- DataBufferUtils.release(db);
- }
- });
+ void onComplete(FluxSink sink, int writtenBytes) {
- request(1);
- });
+ readRequests.poll();
+ DEMAND.decrementAndGet(this);
+ sink.next(writtenBytes);
+ sink.complete();
}
/*
@@ -144,17 +134,19 @@ public Publisher close() {
cancelled = true;
if (error != null) {
+ terminatePendingReads();
sink.error(error);
return;
}
+ terminatePendingReads();
sink.success(Success.SUCCESS);
});
}
- protected void request(int n) {
+ protected void request(long n) {
- if (complete) {
+ if (allDataBuffersReceived && bufferQueue.isEmpty()) {
terminatePendingReads();
return;
@@ -176,18 +168,51 @@ protected void request(int n) {
requestFromSubscription(subscription);
}
}
+
}
void requestFromSubscription(Subscription subscription) {
- long demand = DEMAND.get(AsyncInputStreamAdapter.this);
-
if (cancelled) {
subscription.cancel();
}
- if (demand > 0 && DEMAND.compareAndSet(AsyncInputStreamAdapter.this, demand, demand - 1)) {
- subscription.request(1);
+ drainLoop();
+ }
+
+ void drainLoop() {
+
+ while (DEMAND.get(AsyncInputStreamAdapter.this) > 0) {
+
+ DataBuffer wip = bufferQueue.peek();
+
+ if (wip == null) {
+ break;
+ }
+
+ if (wip.readableByteCount() == 0) {
+ bufferQueue.poll();
+ continue;
+ }
+
+ ReadRequest consumer = AsyncInputStreamAdapter.this.readRequests.peek();
+ if (consumer == null) {
+ break;
+ }
+
+ consumer.transferBytes(wip, wip.readableByteCount());
+ }
+
+ if (bufferQueue.isEmpty()) {
+
+ if (allDataBuffersReceived) {
+ terminatePendingReads();
+ return;
+ }
+
+ if (demand > 0) {
+ subscription.request(1);
+ }
}
}
@@ -196,10 +221,10 @@ void requestFromSubscription(Subscription subscription) {
*/
void terminatePendingReads() {
- BiConsumer readers;
+ ReadRequest readers;
while ((readers = readRequests.poll()) != null) {
- readers.accept(factory.wrap(new byte[0]), -1);
+ readers.onComplete();
}
}
@@ -214,23 +239,21 @@ public Context currentContext() {
public void onSubscribe(Subscription s) {
AsyncInputStreamAdapter.this.subscription = s;
-
- Operators.addCap(DEMAND, AsyncInputStreamAdapter.this, -1);
s.request(1);
}
@Override
public void onNext(DataBuffer dataBuffer) {
- if (cancelled || complete) {
+ if (cancelled || allDataBuffersReceived) {
DataBufferUtils.release(dataBuffer);
Operators.onNextDropped(dataBuffer, AsyncInputStreamAdapter.this.subscriberContext);
return;
}
- BiConsumer poll = AsyncInputStreamAdapter.this.readRequests.poll();
+ ReadRequest readRequest = AsyncInputStreamAdapter.this.readRequests.peek();
- if (poll == null) {
+ if (readRequest == null) {
DataBufferUtils.release(dataBuffer);
Operators.onNextDropped(dataBuffer, AsyncInputStreamAdapter.this.subscriberContext);
@@ -238,29 +261,103 @@ public void onNext(DataBuffer dataBuffer) {
return;
}
- poll.accept(dataBuffer, dataBuffer.readableByteCount());
+ bufferQueue.offer(dataBuffer);
- requestFromSubscription(subscription);
+ drainLoop();
}
@Override
public void onError(Throwable t) {
- if (AsyncInputStreamAdapter.this.cancelled || AsyncInputStreamAdapter.this.complete) {
+ if (AsyncInputStreamAdapter.this.cancelled || AsyncInputStreamAdapter.this.allDataBuffersReceived) {
Operators.onErrorDropped(t, AsyncInputStreamAdapter.this.subscriberContext);
return;
}
AsyncInputStreamAdapter.this.error = t;
- AsyncInputStreamAdapter.this.complete = true;
+ AsyncInputStreamAdapter.this.allDataBuffersReceived = true;
terminatePendingReads();
}
@Override
public void onComplete() {
- AsyncInputStreamAdapter.this.complete = true;
- terminatePendingReads();
+ AsyncInputStreamAdapter.this.allDataBuffersReceived = true;
+ if (bufferQueue.isEmpty()) {
+ terminatePendingReads();
+ }
+ }
+ }
+
+ /**
+ * Request to read bytes and transfer these to the associated {@link ByteBuffer}.
+ */
+ class ReadRequest {
+
+ private final FluxSink sink;
+ private final ByteBuffer dst;
+
+ private int writtenBytes;
+
+ ReadRequest(FluxSink sink, ByteBuffer dst) {
+ this.sink = sink;
+ this.dst = dst;
+ this.writtenBytes = -1;
+ }
+
+ public void onComplete() {
+
+ if (error != null) {
+ AsyncInputStreamAdapter.this.onError(sink, error);
+ return;
+ }
+
+ AsyncInputStreamAdapter.this.onComplete(sink, writtenBytes);
+ }
+
+ public void transferBytes(DataBuffer db, int bytes) {
+
+ try {
+
+ if (error != null) {
+ AsyncInputStreamAdapter.this.onError(sink, error);
+ return;
+ }
+
+ ByteBuffer byteBuffer = db.asByteBuffer();
+ int remaining = byteBuffer.remaining();
+ int writeCapacity = Math.min(dst.remaining(), remaining);
+ int limit = Math.min(byteBuffer.position() + writeCapacity, byteBuffer.capacity());
+ int toWrite = limit - byteBuffer.position();
+
+ if (toWrite == 0) {
+
+ AsyncInputStreamAdapter.this.onComplete(sink, writtenBytes);
+ return;
+ }
+
+ int oldPosition = byteBuffer.position();
+
+ byteBuffer.limit(toWrite);
+ dst.put(byteBuffer);
+ byteBuffer.limit(byteBuffer.capacity());
+ byteBuffer.position(oldPosition);
+ db.readPosition(db.readPosition() + toWrite);
+
+ if (writtenBytes == -1) {
+ writtenBytes = bytes;
+ } else {
+ writtenBytes += bytes;
+ }
+
+ } catch (Exception e) {
+ AsyncInputStreamAdapter.this.onError(sink, e);
+ } finally {
+
+ if (db.readableByteCount() == 0) {
+ DataBufferUtils.release(db);
+ }
+ }
}
}
}
diff --git a/spring-data-mongodb/src/main/java/org/springframework/data/mongodb/gridfs/BinaryStreamAdapters.java b/spring-data-mongodb/src/main/java/org/springframework/data/mongodb/gridfs/BinaryStreamAdapters.java
index 0bacda78a2..0bb0b1fdde 100644
--- a/spring-data-mongodb/src/main/java/org/springframework/data/mongodb/gridfs/BinaryStreamAdapters.java
+++ b/spring-data-mongodb/src/main/java/org/springframework/data/mongodb/gridfs/BinaryStreamAdapters.java
@@ -42,12 +42,14 @@ class BinaryStreamAdapters {
*
* @param inputStream must not be {@literal null}.
* @param dataBufferFactory must not be {@literal null}.
+ * @param bufferSize read {@code n} bytes per iteration.
* @return {@link Flux} emitting {@link DataBuffer}s.
* @see DataBufferFactory#allocateBuffer()
*/
- static Flux toPublisher(AsyncInputStream inputStream, DataBufferFactory dataBufferFactory) {
+ static Flux toPublisher(AsyncInputStream inputStream, DataBufferFactory dataBufferFactory,
+ int bufferSize) {
- return DataBufferPublisherAdapter.createBinaryStream(inputStream, dataBufferFactory) //
+ return DataBufferPublisherAdapter.createBinaryStream(inputStream, dataBufferFactory, bufferSize) //
.filter(it -> {
if (it.readableByteCount() == 0) {
diff --git a/spring-data-mongodb/src/main/java/org/springframework/data/mongodb/gridfs/DataBufferPublisherAdapter.java b/spring-data-mongodb/src/main/java/org/springframework/data/mongodb/gridfs/DataBufferPublisherAdapter.java
index dfeb1411d5..180778c93b 100644
--- a/spring-data-mongodb/src/main/java/org/springframework/data/mongodb/gridfs/DataBufferPublisherAdapter.java
+++ b/spring-data-mongodb/src/main/java/org/springframework/data/mongodb/gridfs/DataBufferPublisherAdapter.java
@@ -29,11 +29,10 @@
import org.reactivestreams.Publisher;
import org.reactivestreams.Subscription;
-
import org.springframework.core.io.buffer.DataBuffer;
import org.springframework.core.io.buffer.DataBufferFactory;
-import org.springframework.core.io.buffer.DataBufferUtils;
+import com.mongodb.reactivestreams.client.Success;
import com.mongodb.reactivestreams.client.gridfs.AsyncInputStream;
/**
@@ -51,38 +50,104 @@ class DataBufferPublisherAdapter {
*
* @param inputStream must not be {@literal null}.
* @param dataBufferFactory must not be {@literal null}.
+ * @param bufferSize read {@code n} bytes per iteration.
* @return the resulting {@link Publisher}.
*/
- static Flux createBinaryStream(AsyncInputStream inputStream, DataBufferFactory dataBufferFactory) {
+ static Flux createBinaryStream(AsyncInputStream inputStream, DataBufferFactory dataBufferFactory,
+ int bufferSize) {
+
+ return Flux.usingWhen(Mono.just(new DelegatingAsyncInputStream(inputStream, dataBufferFactory, bufferSize)),
+ DataBufferPublisherAdapter::doRead, AsyncInputStream::close, (it, err) -> it.close(), AsyncInputStream::close);
+ }
- State state = new State(inputStream, dataBufferFactory);
+ /**
+ * Use an {@link AsyncInputStreamHandler} to read data from the given {@link AsyncInputStream}.
+ *
+ * @param inputStream the source stream.
+ * @return a {@link Flux} emitting data chunks one by one.
+ * @since 2.2.1
+ */
+ private static Flux doRead(DelegatingAsyncInputStream inputStream) {
- return Flux.usingWhen(Mono.just(inputStream), it -> {
+ AsyncInputStreamHandler streamHandler = new AsyncInputStreamHandler(inputStream, inputStream.dataBufferFactory,
+ inputStream.bufferSize);
- return Flux. create((sink) -> {
+ return Flux.create((sink) -> {
- sink.onDispose(state::close);
- sink.onCancel(state::close);
+ sink.onDispose(streamHandler::close);
+ sink.onCancel(streamHandler::close);
- sink.onRequest(n -> {
- state.request(sink, n);
- });
+ sink.onRequest(n -> {
+ streamHandler.request(sink, n);
});
- }, AsyncInputStream::close, (it, err) -> it.close(), AsyncInputStream::close) //
- .concatMap(Flux::just, 1);
+ });
+ }
+
+ /**
+ * An {@link AsyncInputStream} also holding a {@link DataBufferFactory} and default {@literal bufferSize} for reading
+ * from it, delegating operations on the {@link AsyncInputStream} to the reference instance.
+ * Used to pass on the {@link AsyncInputStream} and parameters to avoid capturing lambdas.
+ *
+ * @author Christoph Strobl
+ * @since 2.2.1
+ */
+ private static class DelegatingAsyncInputStream implements AsyncInputStream {
+
+ private final AsyncInputStream inputStream;
+ private final DataBufferFactory dataBufferFactory;
+ private int bufferSize;
+
+ /**
+ * @param inputStream the source input stream.
+ * @param dataBufferFactory
+ * @param bufferSize
+ */
+ DelegatingAsyncInputStream(AsyncInputStream inputStream, DataBufferFactory dataBufferFactory, int bufferSize) {
+
+ this.inputStream = inputStream;
+ this.dataBufferFactory = dataBufferFactory;
+ this.bufferSize = bufferSize;
+ }
+
+ /*
+ * (non-Javadoc)
+ * @see com.mongodb.reactivestreams.client.gridfs.AsyncInputStream#read(java.nio.ByteBuffer)
+ */
+ @Override
+ public Publisher read(ByteBuffer dst) {
+ return inputStream.read(dst);
+ }
+
+ /*
+ * (non-Javadoc)
+ * @see com.mongodb.reactivestreams.client.gridfs.AsyncInputStream#skip(long)
+ */
+ @Override
+ public Publisher skip(long bytesToSkip) {
+ return inputStream.skip(bytesToSkip);
+ }
+
+ /*
+ * (non-Javadoc)
+ * @see com.mongodb.reactivestreams.client.gridfs.AsyncInputStream#close()
+ */
+ @Override
+ public Publisher close() {
+ return inputStream.close();
+ }
}
@RequiredArgsConstructor
- static class State {
+ static class AsyncInputStreamHandler {
- private static final AtomicLongFieldUpdater DEMAND = AtomicLongFieldUpdater.newUpdater(State.class,
- "demand");
+ private static final AtomicLongFieldUpdater DEMAND = AtomicLongFieldUpdater
+ .newUpdater(AsyncInputStreamHandler.class, "demand");
- private static final AtomicIntegerFieldUpdater STATE = AtomicIntegerFieldUpdater.newUpdater(State.class,
- "state");
+ private static final AtomicIntegerFieldUpdater STATE = AtomicIntegerFieldUpdater
+ .newUpdater(AsyncInputStreamHandler.class, "state");
- private static final AtomicIntegerFieldUpdater READ = AtomicIntegerFieldUpdater.newUpdater(State.class,
- "read");
+ private static final AtomicIntegerFieldUpdater READ = AtomicIntegerFieldUpdater
+ .newUpdater(AsyncInputStreamHandler.class, "read");
private static final int STATE_OPEN = 0;
private static final int STATE_CLOSED = 1;
@@ -92,6 +157,7 @@ static class State {
final AsyncInputStream inputStream;
final DataBufferFactory dataBufferFactory;
+ final int bufferSize;
// see DEMAND
volatile long demand;
@@ -105,8 +171,16 @@ static class State {
void request(FluxSink sink, long n) {
Operators.addCap(DEMAND, this, n);
+ drainLoop(sink);
+ }
- if (onShouldRead()) {
+ /**
+ * Loops while we have demand and while no read is in progress.
+ *
+ * @param sink
+ */
+ void drainLoop(FluxSink sink) {
+ while (onShouldRead()) {
emitNext(sink);
}
}
@@ -119,16 +193,16 @@ boolean onWantRead() {
return READ.compareAndSet(this, READ_NONE, READ_IN_PROGRESS);
}
- boolean onReadDone() {
- return READ.compareAndSet(this, READ_IN_PROGRESS, READ_NONE);
+ void onReadDone() {
+ READ.compareAndSet(this, READ_IN_PROGRESS, READ_NONE);
}
long getDemand() {
return DEMAND.get(this);
}
- boolean decrementDemand() {
- return DEMAND.decrementAndGet(this) > 0;
+ void decrementDemand() {
+ DEMAND.decrementAndGet(this);
}
void close() {
@@ -143,15 +217,15 @@ boolean isClosed() {
* Emit the next {@link DataBuffer}.
*
* @param sink
+ * @return
*/
- void emitNext(FluxSink sink) {
-
- DataBuffer dataBuffer = dataBufferFactory.allocateBuffer();
- ByteBuffer intermediate = ByteBuffer.allocate(dataBuffer.capacity());
+ private void emitNext(FluxSink sink) {
+ ByteBuffer transport = ByteBuffer.allocate(bufferSize);
+ BufferCoreSubscriber bufferCoreSubscriber = new BufferCoreSubscriber(sink, dataBufferFactory, transport);
try {
- Mono.from(inputStream.read(intermediate)).subscribe(new BufferCoreSubscriber(sink, dataBuffer, intermediate));
- } catch (Exception e) {
+ inputStream.read(transport).subscribe(bufferCoreSubscriber);
+ } catch (Throwable e) {
sink.error(e);
}
}
@@ -159,14 +233,16 @@ void emitNext(FluxSink sink) {
private class BufferCoreSubscriber implements CoreSubscriber {
private final FluxSink sink;
- private final DataBuffer dataBuffer;
- private final ByteBuffer intermediate;
+ private final DataBufferFactory factory;
+ private final ByteBuffer transport;
+ private final Thread subscribeThread = Thread.currentThread();
+ private volatile Subscription subscription;
- BufferCoreSubscriber(FluxSink sink, DataBuffer dataBuffer, ByteBuffer intermediate) {
+ BufferCoreSubscriber(FluxSink sink, DataBufferFactory factory, ByteBuffer transport) {
this.sink = sink;
- this.dataBuffer = dataBuffer;
- this.intermediate = intermediate;
+ this.factory = factory;
+ this.transport = transport;
}
@Override
@@ -176,6 +252,8 @@ public Context currentContext() {
@Override
public void onSubscribe(Subscription s) {
+
+ this.subscription = s;
s.request(1);
}
@@ -185,24 +263,38 @@ public void onNext(Integer bytes) {
if (isClosed()) {
onReadDone();
- DataBufferUtils.release(dataBuffer);
- Operators.onNextDropped(dataBuffer, sink.currentContext());
return;
}
- intermediate.flip();
- dataBuffer.write(intermediate);
+ if (bytes > 0) {
- sink.next(dataBuffer);
- decrementDemand();
+ DataBuffer buffer = readNextChunk();
+ sink.next(buffer);
+ decrementDemand();
+ }
try {
if (bytes == -1) {
sink.complete();
+ return;
}
} finally {
onReadDone();
}
+
+ subscription.request(1);
+ }
+
+ private DataBuffer readNextChunk() {
+
+ transport.flip();
+
+ DataBuffer dataBuffer = factory.allocateBuffer(transport.remaining());
+ dataBuffer.write(transport);
+
+ transport.clear();
+
+ return dataBuffer;
}
@Override
@@ -215,16 +307,14 @@ public void onError(Throwable t) {
}
onReadDone();
- DataBufferUtils.release(dataBuffer);
- Operators.onNextDropped(dataBuffer, sink.currentContext());
sink.error(t);
}
@Override
public void onComplete() {
- if (onShouldRead()) {
- emitNext(sink);
+ if (subscribeThread != Thread.currentThread()) {
+ drainLoop(sink);
}
}
}
diff --git a/spring-data-mongodb/src/main/java/org/springframework/data/mongodb/gridfs/ReactiveGridFsResource.java b/spring-data-mongodb/src/main/java/org/springframework/data/mongodb/gridfs/ReactiveGridFsResource.java
index 4946d6d0d5..fb40dad10c 100644
--- a/spring-data-mongodb/src/main/java/org/springframework/data/mongodb/gridfs/ReactiveGridFsResource.java
+++ b/spring-data-mongodb/src/main/java/org/springframework/data/mongodb/gridfs/ReactiveGridFsResource.java
@@ -20,9 +20,9 @@
import java.io.FileNotFoundException;
import java.io.IOException;
import java.io.InputStream;
+import java.util.function.IntFunction;
import org.reactivestreams.Publisher;
-
import org.springframework.core.io.AbstractResource;
import org.springframework.core.io.Resource;
import org.springframework.core.io.buffer.DataBuffer;
@@ -35,26 +35,28 @@
* Reactive {@link GridFSFile} based {@link Resource} implementation.
*
* @author Mark Paluch
+ * @author Christoph Strobl
* @since 2.2
*/
public class ReactiveGridFsResource extends AbstractResource {
+ private static final Integer DEFAULT_CHUNK_SIZE = 256 * 1024;
+
private final @Nullable GridFSFile file;
private final String filename;
- private final Flux content;
+ private final IntFunction> contentFunction;
/**
* Creates a new, absent {@link ReactiveGridFsResource}.
*
* @param filename filename of the absent resource.
* @param content
- * @since 2.1
*/
private ReactiveGridFsResource(String filename, Publisher content) {
this.file = null;
this.filename = filename;
- this.content = Flux.from(content);
+ this.contentFunction = any -> Flux.from(content);
}
/**
@@ -64,10 +66,21 @@ private ReactiveGridFsResource(String filename, Publisher content) {
* @param content
*/
public ReactiveGridFsResource(GridFSFile file, Publisher content) {
+ this(file, (IntFunction>) any -> Flux.from(content));
+ }
+
+ /**
+ * Creates a new {@link ReactiveGridFsResource} from the given {@link GridFSFile}.
+ *
+ * @param file must not be {@literal null}.
+ * @param contentFunction
+ * @since 2.2.1
+ */
+ ReactiveGridFsResource(GridFSFile file, IntFunction> contentFunction) {
this.file = file;
this.filename = file.getFilename();
- this.content = Flux.from(content);
+ this.contentFunction = contentFunction;
}
/**
@@ -165,16 +178,32 @@ public GridFSFile getGridFSFile() {
}
/**
- * Retrieve the download stream.
+ * Retrieve the download stream using the default chunk size of 256 kB.
*
- * @return
+ * @return a {@link Flux} emitting data chunks one by one. Please make sure to
+ * {@link org.springframework.core.io.buffer.DataBufferUtils#release(DataBuffer) release} all
+ * {@link DataBuffer buffers} when done.
*/
public Flux getDownloadStream() {
+ return getDownloadStream(DEFAULT_CHUNK_SIZE);
+ }
+
+ /**
+ * Retrieve the download stream.
+ *
+ * @param chunkSize chunk size in bytes to use.
+ * @return a {@link Flux} emitting data chunks one by one. Please make sure to
+ * {@link org.springframework.core.io.buffer.DataBufferUtils#release(DataBuffer) release} all
+ * {@link DataBuffer buffers} when done.
+ * @since 2.2.1
+ */
+ public Flux getDownloadStream(int chunkSize) {
if (!exists()) {
return Flux.error(new FileNotFoundException(String.format("%s does not exist.", getDescription())));
}
- return this.content;
+
+ return contentFunction.apply(chunkSize);
}
private void verifyExists() throws FileNotFoundException {
diff --git a/spring-data-mongodb/src/main/java/org/springframework/data/mongodb/gridfs/ReactiveGridFsTemplate.java b/spring-data-mongodb/src/main/java/org/springframework/data/mongodb/gridfs/ReactiveGridFsTemplate.java
index fcdf77df99..cec6937630 100644
--- a/spring-data-mongodb/src/main/java/org/springframework/data/mongodb/gridfs/ReactiveGridFsTemplate.java
+++ b/spring-data-mongodb/src/main/java/org/springframework/data/mongodb/gridfs/ReactiveGridFsTemplate.java
@@ -223,9 +223,11 @@ public Mono getResource(GridFSFile file) {
return Mono.fromSupplier(() -> {
- GridFSDownloadStream stream = getGridFs().openDownloadStream(file.getId());
+ return new ReactiveGridFsResource(file, chunkSize -> {
- return new ReactiveGridFsResource(file, BinaryStreamAdapters.toPublisher(stream, dataBufferFactory));
+ GridFSDownloadStream stream = getGridFs().openDownloadStream(file.getId());
+ return BinaryStreamAdapters.toPublisher(stream, dataBufferFactory, chunkSize);
+ });
});
}
diff --git a/spring-data-mongodb/src/test/java/org/springframework/data/mongodb/core/MongoTemplateCollationTests.java b/spring-data-mongodb/src/test/java/org/springframework/data/mongodb/core/MongoTemplateCollationTests.java
index 70bf70777c..8950571c0c 100644
--- a/spring-data-mongodb/src/test/java/org/springframework/data/mongodb/core/MongoTemplateCollationTests.java
+++ b/spring-data-mongodb/src/test/java/org/springframework/data/mongodb/core/MongoTemplateCollationTests.java
@@ -60,6 +60,11 @@ public MongoClient mongoClient() {
protected String getDatabaseName() {
return "collation-tests";
}
+
+ @Override
+ protected boolean autoIndexCreation() {
+ return false;
+ }
}
@Autowired MongoTemplate template;
diff --git a/spring-data-mongodb/src/test/java/org/springframework/data/mongodb/core/MongoTemplateTransactionTests.java b/spring-data-mongodb/src/test/java/org/springframework/data/mongodb/core/MongoTemplateTransactionTests.java
index 92654379a4..da8ab558ee 100644
--- a/spring-data-mongodb/src/test/java/org/springframework/data/mongodb/core/MongoTemplateTransactionTests.java
+++ b/spring-data-mongodb/src/test/java/org/springframework/data/mongodb/core/MongoTemplateTransactionTests.java
@@ -85,6 +85,11 @@ protected String getDatabaseName() {
return DB_NAME;
}
+ @Override
+ protected boolean autoIndexCreation() {
+ return false;
+ }
+
@Bean
MongoTransactionManager txManager(MongoDbFactory dbFactory) {
return new MongoTransactionManager(dbFactory);
diff --git a/spring-data-mongodb/src/test/java/org/springframework/data/mongodb/core/MongoTemplateValidationTests.java b/spring-data-mongodb/src/test/java/org/springframework/data/mongodb/core/MongoTemplateValidationTests.java
index d0b68acf2e..1983634d06 100644
--- a/spring-data-mongodb/src/test/java/org/springframework/data/mongodb/core/MongoTemplateValidationTests.java
+++ b/spring-data-mongodb/src/test/java/org/springframework/data/mongodb/core/MongoTemplateValidationTests.java
@@ -74,6 +74,11 @@ public MongoClient mongoClient() {
protected String getDatabaseName() {
return "validation-tests";
}
+
+ @Override
+ protected boolean autoIndexCreation() {
+ return false;
+ }
}
@Autowired MongoTemplate template;
diff --git a/spring-data-mongodb/src/test/java/org/springframework/data/mongodb/core/NoExplicitIdTests.java b/spring-data-mongodb/src/test/java/org/springframework/data/mongodb/core/NoExplicitIdTests.java
index 19a373068e..5e74309c05 100644
--- a/spring-data-mongodb/src/test/java/org/springframework/data/mongodb/core/NoExplicitIdTests.java
+++ b/spring-data-mongodb/src/test/java/org/springframework/data/mongodb/core/NoExplicitIdTests.java
@@ -59,6 +59,11 @@ protected String getDatabaseName() {
public MongoClient mongoClient() {
return MongoTestUtils.client();
}
+
+ @Override
+ protected boolean autoIndexCreation() {
+ return false;
+ }
}
@Autowired MongoOperations mongoOps;
diff --git a/spring-data-mongodb/src/test/java/org/springframework/data/mongodb/gridfs/BinaryStreamAdaptersUnitTests.java b/spring-data-mongodb/src/test/java/org/springframework/data/mongodb/gridfs/BinaryStreamAdaptersUnitTests.java
index 9c17d778f6..7d46d4c468 100644
--- a/spring-data-mongodb/src/test/java/org/springframework/data/mongodb/gridfs/BinaryStreamAdaptersUnitTests.java
+++ b/spring-data-mongodb/src/test/java/org/springframework/data/mongodb/gridfs/BinaryStreamAdaptersUnitTests.java
@@ -25,6 +25,7 @@
import java.nio.ByteBuffer;
import org.junit.Test;
+
import org.springframework.core.io.ClassPathResource;
import org.springframework.core.io.buffer.DataBuffer;
import org.springframework.core.io.buffer.DataBufferUtils;
@@ -49,7 +50,7 @@ public void shouldAdaptAsyncInputStreamToDataBufferPublisher() throws IOExceptio
byte[] content = StreamUtils.copyToByteArray(resource.getInputStream());
AsyncInputStream inputStream = AsyncStreamHelper.toAsyncInputStream(resource.getInputStream());
- Flux dataBuffers = BinaryStreamAdapters.toPublisher(inputStream, new DefaultDataBufferFactory());
+ Flux dataBuffers = BinaryStreamAdapters.toPublisher(inputStream, new DefaultDataBufferFactory(), 256);
DataBufferUtils.join(dataBuffers) //
.as(StepVerifier::create) //
diff --git a/spring-data-mongodb/src/test/java/org/springframework/data/mongodb/gridfs/DataBufferPublisherAdapterUnitTests.java b/spring-data-mongodb/src/test/java/org/springframework/data/mongodb/gridfs/DataBufferPublisherAdapterUnitTests.java
index e89d96676e..26eaebc6c2 100644
--- a/spring-data-mongodb/src/test/java/org/springframework/data/mongodb/gridfs/DataBufferPublisherAdapterUnitTests.java
+++ b/spring-data-mongodb/src/test/java/org/springframework/data/mongodb/gridfs/DataBufferPublisherAdapterUnitTests.java
@@ -44,7 +44,7 @@ public void adapterShouldPropagateErrors() {
when(asyncInput.read(any())).thenReturn(Mono.just(1), Mono.error(new IllegalStateException()));
when(asyncInput.close()).thenReturn(Mono.empty());
- Flux binaryStream = DataBufferPublisherAdapter.createBinaryStream(asyncInput, factory);
+ Flux binaryStream = DataBufferPublisherAdapter.createBinaryStream(asyncInput, factory, 256);
StepVerifier.create(binaryStream, 0) //
.thenRequest(1) //
diff --git a/spring-data-mongodb/src/test/java/org/springframework/data/mongodb/gridfs/ReactiveGridFsTemplateTests.java b/spring-data-mongodb/src/test/java/org/springframework/data/mongodb/gridfs/ReactiveGridFsTemplateTests.java
index cc808339fc..9914443c8d 100644
--- a/spring-data-mongodb/src/test/java/org/springframework/data/mongodb/gridfs/ReactiveGridFsTemplateTests.java
+++ b/spring-data-mongodb/src/test/java/org/springframework/data/mongodb/gridfs/ReactiveGridFsTemplateTests.java
@@ -24,6 +24,7 @@
import reactor.test.StepVerifier;
import java.io.IOException;
+import java.nio.ByteBuffer;
import java.util.UUID;
import org.bson.BsonObjectId;
@@ -32,7 +33,6 @@
import org.junit.Before;
import org.junit.Test;
import org.junit.runner.RunWith;
-
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.core.io.ClassPathResource;
import org.springframework.core.io.Resource;
@@ -40,7 +40,9 @@
import org.springframework.core.io.buffer.DefaultDataBuffer;
import org.springframework.core.io.buffer.DefaultDataBufferFactory;
import org.springframework.dao.IncorrectResultSizeDataAccessException;
+import org.springframework.data.mongodb.ReactiveMongoDatabaseFactory;
import org.springframework.data.mongodb.core.SimpleMongoDbFactory;
+import org.springframework.data.mongodb.core.convert.MongoConverter;
import org.springframework.data.mongodb.core.query.Query;
import org.springframework.test.context.ContextConfiguration;
import org.springframework.test.context.junit4.SpringRunner;
@@ -48,6 +50,7 @@
import com.mongodb.gridfs.GridFS;
import com.mongodb.gridfs.GridFSInputFile;
+import com.mongodb.internal.HexUtils;
import com.mongodb.reactivestreams.client.gridfs.AsyncInputStream;
import com.mongodb.reactivestreams.client.gridfs.helpers.AsyncStreamHelper;
@@ -66,6 +69,8 @@ public class ReactiveGridFsTemplateTests {
@Autowired ReactiveGridFsOperations operations;
@Autowired SimpleMongoDbFactory mongoClient;
+ @Autowired ReactiveMongoDatabaseFactory dbFactory;
+ @Autowired MongoConverter mongoConverter;
@Before
public void setUp() {
@@ -92,6 +97,45 @@ public void storesAndFindsSimpleDocument() {
.verifyComplete();
}
+ @Test // DATAMONGO-1855
+ public void storesAndLoadsLargeFileCorrectly() {
+
+ ByteBuffer buffer = ByteBuffer.allocate(1000 * 1000); // 1 mb
+ int i = 0;
+ while (buffer.remaining() != 0) {
+ buffer.put(HexUtils.toHex(new byte[] { (byte) (i++ % 16) }).getBytes());
+ }
+ buffer.flip();
+
+ DefaultDataBufferFactory factory = new DefaultDataBufferFactory();
+
+ ObjectId reference = operations.store(Flux.just(factory.wrap(buffer)), "large.txt").block();
+
+ buffer.clear();
+
+ // default chunk size
+ operations.findOne(query(where("_id").is(reference))).flatMap(operations::getResource)
+ .flatMapMany(ReactiveGridFsResource::getDownloadStream) //
+ .transform(DataBufferUtils::join) //
+ .as(StepVerifier::create) //
+ .consumeNextWith(dataBuffer -> {
+
+ assertThat(dataBuffer.readableByteCount()).isEqualTo(buffer.remaining());
+ assertThat(dataBuffer.asByteBuffer()).isEqualTo(buffer);
+ }).verifyComplete();
+
+ // small chunk size
+ operations.findOne(query(where("_id").is(reference))).flatMap(operations::getResource)
+ .flatMapMany(reactiveGridFsResource -> reactiveGridFsResource.getDownloadStream(256)) //
+ .transform(DataBufferUtils::join) //
+ .as(StepVerifier::create) //
+ .consumeNextWith(dataBuffer -> {
+
+ assertThat(dataBuffer.readableByteCount()).isEqualTo(buffer.remaining());
+ assertThat(dataBuffer.asByteBuffer()).isEqualTo(buffer);
+ }).verifyComplete();
+ }
+
@Test // DATAMONGO-2392
public void storesAndFindsByUUID() throws IOException {