diff --git a/pom.xml b/pom.xml index 15b2d67f47..2f40db764d 100644 --- a/pom.xml +++ b/pom.xml @@ -5,7 +5,7 @@ org.springframework.data spring-data-mongodb-parent - 2.3.0.BUILD-SNAPSHOT + 2.3.0.DATAMONGO-2393-SNAPSHOT pom Spring Data MongoDB diff --git a/spring-data-mongodb-benchmarks/pom.xml b/spring-data-mongodb-benchmarks/pom.xml index c4766040c1..9aa26e3402 100644 --- a/spring-data-mongodb-benchmarks/pom.xml +++ b/spring-data-mongodb-benchmarks/pom.xml @@ -7,7 +7,7 @@ org.springframework.data spring-data-mongodb-parent - 2.3.0.BUILD-SNAPSHOT + 2.3.0.DATAMONGO-2393-SNAPSHOT ../pom.xml diff --git a/spring-data-mongodb-distribution/pom.xml b/spring-data-mongodb-distribution/pom.xml index ed39c63e76..bc2ff731e3 100644 --- a/spring-data-mongodb-distribution/pom.xml +++ b/spring-data-mongodb-distribution/pom.xml @@ -14,7 +14,7 @@ org.springframework.data spring-data-mongodb-parent - 2.3.0.BUILD-SNAPSHOT + 2.3.0.DATAMONGO-2393-SNAPSHOT ../pom.xml diff --git a/spring-data-mongodb/pom.xml b/spring-data-mongodb/pom.xml index 25cf02b5d5..bf31d9bc0d 100644 --- a/spring-data-mongodb/pom.xml +++ b/spring-data-mongodb/pom.xml @@ -11,7 +11,7 @@ org.springframework.data spring-data-mongodb-parent - 2.3.0.BUILD-SNAPSHOT + 2.3.0.DATAMONGO-2393-SNAPSHOT ../pom.xml diff --git a/spring-data-mongodb/src/main/java/org/springframework/data/mongodb/gridfs/AsyncInputStreamAdapter.java b/spring-data-mongodb/src/main/java/org/springframework/data/mongodb/gridfs/AsyncInputStreamAdapter.java index 51ea4a1dce..ff2a33bf57 100644 --- a/spring-data-mongodb/src/main/java/org/springframework/data/mongodb/gridfs/AsyncInputStreamAdapter.java +++ b/spring-data-mongodb/src/main/java/org/springframework/data/mongodb/gridfs/AsyncInputStreamAdapter.java @@ -17,6 +17,8 @@ import lombok.RequiredArgsConstructor; import reactor.core.CoreSubscriber; +import reactor.core.publisher.Flux; +import reactor.core.publisher.FluxSink; import reactor.core.publisher.Mono; import reactor.core.publisher.Operators; import reactor.util.concurrent.Queues; @@ -26,13 +28,12 @@ import java.util.Queue; import java.util.concurrent.atomic.AtomicIntegerFieldUpdater; import java.util.concurrent.atomic.AtomicLongFieldUpdater; -import java.util.function.BiConsumer; import org.reactivestreams.Publisher; import org.reactivestreams.Subscription; + import org.springframework.core.io.buffer.DataBuffer; import org.springframework.core.io.buffer.DataBufferUtils; -import org.springframework.core.io.buffer.DefaultDataBufferFactory; import com.mongodb.reactivestreams.client.Success; import com.mongodb.reactivestreams.client.gridfs.AsyncInputStream; @@ -66,14 +67,14 @@ class AsyncInputStreamAdapter implements AsyncInputStream { private final Publisher buffers; private final Context subscriberContext; - private final DefaultDataBufferFactory factory = new DefaultDataBufferFactory(); private volatile Subscription subscription; private volatile boolean cancelled; - private volatile boolean complete; + private volatile boolean allDataBuffersReceived; private volatile Throwable error; - private final Queue> readRequests = Queues.> small() - .get(); + private final Queue readRequests = Queues. small().get(); + + private final Queue bufferQueue = Queues. small().get(); // see DEMAND volatile long demand; @@ -88,39 +89,28 @@ class AsyncInputStreamAdapter implements AsyncInputStream { @Override public Publisher read(ByteBuffer dst) { - return Mono.create(sink -> { - - readRequests.offer((db, bytecount) -> { - - try { + return Flux.create(sink -> { - if (error != null) { + readRequests.offer(new ReadRequest(sink, dst)); - sink.error(error); - return; - } - - if (bytecount == -1) { - - sink.success(-1); - return; - } + sink.onCancel(this::terminatePendingReads); + sink.onDispose(this::terminatePendingReads); + sink.onRequest(this::request); + }); + } - ByteBuffer byteBuffer = db.asByteBuffer(); - int toWrite = byteBuffer.remaining(); + void onError(FluxSink sink, Throwable e) { - dst.put(byteBuffer); - sink.success(toWrite); + readRequests.poll(); + sink.error(e); + } - } catch (Exception e) { - sink.error(e); - } finally { - DataBufferUtils.release(db); - } - }); + void onComplete(FluxSink sink, int writtenBytes) { - request(1); - }); + readRequests.poll(); + DEMAND.decrementAndGet(this); + sink.next(writtenBytes); + sink.complete(); } /* @@ -144,17 +134,19 @@ public Publisher close() { cancelled = true; if (error != null) { + terminatePendingReads(); sink.error(error); return; } + terminatePendingReads(); sink.success(Success.SUCCESS); }); } - protected void request(int n) { + protected void request(long n) { - if (complete) { + if (allDataBuffersReceived && bufferQueue.isEmpty()) { terminatePendingReads(); return; @@ -176,18 +168,51 @@ protected void request(int n) { requestFromSubscription(subscription); } } + } void requestFromSubscription(Subscription subscription) { - long demand = DEMAND.get(AsyncInputStreamAdapter.this); - if (cancelled) { subscription.cancel(); } - if (demand > 0 && DEMAND.compareAndSet(AsyncInputStreamAdapter.this, demand, demand - 1)) { - subscription.request(1); + drainLoop(); + } + + void drainLoop() { + + while (DEMAND.get(AsyncInputStreamAdapter.this) > 0) { + + DataBuffer wip = bufferQueue.peek(); + + if (wip == null) { + break; + } + + if (wip.readableByteCount() == 0) { + bufferQueue.poll(); + continue; + } + + ReadRequest consumer = AsyncInputStreamAdapter.this.readRequests.peek(); + if (consumer == null) { + break; + } + + consumer.transferBytes(wip, wip.readableByteCount()); + } + + if (bufferQueue.isEmpty()) { + + if (allDataBuffersReceived) { + terminatePendingReads(); + return; + } + + if (demand > 0) { + subscription.request(1); + } } } @@ -196,10 +221,10 @@ void requestFromSubscription(Subscription subscription) { */ void terminatePendingReads() { - BiConsumer readers; + ReadRequest readers; while ((readers = readRequests.poll()) != null) { - readers.accept(factory.wrap(new byte[0]), -1); + readers.onComplete(); } } @@ -214,23 +239,21 @@ public Context currentContext() { public void onSubscribe(Subscription s) { AsyncInputStreamAdapter.this.subscription = s; - - Operators.addCap(DEMAND, AsyncInputStreamAdapter.this, -1); s.request(1); } @Override public void onNext(DataBuffer dataBuffer) { - if (cancelled || complete) { + if (cancelled || allDataBuffersReceived) { DataBufferUtils.release(dataBuffer); Operators.onNextDropped(dataBuffer, AsyncInputStreamAdapter.this.subscriberContext); return; } - BiConsumer poll = AsyncInputStreamAdapter.this.readRequests.poll(); + ReadRequest readRequest = AsyncInputStreamAdapter.this.readRequests.peek(); - if (poll == null) { + if (readRequest == null) { DataBufferUtils.release(dataBuffer); Operators.onNextDropped(dataBuffer, AsyncInputStreamAdapter.this.subscriberContext); @@ -238,29 +261,103 @@ public void onNext(DataBuffer dataBuffer) { return; } - poll.accept(dataBuffer, dataBuffer.readableByteCount()); + bufferQueue.offer(dataBuffer); - requestFromSubscription(subscription); + drainLoop(); } @Override public void onError(Throwable t) { - if (AsyncInputStreamAdapter.this.cancelled || AsyncInputStreamAdapter.this.complete) { + if (AsyncInputStreamAdapter.this.cancelled || AsyncInputStreamAdapter.this.allDataBuffersReceived) { Operators.onErrorDropped(t, AsyncInputStreamAdapter.this.subscriberContext); return; } AsyncInputStreamAdapter.this.error = t; - AsyncInputStreamAdapter.this.complete = true; + AsyncInputStreamAdapter.this.allDataBuffersReceived = true; terminatePendingReads(); } @Override public void onComplete() { - AsyncInputStreamAdapter.this.complete = true; - terminatePendingReads(); + AsyncInputStreamAdapter.this.allDataBuffersReceived = true; + if (bufferQueue.isEmpty()) { + terminatePendingReads(); + } + } + } + + /** + * Request to read bytes and transfer these to the associated {@link ByteBuffer}. + */ + class ReadRequest { + + private final FluxSink sink; + private final ByteBuffer dst; + + private int writtenBytes; + + ReadRequest(FluxSink sink, ByteBuffer dst) { + this.sink = sink; + this.dst = dst; + this.writtenBytes = -1; + } + + public void onComplete() { + + if (error != null) { + AsyncInputStreamAdapter.this.onError(sink, error); + return; + } + + AsyncInputStreamAdapter.this.onComplete(sink, writtenBytes); + } + + public void transferBytes(DataBuffer db, int bytes) { + + try { + + if (error != null) { + AsyncInputStreamAdapter.this.onError(sink, error); + return; + } + + ByteBuffer byteBuffer = db.asByteBuffer(); + int remaining = byteBuffer.remaining(); + int writeCapacity = Math.min(dst.remaining(), remaining); + int limit = Math.min(byteBuffer.position() + writeCapacity, byteBuffer.capacity()); + int toWrite = limit - byteBuffer.position(); + + if (toWrite == 0) { + + AsyncInputStreamAdapter.this.onComplete(sink, writtenBytes); + return; + } + + int oldPosition = byteBuffer.position(); + + byteBuffer.limit(toWrite); + dst.put(byteBuffer); + byteBuffer.limit(byteBuffer.capacity()); + byteBuffer.position(oldPosition); + db.readPosition(db.readPosition() + toWrite); + + if (writtenBytes == -1) { + writtenBytes = bytes; + } else { + writtenBytes += bytes; + } + + } catch (Exception e) { + AsyncInputStreamAdapter.this.onError(sink, e); + } finally { + + if (db.readableByteCount() == 0) { + DataBufferUtils.release(db); + } + } } } } diff --git a/spring-data-mongodb/src/main/java/org/springframework/data/mongodb/gridfs/BinaryStreamAdapters.java b/spring-data-mongodb/src/main/java/org/springframework/data/mongodb/gridfs/BinaryStreamAdapters.java index 0bacda78a2..0bb0b1fdde 100644 --- a/spring-data-mongodb/src/main/java/org/springframework/data/mongodb/gridfs/BinaryStreamAdapters.java +++ b/spring-data-mongodb/src/main/java/org/springframework/data/mongodb/gridfs/BinaryStreamAdapters.java @@ -42,12 +42,14 @@ class BinaryStreamAdapters { * * @param inputStream must not be {@literal null}. * @param dataBufferFactory must not be {@literal null}. + * @param bufferSize read {@code n} bytes per iteration. * @return {@link Flux} emitting {@link DataBuffer}s. * @see DataBufferFactory#allocateBuffer() */ - static Flux toPublisher(AsyncInputStream inputStream, DataBufferFactory dataBufferFactory) { + static Flux toPublisher(AsyncInputStream inputStream, DataBufferFactory dataBufferFactory, + int bufferSize) { - return DataBufferPublisherAdapter.createBinaryStream(inputStream, dataBufferFactory) // + return DataBufferPublisherAdapter.createBinaryStream(inputStream, dataBufferFactory, bufferSize) // .filter(it -> { if (it.readableByteCount() == 0) { diff --git a/spring-data-mongodb/src/main/java/org/springframework/data/mongodb/gridfs/DataBufferPublisherAdapter.java b/spring-data-mongodb/src/main/java/org/springframework/data/mongodb/gridfs/DataBufferPublisherAdapter.java index dfeb1411d5..180778c93b 100644 --- a/spring-data-mongodb/src/main/java/org/springframework/data/mongodb/gridfs/DataBufferPublisherAdapter.java +++ b/spring-data-mongodb/src/main/java/org/springframework/data/mongodb/gridfs/DataBufferPublisherAdapter.java @@ -29,11 +29,10 @@ import org.reactivestreams.Publisher; import org.reactivestreams.Subscription; - import org.springframework.core.io.buffer.DataBuffer; import org.springframework.core.io.buffer.DataBufferFactory; -import org.springframework.core.io.buffer.DataBufferUtils; +import com.mongodb.reactivestreams.client.Success; import com.mongodb.reactivestreams.client.gridfs.AsyncInputStream; /** @@ -51,38 +50,104 @@ class DataBufferPublisherAdapter { * * @param inputStream must not be {@literal null}. * @param dataBufferFactory must not be {@literal null}. + * @param bufferSize read {@code n} bytes per iteration. * @return the resulting {@link Publisher}. */ - static Flux createBinaryStream(AsyncInputStream inputStream, DataBufferFactory dataBufferFactory) { + static Flux createBinaryStream(AsyncInputStream inputStream, DataBufferFactory dataBufferFactory, + int bufferSize) { + + return Flux.usingWhen(Mono.just(new DelegatingAsyncInputStream(inputStream, dataBufferFactory, bufferSize)), + DataBufferPublisherAdapter::doRead, AsyncInputStream::close, (it, err) -> it.close(), AsyncInputStream::close); + } - State state = new State(inputStream, dataBufferFactory); + /** + * Use an {@link AsyncInputStreamHandler} to read data from the given {@link AsyncInputStream}. + * + * @param inputStream the source stream. + * @return a {@link Flux} emitting data chunks one by one. + * @since 2.2.1 + */ + private static Flux doRead(DelegatingAsyncInputStream inputStream) { - return Flux.usingWhen(Mono.just(inputStream), it -> { + AsyncInputStreamHandler streamHandler = new AsyncInputStreamHandler(inputStream, inputStream.dataBufferFactory, + inputStream.bufferSize); - return Flux. create((sink) -> { + return Flux.create((sink) -> { - sink.onDispose(state::close); - sink.onCancel(state::close); + sink.onDispose(streamHandler::close); + sink.onCancel(streamHandler::close); - sink.onRequest(n -> { - state.request(sink, n); - }); + sink.onRequest(n -> { + streamHandler.request(sink, n); }); - }, AsyncInputStream::close, (it, err) -> it.close(), AsyncInputStream::close) // - .concatMap(Flux::just, 1); + }); + } + + /** + * An {@link AsyncInputStream} also holding a {@link DataBufferFactory} and default {@literal bufferSize} for reading + * from it, delegating operations on the {@link AsyncInputStream} to the reference instance.
+ * Used to pass on the {@link AsyncInputStream} and parameters to avoid capturing lambdas. + * + * @author Christoph Strobl + * @since 2.2.1 + */ + private static class DelegatingAsyncInputStream implements AsyncInputStream { + + private final AsyncInputStream inputStream; + private final DataBufferFactory dataBufferFactory; + private int bufferSize; + + /** + * @param inputStream the source input stream. + * @param dataBufferFactory + * @param bufferSize + */ + DelegatingAsyncInputStream(AsyncInputStream inputStream, DataBufferFactory dataBufferFactory, int bufferSize) { + + this.inputStream = inputStream; + this.dataBufferFactory = dataBufferFactory; + this.bufferSize = bufferSize; + } + + /* + * (non-Javadoc) + * @see com.mongodb.reactivestreams.client.gridfs.AsyncInputStream#read(java.nio.ByteBuffer) + */ + @Override + public Publisher read(ByteBuffer dst) { + return inputStream.read(dst); + } + + /* + * (non-Javadoc) + * @see com.mongodb.reactivestreams.client.gridfs.AsyncInputStream#skip(long) + */ + @Override + public Publisher skip(long bytesToSkip) { + return inputStream.skip(bytesToSkip); + } + + /* + * (non-Javadoc) + * @see com.mongodb.reactivestreams.client.gridfs.AsyncInputStream#close() + */ + @Override + public Publisher close() { + return inputStream.close(); + } } @RequiredArgsConstructor - static class State { + static class AsyncInputStreamHandler { - private static final AtomicLongFieldUpdater DEMAND = AtomicLongFieldUpdater.newUpdater(State.class, - "demand"); + private static final AtomicLongFieldUpdater DEMAND = AtomicLongFieldUpdater + .newUpdater(AsyncInputStreamHandler.class, "demand"); - private static final AtomicIntegerFieldUpdater STATE = AtomicIntegerFieldUpdater.newUpdater(State.class, - "state"); + private static final AtomicIntegerFieldUpdater STATE = AtomicIntegerFieldUpdater + .newUpdater(AsyncInputStreamHandler.class, "state"); - private static final AtomicIntegerFieldUpdater READ = AtomicIntegerFieldUpdater.newUpdater(State.class, - "read"); + private static final AtomicIntegerFieldUpdater READ = AtomicIntegerFieldUpdater + .newUpdater(AsyncInputStreamHandler.class, "read"); private static final int STATE_OPEN = 0; private static final int STATE_CLOSED = 1; @@ -92,6 +157,7 @@ static class State { final AsyncInputStream inputStream; final DataBufferFactory dataBufferFactory; + final int bufferSize; // see DEMAND volatile long demand; @@ -105,8 +171,16 @@ static class State { void request(FluxSink sink, long n) { Operators.addCap(DEMAND, this, n); + drainLoop(sink); + } - if (onShouldRead()) { + /** + * Loops while we have demand and while no read is in progress. + * + * @param sink + */ + void drainLoop(FluxSink sink) { + while (onShouldRead()) { emitNext(sink); } } @@ -119,16 +193,16 @@ boolean onWantRead() { return READ.compareAndSet(this, READ_NONE, READ_IN_PROGRESS); } - boolean onReadDone() { - return READ.compareAndSet(this, READ_IN_PROGRESS, READ_NONE); + void onReadDone() { + READ.compareAndSet(this, READ_IN_PROGRESS, READ_NONE); } long getDemand() { return DEMAND.get(this); } - boolean decrementDemand() { - return DEMAND.decrementAndGet(this) > 0; + void decrementDemand() { + DEMAND.decrementAndGet(this); } void close() { @@ -143,15 +217,15 @@ boolean isClosed() { * Emit the next {@link DataBuffer}. * * @param sink + * @return */ - void emitNext(FluxSink sink) { - - DataBuffer dataBuffer = dataBufferFactory.allocateBuffer(); - ByteBuffer intermediate = ByteBuffer.allocate(dataBuffer.capacity()); + private void emitNext(FluxSink sink) { + ByteBuffer transport = ByteBuffer.allocate(bufferSize); + BufferCoreSubscriber bufferCoreSubscriber = new BufferCoreSubscriber(sink, dataBufferFactory, transport); try { - Mono.from(inputStream.read(intermediate)).subscribe(new BufferCoreSubscriber(sink, dataBuffer, intermediate)); - } catch (Exception e) { + inputStream.read(transport).subscribe(bufferCoreSubscriber); + } catch (Throwable e) { sink.error(e); } } @@ -159,14 +233,16 @@ void emitNext(FluxSink sink) { private class BufferCoreSubscriber implements CoreSubscriber { private final FluxSink sink; - private final DataBuffer dataBuffer; - private final ByteBuffer intermediate; + private final DataBufferFactory factory; + private final ByteBuffer transport; + private final Thread subscribeThread = Thread.currentThread(); + private volatile Subscription subscription; - BufferCoreSubscriber(FluxSink sink, DataBuffer dataBuffer, ByteBuffer intermediate) { + BufferCoreSubscriber(FluxSink sink, DataBufferFactory factory, ByteBuffer transport) { this.sink = sink; - this.dataBuffer = dataBuffer; - this.intermediate = intermediate; + this.factory = factory; + this.transport = transport; } @Override @@ -176,6 +252,8 @@ public Context currentContext() { @Override public void onSubscribe(Subscription s) { + + this.subscription = s; s.request(1); } @@ -185,24 +263,38 @@ public void onNext(Integer bytes) { if (isClosed()) { onReadDone(); - DataBufferUtils.release(dataBuffer); - Operators.onNextDropped(dataBuffer, sink.currentContext()); return; } - intermediate.flip(); - dataBuffer.write(intermediate); + if (bytes > 0) { - sink.next(dataBuffer); - decrementDemand(); + DataBuffer buffer = readNextChunk(); + sink.next(buffer); + decrementDemand(); + } try { if (bytes == -1) { sink.complete(); + return; } } finally { onReadDone(); } + + subscription.request(1); + } + + private DataBuffer readNextChunk() { + + transport.flip(); + + DataBuffer dataBuffer = factory.allocateBuffer(transport.remaining()); + dataBuffer.write(transport); + + transport.clear(); + + return dataBuffer; } @Override @@ -215,16 +307,14 @@ public void onError(Throwable t) { } onReadDone(); - DataBufferUtils.release(dataBuffer); - Operators.onNextDropped(dataBuffer, sink.currentContext()); sink.error(t); } @Override public void onComplete() { - if (onShouldRead()) { - emitNext(sink); + if (subscribeThread != Thread.currentThread()) { + drainLoop(sink); } } } diff --git a/spring-data-mongodb/src/main/java/org/springframework/data/mongodb/gridfs/ReactiveGridFsResource.java b/spring-data-mongodb/src/main/java/org/springframework/data/mongodb/gridfs/ReactiveGridFsResource.java index 4946d6d0d5..fb40dad10c 100644 --- a/spring-data-mongodb/src/main/java/org/springframework/data/mongodb/gridfs/ReactiveGridFsResource.java +++ b/spring-data-mongodb/src/main/java/org/springframework/data/mongodb/gridfs/ReactiveGridFsResource.java @@ -20,9 +20,9 @@ import java.io.FileNotFoundException; import java.io.IOException; import java.io.InputStream; +import java.util.function.IntFunction; import org.reactivestreams.Publisher; - import org.springframework.core.io.AbstractResource; import org.springframework.core.io.Resource; import org.springframework.core.io.buffer.DataBuffer; @@ -35,26 +35,28 @@ * Reactive {@link GridFSFile} based {@link Resource} implementation. * * @author Mark Paluch + * @author Christoph Strobl * @since 2.2 */ public class ReactiveGridFsResource extends AbstractResource { + private static final Integer DEFAULT_CHUNK_SIZE = 256 * 1024; + private final @Nullable GridFSFile file; private final String filename; - private final Flux content; + private final IntFunction> contentFunction; /** * Creates a new, absent {@link ReactiveGridFsResource}. * * @param filename filename of the absent resource. * @param content - * @since 2.1 */ private ReactiveGridFsResource(String filename, Publisher content) { this.file = null; this.filename = filename; - this.content = Flux.from(content); + this.contentFunction = any -> Flux.from(content); } /** @@ -64,10 +66,21 @@ private ReactiveGridFsResource(String filename, Publisher content) { * @param content */ public ReactiveGridFsResource(GridFSFile file, Publisher content) { + this(file, (IntFunction>) any -> Flux.from(content)); + } + + /** + * Creates a new {@link ReactiveGridFsResource} from the given {@link GridFSFile}. + * + * @param file must not be {@literal null}. + * @param contentFunction + * @since 2.2.1 + */ + ReactiveGridFsResource(GridFSFile file, IntFunction> contentFunction) { this.file = file; this.filename = file.getFilename(); - this.content = Flux.from(content); + this.contentFunction = contentFunction; } /** @@ -165,16 +178,32 @@ public GridFSFile getGridFSFile() { } /** - * Retrieve the download stream. + * Retrieve the download stream using the default chunk size of 256 kB. * - * @return + * @return a {@link Flux} emitting data chunks one by one. Please make sure to + * {@link org.springframework.core.io.buffer.DataBufferUtils#release(DataBuffer) release} all + * {@link DataBuffer buffers} when done. */ public Flux getDownloadStream() { + return getDownloadStream(DEFAULT_CHUNK_SIZE); + } + + /** + * Retrieve the download stream. + * + * @param chunkSize chunk size in bytes to use. + * @return a {@link Flux} emitting data chunks one by one. Please make sure to + * {@link org.springframework.core.io.buffer.DataBufferUtils#release(DataBuffer) release} all + * {@link DataBuffer buffers} when done. + * @since 2.2.1 + */ + public Flux getDownloadStream(int chunkSize) { if (!exists()) { return Flux.error(new FileNotFoundException(String.format("%s does not exist.", getDescription()))); } - return this.content; + + return contentFunction.apply(chunkSize); } private void verifyExists() throws FileNotFoundException { diff --git a/spring-data-mongodb/src/main/java/org/springframework/data/mongodb/gridfs/ReactiveGridFsTemplate.java b/spring-data-mongodb/src/main/java/org/springframework/data/mongodb/gridfs/ReactiveGridFsTemplate.java index fcdf77df99..cec6937630 100644 --- a/spring-data-mongodb/src/main/java/org/springframework/data/mongodb/gridfs/ReactiveGridFsTemplate.java +++ b/spring-data-mongodb/src/main/java/org/springframework/data/mongodb/gridfs/ReactiveGridFsTemplate.java @@ -223,9 +223,11 @@ public Mono getResource(GridFSFile file) { return Mono.fromSupplier(() -> { - GridFSDownloadStream stream = getGridFs().openDownloadStream(file.getId()); + return new ReactiveGridFsResource(file, chunkSize -> { - return new ReactiveGridFsResource(file, BinaryStreamAdapters.toPublisher(stream, dataBufferFactory)); + GridFSDownloadStream stream = getGridFs().openDownloadStream(file.getId()); + return BinaryStreamAdapters.toPublisher(stream, dataBufferFactory, chunkSize); + }); }); } diff --git a/spring-data-mongodb/src/test/java/org/springframework/data/mongodb/core/MongoTemplateCollationTests.java b/spring-data-mongodb/src/test/java/org/springframework/data/mongodb/core/MongoTemplateCollationTests.java index 70bf70777c..8950571c0c 100644 --- a/spring-data-mongodb/src/test/java/org/springframework/data/mongodb/core/MongoTemplateCollationTests.java +++ b/spring-data-mongodb/src/test/java/org/springframework/data/mongodb/core/MongoTemplateCollationTests.java @@ -60,6 +60,11 @@ public MongoClient mongoClient() { protected String getDatabaseName() { return "collation-tests"; } + + @Override + protected boolean autoIndexCreation() { + return false; + } } @Autowired MongoTemplate template; diff --git a/spring-data-mongodb/src/test/java/org/springframework/data/mongodb/core/MongoTemplateTransactionTests.java b/spring-data-mongodb/src/test/java/org/springframework/data/mongodb/core/MongoTemplateTransactionTests.java index 92654379a4..da8ab558ee 100644 --- a/spring-data-mongodb/src/test/java/org/springframework/data/mongodb/core/MongoTemplateTransactionTests.java +++ b/spring-data-mongodb/src/test/java/org/springframework/data/mongodb/core/MongoTemplateTransactionTests.java @@ -85,6 +85,11 @@ protected String getDatabaseName() { return DB_NAME; } + @Override + protected boolean autoIndexCreation() { + return false; + } + @Bean MongoTransactionManager txManager(MongoDbFactory dbFactory) { return new MongoTransactionManager(dbFactory); diff --git a/spring-data-mongodb/src/test/java/org/springframework/data/mongodb/core/MongoTemplateValidationTests.java b/spring-data-mongodb/src/test/java/org/springframework/data/mongodb/core/MongoTemplateValidationTests.java index d0b68acf2e..1983634d06 100644 --- a/spring-data-mongodb/src/test/java/org/springframework/data/mongodb/core/MongoTemplateValidationTests.java +++ b/spring-data-mongodb/src/test/java/org/springframework/data/mongodb/core/MongoTemplateValidationTests.java @@ -74,6 +74,11 @@ public MongoClient mongoClient() { protected String getDatabaseName() { return "validation-tests"; } + + @Override + protected boolean autoIndexCreation() { + return false; + } } @Autowired MongoTemplate template; diff --git a/spring-data-mongodb/src/test/java/org/springframework/data/mongodb/core/NoExplicitIdTests.java b/spring-data-mongodb/src/test/java/org/springframework/data/mongodb/core/NoExplicitIdTests.java index 19a373068e..5e74309c05 100644 --- a/spring-data-mongodb/src/test/java/org/springframework/data/mongodb/core/NoExplicitIdTests.java +++ b/spring-data-mongodb/src/test/java/org/springframework/data/mongodb/core/NoExplicitIdTests.java @@ -59,6 +59,11 @@ protected String getDatabaseName() { public MongoClient mongoClient() { return MongoTestUtils.client(); } + + @Override + protected boolean autoIndexCreation() { + return false; + } } @Autowired MongoOperations mongoOps; diff --git a/spring-data-mongodb/src/test/java/org/springframework/data/mongodb/gridfs/BinaryStreamAdaptersUnitTests.java b/spring-data-mongodb/src/test/java/org/springframework/data/mongodb/gridfs/BinaryStreamAdaptersUnitTests.java index 9c17d778f6..7d46d4c468 100644 --- a/spring-data-mongodb/src/test/java/org/springframework/data/mongodb/gridfs/BinaryStreamAdaptersUnitTests.java +++ b/spring-data-mongodb/src/test/java/org/springframework/data/mongodb/gridfs/BinaryStreamAdaptersUnitTests.java @@ -25,6 +25,7 @@ import java.nio.ByteBuffer; import org.junit.Test; + import org.springframework.core.io.ClassPathResource; import org.springframework.core.io.buffer.DataBuffer; import org.springframework.core.io.buffer.DataBufferUtils; @@ -49,7 +50,7 @@ public void shouldAdaptAsyncInputStreamToDataBufferPublisher() throws IOExceptio byte[] content = StreamUtils.copyToByteArray(resource.getInputStream()); AsyncInputStream inputStream = AsyncStreamHelper.toAsyncInputStream(resource.getInputStream()); - Flux dataBuffers = BinaryStreamAdapters.toPublisher(inputStream, new DefaultDataBufferFactory()); + Flux dataBuffers = BinaryStreamAdapters.toPublisher(inputStream, new DefaultDataBufferFactory(), 256); DataBufferUtils.join(dataBuffers) // .as(StepVerifier::create) // diff --git a/spring-data-mongodb/src/test/java/org/springframework/data/mongodb/gridfs/DataBufferPublisherAdapterUnitTests.java b/spring-data-mongodb/src/test/java/org/springframework/data/mongodb/gridfs/DataBufferPublisherAdapterUnitTests.java index e89d96676e..26eaebc6c2 100644 --- a/spring-data-mongodb/src/test/java/org/springframework/data/mongodb/gridfs/DataBufferPublisherAdapterUnitTests.java +++ b/spring-data-mongodb/src/test/java/org/springframework/data/mongodb/gridfs/DataBufferPublisherAdapterUnitTests.java @@ -44,7 +44,7 @@ public void adapterShouldPropagateErrors() { when(asyncInput.read(any())).thenReturn(Mono.just(1), Mono.error(new IllegalStateException())); when(asyncInput.close()).thenReturn(Mono.empty()); - Flux binaryStream = DataBufferPublisherAdapter.createBinaryStream(asyncInput, factory); + Flux binaryStream = DataBufferPublisherAdapter.createBinaryStream(asyncInput, factory, 256); StepVerifier.create(binaryStream, 0) // .thenRequest(1) // diff --git a/spring-data-mongodb/src/test/java/org/springframework/data/mongodb/gridfs/ReactiveGridFsTemplateTests.java b/spring-data-mongodb/src/test/java/org/springframework/data/mongodb/gridfs/ReactiveGridFsTemplateTests.java index cc808339fc..9914443c8d 100644 --- a/spring-data-mongodb/src/test/java/org/springframework/data/mongodb/gridfs/ReactiveGridFsTemplateTests.java +++ b/spring-data-mongodb/src/test/java/org/springframework/data/mongodb/gridfs/ReactiveGridFsTemplateTests.java @@ -24,6 +24,7 @@ import reactor.test.StepVerifier; import java.io.IOException; +import java.nio.ByteBuffer; import java.util.UUID; import org.bson.BsonObjectId; @@ -32,7 +33,6 @@ import org.junit.Before; import org.junit.Test; import org.junit.runner.RunWith; - import org.springframework.beans.factory.annotation.Autowired; import org.springframework.core.io.ClassPathResource; import org.springframework.core.io.Resource; @@ -40,7 +40,9 @@ import org.springframework.core.io.buffer.DefaultDataBuffer; import org.springframework.core.io.buffer.DefaultDataBufferFactory; import org.springframework.dao.IncorrectResultSizeDataAccessException; +import org.springframework.data.mongodb.ReactiveMongoDatabaseFactory; import org.springframework.data.mongodb.core.SimpleMongoDbFactory; +import org.springframework.data.mongodb.core.convert.MongoConverter; import org.springframework.data.mongodb.core.query.Query; import org.springframework.test.context.ContextConfiguration; import org.springframework.test.context.junit4.SpringRunner; @@ -48,6 +50,7 @@ import com.mongodb.gridfs.GridFS; import com.mongodb.gridfs.GridFSInputFile; +import com.mongodb.internal.HexUtils; import com.mongodb.reactivestreams.client.gridfs.AsyncInputStream; import com.mongodb.reactivestreams.client.gridfs.helpers.AsyncStreamHelper; @@ -66,6 +69,8 @@ public class ReactiveGridFsTemplateTests { @Autowired ReactiveGridFsOperations operations; @Autowired SimpleMongoDbFactory mongoClient; + @Autowired ReactiveMongoDatabaseFactory dbFactory; + @Autowired MongoConverter mongoConverter; @Before public void setUp() { @@ -92,6 +97,45 @@ public void storesAndFindsSimpleDocument() { .verifyComplete(); } + @Test // DATAMONGO-1855 + public void storesAndLoadsLargeFileCorrectly() { + + ByteBuffer buffer = ByteBuffer.allocate(1000 * 1000); // 1 mb + int i = 0; + while (buffer.remaining() != 0) { + buffer.put(HexUtils.toHex(new byte[] { (byte) (i++ % 16) }).getBytes()); + } + buffer.flip(); + + DefaultDataBufferFactory factory = new DefaultDataBufferFactory(); + + ObjectId reference = operations.store(Flux.just(factory.wrap(buffer)), "large.txt").block(); + + buffer.clear(); + + // default chunk size + operations.findOne(query(where("_id").is(reference))).flatMap(operations::getResource) + .flatMapMany(ReactiveGridFsResource::getDownloadStream) // + .transform(DataBufferUtils::join) // + .as(StepVerifier::create) // + .consumeNextWith(dataBuffer -> { + + assertThat(dataBuffer.readableByteCount()).isEqualTo(buffer.remaining()); + assertThat(dataBuffer.asByteBuffer()).isEqualTo(buffer); + }).verifyComplete(); + + // small chunk size + operations.findOne(query(where("_id").is(reference))).flatMap(operations::getResource) + .flatMapMany(reactiveGridFsResource -> reactiveGridFsResource.getDownloadStream(256)) // + .transform(DataBufferUtils::join) // + .as(StepVerifier::create) // + .consumeNextWith(dataBuffer -> { + + assertThat(dataBuffer.readableByteCount()).isEqualTo(buffer.remaining()); + assertThat(dataBuffer.asByteBuffer()).isEqualTo(buffer); + }).verifyComplete(); + } + @Test // DATAMONGO-2392 public void storesAndFindsByUUID() throws IOException {