diff --git a/pom.xml b/pom.xml
index 657607148e..378af07e5d 100644
--- a/pom.xml
+++ b/pom.xml
@@ -5,7 +5,7 @@
org.springframework.data
spring-data-mongodb-parent
- 2.2.0.BUILD-SNAPSHOT
+ 2.2.0.DATAMONGO-1855-SNAPSHOT
pom
Spring Data MongoDB
diff --git a/spring-data-mongodb-benchmarks/pom.xml b/spring-data-mongodb-benchmarks/pom.xml
index c2ff37b35c..cd2ab5857a 100644
--- a/spring-data-mongodb-benchmarks/pom.xml
+++ b/spring-data-mongodb-benchmarks/pom.xml
@@ -7,7 +7,7 @@
org.springframework.data
spring-data-mongodb-parent
- 2.2.0.BUILD-SNAPSHOT
+ 2.2.0.DATAMONGO-1855-SNAPSHOT
../pom.xml
diff --git a/spring-data-mongodb-cross-store/pom.xml b/spring-data-mongodb-cross-store/pom.xml
index fd36f227c0..3c604ed0d9 100644
--- a/spring-data-mongodb-cross-store/pom.xml
+++ b/spring-data-mongodb-cross-store/pom.xml
@@ -6,7 +6,7 @@
org.springframework.data
spring-data-mongodb-parent
- 2.2.0.BUILD-SNAPSHOT
+ 2.2.0.DATAMONGO-1855-SNAPSHOT
../pom.xml
@@ -50,7 +50,7 @@
org.springframework.data
spring-data-mongodb
- 2.2.0.BUILD-SNAPSHOT
+ 2.2.0.DATAMONGO-1855-SNAPSHOT
diff --git a/spring-data-mongodb-distribution/pom.xml b/spring-data-mongodb-distribution/pom.xml
index fc8d28a2b6..40cdba2c1a 100644
--- a/spring-data-mongodb-distribution/pom.xml
+++ b/spring-data-mongodb-distribution/pom.xml
@@ -14,7 +14,7 @@
org.springframework.data
spring-data-mongodb-parent
- 2.2.0.BUILD-SNAPSHOT
+ 2.2.0.DATAMONGO-1855-SNAPSHOT
../pom.xml
diff --git a/spring-data-mongodb/pom.xml b/spring-data-mongodb/pom.xml
index f3c85a046a..80578399a4 100644
--- a/spring-data-mongodb/pom.xml
+++ b/spring-data-mongodb/pom.xml
@@ -11,7 +11,7 @@
org.springframework.data
spring-data-mongodb-parent
- 2.2.0.BUILD-SNAPSHOT
+ 2.2.0.DATAMONGO-1855-SNAPSHOT
../pom.xml
diff --git a/spring-data-mongodb/src/main/java/org/springframework/data/mongodb/gridfs/AsyncInputStreamAdapter.java b/spring-data-mongodb/src/main/java/org/springframework/data/mongodb/gridfs/AsyncInputStreamAdapter.java
new file mode 100644
index 0000000000..d57268388a
--- /dev/null
+++ b/spring-data-mongodb/src/main/java/org/springframework/data/mongodb/gridfs/AsyncInputStreamAdapter.java
@@ -0,0 +1,257 @@
+/*
+ * Copyright 2019 the original author or authors.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.springframework.data.mongodb.gridfs;
+
+import lombok.RequiredArgsConstructor;
+import reactor.core.CoreSubscriber;
+import reactor.core.publisher.Mono;
+import reactor.core.publisher.Operators;
+import reactor.util.concurrent.Queues;
+import reactor.util.context.Context;
+
+import java.nio.ByteBuffer;
+import java.util.Queue;
+import java.util.concurrent.atomic.AtomicIntegerFieldUpdater;
+import java.util.concurrent.atomic.AtomicLongFieldUpdater;
+import java.util.function.BiConsumer;
+
+import org.reactivestreams.Publisher;
+import org.reactivestreams.Subscription;
+import org.springframework.core.io.buffer.DataBuffer;
+import org.springframework.core.io.buffer.DataBufferUtils;
+import org.springframework.core.io.buffer.DefaultDataBufferFactory;
+
+import com.mongodb.reactivestreams.client.Success;
+import com.mongodb.reactivestreams.client.gridfs.AsyncInputStream;
+
+/**
+ * Adapter accepting a binary stream {@link Publisher} and emitting its through {@link AsyncInputStream}.
+ *
+ * This adapter subscribes to the binary {@link Publisher} as soon as the first chunk gets {@link #read(ByteBuffer)
+ * requested}. Requests are queued and binary chunks are requested from the {@link Publisher}. As soon as the
+ * {@link Publisher} emits items, chunks are provided to the read request which completes the number-of-written-bytes
+ * {@link Publisher}.
+ *
+ * {@link AsyncInputStream} is supposed to work as sequential callback API that is called until reaching EOF.
+ * {@link #close()} is propagated as cancellation signal to the binary {@link Publisher}.
+ *
+ * @author Mark Paluch
+ * @author Christoph Strobl
+ * @since 2.2
+ */
+@RequiredArgsConstructor
+class AsyncInputStreamAdapter implements AsyncInputStream {
+
+ private static final AtomicLongFieldUpdater DEMAND = AtomicLongFieldUpdater
+ .newUpdater(AsyncInputStreamAdapter.class, "demand");
+
+ private static final AtomicIntegerFieldUpdater SUBSCRIBED = AtomicIntegerFieldUpdater
+ .newUpdater(AsyncInputStreamAdapter.class, "subscribed");
+
+ private static final int SUBSCRIPTION_NOT_SUBSCRIBED = 0;
+ private static final int SUBSCRIPTION_SUBSCRIBED = 1;
+
+ private final Publisher extends DataBuffer> buffers;
+ private final Context subscriberContext;
+ private final DefaultDataBufferFactory factory = new DefaultDataBufferFactory();
+
+ private volatile Subscription subscription;
+ private volatile boolean cancelled;
+ private volatile boolean complete;
+ private volatile Throwable error;
+ private final Queue> readRequests = Queues.> small()
+ .get();
+
+ // see DEMAND
+ volatile long demand;
+
+ // see SUBSCRIBED
+ volatile int subscribed = SUBSCRIPTION_NOT_SUBSCRIBED;
+
+ /*
+ * (non-Javadoc)
+ * @see com.mongodb.reactivestreams.client.gridfs.AsyncInputStream#read(java.nio.ByteBuffer)
+ */
+ @Override
+ public Publisher read(ByteBuffer dst) {
+
+ return Mono.create(sink -> {
+
+ readRequests.offer((db, bytecount) -> {
+
+ try {
+
+ if (error != null) {
+
+ sink.error(error);
+ return;
+ }
+
+ if (bytecount == -1) {
+
+ sink.success(-1);
+ return;
+ }
+
+ ByteBuffer byteBuffer = db.asByteBuffer();
+ int toWrite = byteBuffer.remaining();
+
+ dst.put(byteBuffer);
+ sink.success(toWrite);
+
+ } catch (Exception e) {
+ sink.error(e);
+ } finally {
+ DataBufferUtils.release(db);
+ }
+ });
+
+ request(1);
+ });
+ }
+
+ /*
+ * (non-Javadoc)
+ * @see com.mongodb.reactivestreams.client.gridfs.AsyncInputStream#close()
+ */
+ @Override
+ public Publisher close() {
+
+ return Mono.create(sink -> {
+
+ cancelled = true;
+
+ if (error != null) {
+ sink.error(error);
+ return;
+ }
+
+ sink.success(Success.SUCCESS);
+ });
+ }
+
+ protected void request(int n) {
+
+ if (complete) {
+
+ terminatePendingReads();
+ return;
+ }
+
+ Operators.addCap(DEMAND, this, n);
+
+ if (SUBSCRIBED.get(this) == SUBSCRIPTION_NOT_SUBSCRIBED) {
+
+ if (SUBSCRIBED.compareAndSet(this, SUBSCRIPTION_NOT_SUBSCRIBED, SUBSCRIPTION_SUBSCRIBED)) {
+ buffers.subscribe(new DataBufferCoreSubscriber());
+ }
+
+ } else {
+
+ Subscription subscription = this.subscription;
+
+ if (subscription != null) {
+ requestFromSubscription(subscription);
+ }
+ }
+ }
+
+ void requestFromSubscription(Subscription subscription) {
+
+ long demand = DEMAND.get(AsyncInputStreamAdapter.this);
+
+ if (cancelled) {
+ subscription.cancel();
+ }
+
+ if (demand > 0 && DEMAND.compareAndSet(AsyncInputStreamAdapter.this, demand, demand - 1)) {
+ subscription.request(1);
+ }
+ }
+
+ /**
+ * Terminates pending reads with empty buffers.
+ */
+ void terminatePendingReads() {
+
+ BiConsumer readers;
+
+ while ((readers = readRequests.poll()) != null) {
+ readers.accept(factory.wrap(new byte[0]), -1);
+ }
+ }
+
+ private class DataBufferCoreSubscriber implements CoreSubscriber {
+
+ @Override
+ public Context currentContext() {
+ return AsyncInputStreamAdapter.this.subscriberContext;
+ }
+
+ @Override
+ public void onSubscribe(Subscription s) {
+
+ AsyncInputStreamAdapter.this.subscription = s;
+
+ Operators.addCap(DEMAND, AsyncInputStreamAdapter.this, -1);
+ s.request(1);
+ }
+
+ @Override
+ public void onNext(DataBuffer dataBuffer) {
+
+ if (cancelled || complete) {
+ DataBufferUtils.release(dataBuffer);
+ Operators.onNextDropped(dataBuffer, AsyncInputStreamAdapter.this.subscriberContext);
+ return;
+ }
+
+ BiConsumer poll = AsyncInputStreamAdapter.this.readRequests.poll();
+
+ if (poll == null) {
+
+ DataBufferUtils.release(dataBuffer);
+ Operators.onNextDropped(dataBuffer, AsyncInputStreamAdapter.this.subscriberContext);
+ subscription.cancel();
+ return;
+ }
+
+ poll.accept(dataBuffer, dataBuffer.readableByteCount());
+
+ requestFromSubscription(subscription);
+ }
+
+ @Override
+ public void onError(Throwable t) {
+
+ if (AsyncInputStreamAdapter.this.cancelled || AsyncInputStreamAdapter.this.complete) {
+ Operators.onErrorDropped(t, AsyncInputStreamAdapter.this.subscriberContext);
+ return;
+ }
+
+ AsyncInputStreamAdapter.this.error = t;
+ AsyncInputStreamAdapter.this.complete = true;
+ terminatePendingReads();
+ }
+
+ @Override
+ public void onComplete() {
+
+ AsyncInputStreamAdapter.this.complete = true;
+ terminatePendingReads();
+ }
+ }
+}
diff --git a/spring-data-mongodb/src/main/java/org/springframework/data/mongodb/gridfs/BinaryStreamAdapters.java b/spring-data-mongodb/src/main/java/org/springframework/data/mongodb/gridfs/BinaryStreamAdapters.java
new file mode 100644
index 0000000000..ae30151bd2
--- /dev/null
+++ b/spring-data-mongodb/src/main/java/org/springframework/data/mongodb/gridfs/BinaryStreamAdapters.java
@@ -0,0 +1,78 @@
+/*
+ * Copyright 2019 the original author or authors.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.springframework.data.mongodb.gridfs;
+
+import reactor.core.publisher.Flux;
+import reactor.core.publisher.Mono;
+
+import org.reactivestreams.Publisher;
+import org.springframework.core.io.buffer.DataBuffer;
+import org.springframework.core.io.buffer.DataBufferFactory;
+import org.springframework.core.io.buffer.DataBufferUtils;
+
+import com.mongodb.reactivestreams.client.gridfs.AsyncInputStream;
+
+/**
+ * Utility methods to create adapters from between {@link Publisher} of {@link DataBuffer} and {@link AsyncInputStream}.
+ *
+ * @author Mark Paluch
+ * @since 2.2
+ */
+class BinaryStreamAdapters {
+
+ /**
+ * Creates a {@link Flux} emitting {@link DataBuffer} by reading binary chunks from {@link AsyncInputStream}.
+ * Publisher termination (completion, error, cancellation) closes the {@link AsyncInputStream}.
+ *
+ * The resulting {@link org.reactivestreams.Publisher} filters empty binary chunks and uses {@link DataBufferFactory}
+ * settings to determine the chunk size.
+ *
+ * @param inputStream must not be {@literal null}.
+ * @param dataBufferFactory must not be {@literal null}.
+ * @return {@link Flux} emitting {@link DataBuffer}s.
+ * @see DataBufferFactory#allocateBuffer()
+ */
+ static Flux toPublisher(AsyncInputStream inputStream, DataBufferFactory dataBufferFactory) {
+
+ return DataBufferPublisherAdapter.createBinaryStream(inputStream, dataBufferFactory) //
+ .filter(it -> {
+
+ if (it.readableByteCount() == 0) {
+ DataBufferUtils.release(it);
+ return false;
+ }
+ return true;
+ });
+ }
+
+ /**
+ * Creates a {@link Mono} emitting a {@link AsyncInputStream} to consume a {@link Publisher} emitting
+ * {@link DataBuffer} and exposing the binary stream through {@link AsyncInputStream}. {@link DataBuffer}s are
+ * released by the adapter during consumption.
+ *
+ * This method returns a {@link Mono} to retain the {@link reactor.util.context.Context subscriber context}.
+ *
+ * @param dataBuffers must not be {@literal null}.
+ * @return {@link Mono} emitting {@link AsyncInputStream}.
+ * @see DataBufferUtils#release(DataBuffer)
+ */
+ static Mono toAsyncInputStream(Publisher extends DataBuffer> dataBuffers) {
+
+ return Mono.create(sink -> {
+ sink.success(new AsyncInputStreamAdapter(dataBuffers, sink.currentContext()));
+ });
+ }
+}
diff --git a/spring-data-mongodb/src/main/java/org/springframework/data/mongodb/gridfs/DataBufferPublisherAdapter.java b/spring-data-mongodb/src/main/java/org/springframework/data/mongodb/gridfs/DataBufferPublisherAdapter.java
new file mode 100644
index 0000000000..df98508fe9
--- /dev/null
+++ b/spring-data-mongodb/src/main/java/org/springframework/data/mongodb/gridfs/DataBufferPublisherAdapter.java
@@ -0,0 +1,219 @@
+/*
+ * Copyright 2019 the original author or authors.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.springframework.data.mongodb.gridfs;
+
+import lombok.RequiredArgsConstructor;
+import reactor.core.CoreSubscriber;
+import reactor.core.publisher.Flux;
+import reactor.core.publisher.FluxSink;
+import reactor.core.publisher.Mono;
+import reactor.core.publisher.Operators;
+import reactor.util.context.Context;
+
+import java.nio.ByteBuffer;
+import java.util.concurrent.atomic.AtomicIntegerFieldUpdater;
+import java.util.concurrent.atomic.AtomicLongFieldUpdater;
+
+import org.reactivestreams.Publisher;
+import org.reactivestreams.Subscription;
+import org.springframework.core.io.buffer.DataBuffer;
+import org.springframework.core.io.buffer.DataBufferFactory;
+import org.springframework.core.io.buffer.DataBufferUtils;
+
+import com.mongodb.reactivestreams.client.gridfs.AsyncInputStream;
+
+/**
+ * Utility to adapt a {@link AsyncInputStream} to a {@link Publisher} emitting {@link DataBuffer}.
+ *
+ * @author Mark Paluch
+ * @author Christoph Strobl
+ * @since 2.2
+ */
+class DataBufferPublisherAdapter {
+
+ /**
+ * Creates a {@link Publisher} emitting {@link DataBuffer}s by reading binary chunks from {@link AsyncInputStream}.
+ * Closes the {@link AsyncInputStream} once the {@link Publisher} terminates.
+ *
+ * @param inputStream must not be {@literal null}.
+ * @param dataBufferFactory must not be {@literal null}.
+ * @return the resulting {@link Publisher}.
+ */
+ static Flux createBinaryStream(AsyncInputStream inputStream, DataBufferFactory dataBufferFactory) {
+
+ State state = new State(inputStream, dataBufferFactory);
+
+ return Flux.usingWhen(Mono.just(inputStream), it -> {
+
+ return Flux. create((sink) -> {
+
+ sink.onDispose(state::close);
+ sink.onCancel(state::close);
+
+ sink.onRequest(n -> {
+ state.request(sink, n);
+ });
+ });
+ }, AsyncInputStream::close, AsyncInputStream::close, AsyncInputStream::close) //
+ .concatMap(Flux::just, 1);
+ }
+
+ @RequiredArgsConstructor
+ static class State {
+
+ private static final AtomicLongFieldUpdater DEMAND = AtomicLongFieldUpdater.newUpdater(State.class, "demand");
+
+ private static final AtomicIntegerFieldUpdater STATE = AtomicIntegerFieldUpdater.newUpdater(State.class, "state");
+
+ private static final AtomicIntegerFieldUpdater READ = AtomicIntegerFieldUpdater.newUpdater(State.class, "read");
+
+ private static final int STATE_OPEN = 0;
+ private static final int STATE_CLOSED = 1;
+
+ private static final int READ_NONE = 0;
+ private static final int READ_IN_PROGRESS = 1;
+
+ final AsyncInputStream inputStream;
+ final DataBufferFactory dataBufferFactory;
+
+ // see DEMAND
+ volatile long demand;
+
+ // see STATE
+ volatile int state = STATE_OPEN;
+
+ // see READ_IN_PROGRESS
+ volatile int read = READ_NONE;
+
+ void request(FluxSink sink, long n) {
+
+ Operators.addCap(DEMAND, this, n);
+
+ if (onShouldRead()) {
+ emitNext(sink);
+ }
+ }
+
+ boolean onShouldRead() {
+ return !isClosed() && getDemand() > 0 && onWantRead();
+ }
+
+ boolean onWantRead() {
+ return READ.compareAndSet(this, READ_NONE, READ_IN_PROGRESS);
+ }
+
+ boolean onReadDone() {
+ return READ.compareAndSet(this, READ_IN_PROGRESS, READ_NONE);
+ }
+
+ long getDemand() {
+ return DEMAND.get(this);
+ }
+
+ void close() {
+ STATE.compareAndSet(this, STATE_OPEN, STATE_CLOSED);
+ }
+
+ boolean isClosed() {
+ return STATE.get(this) == STATE_CLOSED;
+ }
+
+ /**
+ * Emit the next {@link DataBuffer}.
+ *
+ * @param sink
+ */
+ void emitNext(FluxSink sink) {
+
+ DataBuffer dataBuffer = dataBufferFactory.allocateBuffer();
+ ByteBuffer intermediate = ByteBuffer.allocate(dataBuffer.capacity());
+
+ Mono.from(inputStream.read(intermediate)).subscribe(new BufferCoreSubscriber(sink, dataBuffer, intermediate));
+ }
+
+ private class BufferCoreSubscriber implements CoreSubscriber {
+
+ private final FluxSink sink;
+ private final DataBuffer dataBuffer;
+ private final ByteBuffer intermediate;
+
+ BufferCoreSubscriber(FluxSink sink, DataBuffer dataBuffer, ByteBuffer intermediate) {
+
+ this.sink = sink;
+ this.dataBuffer = dataBuffer;
+ this.intermediate = intermediate;
+ }
+
+ @Override
+ public Context currentContext() {
+ return sink.currentContext();
+ }
+
+ @Override
+ public void onSubscribe(Subscription s) {
+ s.request(1);
+ }
+
+ @Override
+ public void onNext(Integer bytes) {
+
+ if (isClosed()) {
+
+ onReadDone();
+ DataBufferUtils.release(dataBuffer);
+ Operators.onNextDropped(dataBuffer, sink.currentContext());
+ return;
+ }
+
+ intermediate.flip();
+ dataBuffer.write(intermediate);
+
+ sink.next(dataBuffer);
+
+ try {
+ if (bytes == -1) {
+ sink.complete();
+ }
+ } finally {
+ onReadDone();
+ }
+ }
+
+ @Override
+ public void onError(Throwable t) {
+
+ if (isClosed()) {
+
+ Operators.onErrorDropped(t, sink.currentContext());
+ return;
+ }
+
+ onReadDone();
+ DataBufferUtils.release(dataBuffer);
+ Operators.onNextDropped(dataBuffer, sink.currentContext());
+ sink.error(t);
+ }
+
+ @Override
+ public void onComplete() {
+
+ if (onShouldRead()) {
+ emitNext(sink);
+ }
+ }
+ }
+ }
+}
diff --git a/spring-data-mongodb/src/main/java/org/springframework/data/mongodb/gridfs/GridFsOperationsSupport.java b/spring-data-mongodb/src/main/java/org/springframework/data/mongodb/gridfs/GridFsOperationsSupport.java
new file mode 100644
index 0000000000..8d4bd9f5a3
--- /dev/null
+++ b/spring-data-mongodb/src/main/java/org/springframework/data/mongodb/gridfs/GridFsOperationsSupport.java
@@ -0,0 +1,104 @@
+/*
+ * Copyright 2019 the original author or authors.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.springframework.data.mongodb.gridfs;
+
+import java.util.Optional;
+
+import org.bson.Document;
+import org.springframework.data.mongodb.core.convert.MongoConverter;
+import org.springframework.data.mongodb.core.convert.QueryMapper;
+import org.springframework.lang.Nullable;
+import org.springframework.util.Assert;
+import org.springframework.util.StringUtils;
+
+import com.mongodb.client.gridfs.model.GridFSUploadOptions;
+
+/**
+ * Base class offering common tasks like query mapping and {@link GridFSUploadOptions} computation to be shared across
+ * imperative and reactive implementations.
+ *
+ * @author Christoph Strobl
+ * @since 2.2
+ */
+class GridFsOperationsSupport {
+
+ private final QueryMapper queryMapper;
+ private final MongoConverter converter;
+
+ /**
+ * @param converter must not be {@literal null}.
+ */
+ GridFsOperationsSupport(MongoConverter converter) {
+
+ Assert.notNull(converter, "MongoConverter must not be null!");
+
+ this.converter = converter;
+ this.queryMapper = new QueryMapper(converter);
+ }
+
+ /**
+ * @param query pass the given query though a {@link QueryMapper} to apply type conversion.
+ * @return never {@literal null}.
+ */
+ protected Document getMappedQuery(Document query) {
+ return queryMapper.getMappedObject(query, Optional.empty());
+ }
+
+ /**
+ * Compute the {@link GridFSUploadOptions} to be used from the given {@literal contentType} and {@literal metadata}
+ * {@link Document}.
+ *
+ * @param contentType can be {@literal null}.
+ * @param metadata can be {@literal null}
+ * @return never {@literal null}.
+ */
+ protected GridFSUploadOptions computeUploadOptionsFor(@Nullable String contentType, @Nullable Document metadata) {
+
+ Document targetMetadata = new Document();
+
+ if (StringUtils.hasText(contentType)) {
+ targetMetadata.put(GridFsResource.CONTENT_TYPE_FIELD, contentType);
+ }
+
+ if (metadata != null) {
+ targetMetadata.putAll(metadata);
+ }
+
+ GridFSUploadOptions options = new GridFSUploadOptions();
+ options.metadata(targetMetadata);
+
+ return options;
+ }
+
+ /**
+ * Convert a given {@literal value} into a {@link Document}.
+ *
+ * @param value can be {@literal null}.
+ * @return an empty {@link Document} if the source value is {@literal null}.
+ */
+ protected Document toDocument(@Nullable Object value) {
+
+ if (value instanceof Document) {
+ return (Document) value;
+ }
+
+ Document document = new Document();
+ if (value != null) {
+ converter.write(value, document);
+ }
+ return document;
+ }
+}
diff --git a/spring-data-mongodb/src/main/java/org/springframework/data/mongodb/gridfs/GridFsTemplate.java b/spring-data-mongodb/src/main/java/org/springframework/data/mongodb/gridfs/GridFsTemplate.java
index 3b87d2a78b..6c278ac7c3 100644
--- a/spring-data-mongodb/src/main/java/org/springframework/data/mongodb/gridfs/GridFsTemplate.java
+++ b/spring-data-mongodb/src/main/java/org/springframework/data/mongodb/gridfs/GridFsTemplate.java
@@ -29,7 +29,6 @@
import org.springframework.core.io.support.ResourcePatternResolver;
import org.springframework.data.mongodb.MongoDbFactory;
import org.springframework.data.mongodb.core.convert.MongoConverter;
-import org.springframework.data.mongodb.core.convert.QueryMapper;
import org.springframework.data.mongodb.core.query.Query;
import org.springframework.lang.Nullable;
import org.springframework.util.Assert;
@@ -40,7 +39,6 @@
import com.mongodb.client.gridfs.GridFSBuckets;
import com.mongodb.client.gridfs.GridFSFindIterable;
import com.mongodb.client.gridfs.model.GridFSFile;
-import com.mongodb.client.gridfs.model.GridFSUploadOptions;
/**
* {@link GridFsOperations} implementation to store content into MongoDB GridFS.
@@ -54,13 +52,11 @@
* @author Hartmut Lang
* @author Niklas Helge Hanft
*/
-public class GridFsTemplate implements GridFsOperations, ResourcePatternResolver {
+public class GridFsTemplate extends GridFsOperationsSupport implements GridFsOperations, ResourcePatternResolver {
private final MongoDbFactory dbFactory;
private final @Nullable String bucket;
- private final MongoConverter converter;
- private final QueryMapper queryMapper;
/**
* Creates a new {@link GridFsTemplate} using the given {@link MongoDbFactory} and {@link MongoConverter}.
@@ -81,14 +77,12 @@ public GridFsTemplate(MongoDbFactory dbFactory, MongoConverter converter) {
*/
public GridFsTemplate(MongoDbFactory dbFactory, MongoConverter converter, @Nullable String bucket) {
+ super(converter);
+
Assert.notNull(dbFactory, "MongoDbFactory must not be null!");
- Assert.notNull(converter, "MongoConverter must not be null!");
this.dbFactory = dbFactory;
- this.converter = converter;
this.bucket = bucket;
-
- this.queryMapper = new QueryMapper(converter);
}
/*
@@ -137,16 +131,9 @@ public ObjectId store(InputStream content, @Nullable String filename, @Nullable
* (non-Javadoc)
* @see org.springframework.data.mongodb.gridfs.GridFsOperations#store(java.io.InputStream, java.lang.String, java.lang.String, java.lang.Object)
*/
- public ObjectId store(InputStream content, @Nullable String filename, @Nullable String contentType, @Nullable Object metadata) {
-
- Document document = null;
-
- if (metadata != null) {
- document = new Document();
- converter.write(metadata, document);
- }
-
- return store(content, filename, contentType, document);
+ public ObjectId store(InputStream content, @Nullable String filename, @Nullable String contentType,
+ @Nullable Object metadata) {
+ return store(content, filename, contentType, toDocument(metadata));
}
/*
@@ -161,25 +148,11 @@ public ObjectId store(InputStream content, @Nullable String filename, @Nullable
* (non-Javadoc)
* @see org.springframework.data.mongodb.gridfs.GridFsOperations#store(java.io.InputStream, java.lang.String, com.mongodb.Document)
*/
- public ObjectId store(InputStream content, @Nullable String filename, @Nullable String contentType, @Nullable Document metadata) {
+ public ObjectId store(InputStream content, @Nullable String filename, @Nullable String contentType,
+ @Nullable Document metadata) {
Assert.notNull(content, "InputStream must not be null!");
-
- GridFSUploadOptions options = new GridFSUploadOptions();
-
- Document mData = new Document();
-
- if (StringUtils.hasText(contentType)) {
- mData.put(GridFsResource.CONTENT_TYPE_FIELD, contentType);
- }
-
- if (metadata != null) {
- mData.putAll(metadata);
- }
-
- options.metadata(mData);
-
- return getGridFs().uploadFromStream(filename, content, options);
+ return getGridFs().uploadFromStream(filename, content, computeUploadOptionsFor(contentType, metadata));
}
/*
@@ -210,8 +183,8 @@ public GridFSFile findOne(Query query) {
*/
public void delete(Query query) {
- for (GridFSFile x : find(query)) {
- getGridFs().delete(((BsonObjectId) x.getId()).getValue());
+ for (GridFSFile gridFSFile : find(query)) {
+ getGridFs().delete(((BsonObjectId) gridFSFile.getId()).getValue());
}
}
@@ -246,9 +219,9 @@ public GridFsResource getResource(GridFSFile file) {
}
/*
- * (non-Javadoc)
- * @see org.springframework.core.io.support.ResourcePatternResolver#getResources(java.lang.String)
- */
+ * (non-Javadoc)
+ * @see org.springframework.core.io.support.ResourcePatternResolver#getResources(java.lang.String)
+ */
public GridFsResource[] getResources(String locationPattern) {
if (!StringUtils.hasText(locationPattern)) {
@@ -272,10 +245,6 @@ public GridFsResource[] getResources(String locationPattern) {
return new GridFsResource[] { getResource(locationPattern) };
}
- private Document getMappedQuery(Document query) {
- return queryMapper.getMappedObject(query, Optional.empty());
- }
-
private GridFSBucket getGridFs() {
MongoDatabase db = dbFactory.getDb();
diff --git a/spring-data-mongodb/src/main/java/org/springframework/data/mongodb/gridfs/ReactiveGridFsOperations.java b/spring-data-mongodb/src/main/java/org/springframework/data/mongodb/gridfs/ReactiveGridFsOperations.java
new file mode 100644
index 0000000000..c7f3a578db
--- /dev/null
+++ b/spring-data-mongodb/src/main/java/org/springframework/data/mongodb/gridfs/ReactiveGridFsOperations.java
@@ -0,0 +1,233 @@
+/*
+ * Copyright 2019 the original author or authors.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.springframework.data.mongodb.gridfs;
+
+import reactor.core.publisher.Flux;
+import reactor.core.publisher.Mono;
+
+import org.bson.Document;
+import org.bson.types.ObjectId;
+import org.reactivestreams.Publisher;
+import org.springframework.core.io.buffer.DataBuffer;
+import org.springframework.data.domain.Sort;
+import org.springframework.data.mongodb.core.query.Query;
+import org.springframework.lang.Nullable;
+
+import com.mongodb.client.gridfs.model.GridFSFile;
+import com.mongodb.reactivestreams.client.gridfs.AsyncInputStream;
+
+/**
+ * Collection of operations to store and read files from MongoDB GridFS using reactive infrastructure.
+ *
+ * @author Mark Paluch
+ * @author Christoph Strobl
+ * @since 2.2
+ */
+public interface ReactiveGridFsOperations {
+
+ /**
+ * Stores the given content into a file with the given name.
+ *
+ * @param content must not be {@literal null}.
+ * @param filename must not be {@literal null} or empty.
+ * @return a {@link Mono} emitting the {@link ObjectId} of the {@link com.mongodb.client.gridfs.model.GridFSFile} just
+ * created.
+ */
+ default Mono store(Publisher content, String filename) {
+ return store(content, filename, (Object) null);
+ }
+
+ /**
+ * Stores the given content into a file applying the given metadata.
+ *
+ * @param content must not be {@literal null}.
+ * @param metadata can be {@literal null}.
+ * @return a {@link Mono} emitting the {@link ObjectId} of the {@link com.mongodb.client.gridfs.model.GridFSFile} just
+ * created.
+ */
+ default Mono store(Publisher content, @Nullable Object metadata) {
+ return store(content, null, metadata);
+ }
+
+ /**
+ * Stores the given content into a file applying the given metadata.
+ *
+ * @param content must not be {@literal null}.
+ * @param metadata can be {@literal null}.
+ * @return a {@link Mono} emitting the {@link ObjectId} of the {@link com.mongodb.client.gridfs.model.GridFSFile} just
+ * created.
+ */
+ default Mono store(Publisher content, @Nullable Document metadata) {
+ return store(content, null, metadata);
+ }
+
+ /**
+ * Stores the given content into a file with the given name and content type.
+ *
+ * @param content must not be {@literal null}.
+ * @param filename must not be {@literal null} or empty.
+ * @param contentType can be {@literal null}.
+ * @return a {@link Mono} emitting the {@link ObjectId} of the {@link com.mongodb.client.gridfs.model.GridFSFile} just
+ * created.
+ */
+ default Mono store(Publisher content, @Nullable String filename, @Nullable String contentType) {
+ return store(content, filename, contentType, (Object) null);
+ }
+
+ /**
+ * Stores the given content into a file with the given name using the given metadata. The metadata object will be
+ * marshalled before writing.
+ *
+ * @param content must not be {@literal null}.
+ * @param filename can be {@literal null} or empty.
+ * @param metadata can be {@literal null}.
+ * @return a {@link Mono} emitting the {@link ObjectId} of the {@link com.mongodb.client.gridfs.model.GridFSFile} just
+ * created.
+ */
+ default Mono store(Publisher content, @Nullable String filename, @Nullable Object metadata) {
+ return store(content, filename, null, metadata);
+ }
+
+ /**
+ * Stores the given content into a file with the given name and content type using the given metadata. The metadata
+ * object will be marshalled before writing.
+ *
+ * @param content must not be {@literal null}.
+ * @param filename must not be {@literal null} or empty.
+ * @param contentType can be {@literal null}.
+ * @param metadata can be {@literal null}
+ * @return a {@link Mono} emitting the {@link ObjectId} of the {@link com.mongodb.client.gridfs.model.GridFSFile} just
+ * created.
+ */
+ Mono store(AsyncInputStream content, @Nullable String filename, @Nullable String contentType,
+ @Nullable Object metadata);
+
+ /**
+ * Stores the given content into a file with the given name and content type using the given metadata. The metadata
+ * object will be marshalled before writing.
+ *
+ * @param content must not be {@literal null}.
+ * @param filename must not be {@literal null} or empty.
+ * @param contentType can be {@literal null}.
+ * @param metadata can be {@literal null}
+ * @return a {@link Mono} emitting the {@link ObjectId} of the {@link com.mongodb.client.gridfs.model.GridFSFile} just
+ * created.
+ */
+ Mono store(Publisher content, @Nullable String filename, @Nullable String contentType,
+ @Nullable Object metadata);
+
+ /**
+ * Stores the given content into a file with the given name using the given metadata.
+ *
+ * @param content must not be {@literal null}.
+ * @param filename must not be {@literal null} or empty.
+ * @param metadata can be {@literal null}.
+ * @return a {@link Mono} emitting the {@link ObjectId} of the {@link com.mongodb.client.gridfs.model.GridFSFile} just
+ * created.
+ */
+ default Mono store(Publisher content, @Nullable String filename, @Nullable Document metadata) {
+ return store(content, filename, null, metadata);
+ }
+
+ /**
+ * Stores the given content into a file with the given name and content type using the given metadata.
+ *
+ * @param content must not be {@literal null}.
+ * @param filename must not be {@literal null} or empty.
+ * @param contentType can be {@literal null}.
+ * @param metadata can be {@literal null}.
+ * @return a {@link Mono} emitting the {@link ObjectId} of the {@link com.mongodb.client.gridfs.model.GridFSFile} just
+ * created.
+ */
+ Mono store(AsyncInputStream content, @Nullable String filename, @Nullable String contentType,
+ @Nullable Document metadata);
+
+ /**
+ * Stores the given content into a file with the given name and content type using the given metadata.
+ *
+ * @param content must not be {@literal null}.
+ * @param filename must not be {@literal null} or empty.
+ * @param contentType can be {@literal null}.
+ * @param metadata can be {@literal null}.
+ * @return a {@link Mono} emitting the {@link ObjectId} of the {@link com.mongodb.client.gridfs.model.GridFSFile} just
+ * created.
+ */
+ Mono store(Publisher content, @Nullable String filename, @Nullable String contentType,
+ @Nullable Document metadata);
+
+ /**
+ * Returns a {@link Flux} emitting all files matching the given query.
+ * Note: Currently {@link Sort} criteria defined at the {@link Query} will not be regarded as MongoDB
+ * does not support ordering for GridFS file access.
+ *
+ * @see MongoDB Jira: JAVA-431
+ * @param query must not be {@literal null}.
+ * @return {@link Flux#empty()} if no mach found.
+ */
+ Flux find(Query query);
+
+ /**
+ * Returns a {@link Mono} emitting a single {@link com.mongodb.client.gridfs.model.GridFSFile} matching the given
+ * query or {@link Mono#empty()} in case no file matches.
+ * NOTE If more than one file matches the given query the resulting {@link Mono} emits an error. If
+ * you want to obtain the first found file use {@link #findFirst(Query)}.
+ *
+ * @param query must not be {@literal null}.
+ * @return {@link Mono#empty()} if not match found.
+ */
+ Mono findOne(Query query);
+
+ /**
+ * Returns a {@link Mono} emitting the frist {@link com.mongodb.client.gridfs.model.GridFSFile} matching the given
+ * query or {@link Mono#empty()} in case no file matches.
+ *
+ * @param query must not be {@literal null}.
+ * @return {@link Mono#empty()} if not match found.
+ */
+ Mono findFirst(Query query);
+
+ /**
+ * Deletes all files matching the given {@link Query}.
+ *
+ * @param query must not be {@literal null}.
+ * @return a {@link Mono} signalling operation completion.
+ */
+ Mono delete(Query query);
+
+ /**
+ * Returns a {@link Mono} emitting the {@link ReactiveGridFsResource} with the given file name.
+ *
+ * @param filename must not be {@literal null}.
+ * @return {@link Mono#empty()} if no match found.
+ */
+ Mono getResource(String filename);
+
+ /**
+ * Returns a {@link Mono} emitting the {@link ReactiveGridFsResource} for a {@link GridFSFile}.
+ *
+ * @param file must not be {@literal null}.
+ * @return {@link Mono#empty()} if no match found.
+ */
+ Mono getResource(GridFSFile file);
+
+ /**
+ * Returns a {@link Flux} emitting all {@link ReactiveGridFsResource}s matching the given file name pattern.
+ *
+ * @param filenamePattern must not be {@literal null}.
+ * @return {@link Flux#empty()} if no match found.
+ */
+ Flux getResources(String filenamePattern);
+}
diff --git a/spring-data-mongodb/src/main/java/org/springframework/data/mongodb/gridfs/ReactiveGridFsResource.java b/spring-data-mongodb/src/main/java/org/springframework/data/mongodb/gridfs/ReactiveGridFsResource.java
new file mode 100644
index 0000000000..eb787cbd50
--- /dev/null
+++ b/spring-data-mongodb/src/main/java/org/springframework/data/mongodb/gridfs/ReactiveGridFsResource.java
@@ -0,0 +1,180 @@
+/*
+ * Copyright 2019 the original author or authors.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.springframework.data.mongodb.gridfs;
+
+import reactor.core.publisher.Flux;
+
+import java.io.ByteArrayInputStream;
+import java.io.FileNotFoundException;
+import java.io.IOException;
+import java.io.InputStream;
+
+import org.reactivestreams.Publisher;
+import org.springframework.core.io.AbstractResource;
+import org.springframework.core.io.Resource;
+import org.springframework.core.io.buffer.DataBuffer;
+import org.springframework.lang.Nullable;
+import org.springframework.util.Assert;
+
+import com.mongodb.client.gridfs.model.GridFSFile;
+
+/**
+ * Reactive {@link GridFSFile} based {@link Resource} implementation.
+ *
+ * @author Mark Paluch
+ * @since 2.2
+ */
+public class ReactiveGridFsResource extends AbstractResource {
+
+ static final String CONTENT_TYPE_FIELD = "_contentType";
+ private static final ByteArrayInputStream EMPTY_INPUT_STREAM = new ByteArrayInputStream(new byte[0]);
+
+ private final @Nullable GridFSFile file;
+ private final String filename;
+ private final Flux content;
+
+ /**
+ * Creates a new, absent {@link ReactiveGridFsResource}.
+ *
+ * @param filename filename of the absent resource.
+ * @param content
+ * @since 2.1
+ */
+ private ReactiveGridFsResource(String filename, Publisher content) {
+
+ this.file = null;
+ this.filename = filename;
+ this.content = Flux.from(content);
+ }
+
+ /**
+ * Creates a new {@link ReactiveGridFsResource} from the given {@link GridFSFile}.
+ *
+ * @param file must not be {@literal null}.
+ * @param content
+ */
+ public ReactiveGridFsResource(GridFSFile file, Publisher content) {
+
+ this.file = file;
+ this.filename = file.getFilename();
+ this.content = Flux.from(content);
+ }
+
+ /**
+ * Obtain an absent {@link ReactiveGridFsResource}.
+ *
+ * @param filename filename of the absent resource, must not be {@literal null}.
+ * @return never {@literal null}.
+ * @since 2.1
+ */
+ public static ReactiveGridFsResource absent(String filename) {
+
+ Assert.notNull(filename, "Filename must not be null");
+
+ return new ReactiveGridFsResource(filename, Flux.empty());
+ }
+
+ /*
+ * (non-Javadoc)
+ * @see org.springframework.core.io.InputStreamResource#getInputStream()
+ */
+ @Override
+ public InputStream getInputStream() throws IllegalStateException {
+ throw new UnsupportedOperationException();
+ }
+
+ /*
+ * (non-Javadoc)
+ * @see org.springframework.core.io.AbstractResource#contentLength()
+ */
+ @Override
+ public long contentLength() throws IOException {
+
+ verifyExists();
+ return file.getLength();
+ }
+
+ /*
+ * (non-Javadoc)
+ * @see org.springframework.core.io.AbstractResource#getFilename()
+ */
+ @Override
+ public String getFilename() throws IllegalStateException {
+ return filename;
+ }
+
+ /*
+ * (non-Javadoc)
+ * @see org.springframework.core.io.AbstractResource#exists()
+ */
+ @Override
+ public boolean exists() {
+ return file != null;
+ }
+
+ /*
+ * (non-Javadoc)
+ * @see org.springframework.core.io.AbstractResource#lastModified()
+ */
+ @Override
+ public long lastModified() throws IOException {
+
+ verifyExists();
+ return file.getUploadDate().getTime();
+ }
+
+ /*
+ * (non-Javadoc)
+ * @see org.springframework.core.io.AbstractResource#getDescription()
+ */
+ @Override
+ public String getDescription() {
+ return String.format("GridFs resource [%s]", this.getFilename());
+ }
+
+ /**
+ * Returns the {@link Resource}'s id.
+ *
+ * @return never {@literal null}.
+ * @throws IllegalStateException if the file does not {@link #exists()}.
+ */
+ public Object getId() {
+
+ Assert.state(exists(), () -> String.format("%s does not exist.", getDescription()));
+
+ return file.getId();
+ }
+
+ /**
+ * Retrieve the download stream.
+ *
+ * @return
+ */
+ public Flux getDownloadStream() {
+
+ if (!exists()) {
+ return Flux.error(new FileNotFoundException(String.format("%s does not exist.", getDescription())));
+ }
+ return content;
+ }
+
+ private void verifyExists() throws FileNotFoundException {
+
+ if (!exists()) {
+ throw new FileNotFoundException(String.format("%s does not exist.", getDescription()));
+ }
+ }
+}
diff --git a/spring-data-mongodb/src/main/java/org/springframework/data/mongodb/gridfs/ReactiveGridFsTemplate.java b/spring-data-mongodb/src/main/java/org/springframework/data/mongodb/gridfs/ReactiveGridFsTemplate.java
new file mode 100644
index 0000000000..2c182c6871
--- /dev/null
+++ b/spring-data-mongodb/src/main/java/org/springframework/data/mongodb/gridfs/ReactiveGridFsTemplate.java
@@ -0,0 +1,275 @@
+/*
+ * Copyright 2019 the original author or authors.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.springframework.data.mongodb.gridfs;
+
+import static org.springframework.data.mongodb.core.query.Query.*;
+import static org.springframework.data.mongodb.gridfs.GridFsCriteria.*;
+
+import reactor.core.publisher.Flux;
+import reactor.core.publisher.Mono;
+
+import org.bson.Document;
+import org.bson.types.ObjectId;
+import org.reactivestreams.Publisher;
+import org.springframework.core.io.buffer.DataBuffer;
+import org.springframework.core.io.buffer.DataBufferFactory;
+import org.springframework.core.io.buffer.DefaultDataBufferFactory;
+import org.springframework.dao.IncorrectResultSizeDataAccessException;
+import org.springframework.data.mongodb.ReactiveMongoDatabaseFactory;
+import org.springframework.data.mongodb.core.convert.MongoConverter;
+import org.springframework.data.mongodb.core.query.Query;
+import org.springframework.data.mongodb.core.query.SerializationUtils;
+import org.springframework.lang.Nullable;
+import org.springframework.util.Assert;
+import org.springframework.util.StringUtils;
+
+import com.mongodb.client.gridfs.model.GridFSFile;
+import com.mongodb.reactivestreams.client.MongoDatabase;
+import com.mongodb.reactivestreams.client.gridfs.AsyncInputStream;
+import com.mongodb.reactivestreams.client.gridfs.GridFSBucket;
+import com.mongodb.reactivestreams.client.gridfs.GridFSBuckets;
+import com.mongodb.reactivestreams.client.gridfs.GridFSDownloadStream;
+import com.mongodb.reactivestreams.client.gridfs.GridFSFindPublisher;
+
+/**
+ * {@link ReactiveGridFsOperations} implementation to store content into MongoDB GridFS. Uses by default
+ * {@link DefaultDataBufferFactory} to create {@link DataBuffer buffers}.
+ *
+ * @author Mark Paluch
+ * @since 2.2
+ */
+public class ReactiveGridFsTemplate extends GridFsOperationsSupport implements ReactiveGridFsOperations {
+
+ private final ReactiveMongoDatabaseFactory dbFactory;
+ private final DataBufferFactory dataBufferFactory;
+ private final @Nullable String bucket;
+
+ /**
+ * Creates a new {@link ReactiveGridFsTemplate} using the given {@link ReactiveMongoDatabaseFactory} and
+ * {@link MongoConverter}.
+ *
+ * @param dbFactory must not be {@literal null}.
+ * @param converter must not be {@literal null}.
+ */
+ public ReactiveGridFsTemplate(ReactiveMongoDatabaseFactory dbFactory, MongoConverter converter) {
+ this(dbFactory, converter, null);
+ }
+
+ /**
+ * Creates a new {@link ReactiveGridFsTemplate} using the given {@link ReactiveMongoDatabaseFactory} and
+ * {@link MongoConverter}.
+ *
+ * @param dbFactory must not be {@literal null}.
+ * @param converter must not be {@literal null}.
+ * @param bucket
+ */
+ public ReactiveGridFsTemplate(ReactiveMongoDatabaseFactory dbFactory, MongoConverter converter,
+ @Nullable String bucket) {
+ this(new DefaultDataBufferFactory(), dbFactory, converter, bucket);
+ }
+
+ /**
+ * Creates a new {@link ReactiveGridFsTemplate} using the given {@link DataBufferFactory},
+ * {@link ReactiveMongoDatabaseFactory} and {@link MongoConverter}.
+ *
+ * @param dataBufferFactory must not be {@literal null}.
+ * @param dbFactory must not be {@literal null}.
+ * @param converter must not be {@literal null}.
+ * @param bucket
+ */
+ public ReactiveGridFsTemplate(DataBufferFactory dataBufferFactory, ReactiveMongoDatabaseFactory dbFactory,
+ MongoConverter converter, @Nullable String bucket) {
+
+ super(converter);
+
+ Assert.notNull(dataBufferFactory, "DataBufferFactory must not be null!");
+ Assert.notNull(dbFactory, "ReactiveMongoDatabaseFactory must not be null!");
+
+ this.dataBufferFactory = dataBufferFactory;
+ this.dbFactory = dbFactory;
+ this.bucket = bucket;
+ }
+
+ /*
+ * (non-Javadoc)
+ * @see org.springframework.data.mongodb.gridfs.ReactiveGridFsOperations#store(com.mongodb.reactivestreams.client.gridfs.AsyncInputStream, java.lang.String, java.lang.String, java.lang.Object)
+ */
+ @Override
+ public Mono store(AsyncInputStream content, @Nullable String filename, @Nullable String contentType,
+ @Nullable Object metadata) {
+ return store(content, filename, contentType, toDocument(metadata));
+ }
+
+ /*
+ * (non-Javadoc)
+ * @see org.springframework.data.mongodb.gridfs.ReactiveGridFsOperations#store(org.reactivestreams.Publisher, java.lang.String, java.lang.String, java.lang.Object)
+ */
+ @Override
+ public Mono store(Publisher content, @Nullable String filename, @Nullable String contentType,
+ @Nullable Object metadata) {
+ return store(content, filename, contentType, toDocument(metadata));
+ }
+
+ /*
+ * (non-Javadoc)
+ * @see org.springframework.data.mongodb.gridfs.ReactiveGridFsOperations#store(com.mongodb.reactivestreams.client.gridfs.AsyncInputStream, java.lang.String, java.lang.String, org.bson.Document)
+ */
+ @Override
+ public Mono store(AsyncInputStream content, @Nullable String filename, @Nullable String contentType,
+ @Nullable Document metadata) {
+
+ Assert.notNull(content, "InputStream must not be null!");
+ return Mono.from(getGridFs().uploadFromStream(filename, content, computeUploadOptionsFor(contentType, metadata)));
+ }
+
+ /*
+ * (non-Javadoc)
+ * @see org.springframework.data.mongodb.gridfs.ReactiveGridFsOperations#store(org.reactivestreams.Publisher, java.lang.String, java.lang.String, org.bson.Document)
+ */
+ @Override
+ public Mono store(Publisher content, @Nullable String filename, @Nullable String contentType,
+ @Nullable Document metadata) {
+
+ Assert.notNull(content, "Content must not be null!");
+
+ return BinaryStreamAdapters.toAsyncInputStream(content).flatMap(it -> store(it, filename, contentType, metadata));
+ }
+
+ /*
+ * (non-Javadoc)
+ * @see org.springframework.data.mongodb.gridfs.ReactiveGridFsOperations#find(org.springframework.data.mongodb.core.query.Query)
+ */
+ @Override
+ public Flux find(Query query) {
+ return Flux.from(prepareQuery(query));
+ }
+
+ /*
+ * (non-Javadoc)
+ * @see org.springframework.data.mongodb.gridfs.ReactiveGridFsOperations#findOne(org.springframework.data.mongodb.core.query.Query)
+ */
+ @Override
+ public Mono findOne(Query query) {
+
+ return Flux.from(prepareQuery(query).limit(2)) //
+ .collectList() //
+ .flatMap(it -> {
+ if (it.isEmpty()) {
+ return Mono.empty();
+ }
+
+ if (it.size() > 1) {
+ return Mono.error(new IncorrectResultSizeDataAccessException(
+ "Query " + SerializationUtils.serializeToJsonSafely(query) + " returned non unique result.", 1));
+ }
+
+ return Mono.just(it.get(0));
+ });
+ }
+
+ /*
+ * (non-Javadoc)
+ * @see org.springframework.data.mongodb.gridfs.ReactiveGridFsOperations#findFirst(org.springframework.data.mongodb.core.query.Query)
+ */
+ @Override
+ public Mono findFirst(Query query) {
+ return Flux.from(prepareQuery(query).limit(1)).next();
+ }
+
+ /*
+ * (non-Javadoc)
+ * @see org.springframework.data.mongodb.gridfs.ReactiveGridFsOperations#delete(org.springframework.data.mongodb.core.query.Query)
+ */
+ @Override
+ public Mono delete(Query query) {
+ return find(query).flatMap(it -> getGridFs().delete(it.getId())).then();
+ }
+
+ /*
+ * (non-Javadoc)
+ * @see org.springframework.data.mongodb.gridfs.ReactiveGridFsOperations#getResource(java.lang.String)
+ */
+ @Override
+ public Mono getResource(String location) {
+
+ Assert.notNull(location, "Filename must not be null!");
+
+ return findOne(query(whereFilename().is(location))).flatMap(this::getResource)
+ .defaultIfEmpty(ReactiveGridFsResource.absent(location));
+ }
+
+ /*
+ * (non-Javadoc)
+ * @see org.springframework.data.mongodb.gridfs.ReactiveGridFsOperations#getResource(com.mongodb.client.gridfs.model.GridFSFile)
+ */
+ @Override
+ public Mono getResource(GridFSFile file) {
+
+ Assert.notNull(file, "GridFSFile must not be null!");
+
+ return Mono.fromSupplier(() -> {
+
+ GridFSDownloadStream stream = getGridFs().openDownloadStream(file.getObjectId());
+
+ return new ReactiveGridFsResource(file, BinaryStreamAdapters.toPublisher(stream, dataBufferFactory));
+ });
+ }
+
+ /*
+ * (non-Javadoc)
+ * @see org.springframework.data.mongodb.gridfs.ReactiveGridFsOperations#getResources(java.lang.String)
+ */
+ @Override
+ public Flux getResources(String locationPattern) {
+
+ if (!StringUtils.hasText(locationPattern)) {
+ return Flux.empty();
+ }
+
+ AntPath path = new AntPath(locationPattern);
+
+ if (path.isPattern()) {
+
+ Flux files = find(query(whereFilename().regex(path.toRegex())));
+ return files.flatMap(this::getResource);
+ }
+
+ return getResource(locationPattern).flux();
+ }
+
+ protected GridFSFindPublisher prepareQuery(Query query) {
+
+ Assert.notNull(query, "Query must not be null!");
+
+ Document queryObject = getMappedQuery(query.getQueryObject());
+ Document sortObject = getMappedQuery(query.getSortObject());
+
+ GridFSFindPublisher publisherToUse = getGridFs().find(queryObject).sort(sortObject);
+
+ Integer cursorBatchSize = query.getMeta().getCursorBatchSize();
+ if (cursorBatchSize != null) {
+ publisherToUse = publisherToUse.batchSize(cursorBatchSize);
+ }
+
+ return publisherToUse;
+ }
+
+ protected GridFSBucket getGridFs() {
+
+ MongoDatabase db = dbFactory.getMongoDatabase();
+ return bucket == null ? GridFSBuckets.create(db) : GridFSBuckets.create(db, bucket);
+ }
+}
diff --git a/spring-data-mongodb/src/test/java/org/springframework/data/mongodb/gridfs/BinaryStreamAdaptersUnitTests.java b/spring-data-mongodb/src/test/java/org/springframework/data/mongodb/gridfs/BinaryStreamAdaptersUnitTests.java
new file mode 100644
index 0000000000..b94de93afb
--- /dev/null
+++ b/spring-data-mongodb/src/test/java/org/springframework/data/mongodb/gridfs/BinaryStreamAdaptersUnitTests.java
@@ -0,0 +1,102 @@
+/*
+ * Copyright 2019 the original author or authors.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.springframework.data.mongodb.gridfs;
+
+import static org.assertj.core.api.Assertions.*;
+
+import reactor.core.publisher.Flux;
+import reactor.core.publisher.Mono;
+import reactor.test.StepVerifier;
+
+import java.io.IOException;
+import java.nio.ByteBuffer;
+
+import org.junit.Test;
+import org.springframework.core.io.ClassPathResource;
+import org.springframework.core.io.buffer.DataBuffer;
+import org.springframework.core.io.buffer.DataBufferUtils;
+import org.springframework.core.io.buffer.DefaultDataBufferFactory;
+import org.springframework.util.StreamUtils;
+
+import com.mongodb.reactivestreams.client.gridfs.AsyncInputStream;
+import com.mongodb.reactivestreams.client.gridfs.helpers.AsyncStreamHelper;
+
+/**
+ * Unit tests for {@link BinaryStreamAdapters}.
+ *
+ * @author Mark Paluch
+ */
+public class BinaryStreamAdaptersUnitTests {
+
+ @Test // DATAMONGO-1855
+ public void shouldAdaptAsyncInputStreamToDataBufferPublisher() throws IOException {
+
+ ClassPathResource resource = new ClassPathResource("gridfs/gridfs.xml");
+
+ byte[] content = StreamUtils.copyToByteArray(resource.getInputStream());
+ AsyncInputStream inputStream = AsyncStreamHelper.toAsyncInputStream(resource.getInputStream());
+
+ Flux dataBuffers = BinaryStreamAdapters.toPublisher(inputStream, new DefaultDataBufferFactory());
+
+ DataBufferUtils.join(dataBuffers) //
+ .as(StepVerifier::create) //
+ .consumeNextWith(actual -> {
+
+ byte[] actualContent = new byte[actual.readableByteCount()];
+ actual.read(actualContent);
+ assertThat(actualContent).isEqualTo(content);
+ }) //
+ .verifyComplete();
+ }
+
+ @Test // DATAMONGO-1855
+ public void shouldAdaptBinaryPublisherToAsyncInputStream() throws IOException {
+
+ ClassPathResource resource = new ClassPathResource("gridfs/gridfs.xml");
+
+ byte[] content = StreamUtils.copyToByteArray(resource.getInputStream());
+
+ Flux dataBuffers = DataBufferUtils.readInputStream(resource::getInputStream,
+ new DefaultDataBufferFactory(), 10);
+
+ AsyncInputStream inputStream = BinaryStreamAdapters.toAsyncInputStream(dataBuffers).block();
+ ByteBuffer complete = readBuffer(inputStream);
+
+ assertThat(complete).isEqualTo(ByteBuffer.wrap(content));
+ }
+
+ static ByteBuffer readBuffer(AsyncInputStream inputStream) {
+
+ ByteBuffer complete = ByteBuffer.allocate(1024);
+
+ boolean hasData = true;
+ while (hasData) {
+
+ ByteBuffer chunk = ByteBuffer.allocate(100);
+
+ Integer bytesRead = Mono.from(inputStream.read(chunk)).block();
+
+ chunk.flip();
+ complete.put(chunk);
+
+ hasData = bytesRead > -1;
+ }
+
+ complete.flip();
+
+ return complete;
+ }
+}
diff --git a/spring-data-mongodb/src/test/java/org/springframework/data/mongodb/gridfs/ReactiveGridFsTemplateTests.java b/spring-data-mongodb/src/test/java/org/springframework/data/mongodb/gridfs/ReactiveGridFsTemplateTests.java
new file mode 100644
index 0000000000..879bbdf1a7
--- /dev/null
+++ b/spring-data-mongodb/src/test/java/org/springframework/data/mongodb/gridfs/ReactiveGridFsTemplateTests.java
@@ -0,0 +1,202 @@
+/*
+ * Copyright 2019 the original author or authors.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.springframework.data.mongodb.gridfs;
+
+import static org.assertj.core.api.Assertions.*;
+import static org.springframework.data.mongodb.core.query.Criteria.where;
+import static org.springframework.data.mongodb.core.query.Query.*;
+import static org.springframework.data.mongodb.gridfs.GridFsCriteria.*;
+
+import org.springframework.dao.IncorrectResultSizeDataAccessException;
+import reactor.core.publisher.Flux;
+import reactor.test.StepVerifier;
+
+import java.io.IOException;
+
+import org.bson.BsonObjectId;
+import org.bson.Document;
+import org.bson.types.ObjectId;
+import org.junit.Before;
+import org.junit.Test;
+import org.junit.runner.RunWith;
+import org.springframework.beans.factory.annotation.Autowired;
+import org.springframework.core.io.ClassPathResource;
+import org.springframework.core.io.Resource;
+import org.springframework.core.io.buffer.DataBufferUtils;
+import org.springframework.core.io.buffer.DefaultDataBuffer;
+import org.springframework.core.io.buffer.DefaultDataBufferFactory;
+import org.springframework.data.mongodb.core.query.Query;
+import org.springframework.test.context.ContextConfiguration;
+import org.springframework.test.context.junit4.SpringRunner;
+import org.springframework.util.StreamUtils;
+
+import com.mongodb.reactivestreams.client.gridfs.AsyncInputStream;
+import com.mongodb.reactivestreams.client.gridfs.helpers.AsyncStreamHelper;
+
+/**
+ * Integration tests for {@link ReactiveGridFsTemplate}.
+ *
+ * @author Mark Paluch
+ * @author Christoph Strobl
+ */
+@RunWith(SpringRunner.class)
+@ContextConfiguration("classpath:gridfs/reactive-gridfs.xml")
+public class ReactiveGridFsTemplateTests {
+
+ Resource resource = new ClassPathResource("gridfs/gridfs.xml");
+
+ @Autowired ReactiveGridFsOperations operations;
+
+ @Before
+ public void setUp() {
+
+ operations.delete(new Query()) //
+ .as(StepVerifier::create) //
+ .verifyComplete();
+ }
+
+ @Test // DATAMONGO-1855
+ public void storesAndFindsSimpleDocument() {
+
+ DefaultDataBufferFactory factory = new DefaultDataBufferFactory();
+ DefaultDataBuffer first = factory.wrap("first".getBytes());
+ DefaultDataBuffer second = factory.wrap("second".getBytes());
+
+ ObjectId reference = operations.store(Flux.just(first, second), "foo.xml").block();
+
+ operations.find(query(where("_id").is(reference))) //
+ .as(StepVerifier::create) //
+ .assertNext(actual -> {
+ assertThat(((BsonObjectId) actual.getId()).getValue()).isEqualTo(reference);
+ }) //
+ .verifyComplete();
+ }
+
+ @Test // DATAMONGO-1855
+ public void writesMetadataCorrectly() throws IOException {
+
+ Document metadata = new Document("key", "value");
+
+ AsyncInputStream stream = AsyncStreamHelper.toAsyncInputStream(resource.getInputStream());
+
+ ObjectId reference = operations.store(stream, "foo.xml", "binary/octet-stream", metadata).block();
+
+ operations.find(query(whereMetaData("key").is("value"))) //
+ .as(StepVerifier::create) //
+ .consumeNextWith(actual -> {
+ assertThat(actual.getObjectId()).isEqualTo(reference);
+ }) //
+ .verifyComplete();
+ }
+
+ @Test // DATAMONGO-1855
+ public void marshalsComplexMetadata() throws IOException {
+
+ Metadata metadata = new Metadata();
+ metadata.version = "1.0";
+
+ AsyncInputStream stream = AsyncStreamHelper.toAsyncInputStream(resource.getInputStream());
+
+ ObjectId reference = operations.store(stream, "foo.xml", "binary/octet-stream", metadata).block();
+
+ operations.find(query(whereMetaData("version").is("1.0"))) //
+ .as(StepVerifier::create) //
+ .consumeNextWith(actual -> {
+ assertThat(actual.getObjectId()).isEqualTo(reference);
+ assertThat(actual.getMetadata()).containsEntry("version", "1.0");
+ }) //
+ .verifyComplete();
+ }
+
+ @Test // DATAMONGO-1855
+ public void getResourceShouldRetrieveContentByIdentity() throws IOException {
+
+ byte[] content = StreamUtils.copyToByteArray(resource.getInputStream());
+ AsyncInputStream upload = AsyncStreamHelper.toAsyncInputStream(resource.getInputStream());
+
+ ObjectId reference = operations.store(upload, "foo.xml", null, null).block();
+
+ operations.findOne(query(where("_id").is(reference))).flatMap(operations::getResource)
+ .flatMapMany(ReactiveGridFsResource::getDownloadStream) //
+ .transform(DataBufferUtils::join) //
+ .as(StepVerifier::create) //
+ .consumeNextWith(dataBuffer -> {
+
+ byte[] actual = new byte[dataBuffer.readableByteCount()];
+ dataBuffer.read(actual);
+
+ assertThat(actual).isEqualTo(content);
+ }) //
+ .verifyComplete();
+ }
+
+ @Test // DATAMONGO-1855
+ public void shouldEmitFirstEntryWhenFindFirstRetrievesMoreThanOneResult() throws IOException {
+
+ AsyncInputStream upload1 = AsyncStreamHelper.toAsyncInputStream(resource.getInputStream());
+ AsyncInputStream upload2 = AsyncStreamHelper.toAsyncInputStream(new ClassPathResource("gridfs/another-resource.xml").getInputStream());
+
+ operations.store(upload1, "foo.xml", null, null).block();
+ operations.store(upload2, "foo2.xml", null, null).block();
+
+ operations.findFirst(query(where("filename").regex("foo*"))) //
+ .flatMap(operations::getResource) //
+ .as(StepVerifier::create) //
+ .expectNextCount(1) //
+ .verifyComplete();
+ }
+
+ @Test // DATAMONGO-1855
+ public void shouldEmitErrorWhenFindOneRetrievesMoreThanOneResult() throws IOException {
+
+ AsyncInputStream upload1 = AsyncStreamHelper.toAsyncInputStream(resource.getInputStream());
+ AsyncInputStream upload2 = AsyncStreamHelper.toAsyncInputStream(new ClassPathResource("gridfs/another-resource.xml").getInputStream());
+
+ operations.store(upload1, "foo.xml", null, null).block();
+ operations.store(upload2, "foo2.xml", null, null).block();
+
+ operations.findOne(query(where("filename").regex("foo*"))) //
+ .as(StepVerifier::create) //
+ .expectError(IncorrectResultSizeDataAccessException.class) //
+ .verify();
+ }
+
+ @Test // DATAMONGO-1855
+ public void getResourcesByPattern() throws IOException {
+
+ byte[] content = StreamUtils.copyToByteArray(resource.getInputStream());
+ AsyncInputStream upload = AsyncStreamHelper.toAsyncInputStream(resource.getInputStream());
+
+ operations.store(upload, "foo.xml", null, null).block();
+
+ operations.getResources("foo*") //
+ .flatMap(ReactiveGridFsResource::getDownloadStream) //
+ .transform(DataBufferUtils::join) //
+ .as(StepVerifier::create) //
+ .consumeNextWith(dataBuffer -> {
+
+ byte[] actual = new byte[dataBuffer.readableByteCount()];
+ dataBuffer.read(actual);
+
+ assertThat(actual).isEqualTo(content);
+ }) //
+ .verifyComplete();
+ }
+
+ static class Metadata {
+ String version;
+ }
+}
diff --git a/spring-data-mongodb/src/test/resources/gridfs/reactive-gridfs.xml b/spring-data-mongodb/src/test/resources/gridfs/reactive-gridfs.xml
new file mode 100644
index 0000000000..f07ee8b08d
--- /dev/null
+++ b/spring-data-mongodb/src/test/resources/gridfs/reactive-gridfs.xml
@@ -0,0 +1,30 @@
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
diff --git a/src/main/asciidoc/new-features.adoc b/src/main/asciidoc/new-features.adoc
index 8135ad698f..80ec95981e 100644
--- a/src/main/asciidoc/new-features.adoc
+++ b/src/main/asciidoc/new-features.adoc
@@ -5,6 +5,7 @@
== What's New in Spring Data MongoDB 2.2
* <>
* <> via `ReactiveQuerydslPredicateExecutor`.
+* <>.
[[new-features.2-1-0]]
== What's New in Spring Data MongoDB 2.1
diff --git a/src/main/asciidoc/reference/reactive-mongodb.adoc b/src/main/asciidoc/reference/reactive-mongodb.adoc
index bb9ac80f3e..eaa4a51f2c 100644
--- a/src/main/asciidoc/reference/reactive-mongodb.adoc
+++ b/src/main/asciidoc/reference/reactive-mongodb.adoc
@@ -482,3 +482,96 @@ Flux hasIndex = operations.execute("geolocation",
.flatMap(document -> Mono.just(true))
.defaultIfEmpty(false));
----
+
+[[reactive.gridfs]]
+== GridFS Support
+
+MongoDB supports storing binary files inside its filesystem, GridFS.
+Spring Data MongoDB provides a `ReactiveGridFsOperations` interface as well as the corresponding implementation, `ReactiveGridFsTemplate`, to let you interact with the filesystem.
+You can set up a `ReactiveGridFsTemplate` instance by handing it a `ReactiveMongoDatabaseFactory` as well as a `MongoConverter`, as the following example shows:
+
+.JavaConfig setup for a ReactiveGridFsTemplate
+====
+[source,java]
+----
+class GridFsConfiguration extends AbstractReactiveMongoConfiguration {
+
+ // … further configuration omitted
+
+ @Bean
+ public ReactiveGridFsTemplate reactiveGridFsTemplate() {
+ return new ReactiveGridFsTemplate(reactiveMongoDbFactory(), mappingMongoConverter());
+ }
+}
+----
+====
+
+The template can now be injected and used to perform storage and retrieval operations, as the following example shows:
+
+.Using ReactiveGridFsTemplate to store files
+====
+[source,java]
+----
+class ReactiveGridFsClient {
+
+ @Autowired
+ ReactiveGridFsTemplate operations;
+
+ @Test
+ public Mono storeFileToGridFs() {
+
+ FileMetadata metadata = new FileMetadata();
+ // populate metadata
+ Publisher file = … // lookup File or Resource
+
+ return operations.store(file, "filename.txt", metadata);
+ }
+}
+----
+====
+
+The `store(…)` operations take an `Publisher`, a filename, and (optionally) metadata information about the file to store. The metadata can be an arbitrary object, which will be marshaled by the `MongoConverter` configured with the `ReactiveGridFsTemplate`. Alternatively, you can also provide a `Document`.
+
+NOTE: MongoDB's driver uses `AsyncInputStream` and `AsyncOutputStream` interfaces to exchange binary streams. Spring Data MongoDB adapts these interfaces to `Publisher`. Read more about `DataBuffer` in http://docs.spring.io/spring/docs/{springVersion}/spring-framework-reference/core.html#databuffers[Spring's reference documentation].
+
+You can read files from the filesystem through either the `find(…)` or the `getResources(…)` methods. Let's have a look at the `find(…)` methods first. You can either find a single file or multiple files that match a `Query`. You can use the `GridFsCriteria` helper class to define queries. It provides static factory methods to encapsulate default metadata fields (such as `whereFilename()` and `whereContentType()`) or a custom one through `whereMetaData()`. The following example shows how to use `ReactiveGridFsTemplate` to query for files:
+
+.Using ReactiveGridFsTemplate to query for files
+====
+[source,java]
+----
+class ReactiveGridFsClient {
+
+ @Autowired
+ ReactiveGridFsTemplate operations;
+
+ @Test
+ public Flux findFilesInGridFs() {
+ return operations.find(query(whereFilename().is("filename.txt")))
+ }
+}
+----
+====
+
+NOTE: Currently, MongoDB does not support defining sort criteria when retrieving files from GridFS. For this reason, any sort criteria defined on the `Query` instance handed into the `find(…)` method are disregarded.
+
+The other option to read files from the GridFs is to use the methods modeled along the lines of `ResourcePatternResolver`.
+`ReactiveGridFsOperations` uses reactive types to defer execution while `ResourcePatternResolver` uses a synchronous interface.
+These methods allow handing an Ant path into the method and can thus retrieve files matching the given pattern. The following example shows how to use `ReactiveGridFsTemplate` to read files:
+
+.Using ReactiveGridFsTemplate to read files
+====
+[source,java]
+----
+class ReactiveGridFsClient {
+
+ @Autowired
+ ReactiveGridFsOperations operations;
+
+ @Test
+ public void readFilesFromGridFs() {
+ Flux txtFiles = operations.getResources("*.txt");
+ }
+}
+----
+====