From f5bb056da2279df894b7e2209eaf8ec9b33a0bb6 Mon Sep 17 00:00:00 2001 From: Mark Paluch Date: Thu, 17 Jan 2019 09:47:15 +0100 Subject: [PATCH 1/5] DATAMONGO-1855 - Prepare issue branch. --- pom.xml | 2 +- spring-data-mongodb-benchmarks/pom.xml | 2 +- spring-data-mongodb-cross-store/pom.xml | 4 ++-- spring-data-mongodb-distribution/pom.xml | 2 +- spring-data-mongodb/pom.xml | 2 +- 5 files changed, 6 insertions(+), 6 deletions(-) diff --git a/pom.xml b/pom.xml index 657607148e..378af07e5d 100644 --- a/pom.xml +++ b/pom.xml @@ -5,7 +5,7 @@ org.springframework.data spring-data-mongodb-parent - 2.2.0.BUILD-SNAPSHOT + 2.2.0.DATAMONGO-1855-SNAPSHOT pom Spring Data MongoDB diff --git a/spring-data-mongodb-benchmarks/pom.xml b/spring-data-mongodb-benchmarks/pom.xml index c2ff37b35c..cd2ab5857a 100644 --- a/spring-data-mongodb-benchmarks/pom.xml +++ b/spring-data-mongodb-benchmarks/pom.xml @@ -7,7 +7,7 @@ org.springframework.data spring-data-mongodb-parent - 2.2.0.BUILD-SNAPSHOT + 2.2.0.DATAMONGO-1855-SNAPSHOT ../pom.xml diff --git a/spring-data-mongodb-cross-store/pom.xml b/spring-data-mongodb-cross-store/pom.xml index fd36f227c0..3c604ed0d9 100644 --- a/spring-data-mongodb-cross-store/pom.xml +++ b/spring-data-mongodb-cross-store/pom.xml @@ -6,7 +6,7 @@ org.springframework.data spring-data-mongodb-parent - 2.2.0.BUILD-SNAPSHOT + 2.2.0.DATAMONGO-1855-SNAPSHOT ../pom.xml @@ -50,7 +50,7 @@ org.springframework.data spring-data-mongodb - 2.2.0.BUILD-SNAPSHOT + 2.2.0.DATAMONGO-1855-SNAPSHOT diff --git a/spring-data-mongodb-distribution/pom.xml b/spring-data-mongodb-distribution/pom.xml index fc8d28a2b6..40cdba2c1a 100644 --- a/spring-data-mongodb-distribution/pom.xml +++ b/spring-data-mongodb-distribution/pom.xml @@ -14,7 +14,7 @@ org.springframework.data spring-data-mongodb-parent - 2.2.0.BUILD-SNAPSHOT + 2.2.0.DATAMONGO-1855-SNAPSHOT ../pom.xml diff --git a/spring-data-mongodb/pom.xml b/spring-data-mongodb/pom.xml index f3c85a046a..80578399a4 100644 --- a/spring-data-mongodb/pom.xml +++ b/spring-data-mongodb/pom.xml @@ -11,7 +11,7 @@ org.springframework.data spring-data-mongodb-parent - 2.2.0.BUILD-SNAPSHOT + 2.2.0.DATAMONGO-1855-SNAPSHOT ../pom.xml From ebc282185e1125936eae610b5a61160c17be969b Mon Sep 17 00:00:00 2001 From: Mark Paluch Date: Thu, 17 Jan 2019 15:38:29 +0100 Subject: [PATCH 2/5] DATAMONGO-1855 - Initial reactive GridFS support. MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit We now support reactive GridFS using MongoDB's reactive GridFS API. Files can be consumed and provided as binary stream. ReactiveGridFsOperations operations = …; Publisher buffers = … Mono id = operations.store(buffers, "foo.xml"); Flux download = operations.getResource("foo.xml").flatMap(ReactiveGridFsResource::getDownloadStream); --- .../gridfs/ReactiveGridFsOperations.java | 204 ++++++ .../gridfs/ReactiveGridFsResource.java | 180 ++++++ .../gridfs/ReactiveGridFsTemplate.java | 585 ++++++++++++++++++ ...eactiveGridFsTemplateIntegrationTests.java | 140 +++++ .../test/resources/gridfs/reactive-gridfs.xml | 30 + 5 files changed, 1139 insertions(+) create mode 100644 spring-data-mongodb/src/main/java/org/springframework/data/mongodb/gridfs/ReactiveGridFsOperations.java create mode 100644 spring-data-mongodb/src/main/java/org/springframework/data/mongodb/gridfs/ReactiveGridFsResource.java create mode 100644 spring-data-mongodb/src/main/java/org/springframework/data/mongodb/gridfs/ReactiveGridFsTemplate.java create mode 100644 spring-data-mongodb/src/test/java/org/springframework/data/mongodb/gridfs/ReactiveGridFsTemplateIntegrationTests.java create mode 100644 spring-data-mongodb/src/test/resources/gridfs/reactive-gridfs.xml diff --git a/spring-data-mongodb/src/main/java/org/springframework/data/mongodb/gridfs/ReactiveGridFsOperations.java b/spring-data-mongodb/src/main/java/org/springframework/data/mongodb/gridfs/ReactiveGridFsOperations.java new file mode 100644 index 0000000000..c4fc184e60 --- /dev/null +++ b/spring-data-mongodb/src/main/java/org/springframework/data/mongodb/gridfs/ReactiveGridFsOperations.java @@ -0,0 +1,204 @@ +/* + * Copyright 2019 the original author or authors. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.springframework.data.mongodb.gridfs; + +import reactor.core.publisher.Flux; +import reactor.core.publisher.Mono; + +import org.bson.Document; +import org.bson.types.ObjectId; +import org.reactivestreams.Publisher; +import org.springframework.core.io.buffer.DataBuffer; +import org.springframework.core.io.support.ResourcePatternResolver; +import org.springframework.data.domain.Sort; +import org.springframework.data.mongodb.core.query.Query; +import org.springframework.lang.Nullable; + +import com.mongodb.client.gridfs.GridFSFindIterable; +import com.mongodb.client.gridfs.model.GridFSFile; +import com.mongodb.reactivestreams.client.gridfs.AsyncInputStream; + +/** + * Collection of operations to store and read files from MongoDB GridFS using reactive infrastructure. + * + * @author Mark Paluch + * @since 2.2 + */ +public interface ReactiveGridFsOperations { + + /** + * Stores the given content into a file with the given name. + * + * @param content must not be {@literal null}. + * @param filename must not be {@literal null} or empty. + * @return the {@link ObjectId} of the {@link com.mongodb.client.gridfs.model.GridFSFile} just created. + */ + default Mono store(Publisher content, String filename) { + return store(content, filename, (Object) null); + } + + /** + * Stores the given content into a file with the given name. + * + * @param content must not be {@literal null}. + * @param metadata can be {@literal null}. + * @return the {@link ObjectId} of the {@link com.mongodb.client.gridfs.model.GridFSFile} just created. + */ + default Mono store(Publisher content, @Nullable Object metadata) { + return store(content, null, metadata); + } + + /** + * Stores the given content into a file with the given name. + * + * @param content must not be {@literal null}. + * @param metadata can be {@literal null}. + * @return the {@link ObjectId} of the {@link com.mongodb.client.gridfs.model.GridFSFile} just created. + */ + default Mono store(Publisher content, @Nullable Document metadata) { + return store(content, null, metadata); + } + + /** + * Stores the given content into a file with the given name and content type. + * + * @param content must not be {@literal null}. + * @param filename must not be {@literal null} or empty. + * @param contentType can be {@literal null}. + * @return the {@link ObjectId} of the {@link com.mongodb.client.gridfs.model.GridFSFile} just created. + */ + default Mono store(Publisher content, @Nullable String filename, @Nullable String contentType) { + return store(content, filename, contentType, (Object) null); + } + + /** + * Stores the given content into a file with the given name using the given metadata. The metadata object will be + * marshalled before writing. + * + * @param content must not be {@literal null}. + * @param filename can be {@literal null} or empty. + * @param metadata can be {@literal null}. + * @return the {@link ObjectId} of the {@link com.mongodb.client.gridfs.model.GridFSFile} just created. + */ + default Mono store(Publisher content, @Nullable String filename, @Nullable Object metadata) { + return store(content, filename, null, metadata); + } + + /** + * Stores the given content into a file with the given name and content type using the given metadata. The metadata + * object will be marshalled before writing. + * + * @param content must not be {@literal null}. + * @param filename must not be {@literal null} or empty. + * @param contentType can be {@literal null}. + * @param metadata can be {@literal null} + * @return the {@link ObjectId} of the {@link com.mongodb.client.gridfs.model.GridFSFile} just created. + */ + Mono store(AsyncInputStream content, @Nullable String filename, @Nullable String contentType, + @Nullable Object metadata); + + /** + * Stores the given content into a file with the given name and content type using the given metadata. The metadata + * object will be marshalled before writing. + * + * @param content must not be {@literal null}. + * @param filename must not be {@literal null} or empty. + * @param contentType can be {@literal null}. + * @param metadata can be {@literal null} + * @return the {@link ObjectId} of the {@link com.mongodb.client.gridfs.model.GridFSFile} just created. + */ + Mono store(Publisher content, @Nullable String filename, @Nullable String contentType, + @Nullable Object metadata); + + /** + * Stores the given content into a file with the given name using the given metadata. + * + * @param content must not be {@literal null}. + * @param filename must not be {@literal null} or empty. + * @param metadata can be {@literal null}. + * @return the {@link ObjectId} of the {@link com.mongodb.client.gridfs.model.GridFSFile} just created. + */ + default Mono store(Publisher content, @Nullable String filename, @Nullable Document metadata) { + return store(content, filename, null, metadata); + } + + /** + * Stores the given content into a file with the given name and content type using the given metadata. + * + * @param content must not be {@literal null}. + * @param filename must not be {@literal null} or empty. + * @param contentType can be {@literal null}. + * @param metadata can be {@literal null}. + * @return the {@link ObjectId} of the {@link com.mongodb.client.gridfs.model.GridFSFile} just created. + */ + Mono store(AsyncInputStream content, @Nullable String filename, @Nullable String contentType, + @Nullable Document metadata); + + Mono store(Publisher content, String filename, String contentType, Document metadata); + + /** + * Returns all files matching the given query. Note, that currently {@link Sort} criterias defined at the + * {@link Query} will not be regarded as MongoDB does not support ordering for GridFS file access. + * + * @see MongoDB Jira: JAVA-431 + * @param query must not be {@literal null}. + * @return {@link GridFSFindIterable} to obtain results from. Eg. by calling + * {@link GridFSFindIterable#into(java.util.Collection)}. + */ + Flux find(Query query); + + /** + * Returns a single {@link com.mongodb.client.gridfs.model.GridFSFile} matching the given query or {@literal null} in + * case no file matches. + * + * @param query must not be {@literal null}. + * @return + */ + Mono findOne(Query query); + + /** + * Deletes all files matching the given {@link Query}. + * + * @param query must not be {@literal null}. + */ + Mono delete(Query query); + + /** + * Returns the {@link GridFsResource} with the given file name. + * + * @param filename must not be {@literal null}. + * @return the resource. Use {@link org.springframework.core.io.Resource#exists()} to check if the returned + * {@link GridFsResource} is actually present. + * @see ResourcePatternResolver#getResource(String) + */ + Mono getResource(String filename); + + /** + * Returns the {@link GridFsResource} for a {@link GridFSFile}. + * + * @param file must not be {@literal null}. + * @return the resource for the file. + */ + Mono getResource(GridFSFile file); + + /** + * Returns all {@link GridFsResource}s matching the given file name pattern. + * + * @param filenamePattern must not be {@literal null}. + * @return + */ + Flux getResources(String filenamePattern); +} diff --git a/spring-data-mongodb/src/main/java/org/springframework/data/mongodb/gridfs/ReactiveGridFsResource.java b/spring-data-mongodb/src/main/java/org/springframework/data/mongodb/gridfs/ReactiveGridFsResource.java new file mode 100644 index 0000000000..eb787cbd50 --- /dev/null +++ b/spring-data-mongodb/src/main/java/org/springframework/data/mongodb/gridfs/ReactiveGridFsResource.java @@ -0,0 +1,180 @@ +/* + * Copyright 2019 the original author or authors. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.springframework.data.mongodb.gridfs; + +import reactor.core.publisher.Flux; + +import java.io.ByteArrayInputStream; +import java.io.FileNotFoundException; +import java.io.IOException; +import java.io.InputStream; + +import org.reactivestreams.Publisher; +import org.springframework.core.io.AbstractResource; +import org.springframework.core.io.Resource; +import org.springframework.core.io.buffer.DataBuffer; +import org.springframework.lang.Nullable; +import org.springframework.util.Assert; + +import com.mongodb.client.gridfs.model.GridFSFile; + +/** + * Reactive {@link GridFSFile} based {@link Resource} implementation. + * + * @author Mark Paluch + * @since 2.2 + */ +public class ReactiveGridFsResource extends AbstractResource { + + static final String CONTENT_TYPE_FIELD = "_contentType"; + private static final ByteArrayInputStream EMPTY_INPUT_STREAM = new ByteArrayInputStream(new byte[0]); + + private final @Nullable GridFSFile file; + private final String filename; + private final Flux content; + + /** + * Creates a new, absent {@link ReactiveGridFsResource}. + * + * @param filename filename of the absent resource. + * @param content + * @since 2.1 + */ + private ReactiveGridFsResource(String filename, Publisher content) { + + this.file = null; + this.filename = filename; + this.content = Flux.from(content); + } + + /** + * Creates a new {@link ReactiveGridFsResource} from the given {@link GridFSFile}. + * + * @param file must not be {@literal null}. + * @param content + */ + public ReactiveGridFsResource(GridFSFile file, Publisher content) { + + this.file = file; + this.filename = file.getFilename(); + this.content = Flux.from(content); + } + + /** + * Obtain an absent {@link ReactiveGridFsResource}. + * + * @param filename filename of the absent resource, must not be {@literal null}. + * @return never {@literal null}. + * @since 2.1 + */ + public static ReactiveGridFsResource absent(String filename) { + + Assert.notNull(filename, "Filename must not be null"); + + return new ReactiveGridFsResource(filename, Flux.empty()); + } + + /* + * (non-Javadoc) + * @see org.springframework.core.io.InputStreamResource#getInputStream() + */ + @Override + public InputStream getInputStream() throws IllegalStateException { + throw new UnsupportedOperationException(); + } + + /* + * (non-Javadoc) + * @see org.springframework.core.io.AbstractResource#contentLength() + */ + @Override + public long contentLength() throws IOException { + + verifyExists(); + return file.getLength(); + } + + /* + * (non-Javadoc) + * @see org.springframework.core.io.AbstractResource#getFilename() + */ + @Override + public String getFilename() throws IllegalStateException { + return filename; + } + + /* + * (non-Javadoc) + * @see org.springframework.core.io.AbstractResource#exists() + */ + @Override + public boolean exists() { + return file != null; + } + + /* + * (non-Javadoc) + * @see org.springframework.core.io.AbstractResource#lastModified() + */ + @Override + public long lastModified() throws IOException { + + verifyExists(); + return file.getUploadDate().getTime(); + } + + /* + * (non-Javadoc) + * @see org.springframework.core.io.AbstractResource#getDescription() + */ + @Override + public String getDescription() { + return String.format("GridFs resource [%s]", this.getFilename()); + } + + /** + * Returns the {@link Resource}'s id. + * + * @return never {@literal null}. + * @throws IllegalStateException if the file does not {@link #exists()}. + */ + public Object getId() { + + Assert.state(exists(), () -> String.format("%s does not exist.", getDescription())); + + return file.getId(); + } + + /** + * Retrieve the download stream. + * + * @return + */ + public Flux getDownloadStream() { + + if (!exists()) { + return Flux.error(new FileNotFoundException(String.format("%s does not exist.", getDescription()))); + } + return content; + } + + private void verifyExists() throws FileNotFoundException { + + if (!exists()) { + throw new FileNotFoundException(String.format("%s does not exist.", getDescription())); + } + } +} diff --git a/spring-data-mongodb/src/main/java/org/springframework/data/mongodb/gridfs/ReactiveGridFsTemplate.java b/spring-data-mongodb/src/main/java/org/springframework/data/mongodb/gridfs/ReactiveGridFsTemplate.java new file mode 100644 index 0000000000..d88ef1dfdd --- /dev/null +++ b/spring-data-mongodb/src/main/java/org/springframework/data/mongodb/gridfs/ReactiveGridFsTemplate.java @@ -0,0 +1,585 @@ +/* + * Copyright 2019 the original author or authors. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.springframework.data.mongodb.gridfs; + +import static org.springframework.data.mongodb.core.query.Query.*; +import static org.springframework.data.mongodb.gridfs.GridFsCriteria.*; + +import lombok.RequiredArgsConstructor; +import reactor.core.CoreSubscriber; +import reactor.core.publisher.Flux; +import reactor.core.publisher.FluxSink; +import reactor.core.publisher.Mono; +import reactor.core.publisher.Operators; +import reactor.util.concurrent.Queues; +import reactor.util.context.Context; + +import java.nio.ByteBuffer; +import java.util.Optional; +import java.util.Queue; +import java.util.concurrent.atomic.AtomicBoolean; +import java.util.concurrent.atomic.AtomicLongFieldUpdater; +import java.util.function.BiConsumer; +import java.util.function.Function; + +import org.bson.Document; +import org.bson.types.ObjectId; +import org.reactivestreams.Publisher; +import org.reactivestreams.Subscription; +import org.springframework.core.io.buffer.DataBuffer; +import org.springframework.core.io.buffer.DataBufferFactory; +import org.springframework.core.io.buffer.DataBufferUtils; +import org.springframework.core.io.buffer.DefaultDataBufferFactory; +import org.springframework.data.mongodb.ReactiveMongoDatabaseFactory; +import org.springframework.data.mongodb.core.convert.MongoConverter; +import org.springframework.data.mongodb.core.convert.QueryMapper; +import org.springframework.data.mongodb.core.query.Query; +import org.springframework.lang.Nullable; +import org.springframework.util.Assert; +import org.springframework.util.StringUtils; + +import com.mongodb.client.gridfs.model.GridFSFile; +import com.mongodb.client.gridfs.model.GridFSUploadOptions; +import com.mongodb.reactivestreams.client.MongoDatabase; +import com.mongodb.reactivestreams.client.Success; +import com.mongodb.reactivestreams.client.gridfs.AsyncInputStream; +import com.mongodb.reactivestreams.client.gridfs.GridFSBucket; +import com.mongodb.reactivestreams.client.gridfs.GridFSBuckets; +import com.mongodb.reactivestreams.client.gridfs.GridFSDownloadStream; +import com.mongodb.reactivestreams.client.gridfs.GridFSFindPublisher; + +/** + * {@link ReactiveGridFsOperations} implementation to store content into MongoDB GridFS. Uses by default + * {@link DefaultDataBufferFactory} to create {@link DataBuffer buffers}. + * + * @author Mark Paluch + * @since 2.2 + */ +public class ReactiveGridFsTemplate implements ReactiveGridFsOperations { + + private final DataBufferFactory dataBufferFactory; + private final ReactiveMongoDatabaseFactory dbFactory; + private final @Nullable String bucket; + private final MongoConverter converter; + private final QueryMapper queryMapper; + + /** + * Creates a new {@link ReactiveGridFsTemplate} using the given {@link ReactiveMongoDatabaseFactory} and + * {@link MongoConverter}. + * + * @param dbFactory must not be {@literal null}. + * @param converter must not be {@literal null}. + */ + public ReactiveGridFsTemplate(ReactiveMongoDatabaseFactory dbFactory, MongoConverter converter) { + this(dbFactory, converter, null); + } + + /** + * Creates a new {@link ReactiveGridFsTemplate} using the given {@link ReactiveMongoDatabaseFactory} and + * {@link MongoConverter}. + * + * @param dbFactory must not be {@literal null}. + * @param converter must not be {@literal null}. + * @param bucket + */ + public ReactiveGridFsTemplate(ReactiveMongoDatabaseFactory dbFactory, MongoConverter converter, + @Nullable String bucket) { + this(new DefaultDataBufferFactory(), dbFactory, converter, bucket); + } + + /** + * Creates a new {@link ReactiveGridFsTemplate} using the given {@link DataBufferFactory}, + * {@link ReactiveMongoDatabaseFactory} and {@link MongoConverter}. + * + * @param dataBufferFactory must not be {@literal null}. + * @param dbFactory must not be {@literal null}. + * @param converter must not be {@literal null}. + * @param bucket + */ + public ReactiveGridFsTemplate(DataBufferFactory dataBufferFactory, ReactiveMongoDatabaseFactory dbFactory, + MongoConverter converter, @Nullable String bucket) { + + Assert.notNull(dataBufferFactory, "DataBufferFactory must not be null!"); + Assert.notNull(dbFactory, "ReactiveMongoDatabaseFactory must not be null!"); + Assert.notNull(converter, "MongoConverter must not be null!"); + + this.dataBufferFactory = dataBufferFactory; + this.dbFactory = dbFactory; + this.converter = converter; + this.bucket = bucket; + + this.queryMapper = new QueryMapper(converter); + } + + /* + * (non-Javadoc) + * @see org.springframework.data.mongodb.gridfs.ReactiveGridFsOperations#store(com.mongodb.reactivestreams.client.gridfs.AsyncInputStream, java.lang.String, java.lang.String, java.lang.Object) + */ + @Override + public Mono store(AsyncInputStream content, @Nullable String filename, @Nullable String contentType, + @Nullable Object metadata) { + return store(content, filename, contentType, toDocument(metadata)); + } + + /* + * (non-Javadoc) + * @see org.springframework.data.mongodb.gridfs.ReactiveGridFsOperations#store(org.reactivestreams.Publisher, java.lang.String, java.lang.String, java.lang.Object) + */ + @Override + public Mono store(Publisher content, @Nullable String filename, @Nullable String contentType, + @Nullable Object metadata) { + return store(content, filename, contentType, toDocument(metadata)); + } + + /* + * (non-Javadoc) + * @see org.springframework.data.mongodb.gridfs.ReactiveGridFsOperations#store(com.mongodb.reactivestreams.client.gridfs.AsyncInputStream, java.lang.String, java.lang.String, org.bson.Document) + */ + @Override + public Mono store(AsyncInputStream content, @Nullable String filename, @Nullable String contentType, + @Nullable Document metadata) { + + Assert.notNull(content, "InputStream must not be null!"); + + GridFSUploadOptions options = new GridFSUploadOptions(); + + Document mData = new Document(); + + if (StringUtils.hasText(contentType)) { + mData.put(GridFsResource.CONTENT_TYPE_FIELD, contentType); + } + + if (metadata != null) { + mData.putAll(metadata); + } + + options.metadata(mData); + + return Mono.from(getGridFs().uploadFromStream(filename, content, options)); + } + + /* + * (non-Javadoc) + * @see org.springframework.data.mongodb.gridfs.ReactiveGridFsOperations#store(org.reactivestreams.Publisher, java.lang.String, java.lang.String, org.bson.Document) + */ + @Override + public Mono store(Publisher content, @Nullable String filename, @Nullable String contentType, + @Nullable Document metadata) { + + Assert.notNull(content, "Content must not be null!"); + + return Mono.> create(sink -> { + + BinaryPublisherToAsyncInputStreamAdapter adapter = new BinaryPublisherToAsyncInputStreamAdapter(content, + sink.currentContext()); + + Mono store = store(adapter, filename, contentType, metadata); + + sink.success(store); + }).flatMap(Function.identity()); + } + + /* + * (non-Javadoc) + * @see org.springframework.data.mongodb.gridfs.ReactiveGridFsOperations#find(org.springframework.data.mongodb.core.query.Query) + */ + @Override + public Flux find(Query query) { + + GridFSFindPublisher publisherToUse = prepareQuery(query); + + return Flux.from(publisherToUse); + } + + /* + * (non-Javadoc) + * @see org.springframework.data.mongodb.gridfs.ReactiveGridFsOperations#findOne(org.springframework.data.mongodb.core.query.Query) + */ + @Override + public Mono findOne(Query query) { + + GridFSFindPublisher publisherToUse = prepareQuery(query); + + return Flux.from(publisherToUse.limit(1)).next(); + } + + /* + * (non-Javadoc) + * @see org.springframework.data.mongodb.gridfs.ReactiveGridFsOperations#delete(org.springframework.data.mongodb.core.query.Query) + */ + @Override + public Mono delete(Query query) { + + GridFSBucket gridFs = getGridFs(); + return find(query).flatMap(it -> gridFs.delete(it.getId())).then(); + } + + /* + * (non-Javadoc) + * @see org.springframework.data.mongodb.gridfs.ReactiveGridFsOperations#getResource(java.lang.String) + */ + @Override + public Mono getResource(String location) { + + Assert.notNull(location, "Filename must not be null!"); + + return findOne(query(whereFilename().is(location))).flatMap(this::getResource) + .defaultIfEmpty(ReactiveGridFsResource.absent(location)); + } + + /* + * (non-Javadoc) + * @see org.springframework.data.mongodb.gridfs.ReactiveGridFsOperations#getResource(com.mongodb.client.gridfs.model.GridFSFile) + */ + @Override + public Mono getResource(GridFSFile file) { + + Assert.notNull(file, "GridFSFile must not be null!"); + + return Mono.fromSupplier(() -> { + + GridFSDownloadStream stream = getGridFs().openDownloadStream(file.getObjectId()); + + return new ReactiveGridFsResource(file, BinaryStreamUtility.createBinaryStream(dataBufferFactory, stream)); + }); + } + + /* + * (non-Javadoc) + * @see org.springframework.data.mongodb.gridfs.ReactiveGridFsOperations#getResources(java.lang.String) + */ + @Override + public Flux getResources(String locationPattern) { + + if (!StringUtils.hasText(locationPattern)) { + return Flux.empty(); + } + + AntPath path = new AntPath(locationPattern); + + if (path.isPattern()) { + + Flux files = find(query(whereFilename().regex(path.toRegex()))); + return files.flatMap(this::getResource); + } + + return getResource(locationPattern).flux(); + } + + protected GridFSFindPublisher prepareQuery(Query query) { + + Assert.notNull(query, "Query must not be null!"); + + Document queryObject = getMappedQuery(query.getQueryObject()); + Document sortObject = getMappedQuery(query.getSortObject()); + + GridFSFindPublisher publisherToUse = getGridFs().find(queryObject).sort(sortObject); + + Integer cursorBatchSize = query.getMeta().getCursorBatchSize(); + if (cursorBatchSize != null) { + publisherToUse = publisherToUse.batchSize(cursorBatchSize); + } + + return publisherToUse; + } + + private Document getMappedQuery(Document query) { + return queryMapper.getMappedObject(query, Optional.empty()); + } + + protected GridFSBucket getGridFs() { + + MongoDatabase db = dbFactory.getMongoDatabase(); + return bucket == null ? GridFSBuckets.create(db) : GridFSBuckets.create(db, bucket); + } + + @Nullable + private Document toDocument(@Nullable Object metadata) { + + Document document = null; + + if (metadata != null) { + document = new Document(); + converter.write(metadata, document); + } + return document; + } + + /** + * Adapter accepting a binary stream {@link Publisher} and transforming its contents into a {@link AsyncInputStream}. + *

+ * This adapter subscribes to the binary {@link Publisher} as soon as the first chunk gets {@link #read(ByteBuffer) + * requested}. Requests are queued and binary chunks are requested from the {@link Publisher}. As soon as the + * {@link Publisher} emits items, chunks are provided to the read request which completes the number-of-written-bytes + * {@link Publisher}. + *

+ * {@link AsyncInputStream} is supposed to work as sequential callback API that is called until reaching EOF. + * {@link #close()} is propagated as cancellation signal to the binary {@link Publisher}. + * + * @author Mark Paluch + */ + @RequiredArgsConstructor + static class BinaryPublisherToAsyncInputStreamAdapter implements AsyncInputStream { + + private static final AtomicLongFieldUpdater UPDATER = AtomicLongFieldUpdater + .newUpdater(BinaryPublisherToAsyncInputStreamAdapter.class, "demand"); + + private final Publisher buffers; + private final Context subscriberContext; + private final DefaultDataBufferFactory factory = new DefaultDataBufferFactory(); + + private final AtomicBoolean subscribed = new AtomicBoolean(); + private volatile Subscription subscription; + private volatile boolean cancelled; + private volatile boolean complete; + private volatile Throwable error; + private final Queue> readRequests = Queues.> small() + .get(); + + // See UPDATER + private volatile long demand; + + /* + * (non-Javadoc) + * @see com.mongodb.reactivestreams.client.gridfs.AsyncInputStream#read(java.nio.ByteBuffer) + */ + @Override + public Publisher read(ByteBuffer dst) { + + return Mono.create(sink -> { + + readRequests.offer((db, bytecount) -> { + + try { + + if (error != null) { + sink.error(error); + return; + } + + if (bytecount == -1) { + sink.success(-1); + return; + } + + ByteBuffer byteBuffer = db.asByteBuffer(); + int toWrite = byteBuffer.remaining(); + dst.put(byteBuffer); + + sink.success(toWrite); + } catch (Exception e) { + sink.error(e); + } finally { + DataBufferUtils.release(db); + } + }); + + request(1); + }); + } + + /* + * (non-Javadoc) + * @see com.mongodb.reactivestreams.client.gridfs.AsyncInputStream#close() + */ + @Override + public Publisher close() { + + return Mono.create(sink -> { + cancelled = true; + + if (error != null) { + sink.error(error); + return; + } + + sink.success(Success.SUCCESS); + }); + } + + protected void request(int n) { + + if (complete) { + terminatePendingReads(); + return; + } + + Operators.addCap(UPDATER, this, n); + + if (!subscribed.get()) { + + if (subscribed.compareAndSet(false, true)) { + + buffers.subscribe(new CoreSubscriber() { + + @Override + public Context currentContext() { + return subscriberContext; + } + + @Override + public void onSubscribe(Subscription s) { + subscription = s; + + Operators.addCap(UPDATER, BinaryPublisherToAsyncInputStreamAdapter.this, -1); + s.request(1); + } + + @Override + public void onNext(DataBuffer dataBuffer) { + + if (cancelled || complete) { + DataBufferUtils.release(dataBuffer); + Operators.onNextDropped(dataBuffer, subscriberContext); + return; + } + + BiConsumer poll = readRequests.poll(); + + if (poll == null) { + + DataBufferUtils.release(dataBuffer); + Operators.onNextDropped(dataBuffer, subscriberContext); + subscription.cancel(); + return; + } + + poll.accept(dataBuffer, dataBuffer.readableByteCount()); + + requestFromSubscription(subscription); + } + + @Override + public void onError(Throwable t) { + + if (cancelled || complete) { + Operators.onErrorDropped(t, subscriberContext); + return; + } + + error = t; + complete = true; + terminatePendingReads(); + } + + @Override + public void onComplete() { + + complete = true; + terminatePendingReads(); + } + }); + } + } else { + + Subscription subscription = this.subscription; + + if (subscription != null) { + requestFromSubscription(subscription); + } + } + } + + void requestFromSubscription(Subscription subscription) { + + long demand = UPDATER.get(BinaryPublisherToAsyncInputStreamAdapter.this); + + if (cancelled) { + subscription.cancel(); + } + + if (demand > 0 && UPDATER.compareAndSet(BinaryPublisherToAsyncInputStreamAdapter.this, demand, demand - 1)) { + subscription.request(1); + } + } + + /** + * Terminates pending reads with empty buffers. + */ + void terminatePendingReads() { + + BiConsumer readers; + + while ((readers = readRequests.poll()) != null) { + readers.accept(factory.wrap(new byte[0]), -1); + } + } + } + + /** + * Utility to adapt a {@link AsyncInputStream} to a {@link Publisher} emitting {@link DataBuffer}. + */ + static class BinaryStreamUtility { + + /** + * Creates a {@link Publisher} emitting {@link DataBuffer}s. + * + * @param dataBufferFactory must not be {@literal null}. + * @param inputStream must not be {@literal null}. + * @return the resulting {@link Publisher}. + */ + public static Flux createBinaryStream(DataBufferFactory dataBufferFactory, + AsyncInputStream inputStream) { + + AtomicBoolean closed = new AtomicBoolean(); + + return Flux.push((sink) -> { + + sink.onDispose(() -> { + close(inputStream, closed); + }); + + sink.onCancel(() -> { + close(inputStream, closed); + }); + + emitNext(dataBufferFactory, inputStream, sink); + }); + } + + /** + * Emit the next {@link DataBuffer}. + * + * @param dataBufferFactory + * @param inputStream + * @param sink + */ + static void emitNext(DataBufferFactory dataBufferFactory, AsyncInputStream inputStream, FluxSink sink) { + + DataBuffer dataBuffer = dataBufferFactory.allocateBuffer(); + ByteBuffer intermediate = ByteBuffer.allocate(dataBuffer.capacity()); + + Mono.from(inputStream.read(intermediate)).subscribe(bytes -> { + + intermediate.flip(); + dataBuffer.write(intermediate); + sink.next(dataBuffer); + + if (bytes == -1) { + sink.complete(); + } else { + emitNext(dataBufferFactory, inputStream, sink); + } + }, sink::error); + } + + static void close(AsyncInputStream inputStream, AtomicBoolean closed) { + if (closed.compareAndSet(false, true)) { + Mono.from(inputStream.close()).subscribe(); + } + } + } +} diff --git a/spring-data-mongodb/src/test/java/org/springframework/data/mongodb/gridfs/ReactiveGridFsTemplateIntegrationTests.java b/spring-data-mongodb/src/test/java/org/springframework/data/mongodb/gridfs/ReactiveGridFsTemplateIntegrationTests.java new file mode 100644 index 0000000000..dea663eb51 --- /dev/null +++ b/spring-data-mongodb/src/test/java/org/springframework/data/mongodb/gridfs/ReactiveGridFsTemplateIntegrationTests.java @@ -0,0 +1,140 @@ +/* + * Copyright 2019 the original author or authors. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.springframework.data.mongodb.gridfs; + +import static org.assertj.core.api.Assertions.*; +import static org.springframework.data.mongodb.core.query.Criteria.*; +import static org.springframework.data.mongodb.core.query.Query.*; +import static org.springframework.data.mongodb.gridfs.GridFsCriteria.*; + +import reactor.core.publisher.Flux; +import reactor.test.StepVerifier; + +import java.io.IOException; + +import org.bson.BsonObjectId; +import org.bson.Document; +import org.bson.types.ObjectId; +import org.junit.Before; +import org.junit.Test; +import org.junit.runner.RunWith; +import org.springframework.beans.factory.annotation.Autowired; +import org.springframework.core.io.ClassPathResource; +import org.springframework.core.io.Resource; +import org.springframework.core.io.buffer.DataBufferUtils; +import org.springframework.core.io.buffer.DefaultDataBuffer; +import org.springframework.core.io.buffer.DefaultDataBufferFactory; +import org.springframework.data.mongodb.core.query.Query; +import org.springframework.test.context.ContextConfiguration; +import org.springframework.test.context.junit4.SpringRunner; +import org.springframework.util.StreamUtils; + +import com.mongodb.reactivestreams.client.gridfs.AsyncInputStream; +import com.mongodb.reactivestreams.client.gridfs.helpers.AsyncStreamHelper; + +/** + * @author Mark Paluch + */ +@RunWith(SpringRunner.class) +@ContextConfiguration("classpath:gridfs/reactive-gridfs.xml") +public class ReactiveGridFsTemplateIntegrationTests { + + Resource resource = new ClassPathResource("gridfs/gridfs.xml"); + + @Autowired ReactiveGridFsOperations operations; + + @Before + public void setUp() { + operations.delete(new Query()) // + .as(StepVerifier::create) // + .verifyComplete(); + } + + @Test // DATAMONGO-1855 + public void storesAndFindsSimpleDocument() { + + DefaultDataBufferFactory factory = new DefaultDataBufferFactory(); + DefaultDataBuffer first = factory.wrap("first".getBytes()); + DefaultDataBuffer second = factory.wrap("second".getBytes()); + + ObjectId reference = operations.store(Flux.just(first, second), "foo.xml").block(); + + operations.find(query(where("_id").is(reference))) // + .as(StepVerifier::create) // + .assertNext(actual -> { + assertThat(((BsonObjectId) actual.getId()).getValue()).isEqualTo(reference); + }).verifyComplete(); + } + + @Test // DATAMONGO-1855 + public void writesMetadataCorrectly() throws IOException { + + Document metadata = new Document("key", "value"); + + AsyncInputStream stream = AsyncStreamHelper.toAsyncInputStream(resource.getInputStream()); + + ObjectId reference = operations.store(stream, "foo.xml", "binary/octet-stream", metadata).block(); + + operations.find(query(whereMetaData("key").is("value"))) // + .as(StepVerifier::create) // + .consumeNextWith(actual -> { + assertThat(actual.getObjectId()).isEqualTo(reference); + })// + .verifyComplete(); + } + + @Test // DATAMONGO-1855 + public void marshalsComplexMetadata() throws IOException { + + Metadata metadata = new Metadata(); + metadata.version = "1.0"; + + AsyncInputStream stream = AsyncStreamHelper.toAsyncInputStream(resource.getInputStream()); + + ObjectId reference = operations.store(stream, "foo.xml", "binary/octet-stream", metadata).block(); + + operations.find(query(whereMetaData("version").is("1.0"))) // + .as(StepVerifier::create) // + .consumeNextWith(actual -> { + assertThat(actual.getObjectId()).isEqualTo(reference); + })// + .verifyComplete(); + } + + @Test // DATAMONGO-1855 + public void getResourceShouldRetrieveContentByIdentity() throws IOException { + + byte[] content = StreamUtils.copyToByteArray(resource.getInputStream()); + AsyncInputStream upload = AsyncStreamHelper.toAsyncInputStream(resource.getInputStream()); + + ObjectId reference = operations.store(upload, "foo.xml", null, null).block(); + + operations.findOne(query(where("_id").is(reference))).flatMap(operations::getResource) + .flatMapMany(ReactiveGridFsResource::getDownloadStream) // + .transform(DataBufferUtils::join).as(StepVerifier::create) // + .consumeNextWith(dataBuffer -> { + + byte[] actual = new byte[dataBuffer.readableByteCount()]; + dataBuffer.read(actual); + + assertThat(actual).isEqualTo(content); + }).verifyComplete(); + } + + static class Metadata { + String version; + } +} diff --git a/spring-data-mongodb/src/test/resources/gridfs/reactive-gridfs.xml b/spring-data-mongodb/src/test/resources/gridfs/reactive-gridfs.xml new file mode 100644 index 0000000000..f07ee8b08d --- /dev/null +++ b/spring-data-mongodb/src/test/resources/gridfs/reactive-gridfs.xml @@ -0,0 +1,30 @@ + + + + + + + + + + + + + + + + + + + + + + + + + + + + From 87f1f16772f0c1b02e530d4c56571799debd68da Mon Sep 17 00:00:00 2001 From: Mark Paluch Date: Fri, 18 Jan 2019 11:07:05 +0100 Subject: [PATCH 3/5] DATAMONGO-1855 - Refactoring. Extract utility methods to BinaryStreamAdapters. Move inner adapter classes to package-protected top-level types. --- .../gridfs/AsyncInputStreamAdapter.java | 248 +++++++++++++++ .../mongodb/gridfs/BinaryStreamAdapters.java | 78 +++++ .../gridfs/DataBufferPublisherAdapter.java | 205 ++++++++++++ .../gridfs/ReactiveGridFsTemplate.java | 292 +----------------- .../gridfs/BinaryStreamAdaptersUnitTests.java | 102 ++++++ 5 files changed, 635 insertions(+), 290 deletions(-) create mode 100644 spring-data-mongodb/src/main/java/org/springframework/data/mongodb/gridfs/AsyncInputStreamAdapter.java create mode 100644 spring-data-mongodb/src/main/java/org/springframework/data/mongodb/gridfs/BinaryStreamAdapters.java create mode 100644 spring-data-mongodb/src/main/java/org/springframework/data/mongodb/gridfs/DataBufferPublisherAdapter.java create mode 100644 spring-data-mongodb/src/test/java/org/springframework/data/mongodb/gridfs/BinaryStreamAdaptersUnitTests.java diff --git a/spring-data-mongodb/src/main/java/org/springframework/data/mongodb/gridfs/AsyncInputStreamAdapter.java b/spring-data-mongodb/src/main/java/org/springframework/data/mongodb/gridfs/AsyncInputStreamAdapter.java new file mode 100644 index 0000000000..e29f344cec --- /dev/null +++ b/spring-data-mongodb/src/main/java/org/springframework/data/mongodb/gridfs/AsyncInputStreamAdapter.java @@ -0,0 +1,248 @@ +/* + * Copyright 2019 the original author or authors. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.springframework.data.mongodb.gridfs; + +import lombok.RequiredArgsConstructor; +import reactor.core.CoreSubscriber; +import reactor.core.publisher.Mono; +import reactor.core.publisher.Operators; +import reactor.util.concurrent.Queues; +import reactor.util.context.Context; + +import java.nio.ByteBuffer; +import java.util.Queue; +import java.util.concurrent.atomic.AtomicIntegerFieldUpdater; +import java.util.concurrent.atomic.AtomicLongFieldUpdater; +import java.util.function.BiConsumer; + +import org.reactivestreams.Publisher; +import org.reactivestreams.Subscription; +import org.springframework.core.io.buffer.DataBuffer; +import org.springframework.core.io.buffer.DataBufferUtils; +import org.springframework.core.io.buffer.DefaultDataBufferFactory; + +import com.mongodb.reactivestreams.client.Success; +import com.mongodb.reactivestreams.client.gridfs.AsyncInputStream; + +/** + * Adapter accepting a binary stream {@link Publisher} and emitting its through {@link AsyncInputStream}. + *

+ * This adapter subscribes to the binary {@link Publisher} as soon as the first chunk gets {@link #read(ByteBuffer) + * requested}. Requests are queued and binary chunks are requested from the {@link Publisher}. As soon as the + * {@link Publisher} emits items, chunks are provided to the read request which completes the number-of-written-bytes + * {@link Publisher}. + *

+ * {@link AsyncInputStream} is supposed to work as sequential callback API that is called until reaching EOF. + * {@link #close()} is propagated as cancellation signal to the binary {@link Publisher}. + * + * @author Mark Paluch + * @since 2.2 + */ +@RequiredArgsConstructor +class AsyncInputStreamAdapter implements AsyncInputStream { + + private static final AtomicLongFieldUpdater DEMAND = AtomicLongFieldUpdater + .newUpdater(AsyncInputStreamAdapter.class, "demand"); + + private static final AtomicIntegerFieldUpdater SUBSCRIBED = AtomicIntegerFieldUpdater + .newUpdater(AsyncInputStreamAdapter.class, "subscribed"); + + private static final int SUBSCRIPTION_NOT_SUBSCRIBED = 0; + private static final int SUBSCRIPTION_SUBSCRIBED = 1; + + private final Publisher buffers; + private final Context subscriberContext; + private final DefaultDataBufferFactory factory = new DefaultDataBufferFactory(); + + private volatile Subscription subscription; + private volatile boolean cancelled; + private volatile boolean complete; + private volatile Throwable error; + private final Queue> readRequests = Queues.> small() + .get(); + + // see DEMAND + private volatile long demand; + + // see SUBSCRIBED + private volatile int subscribed = SUBSCRIPTION_NOT_SUBSCRIBED; + + /* + * (non-Javadoc) + * @see com.mongodb.reactivestreams.client.gridfs.AsyncInputStream#read(java.nio.ByteBuffer) + */ + @Override + public Publisher read(ByteBuffer dst) { + + return Mono.create(sink -> { + + readRequests.offer((db, bytecount) -> { + + try { + + if (error != null) { + sink.error(error); + return; + } + + if (bytecount == -1) { + sink.success(-1); + return; + } + + ByteBuffer byteBuffer = db.asByteBuffer(); + int toWrite = byteBuffer.remaining(); + dst.put(byteBuffer); + + sink.success(toWrite); + } catch (Exception e) { + sink.error(e); + } finally { + DataBufferUtils.release(db); + } + }); + + request(1); + }); + } + + /* + * (non-Javadoc) + * @see com.mongodb.reactivestreams.client.gridfs.AsyncInputStream#close() + */ + @Override + public Publisher close() { + + return Mono.create(sink -> { + cancelled = true; + + if (error != null) { + sink.error(error); + return; + } + + sink.success(Success.SUCCESS); + }); + } + + protected void request(int n) { + + if (complete) { + terminatePendingReads(); + return; + } + + Operators.addCap(DEMAND, this, n); + + if (SUBSCRIBED.get(this) == SUBSCRIPTION_NOT_SUBSCRIBED) { + + if (SUBSCRIBED.compareAndSet(this, SUBSCRIPTION_NOT_SUBSCRIBED, SUBSCRIPTION_SUBSCRIBED)) { + + buffers.subscribe(new CoreSubscriber() { + + @Override + public Context currentContext() { + return subscriberContext; + } + + @Override + public void onSubscribe(Subscription s) { + subscription = s; + + Operators.addCap(DEMAND, AsyncInputStreamAdapter.this, -1); + s.request(1); + } + + @Override + public void onNext(DataBuffer dataBuffer) { + + if (cancelled || complete) { + DataBufferUtils.release(dataBuffer); + Operators.onNextDropped(dataBuffer, subscriberContext); + return; + } + + BiConsumer poll = readRequests.poll(); + + if (poll == null) { + + DataBufferUtils.release(dataBuffer); + Operators.onNextDropped(dataBuffer, subscriberContext); + subscription.cancel(); + return; + } + + poll.accept(dataBuffer, dataBuffer.readableByteCount()); + + requestFromSubscription(subscription); + } + + @Override + public void onError(Throwable t) { + + if (cancelled || complete) { + Operators.onErrorDropped(t, subscriberContext); + return; + } + + error = t; + complete = true; + terminatePendingReads(); + } + + @Override + public void onComplete() { + + complete = true; + terminatePendingReads(); + } + }); + } + } else { + + Subscription subscription = this.subscription; + + if (subscription != null) { + requestFromSubscription(subscription); + } + } + } + + void requestFromSubscription(Subscription subscription) { + + long demand = DEMAND.get(AsyncInputStreamAdapter.this); + + if (cancelled) { + subscription.cancel(); + } + + if (demand > 0 && DEMAND.compareAndSet(AsyncInputStreamAdapter.this, demand, demand - 1)) { + subscription.request(1); + } + } + + /** + * Terminates pending reads with empty buffers. + */ + void terminatePendingReads() { + + BiConsumer readers; + + while ((readers = readRequests.poll()) != null) { + readers.accept(factory.wrap(new byte[0]), -1); + } + } +} diff --git a/spring-data-mongodb/src/main/java/org/springframework/data/mongodb/gridfs/BinaryStreamAdapters.java b/spring-data-mongodb/src/main/java/org/springframework/data/mongodb/gridfs/BinaryStreamAdapters.java new file mode 100644 index 0000000000..ae30151bd2 --- /dev/null +++ b/spring-data-mongodb/src/main/java/org/springframework/data/mongodb/gridfs/BinaryStreamAdapters.java @@ -0,0 +1,78 @@ +/* + * Copyright 2019 the original author or authors. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.springframework.data.mongodb.gridfs; + +import reactor.core.publisher.Flux; +import reactor.core.publisher.Mono; + +import org.reactivestreams.Publisher; +import org.springframework.core.io.buffer.DataBuffer; +import org.springframework.core.io.buffer.DataBufferFactory; +import org.springframework.core.io.buffer.DataBufferUtils; + +import com.mongodb.reactivestreams.client.gridfs.AsyncInputStream; + +/** + * Utility methods to create adapters from between {@link Publisher} of {@link DataBuffer} and {@link AsyncInputStream}. + * + * @author Mark Paluch + * @since 2.2 + */ +class BinaryStreamAdapters { + + /** + * Creates a {@link Flux} emitting {@link DataBuffer} by reading binary chunks from {@link AsyncInputStream}. + * Publisher termination (completion, error, cancellation) closes the {@link AsyncInputStream}. + *

+ * The resulting {@link org.reactivestreams.Publisher} filters empty binary chunks and uses {@link DataBufferFactory} + * settings to determine the chunk size. + * + * @param inputStream must not be {@literal null}. + * @param dataBufferFactory must not be {@literal null}. + * @return {@link Flux} emitting {@link DataBuffer}s. + * @see DataBufferFactory#allocateBuffer() + */ + static Flux toPublisher(AsyncInputStream inputStream, DataBufferFactory dataBufferFactory) { + + return DataBufferPublisherAdapter.createBinaryStream(inputStream, dataBufferFactory) // + .filter(it -> { + + if (it.readableByteCount() == 0) { + DataBufferUtils.release(it); + return false; + } + return true; + }); + } + + /** + * Creates a {@link Mono} emitting a {@link AsyncInputStream} to consume a {@link Publisher} emitting + * {@link DataBuffer} and exposing the binary stream through {@link AsyncInputStream}. {@link DataBuffer}s are + * released by the adapter during consumption. + *

+ * This method returns a {@link Mono} to retain the {@link reactor.util.context.Context subscriber context}. + * + * @param dataBuffers must not be {@literal null}. + * @return {@link Mono} emitting {@link AsyncInputStream}. + * @see DataBufferUtils#release(DataBuffer) + */ + static Mono toAsyncInputStream(Publisher dataBuffers) { + + return Mono.create(sink -> { + sink.success(new AsyncInputStreamAdapter(dataBuffers, sink.currentContext())); + }); + } +} diff --git a/spring-data-mongodb/src/main/java/org/springframework/data/mongodb/gridfs/DataBufferPublisherAdapter.java b/spring-data-mongodb/src/main/java/org/springframework/data/mongodb/gridfs/DataBufferPublisherAdapter.java new file mode 100644 index 0000000000..d55489bc70 --- /dev/null +++ b/spring-data-mongodb/src/main/java/org/springframework/data/mongodb/gridfs/DataBufferPublisherAdapter.java @@ -0,0 +1,205 @@ +/* + * Copyright 2019 the original author or authors. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.springframework.data.mongodb.gridfs; + +import lombok.RequiredArgsConstructor; +import reactor.core.CoreSubscriber; +import reactor.core.publisher.Flux; +import reactor.core.publisher.FluxSink; +import reactor.core.publisher.Mono; +import reactor.core.publisher.Operators; +import reactor.util.context.Context; + +import java.nio.ByteBuffer; +import java.util.concurrent.atomic.AtomicIntegerFieldUpdater; +import java.util.concurrent.atomic.AtomicLongFieldUpdater; + +import org.reactivestreams.Publisher; +import org.reactivestreams.Subscription; +import org.springframework.core.io.buffer.DataBuffer; +import org.springframework.core.io.buffer.DataBufferFactory; +import org.springframework.core.io.buffer.DataBufferUtils; + +import com.mongodb.reactivestreams.client.gridfs.AsyncInputStream; + +/** + * Utility to adapt a {@link AsyncInputStream} to a {@link Publisher} emitting {@link DataBuffer}. + * + * @author Mark Paluch + * @since 2.2 + */ +class DataBufferPublisherAdapter { + + /** + * Creates a {@link Publisher} emitting {@link DataBuffer}s by reading binary chunks from {@link AsyncInputStream}. + * Closes the {@link AsyncInputStream} once the {@link Publisher} terminates. + * + * @param inputStream must not be {@literal null}. + * @param dataBufferFactory must not be {@literal null}. + * @return the resulting {@link Publisher}. + */ + public static Flux createBinaryStream(AsyncInputStream inputStream, DataBufferFactory dataBufferFactory) { + + State state = new State(inputStream, dataBufferFactory); + + return Flux.usingWhen(Mono.just(inputStream), it -> { + + return Flux. create((sink) -> { + + sink.onDispose(state::close); + sink.onCancel(state::close); + + sink.onRequest(n -> { + state.request(sink, n); + }); + }); + }, AsyncInputStream::close, AsyncInputStream::close, AsyncInputStream::close) // + .concatMap(Flux::just, 1); + } + + @RequiredArgsConstructor + static class State { + + static final AtomicLongFieldUpdater DEMAND = AtomicLongFieldUpdater.newUpdater(State.class, "demand"); + + static final AtomicIntegerFieldUpdater STATE = AtomicIntegerFieldUpdater.newUpdater(State.class, "state"); + + static final AtomicIntegerFieldUpdater READ = AtomicIntegerFieldUpdater.newUpdater(State.class, "read"); + + static final int STATE_OPEN = 0; + static final int STATE_CLOSED = 1; + + static final int READ_NONE = 0; + static final int READ_IN_PROGRESS = 1; + + final AsyncInputStream inputStream; + final DataBufferFactory dataBufferFactory; + + // see DEMAND + volatile long demand; + + // see STATE + volatile int state = STATE_OPEN; + + // see READ_IN_PROGRESS + volatile int read = READ_NONE; + + void request(FluxSink sink, long n) { + + Operators.addCap(DEMAND, this, n); + + if (onShouldRead()) { + emitNext(sink); + } + } + + boolean onShouldRead() { + return !isClosed() && getDemand() > 0 && onWantRead(); + } + + boolean onWantRead() { + return READ.compareAndSet(this, READ_NONE, READ_IN_PROGRESS); + } + + boolean onReadDone() { + return READ.compareAndSet(this, READ_IN_PROGRESS, READ_NONE); + } + + long getDemand() { + return DEMAND.get(this); + } + + void close() { + STATE.compareAndSet(this, STATE_OPEN, STATE_CLOSED); + } + + boolean isClosed() { + return STATE.get(this) == STATE_CLOSED; + } + + /** + * Emit the next {@link DataBuffer}. + * + * @param sink + */ + void emitNext(FluxSink sink) { + + DataBuffer dataBuffer = dataBufferFactory.allocateBuffer(); + ByteBuffer intermediate = ByteBuffer.allocate(dataBuffer.capacity()); + + Mono.from(inputStream.read(intermediate)).subscribe(new CoreSubscriber() { + + @Override + public Context currentContext() { + return sink.currentContext(); + } + + @Override + public void onSubscribe(Subscription s) { + s.request(1); + } + + @Override + public void onNext(Integer bytes) { + + if (isClosed()) { + + onReadDone(); + DataBufferUtils.release(dataBuffer); + Operators.onNextDropped(dataBuffer, sink.currentContext()); + return; + } + + intermediate.flip(); + dataBuffer.write(intermediate); + + sink.next(dataBuffer); + + try { + if (bytes == -1) { + sink.complete(); + } + } finally { + onReadDone(); + } + } + + @Override + public void onError(Throwable t) { + + if (isClosed()) { + + Operators.onErrorDropped(t, sink.currentContext()); + return; + } + + onReadDone(); + DataBufferUtils.release(dataBuffer); + Operators.onNextDropped(dataBuffer, sink.currentContext()); + sink.error(t); + } + + @Override + public void onComplete() { + + if (onShouldRead()) { + emitNext(sink); + } + } + }); + } + } +} diff --git a/spring-data-mongodb/src/main/java/org/springframework/data/mongodb/gridfs/ReactiveGridFsTemplate.java b/spring-data-mongodb/src/main/java/org/springframework/data/mongodb/gridfs/ReactiveGridFsTemplate.java index d88ef1dfdd..4f13ded274 100644 --- a/spring-data-mongodb/src/main/java/org/springframework/data/mongodb/gridfs/ReactiveGridFsTemplate.java +++ b/spring-data-mongodb/src/main/java/org/springframework/data/mongodb/gridfs/ReactiveGridFsTemplate.java @@ -18,30 +18,16 @@ import static org.springframework.data.mongodb.core.query.Query.*; import static org.springframework.data.mongodb.gridfs.GridFsCriteria.*; -import lombok.RequiredArgsConstructor; -import reactor.core.CoreSubscriber; import reactor.core.publisher.Flux; -import reactor.core.publisher.FluxSink; import reactor.core.publisher.Mono; -import reactor.core.publisher.Operators; -import reactor.util.concurrent.Queues; -import reactor.util.context.Context; -import java.nio.ByteBuffer; import java.util.Optional; -import java.util.Queue; -import java.util.concurrent.atomic.AtomicBoolean; -import java.util.concurrent.atomic.AtomicLongFieldUpdater; -import java.util.function.BiConsumer; -import java.util.function.Function; import org.bson.Document; import org.bson.types.ObjectId; import org.reactivestreams.Publisher; -import org.reactivestreams.Subscription; import org.springframework.core.io.buffer.DataBuffer; import org.springframework.core.io.buffer.DataBufferFactory; -import org.springframework.core.io.buffer.DataBufferUtils; import org.springframework.core.io.buffer.DefaultDataBufferFactory; import org.springframework.data.mongodb.ReactiveMongoDatabaseFactory; import org.springframework.data.mongodb.core.convert.MongoConverter; @@ -54,7 +40,6 @@ import com.mongodb.client.gridfs.model.GridFSFile; import com.mongodb.client.gridfs.model.GridFSUploadOptions; import com.mongodb.reactivestreams.client.MongoDatabase; -import com.mongodb.reactivestreams.client.Success; import com.mongodb.reactivestreams.client.gridfs.AsyncInputStream; import com.mongodb.reactivestreams.client.gridfs.GridFSBucket; import com.mongodb.reactivestreams.client.gridfs.GridFSBuckets; @@ -181,15 +166,7 @@ public Mono store(Publisher content, @Nullable String file Assert.notNull(content, "Content must not be null!"); - return Mono.> create(sink -> { - - BinaryPublisherToAsyncInputStreamAdapter adapter = new BinaryPublisherToAsyncInputStreamAdapter(content, - sink.currentContext()); - - Mono store = store(adapter, filename, contentType, metadata); - - sink.success(store); - }).flatMap(Function.identity()); + return BinaryStreamAdapters.toAsyncInputStream(content).flatMap(it -> store(it, filename, contentType, metadata)); } /* @@ -253,7 +230,7 @@ public Mono getResource(GridFSFile file) { GridFSDownloadStream stream = getGridFs().openDownloadStream(file.getObjectId()); - return new ReactiveGridFsResource(file, BinaryStreamUtility.createBinaryStream(dataBufferFactory, stream)); + return new ReactiveGridFsResource(file, BinaryStreamAdapters.toPublisher(stream, dataBufferFactory)); }); } @@ -317,269 +294,4 @@ private Document toDocument(@Nullable Object metadata) { } return document; } - - /** - * Adapter accepting a binary stream {@link Publisher} and transforming its contents into a {@link AsyncInputStream}. - *

- * This adapter subscribes to the binary {@link Publisher} as soon as the first chunk gets {@link #read(ByteBuffer) - * requested}. Requests are queued and binary chunks are requested from the {@link Publisher}. As soon as the - * {@link Publisher} emits items, chunks are provided to the read request which completes the number-of-written-bytes - * {@link Publisher}. - *

- * {@link AsyncInputStream} is supposed to work as sequential callback API that is called until reaching EOF. - * {@link #close()} is propagated as cancellation signal to the binary {@link Publisher}. - * - * @author Mark Paluch - */ - @RequiredArgsConstructor - static class BinaryPublisherToAsyncInputStreamAdapter implements AsyncInputStream { - - private static final AtomicLongFieldUpdater UPDATER = AtomicLongFieldUpdater - .newUpdater(BinaryPublisherToAsyncInputStreamAdapter.class, "demand"); - - private final Publisher buffers; - private final Context subscriberContext; - private final DefaultDataBufferFactory factory = new DefaultDataBufferFactory(); - - private final AtomicBoolean subscribed = new AtomicBoolean(); - private volatile Subscription subscription; - private volatile boolean cancelled; - private volatile boolean complete; - private volatile Throwable error; - private final Queue> readRequests = Queues.> small() - .get(); - - // See UPDATER - private volatile long demand; - - /* - * (non-Javadoc) - * @see com.mongodb.reactivestreams.client.gridfs.AsyncInputStream#read(java.nio.ByteBuffer) - */ - @Override - public Publisher read(ByteBuffer dst) { - - return Mono.create(sink -> { - - readRequests.offer((db, bytecount) -> { - - try { - - if (error != null) { - sink.error(error); - return; - } - - if (bytecount == -1) { - sink.success(-1); - return; - } - - ByteBuffer byteBuffer = db.asByteBuffer(); - int toWrite = byteBuffer.remaining(); - dst.put(byteBuffer); - - sink.success(toWrite); - } catch (Exception e) { - sink.error(e); - } finally { - DataBufferUtils.release(db); - } - }); - - request(1); - }); - } - - /* - * (non-Javadoc) - * @see com.mongodb.reactivestreams.client.gridfs.AsyncInputStream#close() - */ - @Override - public Publisher close() { - - return Mono.create(sink -> { - cancelled = true; - - if (error != null) { - sink.error(error); - return; - } - - sink.success(Success.SUCCESS); - }); - } - - protected void request(int n) { - - if (complete) { - terminatePendingReads(); - return; - } - - Operators.addCap(UPDATER, this, n); - - if (!subscribed.get()) { - - if (subscribed.compareAndSet(false, true)) { - - buffers.subscribe(new CoreSubscriber() { - - @Override - public Context currentContext() { - return subscriberContext; - } - - @Override - public void onSubscribe(Subscription s) { - subscription = s; - - Operators.addCap(UPDATER, BinaryPublisherToAsyncInputStreamAdapter.this, -1); - s.request(1); - } - - @Override - public void onNext(DataBuffer dataBuffer) { - - if (cancelled || complete) { - DataBufferUtils.release(dataBuffer); - Operators.onNextDropped(dataBuffer, subscriberContext); - return; - } - - BiConsumer poll = readRequests.poll(); - - if (poll == null) { - - DataBufferUtils.release(dataBuffer); - Operators.onNextDropped(dataBuffer, subscriberContext); - subscription.cancel(); - return; - } - - poll.accept(dataBuffer, dataBuffer.readableByteCount()); - - requestFromSubscription(subscription); - } - - @Override - public void onError(Throwable t) { - - if (cancelled || complete) { - Operators.onErrorDropped(t, subscriberContext); - return; - } - - error = t; - complete = true; - terminatePendingReads(); - } - - @Override - public void onComplete() { - - complete = true; - terminatePendingReads(); - } - }); - } - } else { - - Subscription subscription = this.subscription; - - if (subscription != null) { - requestFromSubscription(subscription); - } - } - } - - void requestFromSubscription(Subscription subscription) { - - long demand = UPDATER.get(BinaryPublisherToAsyncInputStreamAdapter.this); - - if (cancelled) { - subscription.cancel(); - } - - if (demand > 0 && UPDATER.compareAndSet(BinaryPublisherToAsyncInputStreamAdapter.this, demand, demand - 1)) { - subscription.request(1); - } - } - - /** - * Terminates pending reads with empty buffers. - */ - void terminatePendingReads() { - - BiConsumer readers; - - while ((readers = readRequests.poll()) != null) { - readers.accept(factory.wrap(new byte[0]), -1); - } - } - } - - /** - * Utility to adapt a {@link AsyncInputStream} to a {@link Publisher} emitting {@link DataBuffer}. - */ - static class BinaryStreamUtility { - - /** - * Creates a {@link Publisher} emitting {@link DataBuffer}s. - * - * @param dataBufferFactory must not be {@literal null}. - * @param inputStream must not be {@literal null}. - * @return the resulting {@link Publisher}. - */ - public static Flux createBinaryStream(DataBufferFactory dataBufferFactory, - AsyncInputStream inputStream) { - - AtomicBoolean closed = new AtomicBoolean(); - - return Flux.push((sink) -> { - - sink.onDispose(() -> { - close(inputStream, closed); - }); - - sink.onCancel(() -> { - close(inputStream, closed); - }); - - emitNext(dataBufferFactory, inputStream, sink); - }); - } - - /** - * Emit the next {@link DataBuffer}. - * - * @param dataBufferFactory - * @param inputStream - * @param sink - */ - static void emitNext(DataBufferFactory dataBufferFactory, AsyncInputStream inputStream, FluxSink sink) { - - DataBuffer dataBuffer = dataBufferFactory.allocateBuffer(); - ByteBuffer intermediate = ByteBuffer.allocate(dataBuffer.capacity()); - - Mono.from(inputStream.read(intermediate)).subscribe(bytes -> { - - intermediate.flip(); - dataBuffer.write(intermediate); - sink.next(dataBuffer); - - if (bytes == -1) { - sink.complete(); - } else { - emitNext(dataBufferFactory, inputStream, sink); - } - }, sink::error); - } - - static void close(AsyncInputStream inputStream, AtomicBoolean closed) { - if (closed.compareAndSet(false, true)) { - Mono.from(inputStream.close()).subscribe(); - } - } - } } diff --git a/spring-data-mongodb/src/test/java/org/springframework/data/mongodb/gridfs/BinaryStreamAdaptersUnitTests.java b/spring-data-mongodb/src/test/java/org/springframework/data/mongodb/gridfs/BinaryStreamAdaptersUnitTests.java new file mode 100644 index 0000000000..b94de93afb --- /dev/null +++ b/spring-data-mongodb/src/test/java/org/springframework/data/mongodb/gridfs/BinaryStreamAdaptersUnitTests.java @@ -0,0 +1,102 @@ +/* + * Copyright 2019 the original author or authors. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.springframework.data.mongodb.gridfs; + +import static org.assertj.core.api.Assertions.*; + +import reactor.core.publisher.Flux; +import reactor.core.publisher.Mono; +import reactor.test.StepVerifier; + +import java.io.IOException; +import java.nio.ByteBuffer; + +import org.junit.Test; +import org.springframework.core.io.ClassPathResource; +import org.springframework.core.io.buffer.DataBuffer; +import org.springframework.core.io.buffer.DataBufferUtils; +import org.springframework.core.io.buffer.DefaultDataBufferFactory; +import org.springframework.util.StreamUtils; + +import com.mongodb.reactivestreams.client.gridfs.AsyncInputStream; +import com.mongodb.reactivestreams.client.gridfs.helpers.AsyncStreamHelper; + +/** + * Unit tests for {@link BinaryStreamAdapters}. + * + * @author Mark Paluch + */ +public class BinaryStreamAdaptersUnitTests { + + @Test // DATAMONGO-1855 + public void shouldAdaptAsyncInputStreamToDataBufferPublisher() throws IOException { + + ClassPathResource resource = new ClassPathResource("gridfs/gridfs.xml"); + + byte[] content = StreamUtils.copyToByteArray(resource.getInputStream()); + AsyncInputStream inputStream = AsyncStreamHelper.toAsyncInputStream(resource.getInputStream()); + + Flux dataBuffers = BinaryStreamAdapters.toPublisher(inputStream, new DefaultDataBufferFactory()); + + DataBufferUtils.join(dataBuffers) // + .as(StepVerifier::create) // + .consumeNextWith(actual -> { + + byte[] actualContent = new byte[actual.readableByteCount()]; + actual.read(actualContent); + assertThat(actualContent).isEqualTo(content); + }) // + .verifyComplete(); + } + + @Test // DATAMONGO-1855 + public void shouldAdaptBinaryPublisherToAsyncInputStream() throws IOException { + + ClassPathResource resource = new ClassPathResource("gridfs/gridfs.xml"); + + byte[] content = StreamUtils.copyToByteArray(resource.getInputStream()); + + Flux dataBuffers = DataBufferUtils.readInputStream(resource::getInputStream, + new DefaultDataBufferFactory(), 10); + + AsyncInputStream inputStream = BinaryStreamAdapters.toAsyncInputStream(dataBuffers).block(); + ByteBuffer complete = readBuffer(inputStream); + + assertThat(complete).isEqualTo(ByteBuffer.wrap(content)); + } + + static ByteBuffer readBuffer(AsyncInputStream inputStream) { + + ByteBuffer complete = ByteBuffer.allocate(1024); + + boolean hasData = true; + while (hasData) { + + ByteBuffer chunk = ByteBuffer.allocate(100); + + Integer bytesRead = Mono.from(inputStream.read(chunk)).block(); + + chunk.flip(); + complete.put(chunk); + + hasData = bytesRead > -1; + } + + complete.flip(); + + return complete; + } +} From 01d8421b5b7d7a50d4ef6d4c19654e96ee425688 Mon Sep 17 00:00:00 2001 From: Mark Paluch Date: Fri, 18 Jan 2019 11:34:09 +0100 Subject: [PATCH 4/5] DATAMONGO-1855 - Documentation. --- src/main/asciidoc/new-features.adoc | 1 + .../asciidoc/reference/reactive-mongodb.adoc | 93 +++++++++++++++++++ 2 files changed, 94 insertions(+) diff --git a/src/main/asciidoc/new-features.adoc b/src/main/asciidoc/new-features.adoc index 8135ad698f..80ec95981e 100644 --- a/src/main/asciidoc/new-features.adoc +++ b/src/main/asciidoc/new-features.adoc @@ -5,6 +5,7 @@ == What's New in Spring Data MongoDB 2.2 * <> * <> via `ReactiveQuerydslPredicateExecutor`. +* <>. [[new-features.2-1-0]] == What's New in Spring Data MongoDB 2.1 diff --git a/src/main/asciidoc/reference/reactive-mongodb.adoc b/src/main/asciidoc/reference/reactive-mongodb.adoc index bb9ac80f3e..eaa4a51f2c 100644 --- a/src/main/asciidoc/reference/reactive-mongodb.adoc +++ b/src/main/asciidoc/reference/reactive-mongodb.adoc @@ -482,3 +482,96 @@ Flux hasIndex = operations.execute("geolocation", .flatMap(document -> Mono.just(true)) .defaultIfEmpty(false)); ---- + +[[reactive.gridfs]] +== GridFS Support + +MongoDB supports storing binary files inside its filesystem, GridFS. +Spring Data MongoDB provides a `ReactiveGridFsOperations` interface as well as the corresponding implementation, `ReactiveGridFsTemplate`, to let you interact with the filesystem. +You can set up a `ReactiveGridFsTemplate` instance by handing it a `ReactiveMongoDatabaseFactory` as well as a `MongoConverter`, as the following example shows: + +.JavaConfig setup for a ReactiveGridFsTemplate +==== +[source,java] +---- +class GridFsConfiguration extends AbstractReactiveMongoConfiguration { + + // … further configuration omitted + + @Bean + public ReactiveGridFsTemplate reactiveGridFsTemplate() { + return new ReactiveGridFsTemplate(reactiveMongoDbFactory(), mappingMongoConverter()); + } +} +---- +==== + +The template can now be injected and used to perform storage and retrieval operations, as the following example shows: + +.Using ReactiveGridFsTemplate to store files +==== +[source,java] +---- +class ReactiveGridFsClient { + + @Autowired + ReactiveGridFsTemplate operations; + + @Test + public Mono storeFileToGridFs() { + + FileMetadata metadata = new FileMetadata(); + // populate metadata + Publisher file = … // lookup File or Resource + + return operations.store(file, "filename.txt", metadata); + } +} +---- +==== + +The `store(…)` operations take an `Publisher`, a filename, and (optionally) metadata information about the file to store. The metadata can be an arbitrary object, which will be marshaled by the `MongoConverter` configured with the `ReactiveGridFsTemplate`. Alternatively, you can also provide a `Document`. + +NOTE: MongoDB's driver uses `AsyncInputStream` and `AsyncOutputStream` interfaces to exchange binary streams. Spring Data MongoDB adapts these interfaces to `Publisher`. Read more about `DataBuffer` in http://docs.spring.io/spring/docs/{springVersion}/spring-framework-reference/core.html#databuffers[Spring's reference documentation]. + +You can read files from the filesystem through either the `find(…)` or the `getResources(…)` methods. Let's have a look at the `find(…)` methods first. You can either find a single file or multiple files that match a `Query`. You can use the `GridFsCriteria` helper class to define queries. It provides static factory methods to encapsulate default metadata fields (such as `whereFilename()` and `whereContentType()`) or a custom one through `whereMetaData()`. The following example shows how to use `ReactiveGridFsTemplate` to query for files: + +.Using ReactiveGridFsTemplate to query for files +==== +[source,java] +---- +class ReactiveGridFsClient { + + @Autowired + ReactiveGridFsTemplate operations; + + @Test + public Flux findFilesInGridFs() { + return operations.find(query(whereFilename().is("filename.txt"))) + } +} +---- +==== + +NOTE: Currently, MongoDB does not support defining sort criteria when retrieving files from GridFS. For this reason, any sort criteria defined on the `Query` instance handed into the `find(…)` method are disregarded. + +The other option to read files from the GridFs is to use the methods modeled along the lines of `ResourcePatternResolver`. +`ReactiveGridFsOperations` uses reactive types to defer execution while `ResourcePatternResolver` uses a synchronous interface. +These methods allow handing an Ant path into the method and can thus retrieve files matching the given pattern. The following example shows how to use `ReactiveGridFsTemplate` to read files: + +.Using ReactiveGridFsTemplate to read files +==== +[source,java] +---- +class ReactiveGridFsClient { + + @Autowired + ReactiveGridFsOperations operations; + + @Test + public void readFilesFromGridFs() { + Flux txtFiles = operations.getResources("*.txt"); + } +} +---- +==== From ce92ef0f1a93d302dfb577e8464783b32d088fa0 Mon Sep 17 00:00:00 2001 From: Christoph Strobl Date: Fri, 25 Jan 2019 10:17:35 +0100 Subject: [PATCH 5/5] DATAMONGO-1855 - Polishing Introduce base class to share code between imperative and reactive GridFs. --- .../gridfs/AsyncInputStreamAdapter.java | 135 ++++++++++-------- .../gridfs/DataBufferPublisherAdapter.java | 114 ++++++++------- .../gridfs/GridFsOperationsSupport.java | 104 ++++++++++++++ .../data/mongodb/gridfs/GridFsTemplate.java | 59 ++------ .../gridfs/ReactiveGridFsOperations.java | 87 +++++++---- .../gridfs/ReactiveGridFsTemplate.java | 84 ++++------- ....java => ReactiveGridFsTemplateTests.java} | 76 +++++++++- 7 files changed, 412 insertions(+), 247 deletions(-) create mode 100644 spring-data-mongodb/src/main/java/org/springframework/data/mongodb/gridfs/GridFsOperationsSupport.java rename spring-data-mongodb/src/test/java/org/springframework/data/mongodb/gridfs/{ReactiveGridFsTemplateIntegrationTests.java => ReactiveGridFsTemplateTests.java} (65%) diff --git a/spring-data-mongodb/src/main/java/org/springframework/data/mongodb/gridfs/AsyncInputStreamAdapter.java b/spring-data-mongodb/src/main/java/org/springframework/data/mongodb/gridfs/AsyncInputStreamAdapter.java index e29f344cec..d57268388a 100644 --- a/spring-data-mongodb/src/main/java/org/springframework/data/mongodb/gridfs/AsyncInputStreamAdapter.java +++ b/spring-data-mongodb/src/main/java/org/springframework/data/mongodb/gridfs/AsyncInputStreamAdapter.java @@ -49,6 +49,7 @@ * {@link #close()} is propagated as cancellation signal to the binary {@link Publisher}. * * @author Mark Paluch + * @author Christoph Strobl * @since 2.2 */ @RequiredArgsConstructor @@ -75,10 +76,10 @@ class AsyncInputStreamAdapter implements AsyncInputStream { .get(); // see DEMAND - private volatile long demand; + volatile long demand; // see SUBSCRIBED - private volatile int subscribed = SUBSCRIPTION_NOT_SUBSCRIBED; + volatile int subscribed = SUBSCRIPTION_NOT_SUBSCRIBED; /* * (non-Javadoc) @@ -94,20 +95,23 @@ public Publisher read(ByteBuffer dst) { try { if (error != null) { + sink.error(error); return; } if (bytecount == -1) { + sink.success(-1); return; } ByteBuffer byteBuffer = db.asByteBuffer(); int toWrite = byteBuffer.remaining(); - dst.put(byteBuffer); + dst.put(byteBuffer); sink.success(toWrite); + } catch (Exception e) { sink.error(e); } finally { @@ -127,6 +131,7 @@ public Publisher read(ByteBuffer dst) { public Publisher close() { return Mono.create(sink -> { + cancelled = true; if (error != null) { @@ -141,6 +146,7 @@ public Publisher close() { protected void request(int n) { if (complete) { + terminatePendingReads(); return; } @@ -150,67 +156,9 @@ protected void request(int n) { if (SUBSCRIBED.get(this) == SUBSCRIPTION_NOT_SUBSCRIBED) { if (SUBSCRIBED.compareAndSet(this, SUBSCRIPTION_NOT_SUBSCRIBED, SUBSCRIPTION_SUBSCRIBED)) { - - buffers.subscribe(new CoreSubscriber() { - - @Override - public Context currentContext() { - return subscriberContext; - } - - @Override - public void onSubscribe(Subscription s) { - subscription = s; - - Operators.addCap(DEMAND, AsyncInputStreamAdapter.this, -1); - s.request(1); - } - - @Override - public void onNext(DataBuffer dataBuffer) { - - if (cancelled || complete) { - DataBufferUtils.release(dataBuffer); - Operators.onNextDropped(dataBuffer, subscriberContext); - return; - } - - BiConsumer poll = readRequests.poll(); - - if (poll == null) { - - DataBufferUtils.release(dataBuffer); - Operators.onNextDropped(dataBuffer, subscriberContext); - subscription.cancel(); - return; - } - - poll.accept(dataBuffer, dataBuffer.readableByteCount()); - - requestFromSubscription(subscription); - } - - @Override - public void onError(Throwable t) { - - if (cancelled || complete) { - Operators.onErrorDropped(t, subscriberContext); - return; - } - - error = t; - complete = true; - terminatePendingReads(); - } - - @Override - public void onComplete() { - - complete = true; - terminatePendingReads(); - } - }); + buffers.subscribe(new DataBufferCoreSubscriber()); } + } else { Subscription subscription = this.subscription; @@ -245,4 +193,65 @@ void terminatePendingReads() { readers.accept(factory.wrap(new byte[0]), -1); } } + + private class DataBufferCoreSubscriber implements CoreSubscriber { + + @Override + public Context currentContext() { + return AsyncInputStreamAdapter.this.subscriberContext; + } + + @Override + public void onSubscribe(Subscription s) { + + AsyncInputStreamAdapter.this.subscription = s; + + Operators.addCap(DEMAND, AsyncInputStreamAdapter.this, -1); + s.request(1); + } + + @Override + public void onNext(DataBuffer dataBuffer) { + + if (cancelled || complete) { + DataBufferUtils.release(dataBuffer); + Operators.onNextDropped(dataBuffer, AsyncInputStreamAdapter.this.subscriberContext); + return; + } + + BiConsumer poll = AsyncInputStreamAdapter.this.readRequests.poll(); + + if (poll == null) { + + DataBufferUtils.release(dataBuffer); + Operators.onNextDropped(dataBuffer, AsyncInputStreamAdapter.this.subscriberContext); + subscription.cancel(); + return; + } + + poll.accept(dataBuffer, dataBuffer.readableByteCount()); + + requestFromSubscription(subscription); + } + + @Override + public void onError(Throwable t) { + + if (AsyncInputStreamAdapter.this.cancelled || AsyncInputStreamAdapter.this.complete) { + Operators.onErrorDropped(t, AsyncInputStreamAdapter.this.subscriberContext); + return; + } + + AsyncInputStreamAdapter.this.error = t; + AsyncInputStreamAdapter.this.complete = true; + terminatePendingReads(); + } + + @Override + public void onComplete() { + + AsyncInputStreamAdapter.this.complete = true; + terminatePendingReads(); + } + } } diff --git a/spring-data-mongodb/src/main/java/org/springframework/data/mongodb/gridfs/DataBufferPublisherAdapter.java b/spring-data-mongodb/src/main/java/org/springframework/data/mongodb/gridfs/DataBufferPublisherAdapter.java index d55489bc70..df98508fe9 100644 --- a/spring-data-mongodb/src/main/java/org/springframework/data/mongodb/gridfs/DataBufferPublisherAdapter.java +++ b/spring-data-mongodb/src/main/java/org/springframework/data/mongodb/gridfs/DataBufferPublisherAdapter.java @@ -39,6 +39,7 @@ * Utility to adapt a {@link AsyncInputStream} to a {@link Publisher} emitting {@link DataBuffer}. * * @author Mark Paluch + * @author Christoph Strobl * @since 2.2 */ class DataBufferPublisherAdapter { @@ -51,7 +52,7 @@ class DataBufferPublisherAdapter { * @param dataBufferFactory must not be {@literal null}. * @return the resulting {@link Publisher}. */ - public static Flux createBinaryStream(AsyncInputStream inputStream, DataBufferFactory dataBufferFactory) { + static Flux createBinaryStream(AsyncInputStream inputStream, DataBufferFactory dataBufferFactory) { State state = new State(inputStream, dataBufferFactory); @@ -73,17 +74,17 @@ public static Flux createBinaryStream(AsyncInputStream inputStream, @RequiredArgsConstructor static class State { - static final AtomicLongFieldUpdater DEMAND = AtomicLongFieldUpdater.newUpdater(State.class, "demand"); + private static final AtomicLongFieldUpdater DEMAND = AtomicLongFieldUpdater.newUpdater(State.class, "demand"); - static final AtomicIntegerFieldUpdater STATE = AtomicIntegerFieldUpdater.newUpdater(State.class, "state"); + private static final AtomicIntegerFieldUpdater STATE = AtomicIntegerFieldUpdater.newUpdater(State.class, "state"); - static final AtomicIntegerFieldUpdater READ = AtomicIntegerFieldUpdater.newUpdater(State.class, "read"); + private static final AtomicIntegerFieldUpdater READ = AtomicIntegerFieldUpdater.newUpdater(State.class, "read"); - static final int STATE_OPEN = 0; - static final int STATE_CLOSED = 1; + private static final int STATE_OPEN = 0; + private static final int STATE_CLOSED = 1; - static final int READ_NONE = 0; - static final int READ_IN_PROGRESS = 1; + private static final int READ_NONE = 0; + private static final int READ_IN_PROGRESS = 1; final AsyncInputStream inputStream; final DataBufferFactory dataBufferFactory; @@ -140,66 +141,79 @@ void emitNext(FluxSink sink) { DataBuffer dataBuffer = dataBufferFactory.allocateBuffer(); ByteBuffer intermediate = ByteBuffer.allocate(dataBuffer.capacity()); - Mono.from(inputStream.read(intermediate)).subscribe(new CoreSubscriber() { + Mono.from(inputStream.read(intermediate)).subscribe(new BufferCoreSubscriber(sink, dataBuffer, intermediate)); + } - @Override - public Context currentContext() { - return sink.currentContext(); - } + private class BufferCoreSubscriber implements CoreSubscriber { - @Override - public void onSubscribe(Subscription s) { - s.request(1); - } + private final FluxSink sink; + private final DataBuffer dataBuffer; + private final ByteBuffer intermediate; - @Override - public void onNext(Integer bytes) { + BufferCoreSubscriber(FluxSink sink, DataBuffer dataBuffer, ByteBuffer intermediate) { - if (isClosed()) { + this.sink = sink; + this.dataBuffer = dataBuffer; + this.intermediate = intermediate; + } - onReadDone(); - DataBufferUtils.release(dataBuffer); - Operators.onNextDropped(dataBuffer, sink.currentContext()); - return; - } + @Override + public Context currentContext() { + return sink.currentContext(); + } - intermediate.flip(); - dataBuffer.write(intermediate); + @Override + public void onSubscribe(Subscription s) { + s.request(1); + } - sink.next(dataBuffer); + @Override + public void onNext(Integer bytes) { - try { - if (bytes == -1) { - sink.complete(); - } - } finally { - onReadDone(); - } + if (isClosed()) { + + onReadDone(); + DataBufferUtils.release(dataBuffer); + Operators.onNextDropped(dataBuffer, sink.currentContext()); + return; } - @Override - public void onError(Throwable t) { + intermediate.flip(); + dataBuffer.write(intermediate); - if (isClosed()) { + sink.next(dataBuffer); - Operators.onErrorDropped(t, sink.currentContext()); - return; + try { + if (bytes == -1) { + sink.complete(); } - + } finally { onReadDone(); - DataBufferUtils.release(dataBuffer); - Operators.onNextDropped(dataBuffer, sink.currentContext()); - sink.error(t); } + } - @Override - public void onComplete() { + @Override + public void onError(Throwable t) { - if (onShouldRead()) { - emitNext(sink); - } + if (isClosed()) { + + Operators.onErrorDropped(t, sink.currentContext()); + return; } - }); + + onReadDone(); + DataBufferUtils.release(dataBuffer); + Operators.onNextDropped(dataBuffer, sink.currentContext()); + sink.error(t); + } + + @Override + public void onComplete() { + + if (onShouldRead()) { + emitNext(sink); + } + } } } } diff --git a/spring-data-mongodb/src/main/java/org/springframework/data/mongodb/gridfs/GridFsOperationsSupport.java b/spring-data-mongodb/src/main/java/org/springframework/data/mongodb/gridfs/GridFsOperationsSupport.java new file mode 100644 index 0000000000..8d4bd9f5a3 --- /dev/null +++ b/spring-data-mongodb/src/main/java/org/springframework/data/mongodb/gridfs/GridFsOperationsSupport.java @@ -0,0 +1,104 @@ +/* + * Copyright 2019 the original author or authors. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.springframework.data.mongodb.gridfs; + +import java.util.Optional; + +import org.bson.Document; +import org.springframework.data.mongodb.core.convert.MongoConverter; +import org.springframework.data.mongodb.core.convert.QueryMapper; +import org.springframework.lang.Nullable; +import org.springframework.util.Assert; +import org.springframework.util.StringUtils; + +import com.mongodb.client.gridfs.model.GridFSUploadOptions; + +/** + * Base class offering common tasks like query mapping and {@link GridFSUploadOptions} computation to be shared across + * imperative and reactive implementations. + * + * @author Christoph Strobl + * @since 2.2 + */ +class GridFsOperationsSupport { + + private final QueryMapper queryMapper; + private final MongoConverter converter; + + /** + * @param converter must not be {@literal null}. + */ + GridFsOperationsSupport(MongoConverter converter) { + + Assert.notNull(converter, "MongoConverter must not be null!"); + + this.converter = converter; + this.queryMapper = new QueryMapper(converter); + } + + /** + * @param query pass the given query though a {@link QueryMapper} to apply type conversion. + * @return never {@literal null}. + */ + protected Document getMappedQuery(Document query) { + return queryMapper.getMappedObject(query, Optional.empty()); + } + + /** + * Compute the {@link GridFSUploadOptions} to be used from the given {@literal contentType} and {@literal metadata} + * {@link Document}. + * + * @param contentType can be {@literal null}. + * @param metadata can be {@literal null} + * @return never {@literal null}. + */ + protected GridFSUploadOptions computeUploadOptionsFor(@Nullable String contentType, @Nullable Document metadata) { + + Document targetMetadata = new Document(); + + if (StringUtils.hasText(contentType)) { + targetMetadata.put(GridFsResource.CONTENT_TYPE_FIELD, contentType); + } + + if (metadata != null) { + targetMetadata.putAll(metadata); + } + + GridFSUploadOptions options = new GridFSUploadOptions(); + options.metadata(targetMetadata); + + return options; + } + + /** + * Convert a given {@literal value} into a {@link Document}. + * + * @param value can be {@literal null}. + * @return an empty {@link Document} if the source value is {@literal null}. + */ + protected Document toDocument(@Nullable Object value) { + + if (value instanceof Document) { + return (Document) value; + } + + Document document = new Document(); + if (value != null) { + converter.write(value, document); + } + return document; + } +} diff --git a/spring-data-mongodb/src/main/java/org/springframework/data/mongodb/gridfs/GridFsTemplate.java b/spring-data-mongodb/src/main/java/org/springframework/data/mongodb/gridfs/GridFsTemplate.java index 3b87d2a78b..6c278ac7c3 100644 --- a/spring-data-mongodb/src/main/java/org/springframework/data/mongodb/gridfs/GridFsTemplate.java +++ b/spring-data-mongodb/src/main/java/org/springframework/data/mongodb/gridfs/GridFsTemplate.java @@ -29,7 +29,6 @@ import org.springframework.core.io.support.ResourcePatternResolver; import org.springframework.data.mongodb.MongoDbFactory; import org.springframework.data.mongodb.core.convert.MongoConverter; -import org.springframework.data.mongodb.core.convert.QueryMapper; import org.springframework.data.mongodb.core.query.Query; import org.springframework.lang.Nullable; import org.springframework.util.Assert; @@ -40,7 +39,6 @@ import com.mongodb.client.gridfs.GridFSBuckets; import com.mongodb.client.gridfs.GridFSFindIterable; import com.mongodb.client.gridfs.model.GridFSFile; -import com.mongodb.client.gridfs.model.GridFSUploadOptions; /** * {@link GridFsOperations} implementation to store content into MongoDB GridFS. @@ -54,13 +52,11 @@ * @author Hartmut Lang * @author Niklas Helge Hanft */ -public class GridFsTemplate implements GridFsOperations, ResourcePatternResolver { +public class GridFsTemplate extends GridFsOperationsSupport implements GridFsOperations, ResourcePatternResolver { private final MongoDbFactory dbFactory; private final @Nullable String bucket; - private final MongoConverter converter; - private final QueryMapper queryMapper; /** * Creates a new {@link GridFsTemplate} using the given {@link MongoDbFactory} and {@link MongoConverter}. @@ -81,14 +77,12 @@ public GridFsTemplate(MongoDbFactory dbFactory, MongoConverter converter) { */ public GridFsTemplate(MongoDbFactory dbFactory, MongoConverter converter, @Nullable String bucket) { + super(converter); + Assert.notNull(dbFactory, "MongoDbFactory must not be null!"); - Assert.notNull(converter, "MongoConverter must not be null!"); this.dbFactory = dbFactory; - this.converter = converter; this.bucket = bucket; - - this.queryMapper = new QueryMapper(converter); } /* @@ -137,16 +131,9 @@ public ObjectId store(InputStream content, @Nullable String filename, @Nullable * (non-Javadoc) * @see org.springframework.data.mongodb.gridfs.GridFsOperations#store(java.io.InputStream, java.lang.String, java.lang.String, java.lang.Object) */ - public ObjectId store(InputStream content, @Nullable String filename, @Nullable String contentType, @Nullable Object metadata) { - - Document document = null; - - if (metadata != null) { - document = new Document(); - converter.write(metadata, document); - } - - return store(content, filename, contentType, document); + public ObjectId store(InputStream content, @Nullable String filename, @Nullable String contentType, + @Nullable Object metadata) { + return store(content, filename, contentType, toDocument(metadata)); } /* @@ -161,25 +148,11 @@ public ObjectId store(InputStream content, @Nullable String filename, @Nullable * (non-Javadoc) * @see org.springframework.data.mongodb.gridfs.GridFsOperations#store(java.io.InputStream, java.lang.String, com.mongodb.Document) */ - public ObjectId store(InputStream content, @Nullable String filename, @Nullable String contentType, @Nullable Document metadata) { + public ObjectId store(InputStream content, @Nullable String filename, @Nullable String contentType, + @Nullable Document metadata) { Assert.notNull(content, "InputStream must not be null!"); - - GridFSUploadOptions options = new GridFSUploadOptions(); - - Document mData = new Document(); - - if (StringUtils.hasText(contentType)) { - mData.put(GridFsResource.CONTENT_TYPE_FIELD, contentType); - } - - if (metadata != null) { - mData.putAll(metadata); - } - - options.metadata(mData); - - return getGridFs().uploadFromStream(filename, content, options); + return getGridFs().uploadFromStream(filename, content, computeUploadOptionsFor(contentType, metadata)); } /* @@ -210,8 +183,8 @@ public GridFSFile findOne(Query query) { */ public void delete(Query query) { - for (GridFSFile x : find(query)) { - getGridFs().delete(((BsonObjectId) x.getId()).getValue()); + for (GridFSFile gridFSFile : find(query)) { + getGridFs().delete(((BsonObjectId) gridFSFile.getId()).getValue()); } } @@ -246,9 +219,9 @@ public GridFsResource getResource(GridFSFile file) { } /* - * (non-Javadoc) - * @see org.springframework.core.io.support.ResourcePatternResolver#getResources(java.lang.String) - */ + * (non-Javadoc) + * @see org.springframework.core.io.support.ResourcePatternResolver#getResources(java.lang.String) + */ public GridFsResource[] getResources(String locationPattern) { if (!StringUtils.hasText(locationPattern)) { @@ -272,10 +245,6 @@ public GridFsResource[] getResources(String locationPattern) { return new GridFsResource[] { getResource(locationPattern) }; } - private Document getMappedQuery(Document query) { - return queryMapper.getMappedObject(query, Optional.empty()); - } - private GridFSBucket getGridFs() { MongoDatabase db = dbFactory.getDb(); diff --git a/spring-data-mongodb/src/main/java/org/springframework/data/mongodb/gridfs/ReactiveGridFsOperations.java b/spring-data-mongodb/src/main/java/org/springframework/data/mongodb/gridfs/ReactiveGridFsOperations.java index c4fc184e60..c7f3a578db 100644 --- a/spring-data-mongodb/src/main/java/org/springframework/data/mongodb/gridfs/ReactiveGridFsOperations.java +++ b/spring-data-mongodb/src/main/java/org/springframework/data/mongodb/gridfs/ReactiveGridFsOperations.java @@ -22,12 +22,10 @@ import org.bson.types.ObjectId; import org.reactivestreams.Publisher; import org.springframework.core.io.buffer.DataBuffer; -import org.springframework.core.io.support.ResourcePatternResolver; import org.springframework.data.domain.Sort; import org.springframework.data.mongodb.core.query.Query; import org.springframework.lang.Nullable; -import com.mongodb.client.gridfs.GridFSFindIterable; import com.mongodb.client.gridfs.model.GridFSFile; import com.mongodb.reactivestreams.client.gridfs.AsyncInputStream; @@ -35,6 +33,7 @@ * Collection of operations to store and read files from MongoDB GridFS using reactive infrastructure. * * @author Mark Paluch + * @author Christoph Strobl * @since 2.2 */ public interface ReactiveGridFsOperations { @@ -44,29 +43,32 @@ public interface ReactiveGridFsOperations { * * @param content must not be {@literal null}. * @param filename must not be {@literal null} or empty. - * @return the {@link ObjectId} of the {@link com.mongodb.client.gridfs.model.GridFSFile} just created. + * @return a {@link Mono} emitting the {@link ObjectId} of the {@link com.mongodb.client.gridfs.model.GridFSFile} just + * created. */ default Mono store(Publisher content, String filename) { return store(content, filename, (Object) null); } /** - * Stores the given content into a file with the given name. + * Stores the given content into a file applying the given metadata. * * @param content must not be {@literal null}. * @param metadata can be {@literal null}. - * @return the {@link ObjectId} of the {@link com.mongodb.client.gridfs.model.GridFSFile} just created. + * @return a {@link Mono} emitting the {@link ObjectId} of the {@link com.mongodb.client.gridfs.model.GridFSFile} just + * created. */ default Mono store(Publisher content, @Nullable Object metadata) { return store(content, null, metadata); } /** - * Stores the given content into a file with the given name. + * Stores the given content into a file applying the given metadata. * * @param content must not be {@literal null}. * @param metadata can be {@literal null}. - * @return the {@link ObjectId} of the {@link com.mongodb.client.gridfs.model.GridFSFile} just created. + * @return a {@link Mono} emitting the {@link ObjectId} of the {@link com.mongodb.client.gridfs.model.GridFSFile} just + * created. */ default Mono store(Publisher content, @Nullable Document metadata) { return store(content, null, metadata); @@ -78,7 +80,8 @@ default Mono store(Publisher content, @Nullable Document m * @param content must not be {@literal null}. * @param filename must not be {@literal null} or empty. * @param contentType can be {@literal null}. - * @return the {@link ObjectId} of the {@link com.mongodb.client.gridfs.model.GridFSFile} just created. + * @return a {@link Mono} emitting the {@link ObjectId} of the {@link com.mongodb.client.gridfs.model.GridFSFile} just + * created. */ default Mono store(Publisher content, @Nullable String filename, @Nullable String contentType) { return store(content, filename, contentType, (Object) null); @@ -91,7 +94,8 @@ default Mono store(Publisher content, @Nullable String fil * @param content must not be {@literal null}. * @param filename can be {@literal null} or empty. * @param metadata can be {@literal null}. - * @return the {@link ObjectId} of the {@link com.mongodb.client.gridfs.model.GridFSFile} just created. + * @return a {@link Mono} emitting the {@link ObjectId} of the {@link com.mongodb.client.gridfs.model.GridFSFile} just + * created. */ default Mono store(Publisher content, @Nullable String filename, @Nullable Object metadata) { return store(content, filename, null, metadata); @@ -105,7 +109,8 @@ default Mono store(Publisher content, @Nullable String fil * @param filename must not be {@literal null} or empty. * @param contentType can be {@literal null}. * @param metadata can be {@literal null} - * @return the {@link ObjectId} of the {@link com.mongodb.client.gridfs.model.GridFSFile} just created. + * @return a {@link Mono} emitting the {@link ObjectId} of the {@link com.mongodb.client.gridfs.model.GridFSFile} just + * created. */ Mono store(AsyncInputStream content, @Nullable String filename, @Nullable String contentType, @Nullable Object metadata); @@ -118,7 +123,8 @@ Mono store(AsyncInputStream content, @Nullable String filename, @Nulla * @param filename must not be {@literal null} or empty. * @param contentType can be {@literal null}. * @param metadata can be {@literal null} - * @return the {@link ObjectId} of the {@link com.mongodb.client.gridfs.model.GridFSFile} just created. + * @return a {@link Mono} emitting the {@link ObjectId} of the {@link com.mongodb.client.gridfs.model.GridFSFile} just + * created. */ Mono store(Publisher content, @Nullable String filename, @Nullable String contentType, @Nullable Object metadata); @@ -129,7 +135,8 @@ Mono store(Publisher content, @Nullable String filename, @ * @param content must not be {@literal null}. * @param filename must not be {@literal null} or empty. * @param metadata can be {@literal null}. - * @return the {@link ObjectId} of the {@link com.mongodb.client.gridfs.model.GridFSFile} just created. + * @return a {@link Mono} emitting the {@link ObjectId} of the {@link com.mongodb.client.gridfs.model.GridFSFile} just + * created. */ default Mono store(Publisher content, @Nullable String filename, @Nullable Document metadata) { return store(content, filename, null, metadata); @@ -142,63 +149,85 @@ default Mono store(Publisher content, @Nullable String fil * @param filename must not be {@literal null} or empty. * @param contentType can be {@literal null}. * @param metadata can be {@literal null}. - * @return the {@link ObjectId} of the {@link com.mongodb.client.gridfs.model.GridFSFile} just created. + * @return a {@link Mono} emitting the {@link ObjectId} of the {@link com.mongodb.client.gridfs.model.GridFSFile} just + * created. */ Mono store(AsyncInputStream content, @Nullable String filename, @Nullable String contentType, @Nullable Document metadata); - Mono store(Publisher content, String filename, String contentType, Document metadata); + /** + * Stores the given content into a file with the given name and content type using the given metadata. + * + * @param content must not be {@literal null}. + * @param filename must not be {@literal null} or empty. + * @param contentType can be {@literal null}. + * @param metadata can be {@literal null}. + * @return a {@link Mono} emitting the {@link ObjectId} of the {@link com.mongodb.client.gridfs.model.GridFSFile} just + * created. + */ + Mono store(Publisher content, @Nullable String filename, @Nullable String contentType, + @Nullable Document metadata); /** - * Returns all files matching the given query. Note, that currently {@link Sort} criterias defined at the - * {@link Query} will not be regarded as MongoDB does not support ordering for GridFS file access. + * Returns a {@link Flux} emitting all files matching the given query.
+ * Note: Currently {@link Sort} criteria defined at the {@link Query} will not be regarded as MongoDB + * does not support ordering for GridFS file access. * * @see MongoDB Jira: JAVA-431 * @param query must not be {@literal null}. - * @return {@link GridFSFindIterable} to obtain results from. Eg. by calling - * {@link GridFSFindIterable#into(java.util.Collection)}. + * @return {@link Flux#empty()} if no mach found. */ Flux find(Query query); /** - * Returns a single {@link com.mongodb.client.gridfs.model.GridFSFile} matching the given query or {@literal null} in - * case no file matches. + * Returns a {@link Mono} emitting a single {@link com.mongodb.client.gridfs.model.GridFSFile} matching the given + * query or {@link Mono#empty()} in case no file matches.
+ * NOTE If more than one file matches the given query the resulting {@link Mono} emits an error. If + * you want to obtain the first found file use {@link #findFirst(Query)}. * * @param query must not be {@literal null}. - * @return + * @return {@link Mono#empty()} if not match found. */ Mono findOne(Query query); + /** + * Returns a {@link Mono} emitting the frist {@link com.mongodb.client.gridfs.model.GridFSFile} matching the given + * query or {@link Mono#empty()} in case no file matches. + * + * @param query must not be {@literal null}. + * @return {@link Mono#empty()} if not match found. + */ + Mono findFirst(Query query); + /** * Deletes all files matching the given {@link Query}. * * @param query must not be {@literal null}. + * @return a {@link Mono} signalling operation completion. */ Mono delete(Query query); /** - * Returns the {@link GridFsResource} with the given file name. + * Returns a {@link Mono} emitting the {@link ReactiveGridFsResource} with the given file name. * * @param filename must not be {@literal null}. - * @return the resource. Use {@link org.springframework.core.io.Resource#exists()} to check if the returned - * {@link GridFsResource} is actually present. - * @see ResourcePatternResolver#getResource(String) + * @return {@link Mono#empty()} if no match found. */ Mono getResource(String filename); /** - * Returns the {@link GridFsResource} for a {@link GridFSFile}. + * Returns a {@link Mono} emitting the {@link ReactiveGridFsResource} for a {@link GridFSFile}. * * @param file must not be {@literal null}. - * @return the resource for the file. + * @return {@link Mono#empty()} if no match found. */ Mono getResource(GridFSFile file); /** - * Returns all {@link GridFsResource}s matching the given file name pattern. + * Returns a {@link Flux} emitting all {@link ReactiveGridFsResource}s matching the given file name pattern. * * @param filenamePattern must not be {@literal null}. - * @return + * @return {@link Flux#empty()} if no match found. */ Flux getResources(String filenamePattern); } diff --git a/spring-data-mongodb/src/main/java/org/springframework/data/mongodb/gridfs/ReactiveGridFsTemplate.java b/spring-data-mongodb/src/main/java/org/springframework/data/mongodb/gridfs/ReactiveGridFsTemplate.java index 4f13ded274..2c182c6871 100644 --- a/spring-data-mongodb/src/main/java/org/springframework/data/mongodb/gridfs/ReactiveGridFsTemplate.java +++ b/spring-data-mongodb/src/main/java/org/springframework/data/mongodb/gridfs/ReactiveGridFsTemplate.java @@ -21,24 +21,22 @@ import reactor.core.publisher.Flux; import reactor.core.publisher.Mono; -import java.util.Optional; - import org.bson.Document; import org.bson.types.ObjectId; import org.reactivestreams.Publisher; import org.springframework.core.io.buffer.DataBuffer; import org.springframework.core.io.buffer.DataBufferFactory; import org.springframework.core.io.buffer.DefaultDataBufferFactory; +import org.springframework.dao.IncorrectResultSizeDataAccessException; import org.springframework.data.mongodb.ReactiveMongoDatabaseFactory; import org.springframework.data.mongodb.core.convert.MongoConverter; -import org.springframework.data.mongodb.core.convert.QueryMapper; import org.springframework.data.mongodb.core.query.Query; +import org.springframework.data.mongodb.core.query.SerializationUtils; import org.springframework.lang.Nullable; import org.springframework.util.Assert; import org.springframework.util.StringUtils; import com.mongodb.client.gridfs.model.GridFSFile; -import com.mongodb.client.gridfs.model.GridFSUploadOptions; import com.mongodb.reactivestreams.client.MongoDatabase; import com.mongodb.reactivestreams.client.gridfs.AsyncInputStream; import com.mongodb.reactivestreams.client.gridfs.GridFSBucket; @@ -53,13 +51,11 @@ * @author Mark Paluch * @since 2.2 */ -public class ReactiveGridFsTemplate implements ReactiveGridFsOperations { +public class ReactiveGridFsTemplate extends GridFsOperationsSupport implements ReactiveGridFsOperations { - private final DataBufferFactory dataBufferFactory; private final ReactiveMongoDatabaseFactory dbFactory; + private final DataBufferFactory dataBufferFactory; private final @Nullable String bucket; - private final MongoConverter converter; - private final QueryMapper queryMapper; /** * Creates a new {@link ReactiveGridFsTemplate} using the given {@link ReactiveMongoDatabaseFactory} and @@ -97,16 +93,14 @@ public ReactiveGridFsTemplate(ReactiveMongoDatabaseFactory dbFactory, MongoConve public ReactiveGridFsTemplate(DataBufferFactory dataBufferFactory, ReactiveMongoDatabaseFactory dbFactory, MongoConverter converter, @Nullable String bucket) { + super(converter); + Assert.notNull(dataBufferFactory, "DataBufferFactory must not be null!"); Assert.notNull(dbFactory, "ReactiveMongoDatabaseFactory must not be null!"); - Assert.notNull(converter, "MongoConverter must not be null!"); this.dataBufferFactory = dataBufferFactory; this.dbFactory = dbFactory; - this.converter = converter; this.bucket = bucket; - - this.queryMapper = new QueryMapper(converter); } /* @@ -138,22 +132,7 @@ public Mono store(AsyncInputStream content, @Nullable String filename, @Nullable Document metadata) { Assert.notNull(content, "InputStream must not be null!"); - - GridFSUploadOptions options = new GridFSUploadOptions(); - - Document mData = new Document(); - - if (StringUtils.hasText(contentType)) { - mData.put(GridFsResource.CONTENT_TYPE_FIELD, contentType); - } - - if (metadata != null) { - mData.putAll(metadata); - } - - options.metadata(mData); - - return Mono.from(getGridFs().uploadFromStream(filename, content, options)); + return Mono.from(getGridFs().uploadFromStream(filename, content, computeUploadOptionsFor(contentType, metadata))); } /* @@ -175,10 +154,7 @@ public Mono store(Publisher content, @Nullable String file */ @Override public Flux find(Query query) { - - GridFSFindPublisher publisherToUse = prepareQuery(query); - - return Flux.from(publisherToUse); + return Flux.from(prepareQuery(query)); } /* @@ -188,9 +164,29 @@ public Flux find(Query query) { @Override public Mono findOne(Query query) { - GridFSFindPublisher publisherToUse = prepareQuery(query); + return Flux.from(prepareQuery(query).limit(2)) // + .collectList() // + .flatMap(it -> { + if (it.isEmpty()) { + return Mono.empty(); + } - return Flux.from(publisherToUse.limit(1)).next(); + if (it.size() > 1) { + return Mono.error(new IncorrectResultSizeDataAccessException( + "Query " + SerializationUtils.serializeToJsonSafely(query) + " returned non unique result.", 1)); + } + + return Mono.just(it.get(0)); + }); + } + + /* + * (non-Javadoc) + * @see org.springframework.data.mongodb.gridfs.ReactiveGridFsOperations#findFirst(org.springframework.data.mongodb.core.query.Query) + */ + @Override + public Mono findFirst(Query query) { + return Flux.from(prepareQuery(query).limit(1)).next(); } /* @@ -199,9 +195,7 @@ public Mono findOne(Query query) { */ @Override public Mono delete(Query query) { - - GridFSBucket gridFs = getGridFs(); - return find(query).flatMap(it -> gridFs.delete(it.getId())).then(); + return find(query).flatMap(it -> getGridFs().delete(it.getId())).then(); } /* @@ -273,25 +267,9 @@ protected GridFSFindPublisher prepareQuery(Query query) { return publisherToUse; } - private Document getMappedQuery(Document query) { - return queryMapper.getMappedObject(query, Optional.empty()); - } - protected GridFSBucket getGridFs() { MongoDatabase db = dbFactory.getMongoDatabase(); return bucket == null ? GridFSBuckets.create(db) : GridFSBuckets.create(db, bucket); } - - @Nullable - private Document toDocument(@Nullable Object metadata) { - - Document document = null; - - if (metadata != null) { - document = new Document(); - converter.write(metadata, document); - } - return document; - } } diff --git a/spring-data-mongodb/src/test/java/org/springframework/data/mongodb/gridfs/ReactiveGridFsTemplateIntegrationTests.java b/spring-data-mongodb/src/test/java/org/springframework/data/mongodb/gridfs/ReactiveGridFsTemplateTests.java similarity index 65% rename from spring-data-mongodb/src/test/java/org/springframework/data/mongodb/gridfs/ReactiveGridFsTemplateIntegrationTests.java rename to spring-data-mongodb/src/test/java/org/springframework/data/mongodb/gridfs/ReactiveGridFsTemplateTests.java index dea663eb51..879bbdf1a7 100644 --- a/spring-data-mongodb/src/test/java/org/springframework/data/mongodb/gridfs/ReactiveGridFsTemplateIntegrationTests.java +++ b/spring-data-mongodb/src/test/java/org/springframework/data/mongodb/gridfs/ReactiveGridFsTemplateTests.java @@ -16,10 +16,11 @@ package org.springframework.data.mongodb.gridfs; import static org.assertj.core.api.Assertions.*; -import static org.springframework.data.mongodb.core.query.Criteria.*; +import static org.springframework.data.mongodb.core.query.Criteria.where; import static org.springframework.data.mongodb.core.query.Query.*; import static org.springframework.data.mongodb.gridfs.GridFsCriteria.*; +import org.springframework.dao.IncorrectResultSizeDataAccessException; import reactor.core.publisher.Flux; import reactor.test.StepVerifier; @@ -46,11 +47,14 @@ import com.mongodb.reactivestreams.client.gridfs.helpers.AsyncStreamHelper; /** + * Integration tests for {@link ReactiveGridFsTemplate}. + * * @author Mark Paluch + * @author Christoph Strobl */ @RunWith(SpringRunner.class) @ContextConfiguration("classpath:gridfs/reactive-gridfs.xml") -public class ReactiveGridFsTemplateIntegrationTests { +public class ReactiveGridFsTemplateTests { Resource resource = new ClassPathResource("gridfs/gridfs.xml"); @@ -58,6 +62,7 @@ public class ReactiveGridFsTemplateIntegrationTests { @Before public void setUp() { + operations.delete(new Query()) // .as(StepVerifier::create) // .verifyComplete(); @@ -76,7 +81,8 @@ public void storesAndFindsSimpleDocument() { .as(StepVerifier::create) // .assertNext(actual -> { assertThat(((BsonObjectId) actual.getId()).getValue()).isEqualTo(reference); - }).verifyComplete(); + }) // + .verifyComplete(); } @Test // DATAMONGO-1855 @@ -92,7 +98,7 @@ public void writesMetadataCorrectly() throws IOException { .as(StepVerifier::create) // .consumeNextWith(actual -> { assertThat(actual.getObjectId()).isEqualTo(reference); - })// + }) // .verifyComplete(); } @@ -110,7 +116,8 @@ public void marshalsComplexMetadata() throws IOException { .as(StepVerifier::create) // .consumeNextWith(actual -> { assertThat(actual.getObjectId()).isEqualTo(reference); - })// + assertThat(actual.getMetadata()).containsEntry("version", "1.0"); + }) // .verifyComplete(); } @@ -124,14 +131,69 @@ public void getResourceShouldRetrieveContentByIdentity() throws IOException { operations.findOne(query(where("_id").is(reference))).flatMap(operations::getResource) .flatMapMany(ReactiveGridFsResource::getDownloadStream) // - .transform(DataBufferUtils::join).as(StepVerifier::create) // + .transform(DataBufferUtils::join) // + .as(StepVerifier::create) // + .consumeNextWith(dataBuffer -> { + + byte[] actual = new byte[dataBuffer.readableByteCount()]; + dataBuffer.read(actual); + + assertThat(actual).isEqualTo(content); + }) // + .verifyComplete(); + } + + @Test // DATAMONGO-1855 + public void shouldEmitFirstEntryWhenFindFirstRetrievesMoreThanOneResult() throws IOException { + + AsyncInputStream upload1 = AsyncStreamHelper.toAsyncInputStream(resource.getInputStream()); + AsyncInputStream upload2 = AsyncStreamHelper.toAsyncInputStream(new ClassPathResource("gridfs/another-resource.xml").getInputStream()); + + operations.store(upload1, "foo.xml", null, null).block(); + operations.store(upload2, "foo2.xml", null, null).block(); + + operations.findFirst(query(where("filename").regex("foo*"))) // + .flatMap(operations::getResource) // + .as(StepVerifier::create) // + .expectNextCount(1) // + .verifyComplete(); + } + + @Test // DATAMONGO-1855 + public void shouldEmitErrorWhenFindOneRetrievesMoreThanOneResult() throws IOException { + + AsyncInputStream upload1 = AsyncStreamHelper.toAsyncInputStream(resource.getInputStream()); + AsyncInputStream upload2 = AsyncStreamHelper.toAsyncInputStream(new ClassPathResource("gridfs/another-resource.xml").getInputStream()); + + operations.store(upload1, "foo.xml", null, null).block(); + operations.store(upload2, "foo2.xml", null, null).block(); + + operations.findOne(query(where("filename").regex("foo*"))) // + .as(StepVerifier::create) // + .expectError(IncorrectResultSizeDataAccessException.class) // + .verify(); + } + + @Test // DATAMONGO-1855 + public void getResourcesByPattern() throws IOException { + + byte[] content = StreamUtils.copyToByteArray(resource.getInputStream()); + AsyncInputStream upload = AsyncStreamHelper.toAsyncInputStream(resource.getInputStream()); + + operations.store(upload, "foo.xml", null, null).block(); + + operations.getResources("foo*") // + .flatMap(ReactiveGridFsResource::getDownloadStream) // + .transform(DataBufferUtils::join) // + .as(StepVerifier::create) // .consumeNextWith(dataBuffer -> { byte[] actual = new byte[dataBuffer.readableByteCount()]; dataBuffer.read(actual); assertThat(actual).isEqualTo(content); - }).verifyComplete(); + }) // + .verifyComplete(); } static class Metadata {