Skip to content

DATAMONGO-2393 - Fix GridFS upload and download adapters #799

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Closed
wants to merge 7 commits into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@

<groupId>org.springframework.data</groupId>
<artifactId>spring-data-mongodb-parent</artifactId>
<version>2.3.0.BUILD-SNAPSHOT</version>
<version>2.3.0.DATAMONGO-2393-SNAPSHOT</version>
<packaging>pom</packaging>

<name>Spring Data MongoDB</name>
Expand Down
2 changes: 1 addition & 1 deletion spring-data-mongodb-benchmarks/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,7 @@
<parent>
<groupId>org.springframework.data</groupId>
<artifactId>spring-data-mongodb-parent</artifactId>
<version>2.3.0.BUILD-SNAPSHOT</version>
<version>2.3.0.DATAMONGO-2393-SNAPSHOT</version>
<relativePath>../pom.xml</relativePath>
</parent>

Expand Down
2 changes: 1 addition & 1 deletion spring-data-mongodb-distribution/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,7 @@
<parent>
<groupId>org.springframework.data</groupId>
<artifactId>spring-data-mongodb-parent</artifactId>
<version>2.3.0.BUILD-SNAPSHOT</version>
<version>2.3.0.DATAMONGO-2393-SNAPSHOT</version>
<relativePath>../pom.xml</relativePath>
</parent>

Expand Down
2 changes: 1 addition & 1 deletion spring-data-mongodb/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,7 @@
<parent>
<groupId>org.springframework.data</groupId>
<artifactId>spring-data-mongodb-parent</artifactId>
<version>2.3.0.BUILD-SNAPSHOT</version>
<version>2.3.0.DATAMONGO-2393-SNAPSHOT</version>
<relativePath>../pom.xml</relativePath>
</parent>

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,8 @@

import lombok.RequiredArgsConstructor;
import reactor.core.CoreSubscriber;
import reactor.core.publisher.Flux;
import reactor.core.publisher.FluxSink;
import reactor.core.publisher.Mono;
import reactor.core.publisher.Operators;
import reactor.util.concurrent.Queues;
Expand All @@ -26,13 +28,12 @@
import java.util.Queue;
import java.util.concurrent.atomic.AtomicIntegerFieldUpdater;
import java.util.concurrent.atomic.AtomicLongFieldUpdater;
import java.util.function.BiConsumer;

import org.reactivestreams.Publisher;
import org.reactivestreams.Subscription;

import org.springframework.core.io.buffer.DataBuffer;
import org.springframework.core.io.buffer.DataBufferUtils;
import org.springframework.core.io.buffer.DefaultDataBufferFactory;

