diff --git a/pom.xml b/pom.xml index 657607148e..378af07e5d 100644 --- a/pom.xml +++ b/pom.xml @@ -5,7 +5,7 @@ org.springframework.data spring-data-mongodb-parent - 2.2.0.BUILD-SNAPSHOT + 2.2.0.DATAMONGO-1855-SNAPSHOT pom Spring Data MongoDB diff --git a/spring-data-mongodb-benchmarks/pom.xml b/spring-data-mongodb-benchmarks/pom.xml index c2ff37b35c..cd2ab5857a 100644 --- a/spring-data-mongodb-benchmarks/pom.xml +++ b/spring-data-mongodb-benchmarks/pom.xml @@ -7,7 +7,7 @@ org.springframework.data spring-data-mongodb-parent - 2.2.0.BUILD-SNAPSHOT + 2.2.0.DATAMONGO-1855-SNAPSHOT ../pom.xml diff --git a/spring-data-mongodb-cross-store/pom.xml b/spring-data-mongodb-cross-store/pom.xml index fd36f227c0..3c604ed0d9 100644 --- a/spring-data-mongodb-cross-store/pom.xml +++ b/spring-data-mongodb-cross-store/pom.xml @@ -6,7 +6,7 @@ org.springframework.data spring-data-mongodb-parent - 2.2.0.BUILD-SNAPSHOT + 2.2.0.DATAMONGO-1855-SNAPSHOT ../pom.xml @@ -50,7 +50,7 @@ org.springframework.data spring-data-mongodb - 2.2.0.BUILD-SNAPSHOT + 2.2.0.DATAMONGO-1855-SNAPSHOT diff --git a/spring-data-mongodb-distribution/pom.xml b/spring-data-mongodb-distribution/pom.xml index fc8d28a2b6..40cdba2c1a 100644 --- a/spring-data-mongodb-distribution/pom.xml +++ b/spring-data-mongodb-distribution/pom.xml @@ -14,7 +14,7 @@ org.springframework.data spring-data-mongodb-parent - 2.2.0.BUILD-SNAPSHOT + 2.2.0.DATAMONGO-1855-SNAPSHOT ../pom.xml diff --git a/spring-data-mongodb/pom.xml b/spring-data-mongodb/pom.xml index f3c85a046a..80578399a4 100644 --- a/spring-data-mongodb/pom.xml +++ b/spring-data-mongodb/pom.xml @@ -11,7 +11,7 @@ org.springframework.data spring-data-mongodb-parent - 2.2.0.BUILD-SNAPSHOT + 2.2.0.DATAMONGO-1855-SNAPSHOT ../pom.xml diff --git a/spring-data-mongodb/src/main/java/org/springframework/data/mongodb/gridfs/AsyncInputStreamAdapter.java b/spring-data-mongodb/src/main/java/org/springframework/data/mongodb/gridfs/AsyncInputStreamAdapter.java new file mode 100644 index 0000000000..d57268388a --- /dev/null +++ b/spring-data-mongodb/src/main/java/org/springframework/data/mongodb/gridfs/AsyncInputStreamAdapter.java @@ -0,0 +1,257 @@ +/* + * Copyright 2019 the original author or authors. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.springframework.data.mongodb.gridfs; + +import lombok.RequiredArgsConstructor; +import reactor.core.CoreSubscriber; +import reactor.core.publisher.Mono; +import reactor.core.publisher.Operators; +import reactor.util.concurrent.Queues; +import reactor.util.context.Context; + +import java.nio.ByteBuffer; +import java.util.Queue; +import java.util.concurrent.atomic.AtomicIntegerFieldUpdater; +import java.util.concurrent.atomic.AtomicLongFieldUpdater; +import java.util.function.BiConsumer; + +import org.reactivestreams.Publisher; +import org.reactivestreams.Subscription; +import org.springframework.core.io.buffer.DataBuffer; +import org.springframework.core.io.buffer.DataBufferUtils; +import org.springframework.core.io.buffer.DefaultDataBufferFactory; + +import com.mongodb.reactivestreams.client.Success; +import com.mongodb.reactivestreams.client.gridfs.AsyncInputStream; + +/** + * Adapter accepting a binary stream {@link Publisher} and emitting its through {@link AsyncInputStream}. + *

+ * This adapter subscribes to the binary {@link Publisher} as soon as the first chunk gets {@link #read(ByteBuffer) + * requested}. Requests are queued and binary chunks are requested from the {@link Publisher}. As soon as the + * {@link Publisher} emits items, chunks are provided to the read request which completes the number-of-written-bytes + * {@link Publisher}. + *

+ * {@link AsyncInputStream} is supposed to work as sequential callback API that is called until reaching EOF. + * {@link #close()} is propagated as cancellation signal to the binary {@link Publisher}. + * + * @author Mark Paluch + * @author Christoph Strobl + * @since 2.2 + */ +@RequiredArgsConstructor +class AsyncInputStreamAdapter implements AsyncInputStream { + + private static final AtomicLongFieldUpdater DEMAND = AtomicLongFieldUpdater + .newUpdater(AsyncInputStreamAdapter.class, "demand"); + + private static final AtomicIntegerFieldUpdater SUBSCRIBED = AtomicIntegerFieldUpdater + .newUpdater(AsyncInputStreamAdapter.class, "subscribed"); + + private static final int SUBSCRIPTION_NOT_SUBSCRIBED = 0; + private static final int SUBSCRIPTION_SUBSCRIBED = 1; + + private final Publisher buffers; + private final Context subscriberContext; + private final DefaultDataBufferFactory factory = new DefaultDataBufferFactory(); + + private volatile Subscription subscription; + private volatile boolean cancelled; + private volatile boolean complete; + private volatile Throwable error; + private final Queue> readRequests = Queues.> small() + .get(); + + // see DEMAND + volatile long demand; + + // see SUBSCRIBED + volatile int subscribed = SUBSCRIPTION_NOT_SUBSCRIBED; + + /* + * (non-Javadoc) + * @see com.mongodb.reactivestreams.client.gridfs.AsyncInputStream#read(java.nio.ByteBuffer) + */ + @Override + public Publisher read(ByteBuffer dst) { + + return Mono.create(sink -> { + + readRequests.offer((db, bytecount) -> { + + try { + + if (error != null) { + + sink.error(error); + return; + } + + if (bytecount == -1) { + + sink.success(-1); + return; + } + + ByteBuffer byteBuffer = db.asByteBuffer(); + int toWrite = byteBuffer.remaining(); + + dst.put(byteBuffer); + sink.success(toWrite); + + } catch (Exception e) { + sink.error(e); + } finally { + DataBufferUtils.release(db); + } + }); + + request(1); + }); + } + + /* + * (non-Javadoc) + * @see com.mongodb.reactivestreams.client.gridfs.AsyncInputStream#close() + */ + @Override + public Publisher close() { + + return Mono.create(sink -> { + + cancelled = true; + + if (error != null) { + sink.error(error); + return; + } + + sink.success(Success.SUCCESS); + }); + } + + protected void request(int n) { + + if (complete) { + + terminatePendingReads(); + return; + } + + Operators.addCap(DEMAND, this, n); + + if (SUBSCRIBED.get(this) == SUBSCRIPTION_NOT_SUBSCRIBED) { + + if (SUBSCRIBED.compareAndSet(this, SUBSCRIPTION_NOT_SUBSCRIBED, SUBSCRIPTION_SUBSCRIBED)) { + buffers.subscribe(new DataBufferCoreSubscriber()); + } + + } else { + + Subscription subscription = this.subscription; + + if (subscription != null) { + requestFromSubscription(subscription); + } + } + } + + void requestFromSubscription(Subscription subscription) { + + long demand = DEMAND.get(AsyncInputStreamAdapter.this); + + if (cancelled) { + subscription.cancel(); + } + + if (demand > 0 && DEMAND.compareAndSet(AsyncInputStreamAdapter.this, demand, demand - 1)) { + subscription.request(1); + } + } + + /** + * Terminates pending reads with empty buffers. + */ + void terminatePendingReads() { + + BiConsumer readers; + + while ((readers = readRequests.poll()) != null) { + readers.accept(factory.wrap(new byte[0]), -1); + } + } + + private class DataBufferCoreSubscriber implements CoreSubscriber { + + @Override + public Context currentContext() { + return AsyncInputStreamAdapter.this.subscriberContext; + } + + @Override + public void onSubscribe(Subscription s) { + + AsyncInputStreamAdapter.this.subscription = s; + + Operators.addCap(DEMAND, AsyncInputStreamAdapter.this, -1); + s.request(1); + } + + @Override + public void onNext(DataBuffer dataBuffer) { + + if (cancelled || complete) { + DataBufferUtils.release(dataBuffer); + Operators.onNextDropped(dataBuffer, AsyncInputStreamAdapter.this.subscriberContext); + return; + } + + BiConsumer poll = AsyncInputStreamAdapter.this.readRequests.poll(); + + if (poll == null) { + + DataBufferUtils.release(dataBuffer); + Operators.onNextDropped(dataBuffer, AsyncInputStreamAdapter.this.subscriberContext); + subscription.cancel(); + return; + } + + poll.accept(dataBuffer, dataBuffer.readableByteCount()); + + requestFromSubscription(subscription); + } + + @Override + public void onError(Throwable t) { + + if (AsyncInputStreamAdapter.this.cancelled || AsyncInputStreamAdapter.this.complete) { + Operators.onErrorDropped(t, AsyncInputStreamAdapter.this.subscriberContext); + return; + } + + AsyncInputStreamAdapter.this.error = t; + AsyncInputStreamAdapter.this.complete = true; + terminatePendingReads(); + } + + @Override + public void onComplete() { + + AsyncInputStreamAdapter.this.complete = true; + terminatePendingReads(); + } + } +} diff --git a/spring-data-mongodb/src/main/java/org/springframework/data/mongodb/gridfs/BinaryStreamAdapters.java b/spring-data-mongodb/src/main/java/org/springframework/data/mongodb/gridfs/BinaryStreamAdapters.java new file mode 100644 index 0000000000..ae30151bd2 --- /dev/null +++ b/spring-data-mongodb/src/main/java/org/springframework/data/mongodb/gridfs/BinaryStreamAdapters.java @@ -0,0 +1,78 @@ +/* + * Copyright 2019 the original author or authors. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.springframework.data.mongodb.gridfs; + +import reactor.core.publisher.Flux; +import reactor.core.publisher.Mono; + +import org.reactivestreams.Publisher; +import org.springframework.core.io.buffer.DataBuffer; +import org.springframework.core.io.buffer.DataBufferFactory; +import org.springframework.core.io.buffer.DataBufferUtils; + +import com.mongodb.reactivestreams.client.gridfs.AsyncInputStream; + +/** + * Utility methods to create adapters from between {@link Publisher} of {@link DataBuffer} and {@link AsyncInputStream}. + * + * @author Mark Paluch + * @since 2.2 + */ +class BinaryStreamAdapters { + + /** + * Creates a {@link Flux} emitting {@link DataBuffer} by reading binary chunks from {@link AsyncInputStream}. + * Publisher termination (completion, error, cancellation) closes the {@link AsyncInputStream}. + *

+ * The resulting {@link org.reactivestreams.Publisher} filters empty binary chunks and uses {@link DataBufferFactory} + * settings to determine the chunk size. + * + * @param inputStream must not be {@literal null}. + * @param dataBufferFactory must not be {@literal null}. + * @return {@link Flux} emitting {@link DataBuffer}s. + * @see DataBufferFactory#allocateBuffer() + */ + static Flux toPublisher(AsyncInputStream inputStream, DataBufferFactory dataBufferFactory) { + + return DataBufferPublisherAdapter.createBinaryStream(inputStream, dataBufferFactory) // + .filter(it -> { + + if (it.readableByteCount() == 0) { + DataBufferUtils.release(it); + return false; + } + return true; + }); + } + + /** + * Creates a {@link Mono} emitting a {@link AsyncInputStream} to consume a {@link Publisher} emitting + * {@link DataBuffer} and exposing the binary stream through {@link AsyncInputStream}. {@link DataBuffer}s are + * released by the adapter during consumption. + *

+ * This method returns a {@link Mono} to retain the {@link reactor.util.context.Context subscriber context}. + * + * @param dataBuffers must not be {@literal null}. + * @return {@link Mono} emitting {@link AsyncInputStream}. + * @see DataBufferUtils#release(DataBuffer) + */ + static Mono toAsyncInputStream(Publisher dataBuffers) { + + return Mono.create(sink -> { + sink.success(new AsyncInputStreamAdapter(dataBuffers, sink.currentContext())); + }); + } +} diff --git a/spring-data-mongodb/src/main/java/org/springframework/data/mongodb/gridfs/DataBufferPublisherAdapter.java b/spring-data-mongodb/src/main/java/org/springframework/data/mongodb/gridfs/DataBufferPublisherAdapter.java new file mode 100644 index 0000000000..df98508fe9 --- /dev/null +++ b/spring-data-mongodb/src/main/java/org/springframework/data/mongodb/gridfs/DataBufferPublisherAdapter.java @@ -0,0 +1,219 @@ +/* + * Copyright 2019 the original author or authors. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.springframework.data.mongodb.gridfs; + +import lombok.RequiredArgsConstructor; +import reactor.core.CoreSubscriber; +import reactor.core.publisher.Flux; +import reactor.core.publisher.FluxSink; +import reactor.core.publisher.Mono; +import reactor.core.publisher.Operators; +import reactor.util.context.Context; + +import java.nio.ByteBuffer; +import java.util.concurrent.atomic.AtomicIntegerFieldUpdater; +import java.util.concurrent.atomic.AtomicLongFieldUpdater; + +import org.reactivestreams.Publisher; +import org.reactivestreams.Subscription; +import org.springframework.core.io.buffer.DataBuffer; +import org.springframework.core.io.buffer.DataBufferFactory; +import org.springframework.core.io.buffer.DataBufferUtils; + +import com.mongodb.reactivestreams.client.gridfs.AsyncInputStream; + +/** + * Utility to adapt a {@link AsyncInputStream} to a {@link Publisher} emitting {@link DataBuffer}. + * + * @author Mark Paluch + * @author Christoph Strobl + * @since 2.2 + */ +class DataBufferPublisherAdapter { + + /** + * Creates a {@link Publisher} emitting {@link DataBuffer}s by reading binary chunks from {@link AsyncInputStream}. + * Closes the {@link AsyncInputStream} once the {@link Publisher} terminates. + * + * @param inputStream must not be {@literal null}. + * @param dataBufferFactory must not be {@literal null}. + * @return the resulting {@link Publisher}. + */ + static Flux createBinaryStream(AsyncInputStream inputStream, DataBufferFactory dataBufferFactory) { + + State state = new State(inputStream, dataBufferFactory); + + return Flux.usingWhen(Mono.just(inputStream), it -> { + + return Flux. create((sink) -> { + + sink.onDispose(state::close); + sink.onCancel(state::close); + + sink.onRequest(n -> { + state.request(sink, n); + }); + }); + }, AsyncInputStream::close, AsyncInputStream::close, AsyncInputStream::close) // + .concatMap(Flux::just, 1); + } + + @RequiredArgsConstructor + static class State { + + private static final AtomicLongFieldUpdater DEMAND = AtomicLongFieldUpdater.newUpdater(State.class, "demand"); + + private static final AtomicIntegerFieldUpdater STATE = AtomicIntegerFieldUpdater.newUpdater(State.class, "state"); + + private static final AtomicIntegerFieldUpdater READ = AtomicIntegerFieldUpdater.newUpdater(State.class, "read"); + + private static final int STATE_OPEN = 0; + private static final int STATE_CLOSED = 1; + + private static final int READ_NONE = 0; + private static final int READ_IN_PROGRESS = 1; + + final AsyncInputStream inputStream; + final DataBufferFactory dataBufferFactory; + + // see DEMAND + volatile long demand; + + // see STATE + volatile int state = STATE_OPEN; + + // see READ_IN_PROGRESS + volatile int read = READ_NONE; + + void request(FluxSink sink, long n) { + + Operators.addCap(DEMAND, this, n); + + if (onShouldRead()) { + emitNext(sink); + } + } + + boolean onShouldRead() { + return !isClosed() && getDemand() > 0 && onWantRead(); + } + + boolean onWantRead() { + return READ.compareAndSet(this, READ_NONE, READ_IN_PROGRESS); + } + + boolean onReadDone() { + return READ.compareAndSet(this, READ_IN_PROGRESS, READ_NONE); + } + + long getDemand() { + return DEMAND.get(this); + } + + void close() { + STATE.compareAndSet(this, STATE_OPEN, STATE_CLOSED); + } + + boolean isClosed() { + return STATE.get(this) == STATE_CLOSED; + } + + /** + * Emit the next {@link DataBuffer}. + * + * @param sink + */ + void emitNext(FluxSink sink) { + + DataBuffer dataBuffer = dataBufferFactory.allocateBuffer(); + ByteBuffer intermediate = ByteBuffer.allocate(dataBuffer.capacity()); + + Mono.from(inputStream.read(intermediate)).subscribe(new BufferCoreSubscriber(sink, dataBuffer, intermediate)); + } + + private class BufferCoreSubscriber implements CoreSubscriber { + + private final FluxSink sink; + private final DataBuffer dataBuffer; + private final ByteBuffer intermediate; + + BufferCoreSubscriber(FluxSink sink, DataBuffer dataBuffer, ByteBuffer intermediate) { + + this.sink = sink; + this.dataBuffer = dataBuffer; + this.intermediate = intermediate; + } + + @Override + public Context currentContext() { + return sink.currentContext(); + } + + @Override + public void onSubscribe(Subscription s) { + s.request(1); + } + + @Override + public void onNext(Integer bytes) { + + if (isClosed()) { + + onReadDone(); + DataBufferUtils.release(dataBuffer); + Operators.onNextDropped(dataBuffer, sink.currentContext()); + return; + } + + intermediate.flip(); + dataBuffer.write(intermediate); + + sink.next(dataBuffer); + + try { + if (bytes == -1) { + sink.complete(); + } + } finally { + onReadDone(); + } + } + + @Override + public void onError(Throwable t) { + + if (isClosed()) { + + Operators.onErrorDropped(t, sink.currentContext()); + return; + } + + onReadDone(); + DataBufferUtils.release(dataBuffer); + Operators.onNextDropped(dataBuffer, sink.currentContext()); + sink.error(t); + } + + @Override + public void onComplete() { + + if (onShouldRead()) { + emitNext(sink); + } + } + } + } +} diff --git a/spring-data-mongodb/src/main/java/org/springframework/data/mongodb/gridfs/GridFsOperationsSupport.java b/spring-data-mongodb/src/main/java/org/springframework/data/mongodb/gridfs/GridFsOperationsSupport.java new file mode 100644 index 0000000000..8d4bd9f5a3 --- /dev/null +++ b/spring-data-mongodb/src/main/java/org/springframework/data/mongodb/gridfs/GridFsOperationsSupport.java @@ -0,0 +1,104 @@ +/* + * Copyright 2019 the original author or authors. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.springframework.data.mongodb.gridfs; + +import java.util.Optional; + +import org.bson.Document; +import org.springframework.data.mongodb.core.convert.MongoConverter; +import org.springframework.data.mongodb.core.convert.QueryMapper; +import org.springframework.lang.Nullable; +import org.springframework.util.Assert; +import org.springframework.util.StringUtils; + +import com.mongodb.client.gridfs.model.GridFSUploadOptions; + +/** + * Base class offering common tasks like query mapping and {@link GridFSUploadOptions} computation to be shared across + * imperative and reactive implementations. + * + * @author Christoph Strobl + * @since 2.2 + */ +class GridFsOperationsSupport { + + private final QueryMapper queryMapper; + private final MongoConverter converter; + + /** + * @param converter must not be {@literal null}. + */ + GridFsOperationsSupport(MongoConverter converter) { + + Assert.notNull(converter, "MongoConverter must not be null!"); + + this.converter = converter; + this.queryMapper = new QueryMapper(converter); + } + + /** + * @param query pass the given query though a {@link QueryMapper} to apply type conversion. + * @return never {@literal null}. + */ + protected Document getMappedQuery(Document query) { + return queryMapper.getMappedObject(query, Optional.empty()); + } + + /** + * Compute the {@link GridFSUploadOptions} to be used from the given {@literal contentType} and {@literal metadata} + * {@link Document}. + * + * @param contentType can be {@literal null}. + * @param metadata can be {@literal null} + * @return never {@literal null}. + */ + protected GridFSUploadOptions computeUploadOptionsFor(@Nullable String contentType, @Nullable Document metadata) { + + Document targetMetadata = new Document(); + + if (StringUtils.hasText(contentType)) { + targetMetadata.put(GridFsResource.CONTENT_TYPE_FIELD, contentType); + } + + if (metadata != null) { + targetMetadata.putAll(metadata); + } + + GridFSUploadOptions options = new GridFSUploadOptions(); + options.metadata(targetMetadata); + + return options; + } + + /** + * Convert a given {@literal value} into a {@link Document}. + * + * @param value can be {@literal null}. + * @return an empty {@link Document} if the source value is {@literal null}. + */ + protected Document toDocument(@Nullable Object value) { + + if (value instanceof Document) { + return (Document) value; + } + + Document document = new Document(); + if (value != null) { + converter.write(value, document); + } + return document; + } +} diff --git a/spring-data-mongodb/src/main/java/org/springframework/data/mongodb/gridfs/GridFsTemplate.java b/spring-data-mongodb/src/main/java/org/springframework/data/mongodb/gridfs/GridFsTemplate.java index 3b87d2a78b..6c278ac7c3 100644 --- a/spring-data-mongodb/src/main/java/org/springframework/data/mongodb/gridfs/GridFsTemplate.java +++ b/spring-data-mongodb/src/main/java/org/springframework/data/mongodb/gridfs/GridFsTemplate.java @@ -29,7 +29,6 @@ import org.springframework.core.io.support.ResourcePatternResolver; import org.springframework.data.mongodb.MongoDbFactory; import org.springframework.data.mongodb.core.convert.MongoConverter; -import org.springframework.data.mongodb.core.convert.QueryMapper; import org.springframework.data.mongodb.core.query.Query; import org.springframework.lang.Nullable; import org.springframework.util.Assert; @@ -40,7 +39,6 @@ import com.mongodb.client.gridfs.GridFSBuckets; import com.mongodb.client.gridfs.GridFSFindIterable; import com.mongodb.client.gridfs.model.GridFSFile; -import com.mongodb.client.gridfs.model.GridFSUploadOptions; /** * {@link GridFsOperations} implementation to store content into MongoDB GridFS. @@ -54,13 +52,11 @@ * @author Hartmut Lang * @author Niklas Helge Hanft */ -public class GridFsTemplate implements GridFsOperations, ResourcePatternResolver { +public class GridFsTemplate extends GridFsOperationsSupport implements GridFsOperations, ResourcePatternResolver { private final MongoDbFactory dbFactory; private final @Nullable String bucket; - private final MongoConverter converter; - private final QueryMapper queryMapper; /** * Creates a new {@link GridFsTemplate} using the given {@link MongoDbFactory} and {@link MongoConverter}. @@ -81,14 +77,12 @@ public GridFsTemplate(MongoDbFactory dbFactory, MongoConverter converter) { */ public GridFsTemplate(MongoDbFactory dbFactory, MongoConverter converter, @Nullable String bucket) { + super(converter); + Assert.notNull(dbFactory, "MongoDbFactory must not be null!"); - Assert.notNull(converter, "MongoConverter must not be null!"); this.dbFactory = dbFactory; - this.converter = converter; this.bucket = bucket; - - this.queryMapper = new QueryMapper(converter); } /* @@ -137,16 +131,9 @@ public ObjectId store(InputStream content, @Nullable String filename, @Nullable * (non-Javadoc) * @see org.springframework.data.mongodb.gridfs.GridFsOperations#store(java.io.InputStream, java.lang.String, java.lang.String, java.lang.Object) */ - public ObjectId store(InputStream content, @Nullable String filename, @Nullable String contentType, @Nullable Object metadata) { - - Document document = null; - - if (metadata != null) { - document = new Document(); - converter.write(metadata, document); - } - - return store(content, filename, contentType, document); + public ObjectId store(InputStream content, @Nullable String filename, @Nullable String contentType, + @Nullable Object metadata) { + return store(content, filename, contentType, toDocument(metadata)); } /* @@ -161,25 +148,11 @@ public ObjectId store(InputStream content, @Nullable String filename, @Nullable * (non-Javadoc) * @see org.springframework.data.mongodb.gridfs.GridFsOperations#store(java.io.InputStream, java.lang.String, com.mongodb.Document) */ - public ObjectId store(InputStream content, @Nullable String filename, @Nullable String contentType, @Nullable Document metadata) { + public ObjectId store(InputStream content, @Nullable String filename, @Nullable String contentType, + @Nullable Document metadata) { Assert.notNull(content, "InputStream must not be null!"); - - GridFSUploadOptions options = new GridFSUploadOptions(); - - Document mData = new Document(); - - if (StringUtils.hasText(contentType)) { - mData.put(GridFsResource.CONTENT_TYPE_FIELD, contentType); - } - - if (metadata != null) { - mData.putAll(metadata); - } - - options.metadata(mData); - - return getGridFs().uploadFromStream(filename, content, options); + return getGridFs().uploadFromStream(filename, content, computeUploadOptionsFor(contentType, metadata)); } /* @@ -210,8 +183,8 @@ public GridFSFile findOne(Query query) { */ public void delete(Query query) { - for (GridFSFile x : find(query)) { - getGridFs().delete(((BsonObjectId) x.getId()).getValue()); + for (GridFSFile gridFSFile : find(query)) { + getGridFs().delete(((BsonObjectId) gridFSFile.getId()).getValue()); } } @@ -246,9 +219,9 @@ public GridFsResource getResource(GridFSFile file) { } /* - * (non-Javadoc) - * @see org.springframework.core.io.support.ResourcePatternResolver#getResources(java.lang.String) - */ + * (non-Javadoc) + * @see org.springframework.core.io.support.ResourcePatternResolver#getResources(java.lang.String) + */ public GridFsResource[] getResources(String locationPattern) { if (!StringUtils.hasText(locationPattern)) { @@ -272,10 +245,6 @@ public GridFsResource[] getResources(String locationPattern) { return new GridFsResource[] { getResource(locationPattern) }; } - private Document getMappedQuery(Document query) { - return queryMapper.getMappedObject(query, Optional.empty()); - } - private GridFSBucket getGridFs() { MongoDatabase db = dbFactory.getDb(); diff --git a/spring-data-mongodb/src/main/java/org/springframework/data/mongodb/gridfs/ReactiveGridFsOperations.java b/spring-data-mongodb/src/main/java/org/springframework/data/mongodb/gridfs/ReactiveGridFsOperations.java new file mode 100644 index 0000000000..c7f3a578db --- /dev/null +++ b/spring-data-mongodb/src/main/java/org/springframework/data/mongodb/gridfs/ReactiveGridFsOperations.java @@ -0,0 +1,233 @@ +/* + * Copyright 2019 the original author or authors. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.springframework.data.mongodb.gridfs; + +import reactor.core.publisher.Flux; +import reactor.core.publisher.Mono; + +import org.bson.Document; +import org.bson.types.ObjectId; +import org.reactivestreams.Publisher; +import org.springframework.core.io.buffer.DataBuffer; +import org.springframework.data.domain.Sort; +import org.springframework.data.mongodb.core.query.Query; +import org.springframework.lang.Nullable; + +import com.mongodb.client.gridfs.model.GridFSFile; +import com.mongodb.reactivestreams.client.gridfs.AsyncInputStream; + +/** + * Collection of operations to store and read files from MongoDB GridFS using reactive infrastructure. + * + * @author Mark Paluch + * @author Christoph Strobl + * @since 2.2 + */ +public interface ReactiveGridFsOperations { + + /** + * Stores the given content into a file with the given name. + * + * @param content must not be {@literal null}. + * @param filename must not be {@literal null} or empty. + * @return a {@link Mono} emitting the {@link ObjectId} of the {@link com.mongodb.client.gridfs.model.GridFSFile} just + * created. + */ + default Mono store(Publisher content, String filename) { + return store(content, filename, (Object) null); + } + + /** + * Stores the given content into a file applying the given metadata. + * + * @param content must not be {@literal null}. + * @param metadata can be {@literal null}. + * @return a {@link Mono} emitting the {@link ObjectId} of the {@link com.mongodb.client.gridfs.model.GridFSFile} just + * created. + */ + default Mono store(Publisher content, @Nullable Object metadata) { + return store(content, null, metadata); + } + + /** + * Stores the given content into a file applying the given metadata. + * + * @param content must not be {@literal null}. + * @param metadata can be {@literal null}. + * @return a {@link Mono} emitting the {@link ObjectId} of the {@link com.mongodb.client.gridfs.model.GridFSFile} just + * created. + */ + default Mono store(Publisher content, @Nullable Document metadata) { + return store(content, null, metadata); + } + + /** + * Stores the given content into a file with the given name and content type. + * + * @param content must not be {@literal null}. + * @param filename must not be {@literal null} or empty. + * @param contentType can be {@literal null}. + * @return a {@link Mono} emitting the {@link ObjectId} of the {@link com.mongodb.client.gridfs.model.GridFSFile} just + * created. + */ + default Mono store(Publisher content, @Nullable String filename, @Nullable String contentType) { + return store(content, filename, contentType, (Object) null); + } + + /** + * Stores the given content into a file with the given name using the given metadata. The metadata object will be + * marshalled before writing. + * + * @param content must not be {@literal null}. + * @param filename can be {@literal null} or empty. + * @param metadata can be {@literal null}. + * @return a {@link Mono} emitting the {@link ObjectId} of the {@link com.mongodb.client.gridfs.model.GridFSFile} just + * created. + */ + default Mono store(Publisher content, @Nullable String filename, @Nullable Object metadata) { + return store(content, filename, null, metadata); + } + + /** + * Stores the given content into a file with the given name and content type using the given metadata. The metadata + * object will be marshalled before writing. + * + * @param content must not be {@literal null}. + * @param filename must not be {@literal null} or empty. + * @param contentType can be {@literal null}. + * @param metadata can be {@literal null} + * @return a {@link Mono} emitting the {@link ObjectId} of the {@link com.mongodb.client.gridfs.model.GridFSFile} just + * created. + */ + Mono store(AsyncInputStream content, @Nullable String filename, @Nullable String contentType, + @Nullable Object metadata); + + /** + * Stores the given content into a file with the given name and content type using the given metadata. The metadata + * object will be marshalled before writing. + * + * @param content must not be {@literal null}. + * @param filename must not be {@literal null} or empty. + * @param contentType can be {@literal null}. + * @param metadata can be {@literal null} + * @return a {@link Mono} emitting the {@link ObjectId} of the {@link com.mongodb.client.gridfs.model.GridFSFile} just + * created. + */ + Mono store(Publisher content, @Nullable String filename, @Nullable String contentType, + @Nullable Object metadata); + + /** + * Stores the given content into a file with the given name using the given metadata. + * + * @param content must not be {@literal null}. + * @param filename must not be {@literal null} or empty. + * @param metadata can be {@literal null}. + * @return a {@link Mono} emitting the {@link ObjectId} of the {@link com.mongodb.client.gridfs.model.GridFSFile} just + * created. + */ + default Mono store(Publisher content, @Nullable String filename, @Nullable Document metadata) { + return store(content, filename, null, metadata); + } + + /** + * Stores the given content into a file with the given name and content type using the given metadata. + * + * @param content must not be {@literal null}. + * @param filename must not be {@literal null} or empty. + * @param contentType can be {@literal null}. + * @param metadata can be {@literal null}. + * @return a {@link Mono} emitting the {@link ObjectId} of the {@link com.mongodb.client.gridfs.model.GridFSFile} just + * created. + */ + Mono store(AsyncInputStream content, @Nullable String filename, @Nullable String contentType, + @Nullable Document metadata); + + /** + * Stores the given content into a file with the given name and content type using the given metadata. + * + * @param content must not be {@literal null}. + * @param filename must not be {@literal null} or empty. + * @param contentType can be {@literal null}. + * @param metadata can be {@literal null}. + * @return a {@link Mono} emitting the {@link ObjectId} of the {@link com.mongodb.client.gridfs.model.GridFSFile} just + * created. + */ + Mono store(Publisher content, @Nullable String filename, @Nullable String contentType, + @Nullable Document metadata); + + /** + * Returns a {@link Flux} emitting all files matching the given query.
+ * Note: Currently {@link Sort} criteria defined at the {@link Query} will not be regarded as MongoDB + * does not support ordering for GridFS file access. + * + * @see MongoDB Jira: JAVA-431 + * @param query must not be {@literal null}. + * @return {@link Flux#empty()} if no mach found. + */ + Flux find(Query query); + + /** + * Returns a {@link Mono} emitting a single {@link com.mongodb.client.gridfs.model.GridFSFile} matching the given + * query or {@link Mono#empty()} in case no file matches.
+ * NOTE If more than one file matches the given query the resulting {@link Mono} emits an error. If + * you want to obtain the first found file use {@link #findFirst(Query)}. + * + * @param query must not be {@literal null}. + * @return {@link Mono#empty()} if not match found. + */ + Mono findOne(Query query); + + /** + * Returns a {@link Mono} emitting the frist {@link com.mongodb.client.gridfs.model.GridFSFile} matching the given + * query or {@link Mono#empty()} in case no file matches. + * + * @param query must not be {@literal null}. + * @return {@link Mono#empty()} if not match found. + */ + Mono findFirst(Query query); + + /** + * Deletes all files matching the given {@link Query}. + * + * @param query must not be {@literal null}. + * @return a {@link Mono} signalling operation completion. + */ + Mono delete(Query query); + + /** + * Returns a {@link Mono} emitting the {@link ReactiveGridFsResource} with the given file name. + * + * @param filename must not be {@literal null}. + * @return {@link Mono#empty()} if no match found. + */ + Mono getResource(String filename); + + /** + * Returns a {@link Mono} emitting the {@link ReactiveGridFsResource} for a {@link GridFSFile}. + * + * @param file must not be {@literal null}. + * @return {@link Mono#empty()} if no match found. + */ + Mono getResource(GridFSFile file); + + /** + * Returns a {@link Flux} emitting all {@link ReactiveGridFsResource}s matching the given file name pattern. + * + * @param filenamePattern must not be {@literal null}. + * @return {@link Flux#empty()} if no match found. + */ + Flux getResources(String filenamePattern); +} diff --git a/spring-data-mongodb/src/main/java/org/springframework/data/mongodb/gridfs/ReactiveGridFsResource.java b/spring-data-mongodb/src/main/java/org/springframework/data/mongodb/gridfs/ReactiveGridFsResource.java new file mode 100644 index 0000000000..eb787cbd50 --- /dev/null +++ b/spring-data-mongodb/src/main/java/org/springframework/data/mongodb/gridfs/ReactiveGridFsResource.java @@ -0,0 +1,180 @@ +/* + * Copyright 2019 the original author or authors. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.springframework.data.mongodb.gridfs; + +import reactor.core.publisher.Flux; + +import java.io.ByteArrayInputStream; +import java.io.FileNotFoundException; +import java.io.IOException; +import java.io.InputStream; + +import org.reactivestreams.Publisher; +import org.springframework.core.io.AbstractResource; +import org.springframework.core.io.Resource; +import org.springframework.core.io.buffer.DataBuffer; +import org.springframework.lang.Nullable; +import org.springframework.util.Assert; + +import com.mongodb.client.gridfs.model.GridFSFile; + +/** + * Reactive {@link GridFSFile} based {@link Resource} implementation. + * + * @author Mark Paluch + * @since 2.2 + */ +public class ReactiveGridFsResource extends AbstractResource { + + static final String CONTENT_TYPE_FIELD = "_contentType"; + private static final ByteArrayInputStream EMPTY_INPUT_STREAM = new ByteArrayInputStream(new byte[0]); + + private final @Nullable GridFSFile file; + private final String filename; + private final Flux content; + + /** + * Creates a new, absent {@link ReactiveGridFsResource}. + * + * @param filename filename of the absent resource. + * @param content + * @since 2.1 + */ + private ReactiveGridFsResource(String filename, Publisher content) { + + this.file = null; + this.filename = filename; + this.content = Flux.from(content); + } + + /** + * Creates a new {@link ReactiveGridFsResource} from the given {@link GridFSFile}. + * + * @param file must not be {@literal null}. + * @param content + */ + public ReactiveGridFsResource(GridFSFile file, Publisher content) { + + this.file = file; + this.filename = file.getFilename(); + this.content = Flux.from(content); + } + + /** + * Obtain an absent {@link ReactiveGridFsResource}. + * + * @param filename filename of the absent resource, must not be {@literal null}. + * @return never {@literal null}. + * @since 2.1 + */ + public static ReactiveGridFsResource absent(String filename) { + + Assert.notNull(filename, "Filename must not be null"); + + return new ReactiveGridFsResource(filename, Flux.empty()); + } + + /* + * (non-Javadoc) + * @see org.springframework.core.io.InputStreamResource#getInputStream() + */ + @Override + public InputStream getInputStream() throws IllegalStateException { + throw new UnsupportedOperationException(); + } + + /* + * (non-Javadoc) + * @see org.springframework.core.io.AbstractResource#contentLength() + */ + @Override + public long contentLength() throws IOException { + + verifyExists(); + return file.getLength(); + } + + /* + * (non-Javadoc) + * @see org.springframework.core.io.AbstractResource#getFilename() + */ + @Override + public String getFilename() throws IllegalStateException { + return filename; + } + + /* + * (non-Javadoc) + * @see org.springframework.core.io.AbstractResource#exists() + */ + @Override + public boolean exists() { + return file != null; + } + + /* + * (non-Javadoc) + * @see org.springframework.core.io.AbstractResource#lastModified() + */ + @Override + public long lastModified() throws IOException { + + verifyExists(); + return file.getUploadDate().getTime(); + } + + /* + * (non-Javadoc) + * @see org.springframework.core.io.AbstractResource#getDescription() + */ + @Override + public String getDescription() { + return String.format("GridFs resource [%s]", this.getFilename()); + } + + /** + * Returns the {@link Resource}'s id. + * + * @return never {@literal null}. + * @throws IllegalStateException if the file does not {@link #exists()}. + */ + public Object getId() { + + Assert.state(exists(), () -> String.format("%s does not exist.", getDescription())); + + return file.getId(); + } + + /** + * Retrieve the download stream. + * + * @return + */ + public Flux getDownloadStream() { + + if (!exists()) { + return Flux.error(new FileNotFoundException(String.format("%s does not exist.", getDescription()))); + } + return content; + } + + private void verifyExists() throws FileNotFoundException { + + if (!exists()) { + throw new FileNotFoundException(String.format("%s does not exist.", getDescription())); + } + } +} diff --git a/spring-data-mongodb/src/main/java/org/springframework/data/mongodb/gridfs/ReactiveGridFsTemplate.java b/spring-data-mongodb/src/main/java/org/springframework/data/mongodb/gridfs/ReactiveGridFsTemplate.java new file mode 100644 index 0000000000..2c182c6871 --- /dev/null +++ b/spring-data-mongodb/src/main/java/org/springframework/data/mongodb/gridfs/ReactiveGridFsTemplate.java @@ -0,0 +1,275 @@ +/* + * Copyright 2019 the original author or authors. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.springframework.data.mongodb.gridfs; + +import static org.springframework.data.mongodb.core.query.Query.*; +import static org.springframework.data.mongodb.gridfs.GridFsCriteria.*; + +import reactor.core.publisher.Flux; +import reactor.core.publisher.Mono; + +import org.bson.Document; +import org.bson.types.ObjectId; +import org.reactivestreams.Publisher; +import org.springframework.core.io.buffer.DataBuffer; +import org.springframework.core.io.buffer.DataBufferFactory; +import org.springframework.core.io.buffer.DefaultDataBufferFactory; +import org.springframework.dao.IncorrectResultSizeDataAccessException; +import org.springframework.data.mongodb.ReactiveMongoDatabaseFactory; +import org.springframework.data.mongodb.core.convert.MongoConverter; +import org.springframework.data.mongodb.core.query.Query; +import org.springframework.data.mongodb.core.query.SerializationUtils; +import org.springframework.lang.Nullable; +import org.springframework.util.Assert; +import org.springframework.util.StringUtils; + +import com.mongodb.client.gridfs.model.GridFSFile; +import com.mongodb.reactivestreams.client.MongoDatabase; +import com.mongodb.reactivestreams.client.gridfs.AsyncInputStream; +import com.mongodb.reactivestreams.client.gridfs.GridFSBucket; +import com.mongodb.reactivestreams.client.gridfs.GridFSBuckets; +import com.mongodb.reactivestreams.client.gridfs.GridFSDownloadStream; +import com.mongodb.reactivestreams.client.gridfs.GridFSFindPublisher; + +/** + * {@link ReactiveGridFsOperations} implementation to store content into MongoDB GridFS. Uses by default + * {@link DefaultDataBufferFactory} to create {@link DataBuffer buffers}. + * + * @author Mark Paluch + * @since 2.2 + */ +public class ReactiveGridFsTemplate extends GridFsOperationsSupport implements ReactiveGridFsOperations { + + private final ReactiveMongoDatabaseFactory dbFactory; + private final DataBufferFactory dataBufferFactory; + private final @Nullable String bucket; + + /** + * Creates a new {@link ReactiveGridFsTemplate} using the given {@link ReactiveMongoDatabaseFactory} and + * {@link MongoConverter}. + * + * @param dbFactory must not be {@literal null}. + * @param converter must not be {@literal null}. + */ + public ReactiveGridFsTemplate(ReactiveMongoDatabaseFactory dbFactory, MongoConverter converter) { + this(dbFactory, converter, null); + } + + /** + * Creates a new {@link ReactiveGridFsTemplate} using the given {@link ReactiveMongoDatabaseFactory} and + * {@link MongoConverter}. + * + * @param dbFactory must not be {@literal null}. + * @param converter must not be {@literal null}. + * @param bucket + */ + public ReactiveGridFsTemplate(ReactiveMongoDatabaseFactory dbFactory, MongoConverter converter, + @Nullable String bucket) { + this(new DefaultDataBufferFactory(), dbFactory, converter, bucket); + } + + /** + * Creates a new {@link ReactiveGridFsTemplate} using the given {@link DataBufferFactory}, + * {@link ReactiveMongoDatabaseFactory} and {@link MongoConverter}. + * + * @param dataBufferFactory must not be {@literal null}. + * @param dbFactory must not be {@literal null}. + * @param converter must not be {@literal null}. + * @param bucket + */ + public ReactiveGridFsTemplate(DataBufferFactory dataBufferFactory, ReactiveMongoDatabaseFactory dbFactory, + MongoConverter converter, @Nullable String bucket) { + + super(converter); + + Assert.notNull(dataBufferFactory, "DataBufferFactory must not be null!"); + Assert.notNull(dbFactory, "ReactiveMongoDatabaseFactory must not be null!"); + + this.dataBufferFactory = dataBufferFactory; + this.dbFactory = dbFactory; + this.bucket = bucket; + } + + /* + * (non-Javadoc) + * @see org.springframework.data.mongodb.gridfs.ReactiveGridFsOperations#store(com.mongodb.reactivestreams.client.gridfs.AsyncInputStream, java.lang.String, java.lang.String, java.lang.Object) + */ + @Override + public Mono store(AsyncInputStream content, @Nullable String filename, @Nullable String contentType, + @Nullable Object metadata) { + return store(content, filename, contentType, toDocument(metadata)); + } + + /* + * (non-Javadoc) + * @see org.springframework.data.mongodb.gridfs.ReactiveGridFsOperations#store(org.reactivestreams.Publisher, java.lang.String, java.lang.String, java.lang.Object) + */ + @Override + public Mono store(Publisher content, @Nullable String filename, @Nullable String contentType, + @Nullable Object metadata) { + return store(content, filename, contentType, toDocument(metadata)); + } + + /* + * (non-Javadoc) + * @see org.springframework.data.mongodb.gridfs.ReactiveGridFsOperations#store(com.mongodb.reactivestreams.client.gridfs.AsyncInputStream, java.lang.String, java.lang.String, org.bson.Document) + */ + @Override + public Mono store(AsyncInputStream content, @Nullable String filename, @Nullable String contentType, + @Nullable Document metadata) { + + Assert.notNull(content, "InputStream must not be null!"); + return Mono.from(getGridFs().uploadFromStream(filename, content, computeUploadOptionsFor(contentType, metadata))); + } + + /* + * (non-Javadoc) + * @see org.springframework.data.mongodb.gridfs.ReactiveGridFsOperations#store(org.reactivestreams.Publisher, java.lang.String, java.lang.String, org.bson.Document) + */ + @Override + public Mono store(Publisher content, @Nullable String filename, @Nullable String contentType, + @Nullable Document metadata) { + + Assert.notNull(content, "Content must not be null!"); + + return BinaryStreamAdapters.toAsyncInputStream(content).flatMap(it -> store(it, filename, contentType, metadata)); + } + + /* + * (non-Javadoc) + * @see org.springframework.data.mongodb.gridfs.ReactiveGridFsOperations#find(org.springframework.data.mongodb.core.query.Query) + */ + @Override + public Flux find(Query query) { + return Flux.from(prepareQuery(query)); + } + + /* + * (non-Javadoc) + * @see org.springframework.data.mongodb.gridfs.ReactiveGridFsOperations#findOne(org.springframework.data.mongodb.core.query.Query) + */ + @Override + public Mono findOne(Query query) { + + return Flux.from(prepareQuery(query).limit(2)) // + .collectList() // + .flatMap(it -> { + if (it.isEmpty()) { + return Mono.empty(); + } + + if (it.size() > 1) { + return Mono.error(new IncorrectResultSizeDataAccessException( + "Query " + SerializationUtils.serializeToJsonSafely(query) + " returned non unique result.", 1)); + } + + return Mono.just(it.get(0)); + }); + } + + /* + * (non-Javadoc) + * @see org.springframework.data.mongodb.gridfs.ReactiveGridFsOperations#findFirst(org.springframework.data.mongodb.core.query.Query) + */ + @Override + public Mono findFirst(Query query) { + return Flux.from(prepareQuery(query).limit(1)).next(); + } + + /* + * (non-Javadoc) + * @see org.springframework.data.mongodb.gridfs.ReactiveGridFsOperations#delete(org.springframework.data.mongodb.core.query.Query) + */ + @Override + public Mono delete(Query query) { + return find(query).flatMap(it -> getGridFs().delete(it.getId())).then(); + } + + /* + * (non-Javadoc) + * @see org.springframework.data.mongodb.gridfs.ReactiveGridFsOperations#getResource(java.lang.String) + */ + @Override + public Mono getResource(String location) { + + Assert.notNull(location, "Filename must not be null!"); + + return findOne(query(whereFilename().is(location))).flatMap(this::getResource) + .defaultIfEmpty(ReactiveGridFsResource.absent(location)); + } + + /* + * (non-Javadoc) + * @see org.springframework.data.mongodb.gridfs.ReactiveGridFsOperations#getResource(com.mongodb.client.gridfs.model.GridFSFile) + */ + @Override + public Mono getResource(GridFSFile file) { + + Assert.notNull(file, "GridFSFile must not be null!"); + + return Mono.fromSupplier(() -> { + + GridFSDownloadStream stream = getGridFs().openDownloadStream(file.getObjectId()); + + return new ReactiveGridFsResource(file, BinaryStreamAdapters.toPublisher(stream, dataBufferFactory)); + }); + } + + /* + * (non-Javadoc) + * @see org.springframework.data.mongodb.gridfs.ReactiveGridFsOperations#getResources(java.lang.String) + */ + @Override + public Flux getResources(String locationPattern) { + + if (!StringUtils.hasText(locationPattern)) { + return Flux.empty(); + } + + AntPath path = new AntPath(locationPattern); + + if (path.isPattern()) { + + Flux files = find(query(whereFilename().regex(path.toRegex()))); + return files.flatMap(this::getResource); + } + + return getResource(locationPattern).flux(); + } + + protected GridFSFindPublisher prepareQuery(Query query) { + + Assert.notNull(query, "Query must not be null!"); + + Document queryObject = getMappedQuery(query.getQueryObject()); + Document sortObject = getMappedQuery(query.getSortObject()); + + GridFSFindPublisher publisherToUse = getGridFs().find(queryObject).sort(sortObject); + + Integer cursorBatchSize = query.getMeta().getCursorBatchSize(); + if (cursorBatchSize != null) { + publisherToUse = publisherToUse.batchSize(cursorBatchSize); + } + + return publisherToUse; + } + + protected GridFSBucket getGridFs() { + + MongoDatabase db = dbFactory.getMongoDatabase(); + return bucket == null ? GridFSBuckets.create(db) : GridFSBuckets.create(db, bucket); + } +} diff --git a/spring-data-mongodb/src/test/java/org/springframework/data/mongodb/gridfs/BinaryStreamAdaptersUnitTests.java b/spring-data-mongodb/src/test/java/org/springframework/data/mongodb/gridfs/BinaryStreamAdaptersUnitTests.java new file mode 100644 index 0000000000..b94de93afb --- /dev/null +++ b/spring-data-mongodb/src/test/java/org/springframework/data/mongodb/gridfs/BinaryStreamAdaptersUnitTests.java @@ -0,0 +1,102 @@ +/* + * Copyright 2019 the original author or authors. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.springframework.data.mongodb.gridfs; + +import static org.assertj.core.api.Assertions.*; + +import reactor.core.publisher.Flux; +import reactor.core.publisher.Mono; +import reactor.test.StepVerifier; + +import java.io.IOException; +import java.nio.ByteBuffer; + +import org.junit.Test; +import org.springframework.core.io.ClassPathResource; +import org.springframework.core.io.buffer.DataBuffer; +import org.springframework.core.io.buffer.DataBufferUtils; +import org.springframework.core.io.buffer.DefaultDataBufferFactory; +import org.springframework.util.StreamUtils; + +import com.mongodb.reactivestreams.client.gridfs.AsyncInputStream; +import com.mongodb.reactivestreams.client.gridfs.helpers.AsyncStreamHelper; + +/** + * Unit tests for {@link BinaryStreamAdapters}. + * + * @author Mark Paluch + */ +public class BinaryStreamAdaptersUnitTests { + + @Test // DATAMONGO-1855 + public void shouldAdaptAsyncInputStreamToDataBufferPublisher() throws IOException { + + ClassPathResource resource = new ClassPathResource("gridfs/gridfs.xml"); + + byte[] content = StreamUtils.copyToByteArray(resource.getInputStream()); + AsyncInputStream inputStream = AsyncStreamHelper.toAsyncInputStream(resource.getInputStream()); + + Flux dataBuffers = BinaryStreamAdapters.toPublisher(inputStream, new DefaultDataBufferFactory()); + + DataBufferUtils.join(dataBuffers) // + .as(StepVerifier::create) // + .consumeNextWith(actual -> { + + byte[] actualContent = new byte[actual.readableByteCount()]; + actual.read(actualContent); + assertThat(actualContent).isEqualTo(content); + }) // + .verifyComplete(); + } + + @Test // DATAMONGO-1855 + public void shouldAdaptBinaryPublisherToAsyncInputStream() throws IOException { + + ClassPathResource resource = new ClassPathResource("gridfs/gridfs.xml"); + + byte[] content = StreamUtils.copyToByteArray(resource.getInputStream()); + + Flux dataBuffers = DataBufferUtils.readInputStream(resource::getInputStream, + new DefaultDataBufferFactory(), 10); + + AsyncInputStream inputStream = BinaryStreamAdapters.toAsyncInputStream(dataBuffers).block(); + ByteBuffer complete = readBuffer(inputStream); + + assertThat(complete).isEqualTo(ByteBuffer.wrap(content)); + } + + static ByteBuffer readBuffer(AsyncInputStream inputStream) { + + ByteBuffer complete = ByteBuffer.allocate(1024); + + boolean hasData = true; + while (hasData) { + + ByteBuffer chunk = ByteBuffer.allocate(100); + + Integer bytesRead = Mono.from(inputStream.read(chunk)).block(); + + chunk.flip(); + complete.put(chunk); + + hasData = bytesRead > -1; + } + + complete.flip(); + + return complete; + } +} diff --git a/spring-data-mongodb/src/test/java/org/springframework/data/mongodb/gridfs/ReactiveGridFsTemplateTests.java b/spring-data-mongodb/src/test/java/org/springframework/data/mongodb/gridfs/ReactiveGridFsTemplateTests.java new file mode 100644 index 0000000000..879bbdf1a7 --- /dev/null +++ b/spring-data-mongodb/src/test/java/org/springframework/data/mongodb/gridfs/ReactiveGridFsTemplateTests.java @@ -0,0 +1,202 @@ +/* + * Copyright 2019 the original author or authors. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.springframework.data.mongodb.gridfs; + +import static org.assertj.core.api.Assertions.*; +import static org.springframework.data.mongodb.core.query.Criteria.where; +import static org.springframework.data.mongodb.core.query.Query.*; +import static org.springframework.data.mongodb.gridfs.GridFsCriteria.*; + +import org.springframework.dao.IncorrectResultSizeDataAccessException; +import reactor.core.publisher.Flux; +import reactor.test.StepVerifier; + +import java.io.IOException; + +import org.bson.BsonObjectId; +import org.bson.Document; +import org.bson.types.ObjectId; +import org.junit.Before; +import org.junit.Test; +import org.junit.runner.RunWith; +import org.springframework.beans.factory.annotation.Autowired; +import org.springframework.core.io.ClassPathResource; +import org.springframework.core.io.Resource; +import org.springframework.core.io.buffer.DataBufferUtils; +import org.springframework.core.io.buffer.DefaultDataBuffer; +import org.springframework.core.io.buffer.DefaultDataBufferFactory; +import org.springframework.data.mongodb.core.query.Query; +import org.springframework.test.context.ContextConfiguration; +import org.springframework.test.context.junit4.SpringRunner; +import org.springframework.util.StreamUtils; + +import com.mongodb.reactivestreams.client.gridfs.AsyncInputStream; +import com.mongodb.reactivestreams.client.gridfs.helpers.AsyncStreamHelper; + +/** + * Integration tests for {@link ReactiveGridFsTemplate}. + * + * @author Mark Paluch + * @author Christoph Strobl + */ +@RunWith(SpringRunner.class) +@ContextConfiguration("classpath:gridfs/reactive-gridfs.xml") +public class ReactiveGridFsTemplateTests { + + Resource resource = new ClassPathResource("gridfs/gridfs.xml"); + + @Autowired ReactiveGridFsOperations operations; + + @Before + public void setUp() { + + operations.delete(new Query()) // + .as(StepVerifier::create) // + .verifyComplete(); + } + + @Test // DATAMONGO-1855 + public void storesAndFindsSimpleDocument() { + + DefaultDataBufferFactory factory = new DefaultDataBufferFactory(); + DefaultDataBuffer first = factory.wrap("first".getBytes()); + DefaultDataBuffer second = factory.wrap("second".getBytes()); + + ObjectId reference = operations.store(Flux.just(first, second), "foo.xml").block(); + + operations.find(query(where("_id").is(reference))) // + .as(StepVerifier::create) // + .assertNext(actual -> { + assertThat(((BsonObjectId) actual.getId()).getValue()).isEqualTo(reference); + }) // + .verifyComplete(); + } + + @Test // DATAMONGO-1855 + public void writesMetadataCorrectly() throws IOException { + + Document metadata = new Document("key", "value"); + + AsyncInputStream stream = AsyncStreamHelper.toAsyncInputStream(resource.getInputStream()); + + ObjectId reference = operations.store(stream, "foo.xml", "binary/octet-stream", metadata).block(); + + operations.find(query(whereMetaData("key").is("value"))) // + .as(StepVerifier::create) // + .consumeNextWith(actual -> { + assertThat(actual.getObjectId()).isEqualTo(reference); + }) // + .verifyComplete(); + } + + @Test // DATAMONGO-1855 + public void marshalsComplexMetadata() throws IOException { + + Metadata metadata = new Metadata(); + metadata.version = "1.0"; + + AsyncInputStream stream = AsyncStreamHelper.toAsyncInputStream(resource.getInputStream()); + + ObjectId reference = operations.store(stream, "foo.xml", "binary/octet-stream", metadata).block(); + + operations.find(query(whereMetaData("version").is("1.0"))) // + .as(StepVerifier::create) // + .consumeNextWith(actual -> { + assertThat(actual.getObjectId()).isEqualTo(reference); + assertThat(actual.getMetadata()).containsEntry("version", "1.0"); + }) // + .verifyComplete(); + } + + @Test // DATAMONGO-1855 + public void getResourceShouldRetrieveContentByIdentity() throws IOException { + + byte[] content = StreamUtils.copyToByteArray(resource.getInputStream()); + AsyncInputStream upload = AsyncStreamHelper.toAsyncInputStream(resource.getInputStream()); + + ObjectId reference = operations.store(upload, "foo.xml", null, null).block(); + + operations.findOne(query(where("_id").is(reference))).flatMap(operations::getResource) + .flatMapMany(ReactiveGridFsResource::getDownloadStream) // + .transform(DataBufferUtils::join) // + .as(StepVerifier::create) // + .consumeNextWith(dataBuffer -> { + + byte[] actual = new byte[dataBuffer.readableByteCount()]; + dataBuffer.read(actual); + + assertThat(actual).isEqualTo(content); + }) // + .verifyComplete(); + } + + @Test // DATAMONGO-1855 + public void shouldEmitFirstEntryWhenFindFirstRetrievesMoreThanOneResult() throws IOException { + + AsyncInputStream upload1 = AsyncStreamHelper.toAsyncInputStream(resource.getInputStream()); + AsyncInputStream upload2 = AsyncStreamHelper.toAsyncInputStream(new ClassPathResource("gridfs/another-resource.xml").getInputStream()); + + operations.store(upload1, "foo.xml", null, null).block(); + operations.store(upload2, "foo2.xml", null, null).block(); + + operations.findFirst(query(where("filename").regex("foo*"))) // + .flatMap(operations::getResource) // + .as(StepVerifier::create) // + .expectNextCount(1) // + .verifyComplete(); + } + + @Test // DATAMONGO-1855 + public void shouldEmitErrorWhenFindOneRetrievesMoreThanOneResult() throws IOException { + + AsyncInputStream upload1 = AsyncStreamHelper.toAsyncInputStream(resource.getInputStream()); + AsyncInputStream upload2 = AsyncStreamHelper.toAsyncInputStream(new ClassPathResource("gridfs/another-resource.xml").getInputStream()); + + operations.store(upload1, "foo.xml", null, null).block(); + operations.store(upload2, "foo2.xml", null, null).block(); + + operations.findOne(query(where("filename").regex("foo*"))) // + .as(StepVerifier::create) // + .expectError(IncorrectResultSizeDataAccessException.class) // + .verify(); + } + + @Test // DATAMONGO-1855 + public void getResourcesByPattern() throws IOException { + + byte[] content = StreamUtils.copyToByteArray(resource.getInputStream()); + AsyncInputStream upload = AsyncStreamHelper.toAsyncInputStream(resource.getInputStream()); + + operations.store(upload, "foo.xml", null, null).block(); + + operations.getResources("foo*") // + .flatMap(ReactiveGridFsResource::getDownloadStream) // + .transform(DataBufferUtils::join) // + .as(StepVerifier::create) // + .consumeNextWith(dataBuffer -> { + + byte[] actual = new byte[dataBuffer.readableByteCount()]; + dataBuffer.read(actual); + + assertThat(actual).isEqualTo(content); + }) // + .verifyComplete(); + } + + static class Metadata { + String version; + } +} diff --git a/spring-data-mongodb/src/test/resources/gridfs/reactive-gridfs.xml b/spring-data-mongodb/src/test/resources/gridfs/reactive-gridfs.xml new file mode 100644 index 0000000000..f07ee8b08d --- /dev/null +++ b/spring-data-mongodb/src/test/resources/gridfs/reactive-gridfs.xml @@ -0,0 +1,30 @@ + + + + + + + + + + + + + + + + + + + + + + + + + + + + diff --git a/src/main/asciidoc/new-features.adoc b/src/main/asciidoc/new-features.adoc index 8135ad698f..80ec95981e 100644 --- a/src/main/asciidoc/new-features.adoc +++ b/src/main/asciidoc/new-features.adoc @@ -5,6 +5,7 @@ == What's New in Spring Data MongoDB 2.2 * <> * <> via `ReactiveQuerydslPredicateExecutor`. +* <>. [[new-features.2-1-0]] == What's New in Spring Data MongoDB 2.1 diff --git a/src/main/asciidoc/reference/reactive-mongodb.adoc b/src/main/asciidoc/reference/reactive-mongodb.adoc index bb9ac80f3e..eaa4a51f2c 100644 --- a/src/main/asciidoc/reference/reactive-mongodb.adoc +++ b/src/main/asciidoc/reference/reactive-mongodb.adoc @@ -482,3 +482,96 @@ Flux hasIndex = operations.execute("geolocation", .flatMap(document -> Mono.just(true)) .defaultIfEmpty(false)); ---- + +[[reactive.gridfs]] +== GridFS Support + +MongoDB supports storing binary files inside its filesystem, GridFS. +Spring Data MongoDB provides a `ReactiveGridFsOperations` interface as well as the corresponding implementation, `ReactiveGridFsTemplate`, to let you interact with the filesystem. +You can set up a `ReactiveGridFsTemplate` instance by handing it a `ReactiveMongoDatabaseFactory` as well as a `MongoConverter`, as the following example shows: + +.JavaConfig setup for a ReactiveGridFsTemplate +==== +[source,java] +---- +class GridFsConfiguration extends AbstractReactiveMongoConfiguration { + + // … further configuration omitted + + @Bean + public ReactiveGridFsTemplate reactiveGridFsTemplate() { + return new ReactiveGridFsTemplate(reactiveMongoDbFactory(), mappingMongoConverter()); + } +} +---- +==== + +The template can now be injected and used to perform storage and retrieval operations, as the following example shows: + +.Using ReactiveGridFsTemplate to store files +==== +[source,java] +---- +class ReactiveGridFsClient { + + @Autowired + ReactiveGridFsTemplate operations; + + @Test + public Mono storeFileToGridFs() { + + FileMetadata metadata = new FileMetadata(); + // populate metadata + Publisher file = … // lookup File or Resource + + return operations.store(file, "filename.txt", metadata); + } +} +---- +==== + +The `store(…)` operations take an `Publisher`, a filename, and (optionally) metadata information about the file to store. The metadata can be an arbitrary object, which will be marshaled by the `MongoConverter` configured with the `ReactiveGridFsTemplate`. Alternatively, you can also provide a `Document`. + +NOTE: MongoDB's driver uses `AsyncInputStream` and `AsyncOutputStream` interfaces to exchange binary streams. Spring Data MongoDB adapts these interfaces to `Publisher`. Read more about `DataBuffer` in http://docs.spring.io/spring/docs/{springVersion}/spring-framework-reference/core.html#databuffers[Spring's reference documentation]. + +You can read files from the filesystem through either the `find(…)` or the `getResources(…)` methods. Let's have a look at the `find(…)` methods first. You can either find a single file or multiple files that match a `Query`. You can use the `GridFsCriteria` helper class to define queries. It provides static factory methods to encapsulate default metadata fields (such as `whereFilename()` and `whereContentType()`) or a custom one through `whereMetaData()`. The following example shows how to use `ReactiveGridFsTemplate` to query for files: + +.Using ReactiveGridFsTemplate to query for files +==== +[source,java] +---- +class ReactiveGridFsClient { + + @Autowired + ReactiveGridFsTemplate operations; + + @Test + public Flux findFilesInGridFs() { + return operations.find(query(whereFilename().is("filename.txt"))) + } +} +---- +==== + +NOTE: Currently, MongoDB does not support defining sort criteria when retrieving files from GridFS. For this reason, any sort criteria defined on the `Query` instance handed into the `find(…)` method are disregarded. + +The other option to read files from the GridFs is to use the methods modeled along the lines of `ResourcePatternResolver`. +`ReactiveGridFsOperations` uses reactive types to defer execution while `ResourcePatternResolver` uses a synchronous interface. +These methods allow handing an Ant path into the method and can thus retrieve files matching the given pattern. The following example shows how to use `ReactiveGridFsTemplate` to read files: + +.Using ReactiveGridFsTemplate to read files +==== +[source,java] +---- +class ReactiveGridFsClient { + + @Autowired + ReactiveGridFsOperations operations; + + @Test + public void readFilesFromGridFs() { + Flux txtFiles = operations.getResources("*.txt"); + } +} +---- +====