Skip to content

Commit c1332c3

Browse files
mp911dechristophstrobl
authored andcommitted
DATAMONGO-1855 - Initial reactive GridFS support.
We now support reactive GridFS using MongoDB's reactive GridFS API. Files can be consumed and provided as binary stream. ReactiveGridFsOperations operations = …; Publisher<DataBuffer> buffers = … Mono<ObjectId> id = operations.store(buffers, "foo.xml"); Flux<DataBuffer> download = operations.getResource("foo.xml").flatMap(ReactiveGridFsResource::getDownloadStream);
1 parent 668a4b4 commit c1332c3

File tree

5 files changed

+1139
-0
lines changed

5 files changed

+1139
-0
lines changed
Lines changed: 204 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,204 @@
1+
/*
2+
* Copyright 2019 the original author or authors.
3+
*
4+
* Licensed under the Apache License, Version 2.0 (the "License");
5+
* you may not use this file except in compliance with the License.
6+
* You may obtain a copy of the License at
7+
*
8+
* http://www.apache.org/licenses/LICENSE-2.0
9+
*
10+
* Unless required by applicable law or agreed to in writing, software
11+
* distributed under the License is distributed on an "AS IS" BASIS,
12+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13+
* See the License for the specific language governing permissions and
14+
* limitations under the License.
15+
*/
16+
package org.springframework.data.mongodb.gridfs;
17+
18+
import reactor.core.publisher.Flux;
19+
import reactor.core.publisher.Mono;
20+
21+
import org.bson.Document;
22+
import org.bson.types.ObjectId;
23+
import org.reactivestreams.Publisher;
24+
import org.springframework.core.io.buffer.DataBuffer;
25+
import org.springframework.core.io.support.ResourcePatternResolver;
26+
import org.springframework.data.domain.Sort;
27+
import org.springframework.data.mongodb.core.query.Query;
28+
import org.springframework.lang.Nullable;
29+
30+
import com.mongodb.client.gridfs.GridFSFindIterable;
31+
import com.mongodb.client.gridfs.model.GridFSFile;
32+
import com.mongodb.reactivestreams.client.gridfs.AsyncInputStream;
33+
34+
/**
35+
* Collection of operations to store and read files from MongoDB GridFS using reactive infrastructure.
36+
*
37+
* @author Mark Paluch
38+
* @since 2.2
39+
*/
40+
public interface ReactiveGridFsOperations {
41+
42+
/**
43+
* Stores the given content into a file with the given name.
44+
*
45+
* @param content must not be {@literal null}.
46+
* @param filename must not be {@literal null} or empty.
47+
* @return the {@link ObjectId} of the {@link com.mongodb.client.gridfs.model.GridFSFile} just created.
48+
*/
49+
default Mono<ObjectId> store(Publisher<DataBuffer> content, String filename) {
50+
return store(content, filename, (Object) null);
51+
}
52+
53+
/**
54+
* Stores the given content into a file with the given name.
55+
*
56+
* @param content must not be {@literal null}.
57+
* @param metadata can be {@literal null}.
58+
* @return the {@link ObjectId} of the {@link com.mongodb.client.gridfs.model.GridFSFile} just created.
59+
*/
60+
default Mono<ObjectId> store(Publisher<DataBuffer> content, @Nullable Object metadata) {
61+
return store(content, null, metadata);
62+
}
63+
64+
/**
65+
* Stores the given content into a file with the given name.
66+
*
67+
* @param content must not be {@literal null}.
68+
* @param metadata can be {@literal null}.
69+
* @return the {@link ObjectId} of the {@link com.mongodb.client.gridfs.model.GridFSFile} just created.
70+
*/
71+
default Mono<ObjectId> store(Publisher<DataBuffer> content, @Nullable Document metadata) {
72+
return store(content, null, metadata);
73+
}
74+
75+
/**
76+
* Stores the given content into a file with the given name and content type.
77+
*
78+
* @param content must not be {@literal null}.
79+
* @param filename must not be {@literal null} or empty.
80+
* @param contentType can be {@literal null}.
81+
* @return the {@link ObjectId} of the {@link com.mongodb.client.gridfs.model.GridFSFile} just created.
82+
*/
83+
default Mono<ObjectId> store(Publisher<DataBuffer> content, @Nullable String filename, @Nullable String contentType) {
84+
return store(content, filename, contentType, (Object) null);
85+
}
86+
87+
/**
88+
* Stores the given content into a file with the given name using the given metadata. The metadata object will be
89+
* marshalled before writing.
90+
*
91+
* @param content must not be {@literal null}.
92+
* @param filename can be {@literal null} or empty.
93+
* @param metadata can be {@literal null}.
94+
* @return the {@link ObjectId} of the {@link com.mongodb.client.gridfs.model.GridFSFile} just created.
95+
*/
96+
default Mono<ObjectId> store(Publisher<DataBuffer> content, @Nullable String filename, @Nullable Object metadata) {
97+
return store(content, filename, null, metadata);
98+
}
99+
100+
/**
101+
* Stores the given content into a file with the given name and content type using the given metadata. The metadata
102+
* object will be marshalled before writing.
103+
*
104+
* @param content must not be {@literal null}.
105+
* @param filename must not be {@literal null} or empty.
106+
* @param contentType can be {@literal null}.
107+
* @param metadata can be {@literal null}
108+
* @return the {@link ObjectId} of the {@link com.mongodb.client.gridfs.model.GridFSFile} just created.
109+
*/
110+
Mono<ObjectId> store(AsyncInputStream content, @Nullable String filename, @Nullable String contentType,
111+
@Nullable Object metadata);
112+
113+
/**
114+
* Stores the given content into a file with the given name and content type using the given metadata. The metadata
115+
* object will be marshalled before writing.
116+
*
117+
* @param content must not be {@literal null}.
118+
* @param filename must not be {@literal null} or empty.
119+
* @param contentType can be {@literal null}.
120+
* @param metadata can be {@literal null}
121+
* @return the {@link ObjectId} of the {@link com.mongodb.client.gridfs.model.GridFSFile} just created.
122+
*/
123+
Mono<ObjectId> store(Publisher<DataBuffer> content, @Nullable String filename, @Nullable String contentType,
124+
@Nullable Object metadata);
125+
126+
/**
127+
* Stores the given content into a file with the given name using the given metadata.
128+
*
129+
* @param content must not be {@literal null}.
130+
* @param filename must not be {@literal null} or empty.
131+
* @param metadata can be {@literal null}.
132+
* @return the {@link ObjectId} of the {@link com.mongodb.client.gridfs.model.GridFSFile} just created.
133+
*/
134+
default Mono<ObjectId> store(Publisher<DataBuffer> content, @Nullable String filename, @Nullable Document metadata) {
135+
return store(content, filename, null, metadata);
136+
}
137+
138+
/**
139+
* Stores the given content into a file with the given name and content type using the given metadata.
140+
*
141+
* @param content must not be {@literal null}.
142+
* @param filename must not be {@literal null} or empty.
143+
* @param contentType can be {@literal null}.
144+
* @param metadata can be {@literal null}.
145+
* @return the {@link ObjectId} of the {@link com.mongodb.client.gridfs.model.GridFSFile} just created.
146+
*/
147+
Mono<ObjectId> store(AsyncInputStream content, @Nullable String filename, @Nullable String contentType,
148+
@Nullable Document metadata);
149+
150+
Mono<ObjectId> store(Publisher<DataBuffer> content, String filename, String contentType, Document metadata);
151+
152+
/**
153+
* Returns all files matching the given query. Note, that currently {@link Sort} criterias defined at the
154+
* {@link Query} will not be regarded as MongoDB does not support ordering for GridFS file access.
155+
*
156+
* @see <a href="https://jira.mongodb.org/browse/JAVA-431">MongoDB Jira: JAVA-431</a>
157+
* @param query must not be {@literal null}.
158+
* @return {@link GridFSFindIterable} to obtain results from. Eg. by calling
159+
* {@link GridFSFindIterable#into(java.util.Collection)}.
160+
*/
161+
Flux<GridFSFile> find(Query query);
162+
163+
/**
164+
* Returns a single {@link com.mongodb.client.gridfs.model.GridFSFile} matching the given query or {@literal null} in
165+
* case no file matches.
166+
*
167+
* @param query must not be {@literal null}.
168+
* @return
169+
*/
170+
Mono<GridFSFile> findOne(Query query);
171+
172+
/**
173+
* Deletes all files matching the given {@link Query}.
174+
*
175+
* @param query must not be {@literal null}.
176+
*/
177+
Mono<Void> delete(Query query);
178+
179+
/**
180+
* Returns the {@link GridFsResource} with the given file name.
181+
*
182+
* @param filename must not be {@literal null}.
183+
* @return the resource. Use {@link org.springframework.core.io.Resource#exists()} to check if the returned
184+
* {@link GridFsResource} is actually present.
185+
* @see ResourcePatternResolver#getResource(String)
186+
*/
187+
Mono<ReactiveGridFsResource> getResource(String filename);
188+
189+
/**
190+
* Returns the {@link GridFsResource} for a {@link GridFSFile}.
191+
*
192+
* @param file must not be {@literal null}.
193+
* @return the resource for the file.
194+
*/
195+
Mono<ReactiveGridFsResource> getResource(GridFSFile file);
196+
197+
/**
198+
* Returns all {@link GridFsResource}s matching the given file name pattern.
199+
*
200+
* @param filenamePattern must not be {@literal null}.
201+
* @return
202+
*/
203+
Flux<ReactiveGridFsResource> getResources(String filenamePattern);
204+
}
Lines changed: 180 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,180 @@
1+
/*
2+
* Copyright 2019 the original author or authors.
3+
*
4+
* Licensed under the Apache License, Version 2.0 (the "License");
5+
* you may not use this file except in compliance with the License.
6+
* You may obtain a copy of the License at
7+
*
8+
* http://www.apache.org/licenses/LICENSE-2.0
9+
*
10+
* Unless required by applicable law or agreed to in writing, software
11+
* distributed under the License is distributed on an "AS IS" BASIS,
12+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13+
* See the License for the specific language governing permissions and
14+
* limitations under the License.
15+
*/
16+
package org.springframework.data.mongodb.gridfs;
17+
18+
import reactor.core.publisher.Flux;
19+
20+
import java.io.ByteArrayInputStream;
21+
import java.io.FileNotFoundException;
22+
import java.io.IOException;
23+
import java.io.InputStream;
24+
25+
import org.reactivestreams.Publisher;
26+
import org.springframework.core.io.AbstractResource;
27+
import org.springframework.core.io.Resource;
28+
import org.springframework.core.io.buffer.DataBuffer;
29+
import org.springframework.lang.Nullable;
30+
import org.springframework.util.Assert;
31+
32+
import com.mongodb.client.gridfs.model.GridFSFile;
33+
34+
/**
35+
* Reactive {@link GridFSFile} based {@link Resource} implementation.
36+
*
37+
* @author Mark Paluch
38+
* @since 2.2
39+
*/
40+
public class ReactiveGridFsResource extends AbstractResource {
41+
42+
static final String CONTENT_TYPE_FIELD = "_contentType";
43+
private static final ByteArrayInputStream EMPTY_INPUT_STREAM = new ByteArrayInputStream(new byte[0]);
44+
45+
private final @Nullable GridFSFile file;
46+
private final String filename;
47+
private final Flux<DataBuffer> content;
48+
49+
/**
50+
* Creates a new, absent {@link ReactiveGridFsResource}.
51+
*
52+
* @param filename filename of the absent resource.
53+
* @param content
54+
* @since 2.1
55+
*/
56+
private ReactiveGridFsResource(String filename, Publisher<DataBuffer> content) {
57+
58+
this.file = null;
59+
this.filename = filename;
60+
this.content = Flux.from(content);
61+
}
62+
63+
/**
64+
* Creates a new {@link ReactiveGridFsResource} from the given {@link GridFSFile}.
65+
*
66+
* @param file must not be {@literal null}.
67+
* @param content
68+
*/
69+
public ReactiveGridFsResource(GridFSFile file, Publisher<DataBuffer> content) {
70+
71+
this.file = file;
72+
this.filename = file.getFilename();
73+
this.content = Flux.from(content);
74+
}
75+
76+
/**
77+
* Obtain an absent {@link ReactiveGridFsResource}.
78+
*
79+
* @param filename filename of the absent resource, must not be {@literal null}.
80+
* @return never {@literal null}.
81+
* @since 2.1
82+
*/
83+
public static ReactiveGridFsResource absent(String filename) {
84+
85+
Assert.notNull(filename, "Filename must not be null");
86+
87+
return new ReactiveGridFsResource(filename, Flux.empty());
88+
}
89+
90+
/*
91+
* (non-Javadoc)
92+
* @see org.springframework.core.io.InputStreamResource#getInputStream()
93+
*/
94+
@Override
95+
public InputStream getInputStream() throws IllegalStateException {
96+
throw new UnsupportedOperationException();
97+
}
98+
99+
/*
100+
* (non-Javadoc)
101+
* @see org.springframework.core.io.AbstractResource#contentLength()
102+
*/
103+
@Override
104+
public long contentLength() throws IOException {
105+
106+
verifyExists();
107+
return file.getLength();
108+
}
109+
110+
/*
111+
* (non-Javadoc)
112+
* @see org.springframework.core.io.AbstractResource#getFilename()
113+
*/
114+
@Override
115+
public String getFilename() throws IllegalStateException {
116+
return filename;
117+
}
118+
119+
/*
120+
* (non-Javadoc)
121+
* @see org.springframework.core.io.AbstractResource#exists()
122+
*/
123+
@Override
124+
public boolean exists() {
125+
return file != null;
126+
}
127+
128+
/*
129+
* (non-Javadoc)
130+
* @see org.springframework.core.io.AbstractResource#lastModified()
131+
*/
132+
@Override
133+
public long lastModified() throws IOException {
134+
135+
verifyExists();
136+
return file.getUploadDate().getTime();
137+
}
138+
139+
/*
140+
* (non-Javadoc)
141+
* @see org.springframework.core.io.AbstractResource#getDescription()
142+
*/
143+
@Override
144+
public String getDescription() {
145+
return String.format("GridFs resource [%s]", this.getFilename());
146+
}
147+
148+
/**
149+
* Returns the {@link Resource}'s id.
150+
*
151+
* @return never {@literal null}.
152+
* @throws IllegalStateException if the file does not {@link #exists()}.
153+
*/
154+
public Object getId() {
155+
156+
Assert.state(exists(), () -> String.format("%s does not exist.", getDescription()));
157+
158+
return file.getId();
159+
}
160+
161+
/**
162+
* Retrieve the download stream.
163+
*
164+
* @return
165+
*/
166+
public Flux<DataBuffer> getDownloadStream() {
167+
168+
if (!exists()) {
169+
return Flux.error(new FileNotFoundException(String.format("%s does not exist.", getDescription())));
170+
}
171+
return content;
172+
}
173+
174+
private void verifyExists() throws FileNotFoundException {
175+
176+
if (!exists()) {
177+
throw new FileNotFoundException(String.format("%s does not exist.", getDescription()));
178+
}
179+
}
180+
}

0 commit comments

Comments
 (0)