import com.mongodb.reactivestreams.client.Success;
import com.mongodb.reactivestreams.client.gridfs.AsyncInputStream;
Expand Down Expand Up @@ -66,14 +67,14 @@ class AsyncInputStreamAdapter implements AsyncInputStream {

private final Publisher<? extends DataBuffer> buffers;
private final Context subscriberContext;
private final DefaultDataBufferFactory factory = new DefaultDataBufferFactory();

private volatile Subscription subscription;
private volatile boolean cancelled;
private volatile boolean complete;
private volatile boolean allDataBuffersReceived;
private volatile Throwable error;
private final Queue<BiConsumer<DataBuffer, Integer>> readRequests = Queues.<BiConsumer<DataBuffer, Integer>> small()
.get();
private final Queue<ReadRequest> readRequests = Queues.<ReadRequest> small().get();

private final Queue<DataBuffer> bufferQueue = Queues.<DataBuffer> small().get();

// see DEMAND
volatile long demand;
Expand All @@ -88,39 +89,28 @@ class AsyncInputStreamAdapter implements AsyncInputStream {
@Override
public Publisher<Integer> read(ByteBuffer dst) {

return Mono.create(sink -> {

readRequests.offer((db, bytecount) -> {

try {
return Flux.create(sink -> {

if (error != null) {
readRequests.offer(new ReadRequest(sink, dst));

sink.error(error);
return;
}

if (bytecount == -1) {

sink.success(-1);
return;
}
sink.onCancel(this::terminatePendingReads);
sink.onDispose(this::terminatePendingReads);
sink.onRequest(this::request);
});
}

ByteBuffer byteBuffer = db.asByteBuffer();
int toWrite = byteBuffer.remaining();
void onError(FluxSink<Integer> sink, Throwable e) {

dst.put(byteBuffer);
sink.success(toWrite);
readRequests.poll();
sink.error(e);
}

} catch (Exception e) {
sink.error(e);
} finally {
DataBufferUtils.release(db);
}
});
void onComplete(FluxSink<Integer> sink, int writtenBytes) {

request(1);
});
readRequests.poll();
DEMAND.decrementAndGet(this);
sink.next(writtenBytes);
sink.complete();
}

/*
Expand All @@ -144,17 +134,19 @@ public Publisher<Success> close() {
cancelled = true;

if (error != null) {
terminatePendingReads();
sink.error(error);
return;
}

terminatePendingReads();
sink.success(Success.SUCCESS);
});
}

protected void request(int n) {
protected void request(long n) {

if (complete) {
if (allDataBuffersReceived && bufferQueue.isEmpty()) {

terminatePendingReads();
return;
Expand All @@ -176,18 +168,51 @@ protected void request(int n) {
requestFromSubscription(subscription);
}
}

}

void requestFromSubscription(Subscription subscription) {

long demand = DEMAND.get(AsyncInputStreamAdapter.this);

if (cancelled) {
subscription.cancel();
}

if (demand > 0 && DEMAND.compareAndSet(AsyncInputStreamAdapter.this, demand, demand - 1)) {
subscription.request(1);
drainLoop();
}

void drainLoop() {

while (DEMAND.get(AsyncInputStreamAdapter.this) > 0) {

DataBuffer wip = bufferQueue.peek();

if (wip == null) {
break;
}

if (wip.readableByteCount() == 0) {
bufferQueue.poll();
continue;
}

ReadRequest consumer = AsyncInputStreamAdapter.this.readRequests.peek();
if (consumer == null) {
break;
}

consumer.transferBytes(wip, wip.readableByteCount());
}

if (bufferQueue.isEmpty()) {

if (allDataBuffersReceived) {
terminatePendingReads();
return;
}

if (demand > 0) {
subscription.request(1);
}
}
}

Expand All @@ -196,10 +221,10 @@ void requestFromSubscription(Subscription subscription) {
*/
void terminatePendingReads() {

BiConsumer<DataBuffer, Integer> readers;
ReadRequest readers;

while ((readers = readRequests.poll()) != null) {
readers.accept(factory.wrap(new byte[0]), -1);
readers.onComplete();
}
}

Expand All @@ -214,53 +239,125 @@ public Context currentContext() {
public void onSubscribe(Subscription s) {

AsyncInputStreamAdapter.this.subscription = s;

Operators.addCap(DEMAND, AsyncInputStreamAdapter.this, -1);
s.request(1);
}

@Override
public void onNext(DataBuffer dataBuffer) {

if (cancelled || complete) {
if (cancelled || allDataBuffersReceived) {
DataBufferUtils.release(dataBuffer);
Operators.onNextDropped(dataBuffer, AsyncInputStreamAdapter.this.subscriberContext);
return;
}

BiConsumer<DataBuffer, Integer> poll = AsyncInputStreamAdapter.this.readRequests.poll();
ReadRequest readRequest = AsyncInputStreamAdapter.this.readRequests.peek();

if (poll == null) {
if (readRequest == null) {

DataBufferUtils.release(dataBuffer);
Operators.onNextDropped(dataBuffer, AsyncInputStreamAdapter.this.subscriberContext);
subscription.cancel();
return;
}

poll.accept(dataBuffer, dataBuffer.readableByteCount());
bufferQueue.offer(dataBuffer);

requestFromSubscription(subscription);
drainLoop();
}

@Override
public void onError(Throwable t) {

if (AsyncInputStreamAdapter.this.cancelled || AsyncInputStreamAdapter.this.complete) {
if (AsyncInputStreamAdapter.this.cancelled || AsyncInputStreamAdapter.this.allDataBuffersReceived) {
Operators.onErrorDropped(t, AsyncInputStreamAdapter.this.subscriberContext);
return;
}

AsyncInputStreamAdapter.this.error = t;
AsyncInputStreamAdapter.this.complete = true;
AsyncInputStreamAdapter.this.allDataBuffersReceived = true;
terminatePendingReads();
}

@Override
public void onComplete() {

AsyncInputStreamAdapter.this.complete = true;
terminatePendingReads();
AsyncInputStreamAdapter.this.allDataBuffersReceived = true;
if (bufferQueue.isEmpty()) {
terminatePendingReads();
}
}
}

/**
* Request to read bytes and transfer these to the associated {@link ByteBuffer}.
*/
class ReadRequest {

private final FluxSink<Integer> sink;
private final ByteBuffer dst;

private int writtenBytes;

ReadRequest(FluxSink<Integer> sink, ByteBuffer dst) {
this.sink = sink;
this.dst = dst;
this.writtenBytes = -1;
}

public void onComplete() {

if (error != null) {
AsyncInputStreamAdapter.this.onError(sink, error);
return;
}

AsyncInputStreamAdapter.this.onComplete(sink, writtenBytes);
}

public void transferBytes(DataBuffer db, int bytes) {

try {

if (error != null) {
AsyncInputStreamAdapter.this.onError(sink, error);
return;
}

ByteBuffer byteBuffer = db.asByteBuffer();
int remaining = byteBuffer.remaining();
int writeCapacity = Math.min(dst.remaining(), remaining);
int limit = Math.min(byteBuffer.position() + writeCapacity, byteBuffer.capacity());
int toWrite = limit - byteBuffer.position();

if (toWrite == 0) {

AsyncInputStreamAdapter.this.onComplete(sink, writtenBytes);
return;
}

int oldPosition = byteBuffer.position();

byteBuffer.limit(toWrite);
dst.put(byteBuffer);
byteBuffer.limit(byteBuffer.capacity());
byteBuffer.position(oldPosition);
db.readPosition(db.readPosition() + toWrite);

if (writtenBytes == -1) {
writtenBytes = bytes;
} else {
writtenBytes += bytes;
}

} catch (Exception e) {
AsyncInputStreamAdapter.this.onError(sink, e);
} finally {

if (db.readableByteCount() == 0) {
DataBufferUtils.release(db);
}
}
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -42,12 +42,14 @@ class BinaryStreamAdapters {
*
* @param inputStream must not be {@literal null}.
* @param dataBufferFactory must not be {@literal null}.
* @param bufferSize read {@code n} bytes per iteration.
* @return {@link Flux} emitting {@link DataBuffer}s.
* @see DataBufferFactory#allocateBuffer()
*/
static Flux<DataBuffer> toPublisher(AsyncInputStream inputStream, DataBufferFactory dataBufferFactory) {
static Flux<DataBuffer> toPublisher(AsyncInputStream inputStream, DataBufferFactory dataBufferFactory,
int bufferSize) {

return DataBufferPublisherAdapter.createBinaryStream(inputStream, dataBufferFactory) //
return DataBufferPublisherAdapter.createBinaryStream(inputStream, dataBufferFactory, bufferSize) //
.filter(it -> {

if (it.readableByteCount() == 0) {
Expand Down
Loading