Skip to content

Commit f40861b

Browse files
mp911dechristophstrobl
authored andcommitted
DATAMONGO-1855 - Initial reactive GridFS support.
We now support reactive GridFS using MongoDB's reactive GridFS API. Files can be consumed and provided as binary stream. ReactiveGridFsOperations operations = …; Publisher<DataBuffer> buffers = … Mono<ObjectId> id = operations.store(buffers, "foo.xml"); Flux<DataBuffer> download = operations.getResource("foo.xml").flatMap(ReactiveGridFsResource::getDownloadStream); Original Pull Request: #637
1 parent 33e4e38 commit f40861b

File tree

11 files changed

+1578
-0
lines changed

11 files changed

+1578
-0
lines changed
Lines changed: 248 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,248 @@
1+
/*
2+
* Copyright 2019 the original author or authors.
3+
*
4+
* Licensed under the Apache License, Version 2.0 (the "License");
5+
* you may not use this file except in compliance with the License.
6+
* You may obtain a copy of the License at
7+
*
8+
* http://www.apache.org/licenses/LICENSE-2.0
9+
*
10+
* Unless required by applicable law or agreed to in writing, software
11+
* distributed under the License is distributed on an "AS IS" BASIS,
12+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13+
* See the License for the specific language governing permissions and
14+
* limitations under the License.
15+
*/
16+
package org.springframework.data.mongodb.gridfs;
17+
18+
import lombok.RequiredArgsConstructor;
19+
import reactor.core.CoreSubscriber;
20+
import reactor.core.publisher.Mono;
21+
import reactor.core.publisher.Operators;
22+
import reactor.util.concurrent.Queues;
23+
import reactor.util.context.Context;
24+
25+
import java.nio.ByteBuffer;
26+
import java.util.Queue;
27+
import java.util.concurrent.atomic.AtomicIntegerFieldUpdater;
28+
import java.util.concurrent.atomic.AtomicLongFieldUpdater;
29+
import java.util.function.BiConsumer;
30+
31+
import org.reactivestreams.Publisher;
32+
import org.reactivestreams.Subscription;
33+
import org.springframework.core.io.buffer.DataBuffer;
34+
import org.springframework.core.io.buffer.DataBufferUtils;
35+
import org.springframework.core.io.buffer.DefaultDataBufferFactory;
36+
37+
import com.mongodb.reactivestreams.client.Success;
38+
import com.mongodb.reactivestreams.client.gridfs.AsyncInputStream;
39+
40+
/**
41+
* Adapter accepting a binary stream {@link Publisher} and emitting its through {@link AsyncInputStream}.
42+
* <p>
43+
* This adapter subscribes to the binary {@link Publisher} as soon as the first chunk gets {@link #read(ByteBuffer)
44+
* requested}. Requests are queued and binary chunks are requested from the {@link Publisher}. As soon as the
45+
* {@link Publisher} emits items, chunks are provided to the read request which completes the number-of-written-bytes
46+
* {@link Publisher}.
47+
* <p>
48+
* {@link AsyncInputStream} is supposed to work as sequential callback API that is called until reaching EOF.
49+
* {@link #close()} is propagated as cancellation signal to the binary {@link Publisher}.
50+
*
51+
* @author Mark Paluch
52+
* @since 2.2
53+
*/
54+
@RequiredArgsConstructor
55+
class AsyncInputStreamAdapter implements AsyncInputStream {
56+
57+
private static final AtomicLongFieldUpdater<AsyncInputStreamAdapter> DEMAND = AtomicLongFieldUpdater
58+
.newUpdater(AsyncInputStreamAdapter.class, "demand");
59+
60+
private static final AtomicIntegerFieldUpdater<AsyncInputStreamAdapter> SUBSCRIBED = AtomicIntegerFieldUpdater
61+
.newUpdater(AsyncInputStreamAdapter.class, "subscribed");
62+
63+
private static final int SUBSCRIPTION_NOT_SUBSCRIBED = 0;
64+
private static final int SUBSCRIPTION_SUBSCRIBED = 1;
65+
66+
private final Publisher<? extends DataBuffer> buffers;
67+
private final Context subscriberContext;
68+
private final DefaultDataBufferFactory factory = new DefaultDataBufferFactory();
69+
70+
private volatile Subscription subscription;
71+
private volatile boolean cancelled;
72+
private volatile boolean complete;
73+
private volatile Throwable error;
74+
private final Queue<BiConsumer<DataBuffer, Integer>> readRequests = Queues.<BiConsumer<DataBuffer, Integer>> small()
75+
.get();
76+
77+
// see DEMAND
78+
private volatile long demand;
79+
80+
// see SUBSCRIBED
81+
private volatile int subscribed = SUBSCRIPTION_NOT_SUBSCRIBED;
82+
83+
/*
84+
* (non-Javadoc)
85+
* @see com.mongodb.reactivestreams.client.gridfs.AsyncInputStream#read(java.nio.ByteBuffer)
86+
*/
87+
@Override
88+
public Publisher<Integer> read(ByteBuffer dst) {
89+
90+
return Mono.create(sink -> {
91+
92+
readRequests.offer((db, bytecount) -> {
93+
94+
try {
95+
96+
if (error != null) {
97+
sink.error(error);
98+
return;
99+
}
100+
101+
if (bytecount == -1) {
102+
sink.success(-1);
103+
return;
104+
}
105+
106+
ByteBuffer byteBuffer = db.asByteBuffer();
107+
int toWrite = byteBuffer.remaining();
108+
dst.put(byteBuffer);
109+
110+
sink.success(toWrite);
111+
} catch (Exception e) {
112+
sink.error(e);
113+
} finally {
114+
DataBufferUtils.release(db);
115+
}
116+
});
117+
118+
request(1);
119+
});
120+
}
121+
122+
/*
123+
* (non-Javadoc)
124+
* @see com.mongodb.reactivestreams.client.gridfs.AsyncInputStream#close()
125+
*/
126+
@Override
127+
public Publisher<Success> close() {
128+
129+
return Mono.create(sink -> {
130+
cancelled = true;
131+
132+
if (error != null) {
133+
sink.error(error);
134+
return;
135+
}
136+
137+
sink.success(Success.SUCCESS);
138+
});
139+
}
140+
141+
protected void request(int n) {
142+
143+
if (complete) {
144+
terminatePendingReads();
145+
return;
146+
}
147+
148+
Operators.addCap(DEMAND, this, n);
149+
150+
if (SUBSCRIBED.get(this) == SUBSCRIPTION_NOT_SUBSCRIBED) {
151+
152+
if (SUBSCRIBED.compareAndSet(this, SUBSCRIPTION_NOT_SUBSCRIBED, SUBSCRIPTION_SUBSCRIBED)) {
153+
154+
buffers.subscribe(new CoreSubscriber<DataBuffer>() {
155+
156+
@Override
157+
public Context currentContext() {
158+
return subscriberContext;
159+
}
160+
161+
@Override
162+
public void onSubscribe(Subscription s) {
163+
subscription = s;
164+
165+
Operators.addCap(DEMAND, AsyncInputStreamAdapter.this, -1);
166+
s.request(1);
167+
}
168+
169+
@Override
170+
public void onNext(DataBuffer dataBuffer) {
171+
172+
if (cancelled || complete) {
173+
DataBufferUtils.release(dataBuffer);
174+
Operators.onNextDropped(dataBuffer, subscriberContext);
175+
return;
176+
}
177+
178+
BiConsumer<DataBuffer, Integer> poll = readRequests.poll();
179+
180+
if (poll == null) {
181+
182+
DataBufferUtils.release(dataBuffer);
183+
Operators.onNextDropped(dataBuffer, subscriberContext);
184+
subscription.cancel();
185+
return;
186+
}
187+
188+
poll.accept(dataBuffer, dataBuffer.readableByteCount());
189+
190+
requestFromSubscription(subscription);
191+
}
192+
193+
@Override
194+
public void onError(Throwable t) {
195+
196+
if (cancelled || complete) {
197+
Operators.onErrorDropped(t, subscriberContext);
198+
return;
199+
}
200+
201+
error = t;
202+
complete = true;
203+
terminatePendingReads();
204+
}
205+
206+
@Override
207+
public void onComplete() {
208+
209+
complete = true;
210+
terminatePendingReads();
211+
}
212+
});
213+
}
214+
} else {
215+
216+
Subscription subscription = this.subscription;
217+
218+
if (subscription != null) {
219+
requestFromSubscription(subscription);
220+
}
221+
}
222+
}
223+
224+
void requestFromSubscription(Subscription subscription) {
225+
226+
long demand = DEMAND.get(AsyncInputStreamAdapter.this);
227+
228+
if (cancelled) {
229+
subscription.cancel();
230+
}
231+
232+
if (demand > 0 && DEMAND.compareAndSet(AsyncInputStreamAdapter.this, demand, demand - 1)) {
233+
subscription.request(1);
234+
}
235+
}
236+
237+
/**
238+
* Terminates pending reads with empty buffers.
239+
*/
240+
void terminatePendingReads() {
241+
242+
BiConsumer<DataBuffer, Integer> readers;
243+
244+
while ((readers = readRequests.poll()) != null) {
245+
readers.accept(factory.wrap(new byte[0]), -1);
246+
}
247+
}
248+
}
Lines changed: 78 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,78 @@
1+
/*
2+
* Copyright 2019 the original author or authors.
3+
*
4+
* Licensed under the Apache License, Version 2.0 (the "License");
5+
* you may not use this file except in compliance with the License.
6+
* You may obtain a copy of the License at
7+
*
8+
* http://www.apache.org/licenses/LICENSE-2.0
9+
*
10+
* Unless required by applicable law or agreed to in writing, software
11+
* distributed under the License is distributed on an "AS IS" BASIS,
12+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13+
* See the License for the specific language governing permissions and
14+
* limitations under the License.
15+
*/
16+
package org.springframework.data.mongodb.gridfs;
17+
18+
import reactor.core.publisher.Flux;
19+
import reactor.core.publisher.Mono;
20+
21+
import org.reactivestreams.Publisher;
22+
import org.springframework.core.io.buffer.DataBuffer;
23+
import org.springframework.core.io.buffer.DataBufferFactory;
24+
import org.springframework.core.io.buffer.DataBufferUtils;
25+
26+
import com.mongodb.reactivestreams.client.gridfs.AsyncInputStream;
27+
28+
/**
29+
* Utility methods to create adapters from between {@link Publisher} of {@link DataBuffer} and {@link AsyncInputStream}.
30+
*
31+
* @author Mark Paluch
32+
* @since 2.2
33+
*/
34+
class BinaryStreamAdapters {
35+
36+
/**
37+
* Creates a {@link Flux} emitting {@link DataBuffer} by reading binary chunks from {@link AsyncInputStream}.
38+
* Publisher termination (completion, error, cancellation) closes the {@link AsyncInputStream}.
39+
* <p/>
40+
* The resulting {@link org.reactivestreams.Publisher} filters empty binary chunks and uses {@link DataBufferFactory}
41+
* settings to determine the chunk size.
42+
*
43+
* @param inputStream must not be {@literal null}.
44+
* @param dataBufferFactory must not be {@literal null}.
45+
* @return {@link Flux} emitting {@link DataBuffer}s.
46+
* @see DataBufferFactory#allocateBuffer()
47+
*/
48+
static Flux<DataBuffer> toPublisher(AsyncInputStream inputStream, DataBufferFactory dataBufferFactory) {
49+
50+
return DataBufferPublisherAdapter.createBinaryStream(inputStream, dataBufferFactory) //
51+
.filter(it -> {
52+
53+
if (it.readableByteCount() == 0) {
54+
DataBufferUtils.release(it);
55+
return false;
56+
}
57+
return true;
58+
});
59+
}
60+
61+
/**
62+
* Creates a {@link Mono} emitting a {@link AsyncInputStream} to consume a {@link Publisher} emitting
63+
* {@link DataBuffer} and exposing the binary stream through {@link AsyncInputStream}. {@link DataBuffer}s are
64+
* released by the adapter during consumption.
65+
* <p/>
66+
* This method returns a {@link Mono} to retain the {@link reactor.util.context.Context subscriber context}.
67+
*
68+
* @param dataBuffers must not be {@literal null}.
69+
* @return {@link Mono} emitting {@link AsyncInputStream}.
70+
* @see DataBufferUtils#release(DataBuffer)
71+
*/
72+
static Mono<AsyncInputStream> toAsyncInputStream(Publisher<? extends DataBuffer> dataBuffers) {
73+
74+
return Mono.create(sink -> {
75+
sink.success(new AsyncInputStreamAdapter(dataBuffers, sink.currentContext()));
76+
});
77+
}
78+
}

0 commit comments

Comments
 (0)